From 8353ae8b279f54b7329a3d6d7ee060d9fa657270 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Wed, 15 Jun 2022 20:17:35 -0700 Subject: [PATCH] Add few optimizations in async_io for short scans (#10140) Summary: This PR adds few optimizations for async_io for shorter scans. 1. If async_io is enabled, seek would create FilePrefetchBuffer object to fetch the data asynchronously. However `FilePrefetchbuffer::num_file_reads_` wasn't taken into consideration if it calls Next after Seek and would go for Prefetching. This PR fixes that and Next will go for prefetching only if `FilePrefetchbuffer::num_file_reads_` is greater than 2 along with if blocks are sequential. This scenario is only for implicit auto readahead. 2. For seek, when it calls TryReadFromCacheAsync to poll it makes async call as well because TryReadFromCacheAsync flow wasn't changed. So I updated to return after poll instead of further prefetching any data. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10140 Test Plan: 1. Added a unit test 2. Ran crash_test with async_io = 1 to make sure nothing crashes. Reviewed By: anand1976 Differential Revision: D37042242 Pulled By: akankshamahajan15 fbshipit-source-id: b8e6b7cb2ee0886f37a8f53951948b9084e8ffda --- file/file_prefetch_buffer.cc | 17 ++- file/file_prefetch_buffer.h | 28 ++++- file/prefetch_test.cc | 107 ++++++++++++++++-- .../block_based/block_based_table_iterator.cc | 2 +- table/block_based/block_based_table_reader.h | 10 +- table/block_based/block_prefetcher.cc | 26 +++-- table/block_based/block_prefetcher.h | 2 +- table/block_based/partitioned_filter_block.cc | 2 +- .../block_based/partitioned_index_iterator.cc | 8 +- table/block_based/partitioned_index_reader.cc | 2 +- 10 files changed, 162 insertions(+), 42 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 135cdd2e1..9e97845c3 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -282,8 +282,17 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { offset += length; length = 0; + + // Since async request was submitted directly by calling PrefetchAsync in + // last call, we don't need to prefetch further as this call is to poll the + // data submitted in previous call. + if (async_request_submitted_) { + return Status::OK(); + } } + async_request_submitted_ = false; + Status s; size_t prefetch_size = length + readahead_size; size_t alignment = reader->file()->GetRequiredBufferAlignment(); @@ -442,14 +451,10 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, return true; } -// TODO akanksha: Merge this function with TryReadFromCache once async -// functionality is stable. bool FilePrefetchBuffer::TryReadFromCacheAsync( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, Env::IOPriority rate_limiter_priority) { - assert(async_io_); - if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -503,7 +508,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( #endif return false; } - prefetched = true; + prefetched = async_request_submitted_ ? false : true; } else { return false; } @@ -519,6 +524,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (prefetched) { readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } + async_request_submitted_ = false; return true; } @@ -635,6 +641,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // the data. It will return without polling if blocks are not sequential. UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); prev_len_ = 0; + async_request_submitted_ = true; return Status::TryAgain(); } diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 6d32c8314..04303aa30 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -65,7 +65,7 @@ class FilePrefetchBuffer { FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false, - bool async_io = false, FileSystem* fs = nullptr, + uint64_t num_file_reads = 0, FileSystem* fs = nullptr, SystemClock* clock = nullptr, Statistics* stats = nullptr) : curr_(0), readahead_size_(readahead_size), @@ -77,19 +77,20 @@ class FilePrefetchBuffer { implicit_auto_readahead_(implicit_auto_readahead), prev_offset_(0), prev_len_(0), - num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1), + num_file_reads_(num_file_reads), io_handle_(nullptr), del_fn_(nullptr), async_read_in_progress_(false), - async_io_(async_io), + async_request_submitted_(false), fs_(fs), clock_(clock), stats_(stats) { + assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) || + (num_file_reads_ == 0)); // If async_io_ is enabled, data is asynchronously filled in second buffer // while curr_ is being consumed. If data is overlapping in two buffers, // data is copied to third buffer to return continuous buffer. bufs_.resize(3); - (void)async_io_; } ~FilePrefetchBuffer() { @@ -262,6 +263,7 @@ class FilePrefetchBuffer { readahead_size_ = initial_auto_readahead_size_; } + // Called in case of implicit auto prefetching. bool IsEligibleForPrefetch(uint64_t offset, size_t n) { // Prefetch only if this read is sequential otherwise reset readahead_size_ // to initial value. @@ -271,6 +273,13 @@ class FilePrefetchBuffer { return false; } num_file_reads_++; + + // Since async request was submitted in last call directly by calling + // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the + // data submitted in previous call. + if (async_request_submitted_) { + return true; + } if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); return false; @@ -301,14 +310,21 @@ class FilePrefetchBuffer { bool implicit_auto_readahead_; uint64_t prev_offset_; size_t prev_len_; - int64_t num_file_reads_; + // num_file_reads_ is only used when implicit_auto_readahead_ is set. + uint64_t num_file_reads_; // io_handle_ is allocated and used by underlying file system in case of // asynchronous reads. void* io_handle_; IOHandleDeleter del_fn_; bool async_read_in_progress_; - bool async_io_; + + // If async_request_submitted_ is set then it indicates RocksDB called + // PrefetchAsync to submit request. It needs to TryReadFromCacheAsync to poll + // the submitted request without checking if data is sequential and + // num_file_reads_. + bool async_request_submitted_; + FileSystem* fs_; SystemClock* clock_; Statistics* stats_; diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index de896a99e..88e74cc35 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1157,9 +1157,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { size_t expected_current_readahead_size = 8 * 1024; size_t decrease_readahead_size = 8 * 1024; - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { current_readahead_size = *reinterpret_cast(arg); @@ -1168,7 +1167,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; - ro.async_io = true; + // ro.async_io = true; { /* * Reseek keys from sequential Data Blocks within same partitioned @@ -1200,17 +1199,15 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ? (expected_current_readahead_size - decrease_readahead_size) : 0)); - iter->Seek(BuildKey(1000)); // Prefetch the block. + iter->Seek(BuildKey(1000)); // Won't prefetch the block. ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); - expected_current_readahead_size *= 2; iter->Seek(BuildKey(1004)); // Prefetch the block. ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); expected_current_readahead_size *= 2; - // 1011 is already in cache but won't reset?? iter->Seek(BuildKey(1011)); ASSERT_TRUE(iter->Valid()); @@ -1244,7 +1241,101 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { iter->Seek(BuildKey(1022)); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); - ASSERT_EQ(buff_prefetch_count, 3); + ASSERT_EQ(buff_prefetch_count, 2); + + buff_prefetch_count = 0; + } + Close(); +} + +TEST_P(PrefetchTest2, SeekParallelizationTest) { + const int kNumKeys = 2000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + ASSERT_EQ(buff_prefetch_count, 2); // Check stats to make sure async prefetch is done. { diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 52734f7ca..0e7e378b7 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -257,7 +257,7 @@ void BlockBasedTableIterator::InitDataBlock() { // Enabled from the very first IO when ReadOptions.readahead_size is set. block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, - read_options_.async_io, read_options_.rate_limiter_priority); + /*async_io=*/false, read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 2d624cd00..692e519d0 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -662,21 +662,21 @@ struct BlockBasedTable::Rep { size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead, - bool async_io) const { + uint64_t num_file_reads) const { fpb->reset(new FilePrefetchBuffer( readahead_size, max_readahead_size, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, - implicit_auto_readahead, async_io, ioptions.fs.get(), ioptions.clock, - ioptions.stats)); + implicit_auto_readahead, num_file_reads, ioptions.fs.get(), + ioptions.clock, ioptions.stats)); } void CreateFilePrefetchBufferIfNotExists( size_t readahead_size, size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead, - bool async_io) const { + uint64_t num_file_reads) const { if (!(*fpb)) { CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb, - implicit_auto_readahead, async_io); + implicit_auto_readahead, num_file_reads); } } diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index 1f08c161b..981e2043c 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -16,17 +16,21 @@ void BlockPrefetcher::PrefetchIfNeeded( const BlockBasedTable::Rep* rep, const BlockHandle& handle, const size_t readahead_size, bool is_for_compaction, const bool async_io, const Env::IOPriority rate_limiter_priority) { + // num_file_reads is used by FilePrefetchBuffer only when + // implicit_auto_readahead is set. if (is_for_compaction) { rep->CreateFilePrefetchBufferIfNotExists( compaction_readahead_size_, compaction_readahead_size_, - &prefetch_buffer_, false, async_io); + &prefetch_buffer_, /*implicit_auto_readahead=*/false, + /*num_file_reads=*/0); return; } // Explicit user requested readahead. if (readahead_size > 0) { rep->CreateFilePrefetchBufferIfNotExists( - readahead_size, readahead_size, &prefetch_buffer_, false, async_io); + readahead_size, readahead_size, &prefetch_buffer_, + /*implicit_auto_readahead=*/false, /*num_file_reads=*/0); return; } @@ -39,11 +43,13 @@ void BlockPrefetcher::PrefetchIfNeeded( return; } - // In case of async_io, it always creates the PrefetchBuffer. + // In case of async_io, always creates the PrefetchBuffer irrespective of + // num_file_reads_. if (async_io) { rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, /*implicit_auto_readahead=*/true, async_io); + &prefetch_buffer_, /*implicit_auto_readahead=*/true, + /*num_file_reads=*/0); return; } @@ -78,9 +84,9 @@ void BlockPrefetcher::PrefetchIfNeeded( } if (rep->file->use_direct_io()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, - max_auto_readahead_size, - &prefetch_buffer_, true, async_io); + rep->CreateFilePrefetchBufferIfNotExists( + initial_auto_readahead_size_, max_auto_readahead_size, + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); return; } @@ -96,9 +102,9 @@ void BlockPrefetcher::PrefetchIfNeeded( BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_, rate_limiter_priority); if (s.IsNotSupported()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, - max_auto_readahead_size, - &prefetch_buffer_, true, async_io); + rep->CreateFilePrefetchBufferIfNotExists( + initial_auto_readahead_size_, max_auto_readahead_size, + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); return; } diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 285903511..4fae7f0bb 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -63,7 +63,7 @@ class BlockPrefetcher { // initial_auto_readahead_size_ is used if RocksDB uses internal prefetch // buffer. uint64_t initial_auto_readahead_size_; - int64_t num_file_reads_ = 0; + uint64_t num_file_reads_ = 0; uint64_t prev_offset_ = 0; size_t prev_len_ = 0; std::unique_ptr prefetch_buffer_; diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index 73b8c5716..6fc6ddd1e 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -503,7 +503,7 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, false /* Implicit autoreadahead */, - false /*async_io*/); + 0 /*num_reads_*/); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 9e8c06268..94a023133 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -89,10 +89,10 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is set. - block_prefetcher_.PrefetchIfNeeded( - rep, partitioned_index_handle, read_options_.readahead_size, - is_for_compaction, read_options_.async_io, - read_options_.rate_limiter_priority); + block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, + read_options_.readahead_size, + is_for_compaction, /*async_io=*/false, + read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 14e3f7484..53d892097 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -158,7 +158,7 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, false /*Implicit auto readahead*/, - false /*async_io*/); + 0 /*num_reads_*/); IOOptions opts; { Status s = rep->file->PrepareIOOptions(ro, opts);