From 35cdd3e71e0627af2b0306e4322efd134906a74e Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 19 Aug 2022 16:52:52 -0700 Subject: [PATCH] MultiGet async IO across multiple levels (#10535) Summary: This PR exploits parallelism in MultiGet across levels. It applies only to the coroutine version of MultiGet. Previously, MultiGet file reads from SST files in the same level were parallelized. With this PR, MultiGet batches with keys distributed across multiple levels are read in parallel. This is accomplished by splitting the keys not present in a level (determined by bloom filtering) into a separate batch, and processing the new batch in parallel with the original batch. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10535 Test Plan: 1. Ensure existing MultiGet unit tests pass, updating them as necessary 2. New unit tests - TODO 3. Run stress test - TODO No noticeable regression (<1%) without async IO - Without PR: `multireadrandom : 7.261 micros/op 1101724 ops/sec 60.007 seconds 66110936 operations; 571.6 MB/s (8168992 of 8168992 found)` With PR: `multireadrandom : 7.305 micros/op 1095167 ops/sec 60.007 seconds 65717936 operations; 568.2 MB/s (8271992 of 8271992 found)` For a fully cached DB, but with async IO option on, no regression observed (<1%) - Without PR: `multireadrandom : 5.201 micros/op 1538027 ops/sec 60.005 seconds 92288936 operations; 797.9 MB/s (11540992 of 11540992 found) ` With PR: `multireadrandom : 5.249 micros/op 1524097 ops/sec 60.005 seconds 91452936 operations; 790.7 MB/s (11649992 of 11649992 found) ` Reviewed By: akankshamahajan15 Differential Revision: D38774009 Pulled By: anand1976 fbshipit-source-id: c955e259749f1c091590ade73105b3ee46cd0007 --- HISTORY.md | 2 + db/db_basic_test.cc | 49 ++- db/version_set.cc | 775 +++++++++++++++++++++++++------------- db/version_set.h | 28 ++ include/rocksdb/options.h | 11 + options/options.cc | 6 +- table/multiget_context.h | 74 +++- tools/db_bench_tool.cc | 5 + 8 files changed, 671 insertions(+), 279 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f4e11c68c..234d3da1f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,7 @@ * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. * `CompactRangeOptions::exclusive_manual_compaction` is now false by default. This ensures RocksDB does not introduce artificial parallelism limitations by default. * Tiered Storage: change `bottommost_temperture` to `last_level_temperture`. The old option name is kept only for migration, please use the new option. The behavior is changed to apply temperature for the `last_level` SST files only. +* Added a new experimental ReadOption flag called optimize_multiget_for_io, which when set attempts to reduce MultiGet latency by spawning coroutines for keys in multiple levels. ### Bug Fixes * Fix a bug starting in 7.4.0 in which some fsync operations might be skipped in a DB after any DropColumnFamily on that DB, until it is re-opened. This can lead to data loss on power loss. (For custom FileSystem implementations, this could lead to `FSDirectory::Fsync` or `FSDirectory::Close` after the first `FSDirectory::Close`; Also, valgrind could report call to `close()` with `fd=-1`.) @@ -42,6 +43,7 @@ ### Performance Improvements * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. * When using iterators with the integrated BlobDB implementation, blob cache handles are now released immediately when the iterator's position changes. +* MultiGet can now do more IO in parallel by reading data blocks from SST files in multiple levels, if the optimize_multiget_for_io ReadOption flag is set. ## Behavior Change * Block cache keys have changed, which will cause any persistent caches to miss between versions. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index efb864c73..9f866a91a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2013,12 +2013,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSize) { } TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) { -#ifndef USE_COROUTINES if (std::get<1>(GetParam())) { - ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + ROCKSDB_GTEST_SKIP("This test needs to be fixed for async IO"); return; } -#endif // USE_COROUTINES // Skip for unbatched MultiGet if (!std::get<0>(GetParam())) { ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet"); @@ -2131,7 +2129,8 @@ INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, testing::Combine(testing::Bool(), testing::Bool())); #if USE_COROUTINES -class DBMultiGetAsyncIOTest : public DBBasicTest { +class DBMultiGetAsyncIOTest : public DBBasicTest, + public ::testing::WithParamInterface { public: DBMultiGetAsyncIOTest() : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { @@ -2210,7 +2209,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest { std::shared_ptr statistics_; }; -TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { // All 3 keys in L0. The L0 files should be read serially. std::vector key_strs{Key(0), Key(40), Key(80)}; std::vector keys{key_strs[0], key_strs[1], key_strs[2]}; @@ -2219,6 +2218,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2233,13 +2233,17 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); - // No async IO in this case since we don't do parallel lookup in L0 - ASSERT_EQ(multiget_io_batch_size.count, 0); - ASSERT_EQ(multiget_io_batch_size.max, 0); - ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); + // With async IO, lookups will happen in parallel for each key + if (GetParam()) { + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 3); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); + } else { + ASSERT_EQ(multiget_io_batch_size.count, 0); + } } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2256,6 +2260,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2276,7 +2281,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } -TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { +TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { std::vector key_strs; std::vector keys; std::vector values; @@ -2294,6 +2299,7 @@ TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2316,7 +2322,7 @@ TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { ASSERT_EQ(multiget_io_batch_size.max, 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { std::vector key_strs; std::vector keys; std::vector values; @@ -2334,6 +2340,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2348,13 +2355,13 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); - // There is only one MultiGet key in the bottommost level - 56. Thus - // the bottommost level will not use async IO. + // There are 2 keys in L1 in twp separate files, and 1 in L2. With + // async IO, all three lookups will happen in parallel ASSERT_EQ(multiget_io_batch_size.count, 1); - ASSERT_EQ(multiget_io_batch_size.max, 2); + ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2370,6 +2377,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 2); @@ -2382,7 +2390,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2398,6 +2406,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 2); @@ -2408,7 +2417,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2426,6 +2435,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), keys.size()); @@ -2437,6 +2447,9 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { // Bloom filters in L0/L1 will avoid the coroutine calls in those levels ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } + +INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest, + testing::Bool()); #endif // USE_COROUTINES TEST_F(DBBasicTest, MultiGetStats) { diff --git a/db/version_set.cc b/db/version_set.cc index 0c459a56b..a3f37c85b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -347,6 +347,7 @@ class FilePicker { return false; } }; +} // anonymous namespace class FilePickerMultiGet { private: @@ -362,20 +363,21 @@ class FilePickerMultiGet { curr_level_(static_cast(-1)), returned_file_level_(static_cast(-1)), hit_file_level_(static_cast(-1)), - range_(range), - batch_iter_(range->begin()), - batch_iter_prev_(range->begin()), - upper_key_(range->begin()), + range_(*range, range->begin(), range->end()), maybe_repeat_key_(false), current_level_range_(*range, range->begin(), range->end()), current_file_range_(*range, range->begin(), range->end()), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + upper_key_(range->begin()), level_files_brief_(file_levels), is_hit_file_last_in_level_(false), curr_file_level_(nullptr), file_indexer_(file_indexer), user_comparator_(user_comparator), - internal_comparator_(internal_comparator) { - for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + internal_comparator_(internal_comparator), + hit_file_(nullptr) { + for (auto iter = range_.begin(); iter != range_.end(); ++iter) { fp_ctx_array_[iter.index()] = FilePickerContext(0, FileIndexer::kLevelMaxIndex); } @@ -391,7 +393,7 @@ class FilePickerMultiGet { for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) { auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; if (r) { - for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + for (auto iter = range_.begin(); iter != range_.end(); ++iter) { r->Prepare(iter->ikey); } } @@ -399,8 +401,186 @@ class FilePickerMultiGet { } } + FilePickerMultiGet(MultiGetRange* range, const FilePickerMultiGet& other) + : num_levels_(other.num_levels_), + curr_level_(other.curr_level_), + returned_file_level_(other.returned_file_level_), + hit_file_level_(other.hit_file_level_), + fp_ctx_array_(other.fp_ctx_array_), + range_(*range, range->begin(), range->end()), + maybe_repeat_key_(false), + current_level_range_(*range, range->begin(), range->end()), + current_file_range_(*range, range->begin(), range->end()), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + upper_key_(range->begin()), + level_files_brief_(other.level_files_brief_), + is_hit_file_last_in_level_(false), + curr_file_level_(other.curr_file_level_), + file_indexer_(other.file_indexer_), + user_comparator_(other.user_comparator_), + internal_comparator_(other.internal_comparator_), + hit_file_(nullptr) { + PrepareNextLevelForSearch(); + } + int GetCurrentLevel() const { return curr_level_; } + void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } + + FdWithKeyRange* GetNextFileInLevel() { + if (batch_iter_ == current_level_range_.end() || search_ended_) { + hit_file_ = nullptr; + return nullptr; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + // For L0, always advance the key because we will look in the next + // file regardless for all keys not found yet + if (current_level_range_.CheckKeyDone(batch_iter_) || + curr_level_ == 0) { + batch_iter_ = upper_key_; + } + } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; + } + + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + hit_file_ = nullptr; + return nullptr; + } else { + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // the file index for all keys between batch_iter_ and upper_key_ + auto tmp_iter = batch_iter_; + while (tmp_iter != upper_key_) { + ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); + ++tmp_iter; + } + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + hit_file_ = f; + return f; + } + } + + // getter for current file level + // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts + unsigned int GetHitFileLevel() { return hit_file_level_; } + + FdWithKeyRange* GetHitFile() { return hit_file_; } + + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + + bool KeyMaySpanNextFile() { return maybe_repeat_key_; } + + bool IsSearchEnded() { return search_ended_; } + + const MultiGetRange& CurrentFileRange() { return current_file_range_; } + + bool RemainingOverlapInLevel() { + return !current_level_range_.Suffix(current_file_range_).empty(); + } + + MultiGetRange& GetRange() { return range_; } + + void ReplaceRange(const MultiGetRange& other) { + range_ = other; + current_level_range_ = other; + } + + FilePickerMultiGet(FilePickerMultiGet&& other) + : num_levels_(other.num_levels_), + curr_level_(other.curr_level_), + returned_file_level_(other.returned_file_level_), + hit_file_level_(other.hit_file_level_), + fp_ctx_array_(std::move(other.fp_ctx_array_)), + range_(std::move(other.range_)), + maybe_repeat_key_(other.maybe_repeat_key_), + current_level_range_(std::move(other.current_level_range_)), + current_file_range_(std::move(other.current_file_range_)), + batch_iter_(other.batch_iter_, ¤t_level_range_), + batch_iter_prev_(other.batch_iter_prev_, ¤t_level_range_), + upper_key_(other.upper_key_, ¤t_level_range_), + level_files_brief_(other.level_files_brief_), + search_ended_(other.search_ended_), + is_hit_file_last_in_level_(other.is_hit_file_last_in_level_), + curr_file_level_(other.curr_file_level_), + file_indexer_(other.file_indexer_), + user_comparator_(other.user_comparator_), + internal_comparator_(other.internal_comparator_), + hit_file_(other.hit_file_) {} + + private: + unsigned int num_levels_; + unsigned int curr_level_; + unsigned int returned_file_level_; + unsigned int hit_file_level_; + + struct FilePickerContext { + int32_t search_left_bound; + int32_t search_right_bound; + unsigned int curr_index_in_curr_level; + unsigned int start_index_in_curr_level; + + FilePickerContext(int32_t left, int32_t right) + : search_left_bound(left), + search_right_bound(right), + curr_index_in_curr_level(0), + start_index_in_curr_level(0) {} + + FilePickerContext() = default; + }; + std::array fp_ctx_array_; + MultiGetRange range_; + bool maybe_repeat_key_; + MultiGetRange current_level_range_; + MultiGetRange current_file_range_; + // Iterator to iterate through the keys in a MultiGet batch, that gets reset + // at the beginning of each level. Each call to GetNextFile() will position + // batch_iter_ at or right after the last key that was found in the returned + // SST file + MultiGetRange::Iterator batch_iter_; + // An iterator that records the previous position of batch_iter_, i.e last + // key found in the previous SST file, in order to serve as the start of + // the batch key range for the next SST file + MultiGetRange::Iterator batch_iter_prev_; + MultiGetRange::Iterator upper_key_; + autovector* level_files_brief_; + bool search_ended_; + bool is_hit_file_last_in_level_; + LevelFilesBrief* curr_file_level_; + FileIndexer* file_indexer_; + const Comparator* user_comparator_; + const InternalKeyComparator* internal_comparator_; + FdWithKeyRange* hit_file_; + // Iterates through files in the current level until it finds a file that // contains at least one key from the MultiGet batch bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range, @@ -524,124 +704,6 @@ class FilePickerMultiGet { return file_hit; } - void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } - - FdWithKeyRange* GetNextFileInLevel() { - if (batch_iter_ == current_level_range_.end() || search_ended_) { - return nullptr; - } else { - if (maybe_repeat_key_) { - maybe_repeat_key_ = false; - // Check if we found the final value for the last key in the - // previous lookup range. If we did, then there's no need to look - // any further for that key, so advance batch_iter_. Else, keep - // batch_iter_ positioned on that key so we look it up again in - // the next file - // For L0, always advance the key because we will look in the next - // file regardless for all keys not found yet - if (current_level_range_.CheckKeyDone(batch_iter_) || - curr_level_ == 0) { - batch_iter_ = upper_key_; - } - } - // batch_iter_prev_ will become the start key for the next file - // lookup - batch_iter_prev_ = batch_iter_; - } - - MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, - current_level_range_.end()); - size_t curr_file_index = - (batch_iter_ != current_level_range_.end()) - ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level - : curr_file_level_->num_files; - FdWithKeyRange* f; - bool is_last_key_in_file; - if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, - &is_last_key_in_file)) { - return nullptr; - } else { - if (is_last_key_in_file) { - // Since cmp_largest is 0, batch_iter_ still points to the last key - // that falls in this file, instead of the next one. Increment - // the file index for all keys between batch_iter_ and upper_key_ - auto tmp_iter = batch_iter_; - while (tmp_iter != upper_key_) { - ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); - ++tmp_iter; - } - maybe_repeat_key_ = true; - } - // Set the range for this file - current_file_range_ = - MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); - returned_file_level_ = curr_level_; - hit_file_level_ = curr_level_; - is_hit_file_last_in_level_ = - curr_file_index == curr_file_level_->num_files - 1; - return f; - } - } - - // getter for current file level - // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts - unsigned int GetHitFileLevel() { return hit_file_level_; } - - // Returns true if the most recent "hit file" (i.e., one returned by - // GetNextFile()) is at the last index in its level. - bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } - - bool KeyMaySpanNextFile() { return maybe_repeat_key_; } - - bool IsSearchEnded() { return search_ended_; } - - const MultiGetRange& CurrentFileRange() { return current_file_range_; } - - bool RemainingOverlapInLevel() { - return !current_level_range_.Suffix(current_file_range_).empty(); - } - - private: - unsigned int num_levels_; - unsigned int curr_level_; - unsigned int returned_file_level_; - unsigned int hit_file_level_; - - struct FilePickerContext { - int32_t search_left_bound; - int32_t search_right_bound; - unsigned int curr_index_in_curr_level; - unsigned int start_index_in_curr_level; - - FilePickerContext(int32_t left, int32_t right) - : search_left_bound(left), search_right_bound(right), - curr_index_in_curr_level(0), start_index_in_curr_level(0) {} - - FilePickerContext() = default; - }; - std::array fp_ctx_array_; - MultiGetRange* range_; - // Iterator to iterate through the keys in a MultiGet batch, that gets reset - // at the beginning of each level. Each call to GetNextFile() will position - // batch_iter_ at or right after the last key that was found in the returned - // SST file - MultiGetRange::Iterator batch_iter_; - // An iterator that records the previous position of batch_iter_, i.e last - // key found in the previous SST file, in order to serve as the start of - // the batch key range for the next SST file - MultiGetRange::Iterator batch_iter_prev_; - MultiGetRange::Iterator upper_key_; - bool maybe_repeat_key_; - MultiGetRange current_level_range_; - MultiGetRange current_file_range_; - autovector* level_files_brief_; - bool search_ended_; - bool is_hit_file_last_in_level_; - LevelFilesBrief* curr_file_level_; - FileIndexer* file_indexer_; - const Comparator* user_comparator_; - const InternalKeyComparator* internal_comparator_; - // Setup local variables to search next level. // Returns false if there are no more levels to search. bool PrepareNextLevel() { @@ -692,7 +754,7 @@ class FilePickerMultiGet { // are always compacted into a single entry). int32_t start_index = -1; current_level_range_ = - MultiGetRange(*range_, range_->begin(), range_->end()); + MultiGetRange(range_, range_.begin(), range_.end()); for (auto mget_iter = current_level_range_.begin(); mget_iter != current_level_range_.end(); ++mget_iter) { struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; @@ -754,7 +816,6 @@ class FilePickerMultiGet { return false; } }; -} // anonymous namespace VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } @@ -2190,151 +2251,162 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, iter->get_context = &(get_ctx[get_ctx_index]); } - MultiGetRange file_picker_range(*range, range->begin(), range->end()); - FilePickerMultiGet fp( - &file_picker_range, - &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, - &storage_info_.file_indexer_, user_comparator(), internal_comparator()); - FdWithKeyRange* f = fp.GetNextFileInLevel(); Status s; - uint64_t num_index_read = 0; - uint64_t num_filter_read = 0; - uint64_t num_sst_read = 0; - uint64_t num_level_read = 0; - - MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); // blob_file => [[blob_idx, it], ...] std::unordered_map blob_ctxs; - int prev_level = -1; - - while (!fp.IsSearchEnded()) { - // This will be set to true later if we actually look up in a file in L0. - // For per level stats purposes, an L0 file is treated as a level - bool dump_stats_for_l0_file = false; - - // Avoid using the coroutine version if we're looking in a L0 file, since - // L0 files won't be parallelized anyway. The regular synchronous version - // is faster. - if (!read_options.async_io || !using_coroutines() || - fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { - if (f) { - bool skip_filters = IsFilterSkipped( - static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); - // Call MultiGetFromSST for looking up a single file - s = MultiGetFromSST(read_options, fp.CurrentFileRange(), - fp.GetHitFileLevel(), skip_filters, - /*skip_range_deletions=*/false, f, blob_ctxs, - /*table_handle=*/nullptr, num_filter_read, - num_index_read, num_sst_read); - if (fp.GetHitFileLevel() == 0) { - dump_stats_for_l0_file = true; - } - } - if (s.ok()) { - f = fp.GetNextFileInLevel(); - } + MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); #if USE_COROUTINES - } else { - std::vector> mget_tasks; - while (f != nullptr) { - MultiGetRange file_range = fp.CurrentFileRange(); - Cache::Handle* table_handle = nullptr; - bool skip_filters = IsFilterSkipped( - static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); - bool skip_range_deletions = false; - if (!skip_filters) { - Status status = table_cache_->MultiGetFilter( - read_options, *internal_comparator(), *f->file_metadata, - mutable_cf_options_.prefix_extractor, - cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - fp.GetHitFileLevel(), &file_range, &table_handle); - skip_range_deletions = true; - if (status.ok()) { - skip_filters = true; - } else if (!status.IsNotSupported()) { - s = status; + if (read_options.async_io && read_options.optimize_multiget_for_io && + using_coroutines()) { + s = MultiGetAsync(read_options, range, &blob_ctxs); + } else +#endif // USE_COROUTINES + { + MultiGetRange file_picker_range(*range, range->begin(), range->end()); + FilePickerMultiGet fp(&file_picker_range, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), + internal_comparator()); + FdWithKeyRange* f = fp.GetNextFileInLevel(); + uint64_t num_index_read = 0; + uint64_t num_filter_read = 0; + uint64_t num_sst_read = 0; + uint64_t num_level_read = 0; + + int prev_level = -1; + + while (!fp.IsSearchEnded()) { + // This will be set to true later if we actually look up in a file in L0. + // For per level stats purposes, an L0 file is treated as a level + bool dump_stats_for_l0_file = false; + + // Avoid using the coroutine version if we're looking in a L0 file, since + // L0 files won't be parallelized anyway. The regular synchronous version + // is faster. + if (!read_options.async_io || !using_coroutines() || + fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { + if (f) { + bool skip_filters = + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + // Call MultiGetFromSST for looking up a single file + s = MultiGetFromSST(read_options, fp.CurrentFileRange(), + fp.GetHitFileLevel(), skip_filters, + /*skip_range_deletions=*/false, f, blob_ctxs, + /*table_handle=*/nullptr, num_filter_read, + num_index_read, num_sst_read); + if (fp.GetHitFileLevel() == 0) { + dump_stats_for_l0_file = true; } } - - if (!s.ok()) { - break; + if (s.ok()) { + f = fp.GetNextFileInLevel(); } +#if USE_COROUTINES + } else { + std::vector> mget_tasks; + while (f != nullptr) { + MultiGetRange file_range = fp.CurrentFileRange(); + Cache::Handle* table_handle = nullptr; + bool skip_filters = + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + bool skip_range_deletions = false; + if (!skip_filters) { + Status status = table_cache_->MultiGetFilter( + read_options, *internal_comparator(), *f->file_metadata, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + fp.GetHitFileLevel(), &file_range, &table_handle); + skip_range_deletions = true; + if (status.ok()) { + skip_filters = true; + } else if (!status.IsNotSupported()) { + s = status; + } + } - if (!file_range.empty()) { - mget_tasks.emplace_back(MultiGetFromSSTCoroutine( - read_options, file_range, fp.GetHitFileLevel(), skip_filters, - skip_range_deletions, f, blob_ctxs, table_handle, num_filter_read, - num_index_read, num_sst_read)); - } - if (fp.KeyMaySpanNextFile()) { - break; - } - f = fp.GetNextFileInLevel(); - } - if (s.ok() && mget_tasks.size() > 0) { - RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); - // Collect all results so far - std::vector statuses = folly::coro::blockingWait( - folly::coro::collectAllRange(std::move(mget_tasks)) - .scheduleOn(&range->context()->executor())); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; + if (!s.ok()) { + break; } - } - if (s.ok() && fp.KeyMaySpanNextFile()) { + if (!file_range.empty()) { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, file_range, fp.GetHitFileLevel(), skip_filters, + skip_range_deletions, f, blob_ctxs, table_handle, + num_filter_read, num_index_read, num_sst_read)); + } + if (fp.KeyMaySpanNextFile()) { + break; + } f = fp.GetNextFileInLevel(); } - } + if (s.ok() && mget_tasks.size() > 0) { + RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, + mget_tasks.size()); + // Collect all results so far + std::vector statuses = folly::coro::blockingWait( + folly::coro::collectAllRange(std::move(mget_tasks)) + .scheduleOn(&range->context()->executor())); + for (Status stat : statuses) { + if (!stat.ok()) { + s = stat; + } + } + + if (s.ok() && fp.KeyMaySpanNextFile()) { + f = fp.GetNextFileInLevel(); + } + } #endif // USE_COROUTINES - } - // If bad status or we found final result for all the keys - if (!s.ok() || file_picker_range.empty()) { - break; - } - if (!f) { - // Reached the end of this level. Prepare the next level - fp.PrepareNextLevelForSearch(); - if (!fp.IsSearchEnded()) { - // Its possible there is no overlap on this level and f is nullptr - f = fp.GetNextFileInLevel(); - } - if (dump_stats_for_l0_file || - (prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) { - // Dump the stats if the search has moved to the next level and - // reset for next level. - if (num_filter_read + num_index_read) { - RecordInHistogram(db_statistics_, - NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, - num_index_read + num_filter_read); + } + // If bad status or we found final result for all the keys + if (!s.ok() || file_picker_range.empty()) { + break; + } + if (!f) { + // Reached the end of this level. Prepare the next level + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + // Its possible there is no overlap on this level and f is nullptr + f = fp.GetNextFileInLevel(); } - if (num_sst_read) { - RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, - num_sst_read); - num_level_read++; + if (dump_stats_for_l0_file || + (prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) { + // Dump the stats if the search has moved to the next level and + // reset for next level. + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, + num_sst_read); + num_level_read++; + } + num_filter_read = 0; + num_index_read = 0; + num_sst_read = 0; } - num_filter_read = 0; - num_index_read = 0; - num_sst_read = 0; + prev_level = fp.GetHitFileLevel(); } - prev_level = fp.GetHitFileLevel(); } - } - // Dump stats for most recent level - if (num_filter_read + num_index_read) { - RecordInHistogram(db_statistics_, - NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, - num_index_read + num_filter_read); - } - if (num_sst_read) { - RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); - num_level_read++; - } - if (num_level_read) { - RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET, - num_level_read); + // Dump stats for most recent level + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); + num_level_read++; + } + if (num_level_read) { + RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET, + num_level_read); + } } if (s.ok() && !blob_ctxs.empty()) { @@ -2386,6 +2458,201 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } } +#ifdef USE_COROUTINES +Status Version::ProcessBatch( + const ReadOptions& read_options, FilePickerMultiGet* batch, + std::vector>& mget_tasks, + std::unordered_map* blob_ctxs, + autovector& batches, std::deque& waiting, + std::deque& to_process, unsigned int& num_tasks_queued, + uint64_t& num_filter_read, uint64_t& num_index_read, + uint64_t& num_sst_read) { + FilePickerMultiGet& fp = *batch; + MultiGetRange range = fp.GetRange(); + // Initialize a new empty range. Any keys that are not in this level will + // eventually become part of the new range. + MultiGetRange leftover(range, range.begin(), range.begin()); + FdWithKeyRange* f = nullptr; + Status s; + + f = fp.GetNextFileInLevel(); + while (!f) { + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + f = fp.GetNextFileInLevel(); + } else { + break; + } + } + while (f) { + MultiGetRange file_range = fp.CurrentFileRange(); + Cache::Handle* table_handle = nullptr; + bool skip_filters = IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + bool skip_range_deletions = false; + if (!skip_filters) { + Status status = table_cache_->MultiGetFilter( + read_options, *internal_comparator(), *f->file_metadata, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + fp.GetHitFileLevel(), &file_range, &table_handle); + if (status.ok()) { + skip_filters = true; + skip_range_deletions = true; + } else if (!status.IsNotSupported()) { + s = status; + } + } + if (!s.ok()) { + break; + } + // At this point, file_range contains any keys that are likely in this + // file. It may have false positives, but that's ok since higher level + // lookups for the key are dependent on this lookup anyway. + // Add the complement of file_range to leftover. That's the set of keys + // definitely not in this level. + // Subtract the complement of file_range from range, since they will be + // processed in a separate batch in parallel. + leftover += ~file_range; + range -= ~file_range; + if (!file_range.empty()) { + if (waiting.empty() && to_process.empty() && + !fp.RemainingOverlapInLevel() && leftover.empty() && + mget_tasks.empty()) { + // All keys are in one SST file, so take the fast path + s = MultiGetFromSST(read_options, file_range, fp.GetHitFileLevel(), + skip_filters, skip_range_deletions, f, *blob_ctxs, + table_handle, num_filter_read, num_index_read, + num_sst_read); + } else { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, file_range, fp.GetHitFileLevel(), skip_filters, + skip_range_deletions, f, *blob_ctxs, table_handle, num_filter_read, + num_index_read, num_sst_read)); + ++num_tasks_queued; + } + } + if (fp.KeyMaySpanNextFile() && !file_range.empty()) { + break; + } + f = fp.GetNextFileInLevel(); + } + // Split the current batch only if some keys are likely in this level and + // some are not. + if (s.ok() && !leftover.empty() && !range.empty()) { + fp.ReplaceRange(range); + batches.emplace_back(&leftover, fp); + to_process.emplace_back(batches.size() - 1); + } + // 1. If f is non-null, that means we might not be done with this level. + // This can happen if one of the keys is the last key in the file, i.e + // fp.KeyMaySpanNextFile() is true. + // 2. If range is empty, then we're done with this range and no need to + // prepare the next level + // 3. If some tasks were queued for this range, then the next level will be + // prepared after executing those tasks + if (!f && !range.empty() && !num_tasks_queued) { + fp.PrepareNextLevelForSearch(); + } + return s; +} + +Status Version::MultiGetAsync( + const ReadOptions& options, MultiGetRange* range, + std::unordered_map* blob_ctxs) { + autovector batches; + std::deque waiting; + std::deque to_process; + Status s; + std::vector> mget_tasks; + uint64_t num_filter_read = 0; + uint64_t num_index_read = 0; + uint64_t num_sst_read = 0; + + // Create the initial batch with the input range + batches.emplace_back(range, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), + internal_comparator()); + to_process.emplace_back(0); + + while (!to_process.empty()) { + size_t idx = to_process.front(); + FilePickerMultiGet* batch = &batches.at(idx); + unsigned int num_tasks_queued = 0; + to_process.pop_front(); + if (batch->IsSearchEnded() || batch->GetRange().empty()) { + if (!to_process.empty()) { + continue; + } + } else { + // Look through one level. This may split the batch and enqueue it to + // to_process + s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting, + to_process, num_tasks_queued, num_filter_read, + num_index_read, num_sst_read); + if (!s.ok()) { + break; + } + // Dump the stats since the search has moved to the next level + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); + } + // If ProcessBatch didn't enqueue any coroutine tasks, it means all + // keys were filtered out. So put the batch back in to_process to + // lookup in the next level + if (!num_tasks_queued && !batch->IsSearchEnded()) { + // Put this back in the processing queue + to_process.emplace_back(idx); + } else if (num_tasks_queued) { + waiting.emplace_back(idx); + } + } + if (to_process.empty()) { + if (s.ok() && mget_tasks.size() > 0) { + assert(waiting.size()); + RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); + // Collect all results so far + std::vector statuses = folly::coro::blockingWait( + folly::coro::collectAllRange(std::move(mget_tasks)) + .scheduleOn(&range->context()->executor())); + for (Status stat : statuses) { + if (!stat.ok()) { + s = stat; + break; + } + } + + if (!s.ok()) { + break; + } + + for (size_t wait_idx : waiting) { + FilePickerMultiGet& fp = batches.at(wait_idx); + // 1. If fp.GetHitFile() is non-null, then there could be more + // overlap in this level. So skip preparing next level. + // 2. If fp.GetRange() is empty, then this batch is completed + // and no need to prepare the next level. + if (!fp.GetHitFile() && !fp.GetRange().empty()) { + fp.PrepareNextLevelForSearch(); + } + } + to_process.swap(waiting); + } else { + assert(!s.ok() || waiting.size() == 0); + } + } + } + + return s; +} +#endif + bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. diff --git a/db/version_set.h b/db/version_set.h index c164d8e4c..90257b434 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -46,6 +46,10 @@ #include "db/version_edit.h" #include "db/write_controller.h" #include "env/file_system_tracer.h" +#if USE_COROUTINES +#include "folly/experimental/coro/BlockingWait.h" +#include "folly/experimental/coro/Collect.h" +#endif #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" #include "port/port.h" @@ -54,6 +58,7 @@ #include "table/get_context.h" #include "table/multiget_context.h" #include "trace_replay/block_cache_tracer.h" +#include "util/autovector.h" #include "util/coro_utils.h" #include "util/hash_containers.h" @@ -76,6 +81,7 @@ class ColumnFamilySet; class MergeIteratorBuilder; class SystemClock; class ManifestTailer; +class FilePickerMultiGet; // VersionEdit is always supposed to be valid and it is used to point at // entries in Manifest. Ideally it should not be used as a container to @@ -997,6 +1003,28 @@ class Version { Cache::Handle* table_handle, uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read); +#ifdef USE_COROUTINES + // MultiGet using async IO to read data blocks from SST files in parallel + // within and across levels + Status MultiGetAsync( + const ReadOptions& options, MultiGetRange* range, + std::unordered_map* blob_ctxs); + + // A helper function to lookup a batch of keys in a single level. It will + // queue coroutine tasks to mget_tasks. It may also split the input batch + // by creating a new batch with keys definitely not in this level and + // enqueuing it to to_process. + Status ProcessBatch(const ReadOptions& read_options, + FilePickerMultiGet* batch, + std::vector>& mget_tasks, + std::unordered_map* blob_ctxs, + autovector& batches, + std::deque& waiting, + std::deque& to_process, + unsigned int& num_tasks_queued, uint64_t& num_filter_read, + uint64_t& num_index_read, uint64_t& num_sst_read); +#endif + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; Statistics* db_statistics_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b0d21f62c..0338c7697 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1686,6 +1686,17 @@ struct ReadOptions { // Default: false bool async_io; + // Experimental + // + // If async_io is set, then this flag controls whether we read SST files + // in multiple levels asynchronously. Enabling this flag can help reduce + // MultiGet latency by maximizing the number of SST files read in + // parallel if the keys in the MultiGet batch are in different levels. It + // comes at the expense of slightly higher CPU overhead. + // + // Default: false + bool optimize_multiget_for_io; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 6dff5e62f..6f5691515 100644 --- a/options/options.cc +++ b/options/options.cc @@ -696,7 +696,8 @@ ReadOptions::ReadOptions() io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), - async_io(false) {} + async_io(false), + optimize_multiget_for_io(false) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -721,6 +722,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), - async_io(false) {} + async_io(false), + optimize_multiget_for_io(false) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/multiget_context.h b/table/multiget_context.h index 188681480..933f4e17d 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -199,17 +199,24 @@ class MultiGetContext { : range_(range), ctx_(range->ctx_), index_(idx) { while (index_ < range_->end_ && (Mask{1} << index_) & - (range_->ctx_->value_mask_ | range_->skip_mask_)) + (range_->ctx_->value_mask_ | range_->skip_mask_ | + range_->invalid_mask_)) index_++; } Iterator(const Iterator&) = default; + + Iterator(const Iterator& other, const Range* range) + : range_(range), ctx_(other.ctx_), index_(other.index_) { + assert(range->ctx_ == other.ctx_); + } Iterator& operator=(const Iterator&) = default; Iterator& operator++() { while (++index_ < range_->end_ && (Mask{1} << index_) & - (range_->ctx_->value_mask_ | range_->skip_mask_)) + (range_->ctx_->value_mask_ | range_->skip_mask_ | + range_->invalid_mask_)) ; return *this; } @@ -247,9 +254,17 @@ class MultiGetContext { const Iterator& first, const Iterator& last) { ctx_ = mget_range.ctx_; - start_ = first.index_; - end_ = last.index_; + if (first == last) { + // This means create an empty range based on mget_range. So just + // set start_ and and_ to the same value + start_ = mget_range.start_; + end_ = start_; + } else { + start_ = first.index_; + end_ = last.index_; + } skip_mask_ = mget_range.skip_mask_; + invalid_mask_ = mget_range.invalid_mask_; assert(start_ < 64); assert(end_ < 64); } @@ -305,18 +320,67 @@ class MultiGetContext { } } + // The += operator expands the number of keys in this range. The expansion + // is always to the right, i.e start of the additional range >= end of + // current range. There should be no overlap. Any skipped keys in rhs are + // marked as invalid in the invalid_mask_. + Range& operator+=(const Range& rhs) { + assert(rhs.start_ >= end_); + // Check for non-overlapping ranges and adjust invalid_mask_ accordingly + if (end_ < rhs.start_) { + invalid_mask_ |= RangeMask(end_, rhs.start_); + skip_mask_ |= RangeMask(end_, rhs.start_); + } + start_ = std::min(start_, rhs.start_); + end_ = std::max(end_, rhs.end_); + skip_mask_ |= rhs.skip_mask_ & RangeMask(rhs.start_, rhs.end_); + invalid_mask_ |= (rhs.invalid_mask_ | rhs.skip_mask_) & + RangeMask(rhs.start_, rhs.end_); + assert(start_ < 64); + assert(end_ < 64); + return *this; + } + + // The -= operator removes keys from this range. The removed keys should + // come from a range completely overlapping the current range. The removed + // keys are marked invalid in the invalid_mask_. + Range& operator-=(const Range& rhs) { + assert(start_ <= rhs.start_ && end_ >= rhs.end_); + skip_mask_ |= (~rhs.skip_mask_ | rhs.invalid_mask_) & + RangeMask(rhs.start_, rhs.end_); + invalid_mask_ |= (~rhs.skip_mask_ | rhs.invalid_mask_) & + RangeMask(rhs.start_, rhs.end_); + return *this; + } + + // Return a complement of the current range + Range operator~() { + Range res = *this; + res.skip_mask_ = ~skip_mask_ & RangeMask(start_, end_); + return res; + } + private: friend MultiGetContext; MultiGetContext* ctx_; size_t start_; size_t end_; Mask skip_mask_; + Mask invalid_mask_; Range(MultiGetContext* ctx, size_t num_keys) - : ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) { + : ctx_(ctx), + start_(0), + end_(num_keys), + skip_mask_(0), + invalid_mask_(0) { assert(num_keys < 64); } + static Mask RangeMask(size_t start, size_t end) { + return (((Mask{1} << (end - start)) - 1) << start); + } + Mask RemainingMask() const { return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) & ~(ctx_->value_mask_ | skip_mask_)); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index cd7596e31..ddee9c057 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1177,6 +1177,10 @@ DEFINE_bool(async_io, false, "When set true, RocksDB does asynchronous reads for internal auto " "readahead prefetching."); +DEFINE_bool(optimize_multiget_for_io, true, + "When set true, RocksDB does asynchronous reads for SST files in " + "multiple levels for MultiGet."); + DEFINE_bool(charge_compression_dictionary_building_buffer, false, "Setting for " "CacheEntryRoleOptions::charged of " @@ -3364,6 +3368,7 @@ class Benchmark { read_options_.readahead_size = FLAGS_readahead_size; read_options_.adaptive_readahead = FLAGS_adaptive_readahead; read_options_.async_io = FLAGS_async_io; + read_options_.optimize_multiget_for_io = FLAGS_optimize_multiget_for_io; void (Benchmark::*method)(ThreadState*) = nullptr; void (Benchmark::*post_process_method)() = nullptr;