diff --git a/HISTORY.md b/HISTORY.md index fd25c9659..c3f1acfe0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -27,6 +27,8 @@ * In IOOptions, mark `prio` as deprecated for future removal. * In `file_system.h`, mark `IOPriority` as deprecated for future removal. * Add an option, `CompressionOptions::use_zstd_dict_trainer`, to indicate whether zstd dictionary trainer should be used for generating zstd compression dictionaries. The default value of this option is true for backward compatibility. When this option is set to false, zstd API `ZDICT_finalizeDictionary` is used to generate compression dictionaries. +* Seek API which positions itself every LevelIterator on the correct data block in the correct SST file which can be parallelized if ReadOptions.async_io option is enabled. +* Add new stat number_async_seek in PerfContext that indicates number of async calls made by seek to prefetch data. ### Bug Fixes * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. diff --git a/db/c.cc b/db/c.cc index 9d1a89ec4..a62a81c04 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3697,6 +3697,8 @@ uint64_t rocksdb_perfcontext_metric(rocksdb_perfcontext_t* context, return rep->env_unlock_file_nanos; case rocksdb_env_new_logger_nanos: return rep->env_new_logger_nanos; + case rocksdb_number_async_seek: + return rep->number_async_seek; default: break; } diff --git a/db/version_set.cc b/db/version_set.cc index 108347745..223bdba6a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1078,7 +1078,15 @@ void LevelIterator::Seek(const Slice& target) { if (file_iter_.iter() != nullptr) { file_iter_.Seek(target); + // Status::TryAgain indicates asynchronous request for retrieval of data + // blocks has been submitted. So it should return at this point and Seek + // should be called again to retrieve the requested block and execute the + // remaining code. + if (file_iter_.status() == Status::TryAgain()) { + return; + } } + if (SkipEmptyFileForward() && prefix_extractor_ != nullptr && !read_options_.total_order_seek && !read_options_.auto_prefix_mode && file_iter_.iter() != nullptr && file_iter_.Valid()) { diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 222166876..f75261781 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -194,34 +194,7 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, } } -// If async_read = true: -// async_read is enabled in case of sequential reads. So when -// buffers are switched, we clear the curr_ buffer as we assume the data has -// been consumed because of sequential reads. -// -// Scenarios for prefetching asynchronously: -// Case1: If both buffers are empty, prefetch n bytes -// synchronously in curr_ -// and prefetch readahead_size_/2 async in second buffer. -// Case2: If second buffer has partial or full data, make it current and -// prefetch readahead_size_/2 async in second buffer. In case of -// partial data, prefetch remaining bytes from size n synchronously to -// fulfill the requested bytes request. -// Case3: If curr_ has partial data, prefetch remaining bytes from size n -// synchronously in curr_ to fulfill the requested bytes request and -// prefetch readahead_size_/2 bytes async in second buffer. -// Case4: If data is in both buffers, copy requested data from curr_ and second -// buffer to third buffer. If all requested bytes have been copied, do -// the asynchronous prefetching in second buffer. -Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, - RandomAccessFileReader* reader, - uint64_t offset, size_t length, - size_t readahead_size, - Env::IOPriority rate_limiter_priority, - bool& copy_to_third_buffer) { - if (!enable_) { - return Status::OK(); - } +void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { if (async_read_in_progress_ && fs_ != nullptr) { // Wait for prefetch data to complete. // No mutex is needed as PrefetchAsyncCallback updates the result in second @@ -242,11 +215,6 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, del_fn_ = nullptr; } - TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start"); - Status s; - size_t prefetch_size = length + readahead_size; - - size_t alignment = reader->file()->GetRequiredBufferAlignment(); // Index of second buffer. uint32_t second = curr_ ^ 1; @@ -273,17 +241,55 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // outdated data and switch the buffers. bufs_[curr_].buffer_.Clear(); curr_ = curr_ ^ 1; - second = curr_ ^ 1; } - // After swap check if all the requested bytes are in curr_, it will go for - // async prefetching only. +} + +// If async_read = true: +// async_read is enabled in case of sequential reads. So when +// buffers are switched, we clear the curr_ buffer as we assume the data has +// been consumed because of sequential reads. +// +// Scenarios for prefetching asynchronously: +// Case1: If both buffers are empty, prefetch n bytes +// synchronously in curr_ +// and prefetch readahead_size_/2 async in second buffer. +// Case2: If second buffer has partial or full data, make it current and +// prefetch readahead_size_/2 async in second buffer. In case of +// partial data, prefetch remaining bytes from size n synchronously to +// fulfill the requested bytes request. +// Case3: If curr_ has partial data, prefetch remaining bytes from size n +// synchronously in curr_ to fulfill the requested bytes request and +// prefetch readahead_size_/2 bytes async in second buffer. +// Case4: If data is in both buffers, copy requested data from curr_ and second +// buffer to third buffer. If all requested bytes have been copied, do +// the asynchronous prefetching in second buffer. +Status FilePrefetchBuffer::PrefetchAsyncInternal( + const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, + size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer) { + if (!enable_) { + return Status::OK(); + } + + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start"); + + PollAndUpdateBuffersIfNeeded(offset); + + // If all the requested bytes are in curr_, it will go for async prefetching + // only. if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset + length <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { offset += length; length = 0; - prefetch_size = readahead_size; } + + Status s; + size_t prefetch_size = length + readahead_size; + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + // Index of second buffer. + uint32_t second = curr_ ^ 1; + // Data is overlapping i.e. some of the data is in curr_ buffer and remaining // in second buffer. if (bufs_[curr_].buffer_.CurrentSize() > 0 && @@ -315,9 +321,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, prefetch_size = length + readahead_size; } - // Update second again if swap happened. - second = curr_ ^ 1; size_t _offset = static_cast(offset); + second = curr_ ^ 1; // offset and size alignment for curr_ buffer with synchronous prefetching uint64_t rounddown_start1 = Rounddown(_offset, alignment); @@ -447,7 +452,13 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } - if (!enable_ || (offset < bufs_[curr_].offset_)) { + + // In case of async_io_, offset can be less than bufs_[curr_].offset_ because + // of reads not sequential and PrefetchAsync can be called for any block and + // RocksDB will call TryReadFromCacheAsync after PrefetchAsync to Poll for + // requested bytes. IsEligibleForPrefetch API will return false in case reads + // are not sequential and Non sequential reads will be handled there. + if (!enable_ || (offset < bufs_[curr_].offset_ && async_io_ == false)) { return false; } @@ -459,7 +470,8 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( // If readahead is not enabled: return false. TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", &readahead_size_); - if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + if (offset < bufs_[curr_].offset_ || + offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { if (readahead_size_ > 0) { Status s; assert(reader != nullptr); @@ -481,8 +493,9 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (async_io_) { // Prefetch n + readahead_size_/2 synchronously as remaining // readahead_size_/2 will be prefetched asynchronously. - s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2, - rate_limiter_priority, copy_to_third_buffer); + s = PrefetchAsyncInternal(opts, reader, offset, n, + readahead_size_ / 2, rate_limiter_priority, + copy_to_third_buffer); } else { s = Prefetch(opts, reader, offset, n + readahead_size_, rate_limiter_priority); @@ -544,4 +557,92 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, bufs_[index].buffer_.Size(current_size + req.result.size()); } } + +Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t offset, size_t n, + Env::IOPriority rate_limiter_priority, + Slice* result) { + assert(reader != nullptr); + if (!enable_) { + return Status::NotSupported(); + } + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start"); + + PollAndUpdateBuffersIfNeeded(offset); + + // Index of second buffer. + uint32_t second = curr_ ^ 1; + + // Since PrefetchAsync can be called on non sequqential reads. So offset can + // be less than buffers' offset. In that case it clears the buffer and + // prefetch that block. + if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_) { + bufs_[curr_].buffer_.Clear(); + } + + // All requested bytes are already in the curr_ buffer. So no need to Read + // again. + if (bufs_[curr_].buffer_.CurrentSize() > 0 && + offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; + *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); + return Status::OK(); + } + + Status s; + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + + // TODO akanksha: Handle the scenario if data is overlapping in 2 buffers. + // Currently, tt covers 2 scenarios. Either one buffer (curr_) has no data or + // it has partial data. It ignores the contents in second buffer (overlapping + // data in 2 buffers) and send the request to re-read that data again. + + // Clear the second buffer in order to do asynchronous prefetching. + bufs_[second].buffer_.Clear(); + + size_t offset_to_read = static_cast(offset); + uint64_t rounddown_start = 0; + uint64_t roundup_end = 0; + + if (bufs_[curr_].buffer_.CurrentSize() == 0) { + // Prefetch full data. + rounddown_start = Rounddown(offset_to_read, alignment); + roundup_end = Roundup(offset_to_read + n, alignment); + } else { + // Prefetch remaining data. + size_t rem_length = n - (bufs_[curr_].buffer_.CurrentSize() - + (offset - bufs_[curr_].offset_)); + rounddown_start = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); + roundup_end = Roundup(rounddown_start + rem_length, alignment); + } + + uint64_t roundup_len = roundup_end - rounddown_start; + assert(roundup_len >= alignment); + assert(roundup_len % alignment == 0); + + uint64_t chunk_len = 0; + CalculateOffsetAndLen(alignment, rounddown_start, roundup_len, second, false, + chunk_len); + + // Update the buffer offset. + bufs_[second].offset_ = rounddown_start; + assert(roundup_len >= chunk_len); + + size_t read_len = static_cast(roundup_len - chunk_len); + + s = ReadAsync(opts, reader, rate_limiter_priority, read_len, chunk_len, + rounddown_start, second); + + if (!s.ok()) { + return s; + } + + // Update read pattern so that TryReadFromCacheAsync call be called to Poll + // the data. It will return without polling if blocks are not sequential. + UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); + prev_len_ = 0; + + return Status::TryAgain(); +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 88b350ceb..2347479a8 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -131,10 +131,21 @@ class FilePrefetchBuffer { uint64_t offset, size_t n, Env::IOPriority rate_limiter_priority); + // Request for reading the data from a file asynchronously. + // If data already exists in the buffer, result will be updated. + // reader : the file reader. + // offset : the file offset to start reading from. + // n : the number of bytes to read. + // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to + // bypass. + // result : if data already exists in the buffer, result will + // be updated with the data. + // + // If data already exist in the buffer, it will return Status::OK, otherwise + // it will send asynchronous request and return Status::TryAgain. Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t offset, size_t length, size_t readahead_size, - Env::IOPriority rate_limiter_priority, - bool& copy_to_third_buffer); + uint64_t offset, size_t n, + Env::IOPriority rate_limiter_priority, Slice* result); // Tries returning the data for a file read from this buffer if that data is // in the buffer. @@ -160,7 +171,7 @@ class FilePrefetchBuffer { RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, Env::IOPriority rate_limiter_priority, - bool for_compaction /* = false */); + bool for_compaction); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. @@ -207,22 +218,6 @@ class FilePrefetchBuffer { } } - bool IsEligibleForPrefetch(uint64_t offset, size_t n) { - // Prefetch only if this read is sequential otherwise reset readahead_size_ - // to initial value. - if (!IsBlockSequential(offset)) { - UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); - ResetValues(); - return false; - } - num_file_reads_++; - if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { - UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); - return false; - } - return true; - } - // Callback function passed to underlying FS in case of asynchronous reads. void PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg); @@ -234,6 +229,17 @@ class FilePrefetchBuffer { size_t roundup_len, size_t index, bool refit_tail, uint64_t& chunk_len); + // It calls Poll API if any there is any pending asynchronous request. It then + // checks if data is in any buffer. It clears the outdated data and swaps the + // buffers if required. + void PollAndUpdateBuffersIfNeeded(uint64_t offset); + + Status PrefetchAsyncInternal(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, + size_t length, size_t readahead_size, + Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer); + Status Read(const IOOptions& opts, RandomAccessFileReader* reader, Env::IOPriority rate_limiter_priority, uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start, uint32_t index); @@ -256,6 +262,22 @@ class FilePrefetchBuffer { readahead_size_ = initial_auto_readahead_size_; } + bool IsEligibleForPrefetch(uint64_t offset, size_t n) { + // Prefetch only if this read is sequential otherwise reset readahead_size_ + // to initial value. + if (!IsBlockSequential(offset)) { + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + ResetValues(); + return false; + } + num_file_reads_++; + if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + return false; + } + return true; + } + std::vector bufs_; // curr_ represents the index for bufs_ indicating which buffer is being // consumed currently. diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index c6287961c..a9a0dd353 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -534,15 +534,24 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { * initially (2 more data blocks). */ iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1004)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1015)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); // Missed 2 blocks but they are already in buffer so no reset. iter->Seek(BuildKey(103)); // Already in buffer. + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1033)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 3); fs->ClearPrefetchCount(); @@ -558,10 +567,15 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { */ auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1033)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1048)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 0); fs->ClearPrefetchCount(); @@ -576,9 +590,13 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { */ auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(10)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(100)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 0); fs->ClearPrefetchCount(); @@ -596,14 +614,21 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { */ auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); iter->Seek( BuildKey(996)); // Reseek won't prefetch any data and // readahead_size will be initiallized to 8*1024. + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(992)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(989)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 1); fs->ClearPrefetchCount(); @@ -615,11 +640,17 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { // Read sequentially to confirm readahead_size is reset to initial value (2 // more data blocks) iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1026)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(103)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 2); fs->ClearPrefetchCount(); @@ -634,12 +665,19 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { */ auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1167)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1499)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1667)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1847)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1999)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 1); fs->ClearPrefetchCount(); @@ -766,8 +804,11 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); // Warm up the cache iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 1); fs->ClearPrefetchCount(); @@ -780,20 +821,31 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { // After caching, blocks will be read from cache (Sequential blocks) auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). + ASSERT_TRUE(iter->Valid()); // Missed one sequential block but next is in already in buffer so readahead // will not be reset. iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); // Prefetch data but blocks are in cache so no prefetch and reset. iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); // Prefetch data with readahead_size = 4 blocks. iter->Seek(BuildKey(1026)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(103)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1033)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1037)); + ASSERT_TRUE(iter->Valid()); if (support_prefetch && !use_direct_io) { ASSERT_EQ(fs->GetPrefetchCount(), 3); @@ -881,7 +933,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsync:Start", + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", [&](void*) { buff_async_prefetch_count++; }); // The callback checks, since reads are sequential, readahead_size doesn't @@ -955,7 +1007,7 @@ class PrefetchTest2 : public DBTestBase, INSTANTIATE_TEST_CASE_P(PrefetchTest2, PrefetchTest2, ::testing::Bool()); #ifndef ROCKSDB_LITE -TEST_P(PrefetchTest2, NonSequentialReads) { +TEST_P(PrefetchTest2, NonSequentialReadsWithAdaptiveReadahead) { const int kNumKeys = 1000; // Set options std::shared_ptr fs = @@ -1002,9 +1054,8 @@ TEST_P(PrefetchTest2, NonSequentialReads) { int set_readahead = 0; size_t readahead_size = 0; - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsync:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "BlockPrefetcher::SetReadaheadState", [&](void* /*arg*/) { set_readahead++; }); @@ -1018,13 +1069,15 @@ TEST_P(PrefetchTest2, NonSequentialReads) { // Iterate until prefetch is done. ReadOptions ro; ro.adaptive_readahead = true; - // TODO akanksha: Remove after adding new units. - ro.async_io = true; auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + while (iter->Valid() && buff_prefetch_count == 0) { iter->Next(); } + ASSERT_EQ(readahead_size, 8 * 1024); ASSERT_EQ(buff_prefetch_count, 1); ASSERT_EQ(set_readahead, 0); @@ -1033,9 +1086,12 @@ TEST_P(PrefetchTest2, NonSequentialReads) { // Move to last file and check readahead size fallbacks to 8KB. So next // readahead size after prefetch should be 8 * 1024; iter->Seek(BuildKey(4004)); + ASSERT_TRUE(iter->Valid()); + while (iter->Valid() && buff_prefetch_count == 0) { iter->Next(); } + ASSERT_EQ(readahead_size, 8 * 1024); ASSERT_EQ(set_readahead, 0); ASSERT_EQ(buff_prefetch_count, 1); @@ -1099,7 +1155,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { size_t decrease_readahead_size = 8 * 1024; SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsync:Start", + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { @@ -1120,8 +1176,11 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { auto iter = std::unique_ptr(db_->NewIterator(ro)); // Warm up the cache iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); buff_prefetch_count = 0; } @@ -1129,26 +1188,39 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ASSERT_OK(options.statistics->Reset()); // After caching, blocks will be read from cache (Sequential blocks) auto iter = std::unique_ptr(db_->NewIterator(ro)); - iter->Seek(BuildKey(0)); + iter->Seek( + BuildKey(0)); // In cache so it will decrease the readahead_size. ASSERT_TRUE(iter->Valid()); - iter->Seek(BuildKey(1000)); + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); + + iter->Seek(BuildKey(1000)); // Prefetch the block. ASSERT_TRUE(iter->Valid()); - iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + expected_current_readahead_size *= 2; + + iter->Seek(BuildKey(1004)); // Prefetch the block. ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + expected_current_readahead_size *= 2; - // Missed one sequential block but 1011 is already in buffer so - // readahead will not be reset. + // 1011 is already in cache but won't reset?? iter->Seek(BuildKey(1011)); ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(current_readahead_size, expected_current_readahead_size); // Eligible to Prefetch data (not in buffer) but block is in cache so no // prefetch will happen and will result in decrease in readahead_size. // readahead_size will be 8 * 1024 iter->Seek(BuildKey(1015)); ASSERT_TRUE(iter->Valid()); - expected_current_readahead_size -= decrease_readahead_size; + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); // 1016 is the same block as 1015. So no change in readahead_size. iter->Seek(BuildKey(1016)); @@ -1169,7 +1241,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { iter->Seek(BuildKey(1022)); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); - ASSERT_EQ(buff_prefetch_count, 2); + ASSERT_EQ(buff_prefetch_count, 3); // Check stats to make sure async prefetch is done. { @@ -1179,6 +1251,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { ASSERT_EQ(async_read_bytes.count, 0); } else { ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + //"ASSERT_EQ(expected_hits, get_perf_context()->bloom_sst_hit_count);") } } @@ -1264,7 +1338,7 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { } SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsync:Start", + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 86e5dfea8..cbc7e1c4b 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1553,7 +1553,8 @@ enum { rocksdb_env_lock_file_nanos, rocksdb_env_unlock_file_nanos, rocksdb_env_new_logger_nanos, - rocksdb_total_metric_count = 68 + rocksdb_number_async_seek, + rocksdb_total_metric_count = 69 }; extern ROCKSDB_LIBRARY_API void rocksdb_set_perf_level(int); diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index f3058416e..a971725a4 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -229,6 +229,8 @@ struct PerfContext { // Time spent in decrypting data. Populated when EncryptedEnv is used. uint64_t decrypt_data_nanos; + uint64_t number_async_seek; + std::map* level_to_perf_context = nullptr; bool per_level_perf_context_enabled = false; }; diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index 493642f82..d252468b2 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -114,6 +114,7 @@ PerfContext::PerfContext(const PerfContext& other) { iter_next_cpu_nanos = other.iter_next_cpu_nanos; iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + number_async_seek = other.number_async_seek; if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { ClearPerLevelPerfContext(); } @@ -212,6 +213,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept { iter_next_cpu_nanos = other.iter_next_cpu_nanos; iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + number_async_seek = other.number_async_seek; if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { ClearPerLevelPerfContext(); } @@ -312,6 +314,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) { iter_next_cpu_nanos = other.iter_next_cpu_nanos; iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + number_async_seek = other.number_async_seek; if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { ClearPerLevelPerfContext(); } @@ -407,6 +410,7 @@ void PerfContext::Reset() { iter_next_cpu_nanos = 0; iter_prev_cpu_nanos = 0; iter_seek_cpu_nanos = 0; + number_async_seek = 0; if (per_level_perf_context_enabled && level_to_perf_context) { for (auto& kv : *level_to_perf_context) { kv.second.Reset(); @@ -526,6 +530,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(iter_next_cpu_nanos); PERF_CONTEXT_OUTPUT(iter_prev_cpu_nanos); PERF_CONTEXT_OUTPUT(iter_seek_cpu_nanos); + PERF_CONTEXT_OUTPUT(number_async_seek); PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_useful); PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_positive); PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_true_positive); diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 7daf47204..52734f7ca 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -9,11 +9,21 @@ #include "table/block_based/block_based_table_iterator.h" namespace ROCKSDB_NAMESPACE { -void BlockBasedTableIterator::Seek(const Slice& target) { SeekImpl(&target); } -void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr); } +void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); } + +void BlockBasedTableIterator::Seek(const Slice& target) { + SeekImpl(&target, true); +} + +void BlockBasedTableIterator::SeekImpl(const Slice* target, + bool async_prefetch) { + bool is_first_pass = true; + if (async_read_in_progress_) { + AsyncInitDataBlock(false); + is_first_pass = false; + } -void BlockBasedTableIterator::SeekImpl(const Slice* target) { is_out_of_bound_ = false; is_at_first_key_from_index_ = false; if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { @@ -74,7 +84,20 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) { } else { // Need to use the data block. if (!same_block) { - InitDataBlock(); + if (read_options_.async_io && async_prefetch) { + if (is_first_pass) { + AsyncInitDataBlock(is_first_pass); + } + if (async_read_in_progress_) { + // Status::TryAgain indicates asynchronous request for retrieval of + // data blocks has been submitted. So it should return at this point + // and Seek should be called again to retrieve the requested block and + // execute the remaining code. + return; + } + } else { + InitDataBlock(); + } } else { // When the user does a reseek, the iterate_upper_bound might have // changed. CheckDataBlockWithinUpperBound() needs to be called @@ -238,14 +261,65 @@ void BlockBasedTableIterator::InitDataBlock() { Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, - /*get_context=*/nullptr, &lookup_context_, s, + /*get_context=*/nullptr, &lookup_context_, block_prefetcher_.prefetch_buffer(), - /*for_compaction=*/is_for_compaction); + /*for_compaction=*/is_for_compaction, /*async_read=*/false, s); block_iter_points_to_real_block_ = true; CheckDataBlockWithinUpperBound(); } } +void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) { + BlockHandle data_block_handle = index_iter_->value().handle; + bool is_for_compaction = + lookup_context_.caller == TableReaderCaller::kCompaction; + if (is_first_pass) { + if (!block_iter_points_to_real_block_ || + data_block_handle.offset() != prev_block_offset_ || + // if previous attempt of reading the block missed cache, try again + block_iter_.status().IsIncomplete()) { + if (block_iter_points_to_real_block_) { + ResetDataIter(); + } + auto* rep = table_->get_rep(); + // Prefetch additional data for range scans (iterators). + // Implicit auto readahead: + // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. + // Explicit user requested readahead: + // Enabled from the very first IO when ReadOptions.readahead_size is + // set. + block_prefetcher_.PrefetchIfNeeded( + rep, data_block_handle, read_options_.readahead_size, + is_for_compaction, read_options_.async_io, + read_options_.rate_limiter_priority); + + Status s; + table_->NewDataBlockIterator( + read_options_, data_block_handle, &block_iter_, BlockType::kData, + /*get_context=*/nullptr, &lookup_context_, + block_prefetcher_.prefetch_buffer(), + /*for_compaction=*/is_for_compaction, /*async_read=*/true, s); + + if (s.IsTryAgain()) { + async_read_in_progress_ = true; + return; + } + } + } else { + // Second pass will call the Poll to get the data block which has been + // requested asynchronously. + Status s; + table_->NewDataBlockIterator( + read_options_, data_block_handle, &block_iter_, BlockType::kData, + /*get_context=*/nullptr, &lookup_context_, + block_prefetcher_.prefetch_buffer(), + /*for_compaction=*/is_for_compaction, /*async_read=*/false, s); + } + block_iter_points_to_real_block_ = true; + CheckDataBlockWithinUpperBound(); + async_read_in_progress_ = false; +} + bool BlockBasedTableIterator::MaterializeCurrentBlock() { assert(is_at_first_key_from_index_); assert(!block_iter_points_to_real_block_); diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index fe659d9d0..9ba8ecdcc 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -41,7 +41,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { allow_unprepared_value_(allow_unprepared_value), block_iter_points_to_real_block_(false), check_filter_(check_filter), - need_upper_bound_check_(need_upper_bound_check) {} + need_upper_bound_check_(need_upper_bound_check), + async_read_in_progress_(false) {} ~BlockBasedTableIterator() {} @@ -96,6 +97,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { return index_iter_->status(); } else if (block_iter_points_to_real_block_) { return block_iter_.status(); + } else if (async_read_in_progress_) { + return Status::TryAgain(); } else { return Status::OK(); } @@ -236,10 +239,13 @@ class BlockBasedTableIterator : public InternalIteratorBase { // TODO(Zhongyi): pick a better name bool need_upper_bound_check_; + bool async_read_in_progress_; + // If `target` is null, seek to first. - void SeekImpl(const Slice* target); + void SeekImpl(const Slice* target, bool async_prefetch); void InitDataBlock(); + void AsyncInitDataBlock(bool is_first_pass); bool MaterializeCurrentBlock(); void FindKeyForward(); void FindBlockForward(); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 50ad9c245..c40223151 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -125,7 +125,7 @@ Status ReadBlockFromFile( const UncompressionDict& uncompression_dict, const PersistentCacheOptions& cache_options, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, bool for_compaction, bool using_zstd, - const FilterPolicy* filter_policy) { + const FilterPolicy* filter_policy, bool async_read) { assert(result); BlockContents contents; @@ -133,7 +133,17 @@ Status ReadBlockFromFile( file, prefetch_buffer, footer, options, handle, &contents, ioptions, do_uncompress, maybe_compressed, block_type, uncompression_dict, cache_options, memory_allocator, nullptr, for_compaction); - Status s = block_fetcher.ReadBlockContents(); + Status s; + // If prefetch_buffer is not allocated, it will fallback to synchronous + // reading of block contents. + if (async_read && prefetch_buffer != nullptr) { + s = block_fetcher.ReadAsyncBlockContents(); + if (!s.ok()) { + return s; + } + } else { + s = block_fetcher.ReadBlockContents(); + } if (s.ok()) { result->reset(BlocklikeTraits::Create( std::move(contents), read_amp_bytes_per_bit, ioptions.stats, using_zstd, @@ -904,10 +914,12 @@ Status BlockBasedTable::ReadRangeDelBlock( "Error when seeking to range delete tombstones block from file: %s", s.ToString().c_str()); } else if (!range_del_handle.IsNull()) { + Status tmp_status; std::unique_ptr iter(NewDataBlockIterator( read_options, range_del_handle, /*input_iter=*/nullptr, BlockType::kRangeDeletion, - /*get_context=*/nullptr, lookup_context, Status(), prefetch_buffer)); + /*get_context=*/nullptr, lookup_context, prefetch_buffer, + /*for_compaction= */ false, /*async_read= */ false, tmp_status)); assert(iter != nullptr); s = iter->status(); if (!s.ok()) { @@ -1177,7 +1189,7 @@ Status BlockBasedTable::ReadMetaIndexBlock( UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options, 0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep_->table_options), false /* for_compaction */, rep_->blocks_definitely_zstd_compressed, - nullptr /* filter_policy */); + nullptr /* filter_policy */, false /* async_read */); if (!s.ok()) { ROCKS_LOG_ERROR(rep_->ioptions.logger, @@ -1508,7 +1520,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( const bool wait, const bool for_compaction, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - BlockContents* contents) const { + BlockContents* contents, bool async_read) const { assert(block_entry != nullptr); const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep_->table_options.block_cache.get(); @@ -1571,7 +1583,18 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( rep_->persistent_cache_options, GetMemoryAllocator(rep_->table_options), GetMemoryAllocatorForCompressedBlock(rep_->table_options)); - s = block_fetcher.ReadBlockContents(); + + // If prefetch_buffer is not allocated, it will fallback to synchronous + // reading of block contents. + if (async_read && prefetch_buffer != nullptr) { + s = block_fetcher.ReadAsyncBlockContents(); + if (!s.ok()) { + return s; + } + } else { + s = block_fetcher.ReadBlockContents(); + } + raw_block_comp_type = block_fetcher.get_compression_type(); contents = &raw_block_contents; if (get_context) { @@ -1678,7 +1701,8 @@ Status BlockBasedTable::RetrieveBlock( const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache) const { + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const { assert(block_entry); assert(block_entry->IsEmpty()); @@ -1687,7 +1711,7 @@ Status BlockBasedTable::RetrieveBlock( s = MaybeReadBlockAndLoadToCache( prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache, for_compaction, block_entry, block_type, get_context, lookup_context, - /*contents=*/nullptr); + /*contents=*/nullptr, async_read); if (!s.ok()) { return s; @@ -1727,7 +1751,7 @@ Status BlockBasedTable::RetrieveBlock( : 0, GetMemoryAllocator(rep_->table_options), for_compaction, rep_->blocks_definitely_zstd_compressed, - rep_->table_options.filter_policy.get()); + rep_->table_options.filter_policy.get(), async_read); if (get_context) { switch (block_type) { @@ -1763,28 +1787,32 @@ template Status BlockBasedTable::RetrieveBlock( const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const; BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( const BlockBasedTable* table, @@ -2153,10 +2181,11 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, bool does_referenced_key_exist = false; DataBlockIter biter; uint64_t referenced_data_size = 0; + Status tmp_status; NewDataBlockIterator( read_options, v.handle, &biter, BlockType::kData, get_context, - &lookup_data_block_context, - /*s=*/Status(), /*prefetch_buffer*/ nullptr); + &lookup_data_block_context, /*prefetch_buffer=*/nullptr, + /*for_compaction=*/false, /*async_read=*/false, tmp_status); if (no_io && biter.status().IsIncomplete()) { // couldn't get block from block_cache @@ -2297,11 +2326,12 @@ Status BlockBasedTable::Prefetch(const Slice* const begin, // Load the block specified by the block_handle into the block cache DataBlockIter biter; - + Status tmp_status; NewDataBlockIterator( ReadOptions(), block_handle, &biter, /*type=*/BlockType::kData, - /*get_context=*/nullptr, &lookup_context, Status(), - /*prefetch_buffer=*/nullptr); + /*get_context=*/nullptr, &lookup_context, + /*prefetch_buffer=*/nullptr, /*for_compaction=*/false, + /*async_read=*/false, tmp_status); if (!biter.status().ok()) { // there was an unexpected error while pre-fetching @@ -2696,11 +2726,13 @@ Status BlockBasedTable::GetKVPairsFromDataBlocks( } std::unique_ptr datablock_iter; + Status tmp_status; datablock_iter.reset(NewDataBlockIterator( ReadOptions(), blockhandles_iter->value().handle, /*input_iter=*/nullptr, /*type=*/BlockType::kData, - /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(), - /*prefetch_buffer=*/nullptr)); + /*get_context=*/nullptr, /*lookup_context=*/nullptr, + /*prefetch_buffer=*/nullptr, /*for_compaction=*/false, + /*async_read=*/false, tmp_status)); s = datablock_iter->status(); if (!s.ok()) { @@ -2927,11 +2959,13 @@ Status BlockBasedTable::DumpDataBlocks(std::ostream& out_stream) { out_stream << "--------------------------------------\n"; std::unique_ptr datablock_iter; + Status tmp_status; datablock_iter.reset(NewDataBlockIterator( ReadOptions(), blockhandles_iter->value().handle, /*input_iter=*/nullptr, /*type=*/BlockType::kData, - /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(), - /*prefetch_buffer=*/nullptr)); + /*get_context=*/nullptr, /*lookup_context=*/nullptr, + /*prefetch_buffer=*/nullptr, /*for_compaction=*/false, + /*async_read=*/false, tmp_status)); s = datablock_iter->status(); if (!s.ok()) { diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 385130d29..82c780386 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -275,11 +275,14 @@ class BlockBasedTable : public TableReader { // input_iter: if it is not null, update this one and return it as Iterator template - TBlockIter* NewDataBlockIterator( - const ReadOptions& ro, const BlockHandle& block_handle, - TBlockIter* input_iter, BlockType block_type, GetContext* get_context, - BlockCacheLookupContext* lookup_context, Status s, - FilePrefetchBuffer* prefetch_buffer, bool for_compaction = false) const; + TBlockIter* NewDataBlockIterator(const ReadOptions& ro, + const BlockHandle& block_handle, + TBlockIter* input_iter, BlockType block_type, + GetContext* get_context, + BlockCacheLookupContext* lookup_context, + FilePrefetchBuffer* prefetch_buffer, + bool for_compaction, bool async_read, + Status& s) const; // input_iter: if it is not null, update this one and return it as Iterator template @@ -353,7 +356,7 @@ class BlockBasedTable : public TableReader { const bool wait, const bool for_compaction, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - BlockContents* contents) const; + BlockContents* contents, bool async_read) const; // Similar to the above, with one crucial difference: it will retrieve the // block from the file even if there are no caches configured (assuming the @@ -365,8 +368,8 @@ class BlockBasedTable : public TableReader { CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, - bool wait_for_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache, + bool async_read) const; DECLARE_SYNC_AND_ASYNC_CONST( void, RetrieveMultipleBlocks, const ReadOptions& options, diff --git a/table/block_based/block_based_table_reader_impl.h b/table/block_based/block_based_table_reader_impl.h index 6bb384e52..034dd78e0 100644 --- a/table/block_based/block_based_table_reader_impl.h +++ b/table/block_based/block_based_table_reader_impl.h @@ -25,8 +25,9 @@ template TBlockIter* BlockBasedTable::NewDataBlockIterator( const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter, BlockType block_type, GetContext* get_context, - BlockCacheLookupContext* lookup_context, Status s, - FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const { + BlockCacheLookupContext* lookup_context, + FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read, + Status& s) const { PERF_TIMER_GUARD(new_table_block_iter_nanos); TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; @@ -54,7 +55,12 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( CachableEntry block; s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, get_context, lookup_context, for_compaction, - /* use_cache */ true, /* wait_for_cache */ true); + /* use_cache */ true, /* wait_for_cache */ true, + async_read); + + if (s.IsTryAgain() && async_read) { + return iter; + } if (!s.ok()) { assert(block.IsEmpty()); diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index 2ab9882fc..ab42220c3 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -56,7 +56,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) &(*results)[idx_in_batch], BlockType::kData, mget_iter->get_context, &lookup_data_block_context, /* for_compaction */ false, /* use_cache */ true, - /* wait_for_cache */ true); + /* wait_for_cache */ true, /* async_read */ false); } CO_RETURN; } @@ -272,7 +272,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) nullptr, options, handle, uncompression_dict, /*wait=*/true, /*for_compaction=*/false, block_entry, BlockType::kData, mget_iter->get_context, &lookup_data_block_context, - &raw_block_contents); + &raw_block_contents, /*async_read=*/false); // block_entry value could be null if no block cache is present, i.e // BlockBasedTableOptions::no_block_cache is true and no compressed @@ -445,7 +445,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) nullptr, ro, handle, dict, &(results.back()), BlockType::kData, miter->get_context, &lookup_data_block_context, /* for_compaction */ false, /* use_cache */ true, - /* wait_for_cache */ false); + /* wait_for_cache */ false, /* async_read */ false); if (s.IsIncomplete()) { s = Status::OK(); } @@ -589,10 +589,12 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) } next_biter.Invalidate(Status::OK()); + Status tmp_s; NewDataBlockIterator( read_options, iiter->value().handle, &next_biter, BlockType::kData, get_context, &lookup_data_block_context, - Status(), nullptr); + /* prefetch_buffer= */ nullptr, /* for_compaction = */ false, + /*async_read = */ false, tmp_s); biter = &next_biter; reusing_prev_block = false; later_reused = false; diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index f7c7b94fe..1f08c161b 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -39,6 +39,14 @@ void BlockPrefetcher::PrefetchIfNeeded( return; } + // In case of async_io, it always creates the PrefetchBuffer. + if (async_io) { + rep->CreateFilePrefetchBufferIfNotExists( + initial_auto_readahead_size_, max_auto_readahead_size, + &prefetch_buffer_, /*implicit_auto_readahead=*/true, async_io); + return; + } + size_t len = BlockBasedTable::BlockSizeWithTrailer(handle); size_t offset = handle.offset(); diff --git a/table/block_based/filter_block_reader_common.cc b/table/block_based/filter_block_reader_common.cc index 5795e6017..a81a944e8 100644 --- a/table/block_based/filter_block_reader_common.cc +++ b/table/block_based/filter_block_reader_common.cc @@ -31,7 +31,7 @@ Status FilterBlockReaderCommon::ReadFilterBlock( UncompressionDict::GetEmptyDict(), filter_block, BlockType::kFilter, get_context, lookup_context, /* for_compaction */ false, use_cache, - /* wait_for_cache */ true); + /* wait_for_cache */ true, /* async_read */ false); return s; } diff --git a/table/block_based/index_reader_common.cc b/table/block_based/index_reader_common.cc index 58fdfe4b6..6584586c9 100644 --- a/table/block_based/index_reader_common.cc +++ b/table/block_based/index_reader_common.cc @@ -27,7 +27,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( prefetch_buffer, read_options, rep->footer.index_handle(), UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, get_context, lookup_context, /* for_compaction */ false, use_cache, - /* wait_for_cache */ true); + /* wait_for_cache */ true, /* async_read */ false); return s; } diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index d11f1620d..c145d956c 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -323,7 +323,7 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock( UncompressionDict::GetEmptyDict(), filter_block, BlockType::kFilter, get_context, lookup_context, /* for_compaction */ false, /* use_cache */ true, - /* wait_for_cache */ true); + /* wait_for_cache */ true, /* async_read */ false); return s; } @@ -521,7 +521,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), /* wait */ true, /* for_compaction */ false, &block, BlockType::kFilter, - nullptr /* get_context */, &lookup_context, nullptr /* contents */); + nullptr /* get_context */, &lookup_context, nullptr /* contents */, + false); if (!s.ok()) { return s; } diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 3e93787a9..9e8c06268 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -97,9 +97,9 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, BlockType::kIndex, - /*get_context=*/nullptr, &lookup_context_, s, + /*get_context=*/nullptr, &lookup_context_, block_prefetcher_.prefetch_buffer(), - /*for_compaction=*/is_for_compaction); + /*for_compaction=*/is_for_compaction, /*async_read=*/false, s); block_iter_points_to_real_block_ = true; // We could check upper bound here but it is complicated to reason about // upper bound in index iterator. On the other than, in large scans, index diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 13f732ab7..14e3f7484 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -187,7 +187,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, Status s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), /*wait=*/true, /*for_compaction=*/false, &block, BlockType::kIndex, - /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr); + /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr, + /*async_read=*/false); if (!s.ok()) { return s; diff --git a/table/block_based/uncompression_dict_reader.cc b/table/block_based/uncompression_dict_reader.cc index 756422af0..dc9a47ec7 100644 --- a/table/block_based/uncompression_dict_reader.cc +++ b/table/block_based/uncompression_dict_reader.cc @@ -62,7 +62,8 @@ Status UncompressionDictReader::ReadUncompressionDictionary( prefetch_buffer, read_options, rep->compression_dict_handle, UncompressionDict::GetEmptyDict(), uncompression_dict, BlockType::kCompressionDictionary, get_context, lookup_context, - /* for_compaction */ false, use_cache, /* wait_for_cache */ true); + /* for_compaction */ false, use_cache, /* wait_for_cache */ true, + /* async_read */ false); if (!s.ok()) { ROCKS_LOG_WARN( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index d11b25c44..974075b69 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -341,4 +341,57 @@ IOStatus BlockFetcher::ReadBlockContents() { return io_status_; } +IOStatus BlockFetcher::ReadAsyncBlockContents() { + if (TryGetUncompressBlockFromPersistentCache()) { + compression_type_ = kNoCompression; +#ifndef NDEBUG + contents_->is_raw_block = true; +#endif // NDEBUG + return IOStatus::OK(); + } else if (!TryGetCompressedBlockFromPersistentCache()) { + if (prefetch_buffer_ != nullptr) { + IOOptions opts; + IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); + if (io_s.ok()) { + io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync( + opts, file_, handle_.offset(), block_size_with_trailer_, + read_options_.rate_limiter_priority, &slice_)); + if (io_s.IsTryAgain()) { + return io_s; + } + if (!io_s.ok()) { + // Fallback to sequential reading of data blocks. + return ReadBlockContents(); + } + // Data Block is already in prefetch. + got_from_prefetch_buffer_ = true; + ProcessTrailerIfPresent(); + if (!io_status_.ok()) { + return io_status_; + } + used_buf_ = const_cast(slice_.data()); + + if (do_uncompress_ && compression_type_ != kNoCompression) { + PERF_TIMER_GUARD(block_decompress_time); + // compressed page, uncompress, update cache + UncompressionContext context(compression_type_); + UncompressionInfo info(context, uncompression_dict_, + compression_type_); + io_status_ = status_to_io_status(UncompressBlockContents( + info, slice_.data(), block_size_, contents_, + footer_.format_version(), ioptions_, memory_allocator_)); +#ifndef NDEBUG + num_heap_buf_memcpy_++; +#endif + compression_type_ = kNoCompression; + } else { + GetBlockContents(); + } + InsertUncompressedBlockToPersistentCacheIfNeeded(); + } + } + } + return io_status_; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 355cb53d0..2111186c4 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -71,6 +71,8 @@ class BlockFetcher { } IOStatus ReadBlockContents(); + IOStatus ReadAsyncBlockContents(); + inline CompressionType get_compression_type() const { return compression_type_; } diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index a221a5b25..10dda3c66 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -112,6 +112,14 @@ class MergingIterator : public InternalIterator { } PERF_COUNTER_ADD(seek_child_seek_count, 1); + + // child.status() is set to Status::TryAgain indicating asynchronous + // request for retrieval of data blocks has been submitted. So it should + // return at this point and Seek should be called again to retrieve the + // requested block and add the child to min heap. + if (child.status() == Status::TryAgain()) { + continue; + } { // Strictly, we timed slightly more than min heap operation, // but these operations are very cheap. @@ -119,6 +127,18 @@ class MergingIterator : public InternalIterator { AddToMinHeapOrCheckStatus(&child); } } + + for (auto& child : children_) { + if (child.status() == Status::TryAgain()) { + child.Seek(target); + { + PERF_TIMER_GUARD(seek_min_heap_time); + AddToMinHeapOrCheckStatus(&child); + } + PERF_COUNTER_ADD(number_async_seek, 1); + } + } + direction_ = kForward; { PERF_TIMER_GUARD(seek_min_heap_time); @@ -359,6 +379,13 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (&child != current_) { child.Seek(target); + // child.status() is set to Status::TryAgain indicating asynchronous + // request for retrieval of data blocks has been submitted. So it should + // return at this point and Seek should be called again to retrieve the + // requested block and add the child to min heap. + if (child.status() == Status::TryAgain()) { + continue; + } if (child.Valid() && comparator_->Equal(target, child.key())) { assert(child.status().ok()); child.Next(); @@ -366,6 +393,18 @@ void MergingIterator::SwitchToForward() { } AddToMinHeapOrCheckStatus(&child); } + + for (auto& child : children_) { + if (child.status() == Status::TryAgain()) { + child.Seek(target); + if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); + child.Next(); + } + AddToMinHeapOrCheckStatus(&child); + } + } + direction_ = kForward; }