diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 135cdd2e1..9e97845c3 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -282,8 +282,17 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { offset += length; length = 0; + + // Since async request was submitted directly by calling PrefetchAsync in + // last call, we don't need to prefetch further as this call is to poll the + // data submitted in previous call. + if (async_request_submitted_) { + return Status::OK(); + } } + async_request_submitted_ = false; + Status s; size_t prefetch_size = length + readahead_size; size_t alignment = reader->file()->GetRequiredBufferAlignment(); @@ -442,14 +451,10 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, return true; } -// TODO akanksha: Merge this function with TryReadFromCache once async -// functionality is stable. bool FilePrefetchBuffer::TryReadFromCacheAsync( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, Env::IOPriority rate_limiter_priority) { - assert(async_io_); - if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -503,7 +508,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( #endif return false; } - prefetched = true; + prefetched = async_request_submitted_ ? false : true; } else { return false; } @@ -519,6 +524,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (prefetched) { readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } + async_request_submitted_ = false; return true; } @@ -635,6 +641,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // the data. It will return without polling if blocks are not sequential. UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); prev_len_ = 0; + async_request_submitted_ = true; return Status::TryAgain(); } diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 6d32c8314..04303aa30 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -65,7 +65,7 @@ class FilePrefetchBuffer { FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false, - bool async_io = false, FileSystem* fs = nullptr, + uint64_t num_file_reads = 0, FileSystem* fs = nullptr, SystemClock* clock = nullptr, Statistics* stats = nullptr) : curr_(0), readahead_size_(readahead_size), @@ -77,19 +77,20 @@ class FilePrefetchBuffer { implicit_auto_readahead_(implicit_auto_readahead), prev_offset_(0), prev_len_(0), - num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1), + num_file_reads_(num_file_reads), io_handle_(nullptr), del_fn_(nullptr), async_read_in_progress_(false), - async_io_(async_io), + async_request_submitted_(false), fs_(fs), clock_(clock), stats_(stats) { + assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) || + (num_file_reads_ == 0)); // If async_io_ is enabled, data is asynchronously filled in second buffer // while curr_ is being consumed. If data is overlapping in two buffers, // data is copied to third buffer to return continuous buffer. bufs_.resize(3); - (void)async_io_; } ~FilePrefetchBuffer() { @@ -262,6 +263,7 @@ class FilePrefetchBuffer { readahead_size_ = initial_auto_readahead_size_; } + // Called in case of implicit auto prefetching. bool IsEligibleForPrefetch(uint64_t offset, size_t n) { // Prefetch only if this read is sequential otherwise reset readahead_size_ // to initial value. @@ -271,6 +273,13 @@ class FilePrefetchBuffer { return false; } num_file_reads_++; + + // Since async request was submitted in last call directly by calling + // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the + // data submitted in previous call. + if (async_request_submitted_) { + return true; + } if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); return false; @@ -301,14 +310,21 @@ class FilePrefetchBuffer { bool implicit_auto_readahead_; uint64_t prev_offset_; size_t prev_len_; - int64_t num_file_reads_; + // num_file_reads_ is only used when implicit_auto_readahead_ is set. + uint64_t num_file_reads_; // io_handle_ is allocated and used by underlying file system in case of // asynchronous reads. void* io_handle_; IOHandleDeleter del_fn_; bool async_read_in_progress_; - bool async_io_; + + // If async_request_submitted_ is set then it indicates RocksDB called + // PrefetchAsync to submit request. It needs to TryReadFromCacheAsync to poll + // the submitted request without checking if data is sequential and + // num_file_reads_. + bool async_request_submitted_; + FileSystem* fs_; SystemClock* clock_; Statistics* stats_; diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index de896a99e..88e74cc35 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1157,9 +1157,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { size_t expected_current_readahead_size = 8 * 1024; size_t decrease_readahead_size = 8 * 1024; - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { current_readahead_size = *reinterpret_cast(arg); @@ -1168,7 +1167,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; - ro.async_io = true; + // ro.async_io = true; { /* * Reseek keys from sequential Data Blocks within same partitioned @@ -1200,17 +1199,15 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ? (expected_current_readahead_size - decrease_readahead_size) : 0)); - iter->Seek(BuildKey(1000)); // Prefetch the block. + iter->Seek(BuildKey(1000)); // Won't prefetch the block. ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); - expected_current_readahead_size *= 2; iter->Seek(BuildKey(1004)); // Prefetch the block. ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); expected_current_readahead_size *= 2; - // 1011 is already in cache but won't reset?? iter->Seek(BuildKey(1011)); ASSERT_TRUE(iter->Valid()); @@ -1244,7 +1241,101 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { iter->Seek(BuildKey(1022)); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); - ASSERT_EQ(buff_prefetch_count, 3); + ASSERT_EQ(buff_prefetch_count, 2); + + buff_prefetch_count = 0; + } + Close(); +} + +TEST_P(PrefetchTest2, SeekParallelizationTest) { + const int kNumKeys = 2000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + ASSERT_EQ(buff_prefetch_count, 2); // Check stats to make sure async prefetch is done. { diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 52734f7ca..0e7e378b7 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -257,7 +257,7 @@ void BlockBasedTableIterator::InitDataBlock() { // Enabled from the very first IO when ReadOptions.readahead_size is set. block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, - read_options_.async_io, read_options_.rate_limiter_priority); + /*async_io=*/false, read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 2d624cd00..692e519d0 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -662,21 +662,21 @@ struct BlockBasedTable::Rep { size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead, - bool async_io) const { + uint64_t num_file_reads) const { fpb->reset(new FilePrefetchBuffer( readahead_size, max_readahead_size, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, - implicit_auto_readahead, async_io, ioptions.fs.get(), ioptions.clock, - ioptions.stats)); + implicit_auto_readahead, num_file_reads, ioptions.fs.get(), + ioptions.clock, ioptions.stats)); } void CreateFilePrefetchBufferIfNotExists( size_t readahead_size, size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead, - bool async_io) const { + uint64_t num_file_reads) const { if (!(*fpb)) { CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb, - implicit_auto_readahead, async_io); + implicit_auto_readahead, num_file_reads); } } diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index 1f08c161b..981e2043c 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -16,17 +16,21 @@ void BlockPrefetcher::PrefetchIfNeeded( const BlockBasedTable::Rep* rep, const BlockHandle& handle, const size_t readahead_size, bool is_for_compaction, const bool async_io, const Env::IOPriority rate_limiter_priority) { + // num_file_reads is used by FilePrefetchBuffer only when + // implicit_auto_readahead is set. if (is_for_compaction) { rep->CreateFilePrefetchBufferIfNotExists( compaction_readahead_size_, compaction_readahead_size_, - &prefetch_buffer_, false, async_io); + &prefetch_buffer_, /*implicit_auto_readahead=*/false, + /*num_file_reads=*/0); return; } // Explicit user requested readahead. if (readahead_size > 0) { rep->CreateFilePrefetchBufferIfNotExists( - readahead_size, readahead_size, &prefetch_buffer_, false, async_io); + readahead_size, readahead_size, &prefetch_buffer_, + /*implicit_auto_readahead=*/false, /*num_file_reads=*/0); return; } @@ -39,11 +43,13 @@ void BlockPrefetcher::PrefetchIfNeeded( return; } - // In case of async_io, it always creates the PrefetchBuffer. + // In case of async_io, always creates the PrefetchBuffer irrespective of + // num_file_reads_. if (async_io) { rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, /*implicit_auto_readahead=*/true, async_io); + &prefetch_buffer_, /*implicit_auto_readahead=*/true, + /*num_file_reads=*/0); return; } @@ -78,9 +84,9 @@ void BlockPrefetcher::PrefetchIfNeeded( } if (rep->file->use_direct_io()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, - max_auto_readahead_size, - &prefetch_buffer_, true, async_io); + rep->CreateFilePrefetchBufferIfNotExists( + initial_auto_readahead_size_, max_auto_readahead_size, + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); return; } @@ -96,9 +102,9 @@ void BlockPrefetcher::PrefetchIfNeeded( BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_, rate_limiter_priority); if (s.IsNotSupported()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, - max_auto_readahead_size, - &prefetch_buffer_, true, async_io); + rep->CreateFilePrefetchBufferIfNotExists( + initial_auto_readahead_size_, max_auto_readahead_size, + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); return; } diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 285903511..4fae7f0bb 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -63,7 +63,7 @@ class BlockPrefetcher { // initial_auto_readahead_size_ is used if RocksDB uses internal prefetch // buffer. uint64_t initial_auto_readahead_size_; - int64_t num_file_reads_ = 0; + uint64_t num_file_reads_ = 0; uint64_t prev_offset_ = 0; size_t prev_len_ = 0; std::unique_ptr prefetch_buffer_; diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index 73b8c5716..6fc6ddd1e 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -503,7 +503,7 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, false /* Implicit autoreadahead */, - false /*async_io*/); + 0 /*num_reads_*/); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 9e8c06268..94a023133 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -89,10 +89,10 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is set. - block_prefetcher_.PrefetchIfNeeded( - rep, partitioned_index_handle, read_options_.readahead_size, - is_for_compaction, read_options_.async_io, - read_options_.rate_limiter_priority); + block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, + read_options_.readahead_size, + is_for_compaction, /*async_io=*/false, + read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 14e3f7484..53d892097 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -158,7 +158,7 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, false /*Implicit auto readahead*/, - false /*async_io*/); + 0 /*num_reads_*/); IOOptions opts; { Status s = rep->file->PrepareIOOptions(ro, opts);