From 36bc3da97fcb6aa2df6e43df27552f49b762d61e Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Mon, 4 Apr 2022 15:35:43 -0700 Subject: [PATCH] Fix segfault in FilePrefetchBuffer with async_io enabled (#9777) Summary: If FilePrefetchBuffer object is destroyed and then later Poll() calls callback on object which has been destroyed, it gives segfault on accessing destroyed object. It was caught after adding unit tests that tests Posix implementation of ReadAsync and Poll APIs. This PR also updates and fixes existing IOURing tests which were not running locally because RocksDbIOUringEnable function wasn't defined and IOUring was disabled for those tests Pull Request resolved: https://github.com/facebook/rocksdb/pull/9777 Test Plan: Added new unit test Reviewed By: anand1976 Differential Revision: D35254002 Pulled By: akankshamahajan15 fbshipit-source-id: 68e80054ffb14ae25c255920ebc6548ca5f130a1 --- HISTORY.md | 1 + env/env_test.cc | 7 +- env/fs_posix.cc | 9 ++- env/io_posix.cc | 53 +++++++------ env/io_posix.h | 5 +- file/file_prefetch_buffer.cc | 38 ++++----- file/file_prefetch_buffer.h | 26 +++++-- file/prefetch_test.cc | 81 ++++++++++++++++++++ table/block_based/block_based_table_reader.h | 8 +- table/block_fetcher.cc | 3 +- 10 files changed, 167 insertions(+), 64 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 48b0c90be..21ec36ba1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Fixed a bug that `rocksdb.read.block.compaction.micros` cannot track compaction stats (#9722). * Fixed `file_type`, `relative_filename` and `directory` fields returned by `GetLiveFilesMetaData()`, which were added in inheriting from `FileStorageInfo`. * Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766). +* Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction. ### New Features * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. diff --git a/env/env_test.cc b/env/env_test.cc index e8fdd31bc..1b30f093a 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -85,6 +85,8 @@ struct Deleter { void (*fn_)(void*); }; +extern "C" bool RocksDbIOUringEnable() { return true; } + std::unique_ptr NewAligned(const size_t size, const char ch) { char* ptr = nullptr; #ifdef OS_WIN @@ -1256,7 +1258,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { // Random Read Random rnd(301 + attempt); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UpdateResults:io_uring_result", [&](void* arg) { + "UpdateResults::io_uring_result", [&](void* arg) { if (attempt > 0) { // No failure in the first attempt. size_t& bytes_read = *static_cast(arg); @@ -1326,7 +1328,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { const int num_reads = rnd.Uniform(512) + 1; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UpdateResults:io_uring_result", [&](void* arg) { + "UpdateResults::io_uring_result", [&](void* arg) { if (attempt > 5) { // Improve partial result rates in second half of the run to // cover the case of repeated partial results. @@ -3308,7 +3310,6 @@ TEST_F(TestAsyncRead, ReadAsync) { } } } - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/env/fs_posix.cc b/env/fs_posix.cc index cd23b26a9..17095e92c 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1045,9 +1045,12 @@ class PosixFileSystem : public FileSystem { // EXPERIMENTAL // - // TODO akankshamahajan: Update Poll API to take into account min_completions + // TODO akankshamahajan: + // 1. Update Poll API to take into account min_completions // and returns if number of handles in io_handles (any order) completed is // equal to atleast min_completions. + // 2. Currently in case of direct_io, Read API is called because of which call + // to Poll API fails as it expects IOHandle to be populated. virtual IOStatus Poll(std::vector& io_handles, size_t /*min_completions*/) override { #if defined(ROCKSDB_IOURING_PRESENT) @@ -1094,12 +1097,14 @@ class PosixFileSystem : public FileSystem { req.offset = posix_handle->offset; req.len = posix_handle->len; size_t finished_len = 0; + size_t bytes_read = 0; UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len, - true /*async_read*/, finished_len, &req); + true /*async_read*/, finished_len, &req, bytes_read); posix_handle->is_finished = true; io_uring_cqe_seen(iu, cqe); posix_handle->cb(req, posix_handle->cb_arg); (void)finished_len; + (void)bytes_read; if (static_cast(io_handles[i]) == posix_handle) { break; diff --git a/env/io_posix.cc b/env/io_posix.cc index be85a7c6d..012026cbc 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -744,31 +744,36 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, wrap_cache.erase(wrap_check); FSReadRequest* req = req_wrap->req; + size_t bytes_read = 0; UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, - false /*async_read*/, req_wrap->finished_len, req); + false /*async_read*/, req_wrap->finished_len, req, + bytes_read); int32_t res = cqe->res; - if (res == 0) { - /// cqe->res == 0 can means EOF, or can mean partial results. See - // comment - // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 - // Fall back to pread in this case. - if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len, - GetRequiredBufferAlignment())) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - req->result = Slice(req->scratch, req_wrap->finished_len); - req->status = IOStatus::OK(); - } else { - Slice tmp_slice; - req->status = - Read(req->offset + req_wrap->finished_len, - req->len - req_wrap->finished_len, options, &tmp_slice, - req->scratch + req_wrap->finished_len, dbg); - req->result = - Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + if (res >= 0) { + if (bytes_read == 0) { + /// cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + if (use_direct_io() && + !IsSectorAligned(req_wrap->finished_len, + GetRequiredBufferAlignment())) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + req->result = Slice(req->scratch, req_wrap->finished_len); + req->status = IOStatus::OK(); + } else { + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); + } + } else if (bytes_read < req_wrap->iov.iov_len) { + incomplete_rq_list.push_back(req_wrap); } - } else if (res > 0 && res < static_cast(req_wrap->iov.iov_len)) { - incomplete_rq_list.push_back(req_wrap); } io_uring_cqe_seen(iu, cqe); } @@ -896,8 +901,8 @@ IOStatus PosixRandomAccessFile::ReadAsync( // Initialize Posix_IOHandle. posix_handle->iu = iu; - posix_handle->iov.iov_base = posix_handle->scratch; - posix_handle->iov.iov_len = posix_handle->len; + posix_handle->iov.iov_base = req.scratch; + posix_handle->iov.iov_len = req.len; posix_handle->cb = cb; posix_handle->cb_arg = cb_arg; posix_handle->offset = req.offset; diff --git a/env/io_posix.h b/env/io_posix.h index 65c74758c..0ff787a70 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -66,12 +66,13 @@ struct Posix_IOHandle { inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, size_t len, size_t iov_len, bool async_read, - size_t& finished_len, FSReadRequest* req) { + size_t& finished_len, FSReadRequest* req, + size_t& bytes_read) { if (cqe->res < 0) { req->result = Slice(req->scratch, 0); req->status = IOError("Req failed", file_name, cqe->res); } else { - size_t bytes_read = static_cast(cqe->res); + bytes_read = static_cast(cqe->res); TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read); if (bytes_read == iov_len) { req->result = Slice(req->scratch, req->len); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index cef339196..ef349c2b8 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -111,13 +111,6 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, Env::IOPriority rate_limiter_priority, uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start, uint32_t index) { - // Reset io_handle. - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; - } - // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, std::placeholders::_1, std::placeholders::_2); @@ -129,6 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len; Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_, &del_fn_, rate_limiter_priority); + req.status.PermitUncheckedError(); if (s.ok()) { async_read_in_progress_ = true; } @@ -221,24 +215,31 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, // the asynchronous prefetching in second buffer. Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - FileSystem* fs, uint64_t offset, - size_t length, size_t readahead_size, + uint64_t offset, size_t length, + size_t readahead_size, Env::IOPriority rate_limiter_priority, bool& copy_to_third_buffer) { if (!enable_) { return Status::OK(); } - if (async_read_in_progress_ && fs != nullptr) { + if (async_read_in_progress_ && fs_ != nullptr) { // Wait for prefetch data to complete. // No mutex is needed as PrefetchAsyncCallback updates the result in second // buffer and FilePrefetchBuffer should wait for Poll before accessing the // second buffer. std::vector handles; handles.emplace_back(io_handle_); - fs->Poll(handles, 1).PermitUncheckedError(); + fs_->Poll(handles, 1).PermitUncheckedError(); + } + // Release io_handle_ after the Poll API as request has been completed. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; } - // TODO akanksha: Update TEST_SYNC_POINT after new tests are added. + // TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with + // normal prefetching. TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); Status s; size_t prefetch_size = length + readahead_size; @@ -438,8 +439,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, bool FilePrefetchBuffer::TryReadFromCacheAsync( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */, - FileSystem* fs) { + Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */ +) { if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -474,7 +475,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (implicit_auto_readahead_ && async_io_) { // Prefetch n + readahead_size_/2 synchronously as remaining // readahead_size_/2 will be prefetched asynchronously. - s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2, + s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2, rate_limiter_priority, copy_to_third_buffer); } else { s = Prefetch(opts, reader, offset, n + readahead_size_, @@ -527,12 +528,5 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, size_t current_size = bufs_[index].buffer_.CurrentSize(); bufs_[index].buffer_.Size(current_size + req.result.size()); } - - // Release io_handle_. - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; - } } } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index c4713782c..8f2f09c73 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -65,7 +65,7 @@ class FilePrefetchBuffer { FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false, - bool async_io = false) + bool async_io = false, FileSystem* fs = nullptr) : curr_(0), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), @@ -79,13 +79,29 @@ class FilePrefetchBuffer { io_handle_(nullptr), del_fn_(nullptr), async_read_in_progress_(false), - async_io_(async_io) { + async_io_(async_io), + fs_(fs) { // If async_io_ is enabled, data is asynchronously filled in second buffer // while curr_ is being consumed. If data is overlapping in two buffers, // data is copied to third buffer to return continuous buffer. bufs_.resize(3); } + ~FilePrefetchBuffer() { + // Wait for any pending async job before destroying the class object. + if (async_read_in_progress_ && fs_ != nullptr) { + std::vector handles; + handles.emplace_back(io_handle_); + fs_->Poll(handles, 1).PermitUncheckedError(); + } + // Release io_handle_. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; + } + } + // Load data into the buffer from a file. // reader : the file reader. // offset : the file offset to start reading from. @@ -100,8 +116,7 @@ class FilePrefetchBuffer { Env::IOPriority rate_limiter_priority); Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - FileSystem* fs, uint64_t offset, size_t length, - size_t readahead_size, + uint64_t offset, size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, bool& copy_to_third_buffer); @@ -129,7 +144,7 @@ class FilePrefetchBuffer { RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, Env::IOPriority rate_limiter_priority, - bool for_compaction /* = false */, FileSystem* fs); + bool for_compaction /* = false */); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. @@ -256,5 +271,6 @@ class FilePrefetchBuffer { IOHandleDeleter del_fn_; bool async_read_in_progress_; bool async_io_; + FileSystem* fs_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index e13fc5d8d..9319b9973 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1013,6 +1013,87 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { Close(); } +extern "C" bool RocksDbIOUringEnable() { return true; } + +// Tests the default implementation of ReadAsync API with PosixFileSystem. +TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = false; + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index e9fea1d08..9d056f73e 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -655,10 +655,10 @@ struct BlockBasedTable::Rep { std::unique_ptr* fpb, bool implicit_auto_readahead, bool async_io) const { - fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size, - !ioptions.allow_mmap_reads /* enable */, - false /* track_min_offset */, - implicit_auto_readahead, async_io)); + fpb->reset(new FilePrefetchBuffer( + readahead_size, max_readahead_size, + !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, + implicit_auto_readahead, async_io, ioptions.fs.get())); } void CreateFilePrefetchBufferIfNotExists( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 889ec8115..110c4bfe8 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -75,8 +75,7 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (read_options_.async_io) { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, - &io_s, read_options_.rate_limiter_priority, for_compaction_, - ioptions_.fs.get()); + &io_s, read_options_.rate_limiter_priority, for_compaction_); } else { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,