From a479c2c2b2ea6ed141c11fca7130db72665f91b8 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan <43301668+akankshamahajan15@users.noreply.github.com> Date: Mon, 23 May 2022 12:15:26 -0700 Subject: [PATCH] Fix stress test failure "Corruption: checksum mismatch" or "Iterator Diverged" with async_io enabled (#10032) Summary: In case of non sequential reads with `async_io`, `FilePRefetchBuffer::TryReadFromCacheAsync` can be called for previous blocks with `offset < bufs_[curr_].offset_` which wasn't handled correctly resulting wrong data being returned from buffer. Since `FilePRefetchBuffer::PrefetchAsync` can be called for any data block, it sets `prev_len_` to 0 indicating `FilePRefetchBuffer::TryReadFromCacheAsync` to go for the prefetching even though offset < bufs_[curr_].offset_ This is because async prefetching is always done in second buffer (to avoid mutex) even though curr_ is empty leading to offset < bufs_[curr_].offset_ in some cases. If prev_len_ is non zero then `TryReadFromCacheAsync` returns false if `offset < bufs_[curr_].offset_ && prev_len != 0` indicating reads are not sequential and previous call wasn't PrefetchAsync. - This PR also simplifies `FilePRefetchBuffer::TryReadFromCacheAsync` as it was getting complicated covering different scenarios based on `async_io` enabled/disabled. If `for_compaction` is set true, it now calls `FilePRefetchBufferTryReadFromCache` following synchronous flow as before. Its decided in BlockFetcher.cc Pull Request resolved: https://github.com/facebook/rocksdb/pull/10032 Test Plan: 1. export CRASH_TEST_EXT_ARGS=" --async_io=1" make crash_test -j completed successfully locally 2. make crash_test -j completed successfully locally 3. Reran CircleCi mini crashtest job 4 - 5 times. 4. Updated prefetch_test for more coverage. Reviewed By: anand1976 Differential Revision: D36579858 Pulled By: akankshamahajan15 fbshipit-source-id: 0c428d62b45e12e082a83acf533a5e37a584bedf --- file/file_prefetch_buffer.cc | 51 +++++++++++++++------------------- file/file_prefetch_buffer.h | 4 +-- file/prefetch_test.cc | 53 +++++++++++++++++++++++++++++++++++- table/block_fetcher.cc | 31 ++++++++++++--------- 4 files changed, 94 insertions(+), 45 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index f75261781..135cdd2e1 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -447,18 +447,23 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, bool FilePrefetchBuffer::TryReadFromCacheAsync( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */ -) { + Env::IOPriority rate_limiter_priority) { + assert(async_io_); + if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } + if (!enable_) { + return false; + } + // In case of async_io_, offset can be less than bufs_[curr_].offset_ because // of reads not sequential and PrefetchAsync can be called for any block and // RocksDB will call TryReadFromCacheAsync after PrefetchAsync to Poll for - // requested bytes. IsEligibleForPrefetch API will return false in case reads - // are not sequential and Non sequential reads will be handled there. - if (!enable_ || (offset < bufs_[curr_].offset_ && async_io_ == false)) { + // requested bytes. + if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_ && + prev_len_ != 0) { return false; } @@ -476,31 +481,19 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( Status s; assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); - if (for_compaction) { - s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), - rate_limiter_priority); - } else { - if (implicit_auto_readahead_) { - if (!IsEligibleForPrefetch(offset, n)) { - // Ignore status as Prefetch is not called. - s.PermitUncheckedError(); - return false; - } - } - // async prefetching is enabled if it's implicit_auto_readahead_ or - // explicit readahead_size_ is passed along with ReadOptions.async_io = - // true. - if (async_io_) { - // Prefetch n + readahead_size_/2 synchronously as remaining - // readahead_size_/2 will be prefetched asynchronously. - s = PrefetchAsyncInternal(opts, reader, offset, n, - readahead_size_ / 2, rate_limiter_priority, - copy_to_third_buffer); - } else { - s = Prefetch(opts, reader, offset, n + readahead_size_, - rate_limiter_priority); + + if (implicit_auto_readahead_) { + if (!IsEligibleForPrefetch(offset, n)) { + // Ignore status as Prefetch is not called. + s.PermitUncheckedError(); + return false; } } + + // Prefetch n + readahead_size_/2 synchronously as remaining + // readahead_size_/2 will be prefetched asynchronously. + s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2, + rate_limiter_priority, copy_to_third_buffer); if (!s.ok()) { if (status) { *status = s; @@ -574,7 +567,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Index of second buffer. uint32_t second = curr_ ^ 1; - // Since PrefetchAsync can be called on non sequqential reads. So offset can + // Since PrefetchAsync can be called on non sequential reads. So offset can // be less than buffers' offset. In that case it clears the buffer and // prefetch that block. if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_) { diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 2347479a8..6d32c8314 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -89,6 +89,7 @@ class FilePrefetchBuffer { // while curr_ is being consumed. If data is overlapping in two buffers, // data is copied to third buffer to return continuous buffer. bufs_.resize(3); + (void)async_io_; } ~FilePrefetchBuffer() { @@ -170,8 +171,7 @@ class FilePrefetchBuffer { bool TryReadFromCacheAsync(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - Env::IOPriority rate_limiter_priority, - bool for_compaction); + Env::IOPriority rate_limiter_priority); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index a9a0dd353..47ddb431a 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1252,7 +1252,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { } else { ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(get_perf_context()->number_async_seek, 0); - //"ASSERT_EQ(expected_hits, get_perf_context()->bloom_sst_hit_count);") } } @@ -1349,12 +1348,15 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { // Read the keys. { ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); num_keys++; } + ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); @@ -1375,6 +1377,55 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { } ASSERT_GT(prefetched_bytes_discarded.count, 0); } + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + + { + // Read the keys using seek. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + iter->Seek(BuildKey(450)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + + iter->Seek(BuildKey(450)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Prev(); + } + + ASSERT_EQ(num_keys, total_keys + 1); + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + } } SyncPoint::GetInstance()->DisableProcessing(); diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 974075b69..5047e4781 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -9,6 +9,7 @@ #include "table/block_fetcher.h" +#include #include #include @@ -72,10 +73,10 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); if (io_s.ok()) { bool read_from_prefetch_buffer = false; - if (read_options_.async_io) { + if (read_options_.async_io && !for_compaction_) { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, - &io_s, read_options_.rate_limiter_priority, for_compaction_); + &io_s, read_options_.rate_limiter_priority); } else { read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, @@ -349,20 +350,20 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { #endif // NDEBUG return IOStatus::OK(); } else if (!TryGetCompressedBlockFromPersistentCache()) { - if (prefetch_buffer_ != nullptr) { + assert(prefetch_buffer_ != nullptr); + if (!for_compaction_) { IOOptions opts; IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); + if (!io_s.ok()) { + return io_s; + } + io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync( + opts, file_, handle_.offset(), block_size_with_trailer_, + read_options_.rate_limiter_priority, &slice_)); + if (io_s.IsTryAgain()) { + return io_s; + } if (io_s.ok()) { - io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync( - opts, file_, handle_.offset(), block_size_with_trailer_, - read_options_.rate_limiter_priority, &slice_)); - if (io_s.IsTryAgain()) { - return io_s; - } - if (!io_s.ok()) { - // Fallback to sequential reading of data blocks. - return ReadBlockContents(); - } // Data Block is already in prefetch. got_from_prefetch_buffer_ = true; ProcessTrailerIfPresent(); @@ -388,8 +389,12 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { GetBlockContents(); } InsertUncompressedBlockToPersistentCacheIfNeeded(); + return io_status_; } } + // Fallback to sequential reading of data blocks in case of io_s returns + // error or for_compaction_is true. + return ReadBlockContents(); } return io_status_; }