diff --git a/HISTORY.md b/HISTORY.md index 48b0c90be..21ec36ba1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Fixed a bug that `rocksdb.read.block.compaction.micros` cannot track compaction stats (#9722). * Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`. * Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766). +* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction. ### New Features * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. diff --git a/env/env_test.cc b/env/env_test.cc index e8fdd31bc..1b30f093a 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -85,6 +85,8 @@ struct Deleter { void (*fn_)(void*); }; +extern "C" bool RocksDbIOUringEnable() { return true; } + std::unique_ptr NewAligned(const size_t size, const char ch) { char* ptr = nullptr; #ifdef OS_WIN @@ -1256,7 +1258,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { // Random Read Random rnd(301 + attempt); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UpdateResults:io_uring_result", [&](void* arg) { + "UpdateResults::io_uring_result", [&](void* arg) { if (attempt > 0) { // No failure in the first attempt. size_t& bytes_read = *static_cast(arg); @@ -1326,7 +1328,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { const int num_reads = rnd.Uniform(512) + 1; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UpdateResults:io_uring_result", [&](void* arg) { + "UpdateResults::io_uring_result", [&](void* arg) { if (attempt > 5) { // Improve partial result rates in second half of the run to // cover the case of repeated partial results. @@ -3308,7 +3310,6 @@ TEST_F(TestAsyncRead, ReadAsync) { } } } - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/env/fs_posix.cc b/env/fs_posix.cc index cd23b26a9..17095e92c 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1045,9 +1045,12 @@ class PosixFileSystem : public FileSystem { // EXPERIMENTAL // - // TODO akankshamahajan: Update Poll API to take into account min_completions + // TODO akankshamahajan: + // 1. Update Poll API to take into account min_completions // and returns if number of handles in io_handles (any order) completed is // equal to atleast min_completions. + // 2. Currently in case of direct_io, Read API is called because of which call + // to Poll API fails as it expects IOHandle to be populated. virtual IOStatus Poll(std::vector& io_handles, size_t /*min_completions*/) override { #if defined(ROCKSDB_IOURING_PRESENT) @@ -1094,12 +1097,14 @@ class PosixFileSystem : public FileSystem { req.offset = posix_handle->offset; req.len = posix_handle->len; size_t finished_len = 0; + size_t bytes_read = 0; UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len, - true /*async_read*/, finished_len, &req); + true /*async_read*/, finished_len, &req, bytes_read); posix_handle->is_finished = true; io_uring_cqe_seen(iu, cqe); posix_handle->cb(req, posix_handle->cb_arg); (void)finished_len; + (void)bytes_read; if (static_cast(io_handles[i]) == posix_handle) { break; diff --git a/env/io_posix.cc b/env/io_posix.cc index be85a7c6d..012026cbc 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -744,31 +744,36 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, wrap_cache.erase(wrap_check); FSReadRequest* req = req_wrap->req; + size_t bytes_read = 0; UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, - false /*async_read*/, req_wrap->finished_len, req); + false /*async_read*/, req_wrap->finished_len, req, + bytes_read); int32_t res = cqe->res; - if (res == 0) { - /// cqe->res == 0 can means EOF, or can mean partial results. See - // comment - // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 - // Fall back to pread in this case. - if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len, - GetRequiredBufferAlignment())) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - req->result = Slice(req->scratch, req_wrap->finished_len); - req->status = IOStatus::OK(); - } else { - Slice tmp_slice; - req->status = - Read(req->offset + req_wrap->finished_len, - req->len - req_wrap->finished_len, options, &tmp_slice, - req->scratch + req_wrap->finished_len, dbg); - req->result = - Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + if (res >= 0) { + if (bytes_read == 0) { + /// cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + if (use_direct_io() && + !IsSectorAligned(req_wrap->finished_len, + GetRequiredBufferAlignment())) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + req->result = Slice(req->scratch, req_wrap->finished_len); + req->status = IOStatus::OK(); + } else { + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + } + } else if (bytes_read < req_wrap->iov.iov_len) { + incomplete_rq_list.push_back(req_wrap); } - } else if (res > 0 && res < static_cast(req_wrap->iov.iov_len)) { - incomplete_rq_list.push_back(req_wrap); } io_uring_cqe_seen(iu, cqe); } @@ -896,8 +901,8 @@ IOStatus PosixRandomAccessFile::ReadAsync( // Initialize Posix_IOHandle. posix_handle->iu = iu; - posix_handle->iov.iov_base = posix_handle->scratch; - posix_handle->iov.iov_len = posix_handle->len; + posix_handle->iov.iov_base = req.scratch; + posix_handle->iov.iov_len = req.len; posix_handle->cb = cb; posix_handle->cb_arg = cb_arg; posix_handle->offset = req.offset; diff --git a/env/io_posix.h b/env/io_posix.h index 65c74758c..0ff787a70 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -66,12 +66,13 @@ struct Posix_IOHandle { inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, size_t len, size_t iov_len, bool async_read, - size_t& finished_len, FSReadRequest* req) { + size_t& finished_len, FSReadRequest* req, + size_t& bytes_read) { if (cqe->res < 0) { req->result = Slice(req->scratch, 0); req->status = IOError("Req failed", file_name, cqe->res); } else { - size_t bytes_read = static_cast(cqe->res); + bytes_read = static_cast(cqe->res); TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read); if (bytes_read == iov_len) { req->result = Slice(req->scratch, req->len); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index cef339196..ef349c2b8 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -111,13 +111,6 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, Env::IOPriority rate_limiter_priority, uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start, uint32_t index) { - // Reset io_handle. - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; - } - // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, std::placeholders::_1, std::placeholders::_2); @@ -129,6 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len; Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_, &del_fn_, rate_limiter_priority); + req.status.PermitUncheckedError(); if (s.ok()) { async_read_in_progress_ = true; } @@ -221,24 +215,31 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, // the asynchronous prefetching in second buffer. Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - FileSystem* fs, uint64_t offset, - size_t length, size_t readahead_size, + uint64_t offset, size_t length, + size_t readahead_size, Env::IOPriority rate_limiter_priority, bool& copy_to_third_buffer) { if (!enable_) { return Status::OK(); } - if (async_read_in_progress_ && fs != nullptr) { + if (async_read_in_progress_ && fs_ != nullptr) { // Wait for prefetch data to complete. // No mutex is needed as PrefetchAsyncCallback updates the result in second // buffer and FilePrefetchBuffer should wait for Poll before accessing the // second buffer. std::vector handles; handles.emplace_back(io_handle_); - fs->Poll(handles, 1).PermitUncheckedError(); + fs_->Poll(handles, 1).PermitUncheckedError(); + } + // Release io_handle_ after the Poll API as request has been completed. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; } - // TODO akanksha: Update TEST_SYNC_POINT after new tests are added. + // TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with + // normal prefetching. TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); Status s; size_t prefetch_size = length + readahead_size; @@ -438,8 +439,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, bool FilePrefetchBuffer::TryReadFromCacheAsync( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */, - FileSystem* fs) { + Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */ +) { if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -474,7 +475,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (implicit_auto_readahead_ && async_io_) { // Prefetch n + readahead_size_/2 synchronously as remaining // readahead_size_/2 will be prefetched asynchronously. - s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2, + s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2, rate_limiter_priority, copy_to_third_buffer); } else { s = Prefetch(opts, reader, offset, n + readahead_size_, @@ -527,12 +528,5 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, size_t current_size = bufs_[index].buffer_.CurrentSize(); bufs_[index].buffer_.Size(current_size + req.result.size()); } - - // Release io_handle_. - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; - } } } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index c4713782c..8f2f09c73 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -65,7 +65,7 @@ class FilePrefetchBuffer { FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false, - bool async_io = false) + bool async_io = false, FileSystem* fs = nullptr) : curr_(0), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), @@ -79,13 +79,29 @@ class FilePrefetchBuffer { io_handle_(nullptr), del_fn_(nullptr), async_read_in_progress_(false), - async_io_(async_io) { + async_io_(async_io), + fs_(fs) { // If async_io_ is enabled, data is asynchronously filled in second buffer // while curr_ is being consumed. If data is overlapping in two buffers, // data is copied to third buffer to return continuous buffer. bufs_.resize(3); } + ~FilePrefetchBuffer() { + // Wait for any pending async job before destroying the class object. + if (async_read_in_progress_ && fs_ != nullptr) { + std::vector handles; + handles.emplace_back(io_handle_); + fs_->Poll(handles, 1).PermitUncheckedError(); + } + // Release io_handle_. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; + } + } + // Load data into the buffer from a file. // reader : the file reader. // offset : the file offset to start reading from. @@ -100,8 +116,7 @@ class FilePrefetchBuffer { Env::IOPriority rate_limiter_priority); Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - FileSystem* fs, uint64_t offset, size_t length, - size_t readahead_size, + uint64_t offset, size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, bool& copy_to_third_buffer); @@ -129,7 +144,7 @@ class FilePrefetchBuffer { RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, Env::IOPriority rate_limiter_priority, - bool for_compaction /* = false */, FileSystem* fs); + bool for_compaction /* = false */); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. @@ -256,5 +271,6 @@ class FilePrefetchBuffer { IOHandleDeleter del_fn_; bool async_read_in_progress_; bool async_io_; + FileSystem* fs_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index e13fc5d8d..9319b9973 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1013,6 +1013,87 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { Close(); } +extern "C" bool RocksDbIOUringEnable() { return true; } + +// Tests the default implementation of ReadAsync API with PosixFileSystem. +TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = false; + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index e9fea1d08..9d056f73e 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -655,10 +655,10 @@ struct BlockBasedTable::Rep { std::unique_ptr* fpb, bool implicit_auto_readahead, bool async_io) const { - fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size, - !ioptions.allow_mmap_reads /* enable */, - false /* track_min_offset */, - implicit_auto_readahead, async_io)); + fpb->reset(new FilePrefetchBuffer( + readahead_size, max_readahead_size, + !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, + implicit_auto_readahead, async_io, ioptions.fs.get())); } void CreateFilePrefetchBufferIfNotExists( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 889ec8115..110c4bfe8 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -75,8 +75,7 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (read_options_.async_io) { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, - &io_s, read_options_.rate_limiter_priority, for_compaction_, - ioptions_.fs.get()); + &io_s, read_options_.rate_limiter_priority, for_compaction_); } else { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,