diff --git a/HISTORY.md b/HISTORY.md index f4e11c68c..234d3da1f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,7 @@ * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. * `CompactRangeOptions::exclusive_manual_compaction` is now false by default. This ensures RocksDB does not introduce artificial parallelism limitations by default. * Tiered Storage: change `bottommost_temperture` to `last_level_temperture`. The old option name is kept only for migration, please use the new option. The behavior is changed to apply temperature for the `last_level` SST files only. +* Added a new experimental ReadOption flag called optimize_multiget_for_io, which when set attempts to reduce MultiGet latency by spawning coroutines for keys in multiple levels. ### Bug Fixes * Fix a bug starting in 7.4.0 in which some fsync operations might be skipped in a DB after any DropColumnFamily on that DB, until it is re-opened. This can lead to data loss on power loss. (For custom FileSystem implementations, this could lead to `FSDirectory::Fsync` or `FSDirectory::Close` after the first `FSDirectory::Close`; Also, valgrind could report call to `close()` with `fd=-1`.) @@ -42,6 +43,7 @@ ### Performance Improvements * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. * When using iterators with the integrated BlobDB implementation, blob cache handles are now released immediately when the iterator's position changes. +* MultiGet can now do more IO in parallel by reading data blocks from SST files in multiple levels, if the optimize_multiget_for_io ReadOption flag is set. ## Behavior Change * Block cache keys have changed, which will cause any persistent caches to miss between versions. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index efb864c73..9f866a91a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2013,12 +2013,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSize) { } TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) { -#ifndef USE_COROUTINES if (std::get<1>(GetParam())) { - ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + ROCKSDB_GTEST_SKIP("This test needs to be fixed for async IO"); return; } -#endif // USE_COROUTINES // Skip for unbatched MultiGet if (!std::get<0>(GetParam())) { ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet"); @@ -2131,7 +2129,8 @@ INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, testing::Combine(testing::Bool(), testing::Bool())); #if USE_COROUTINES -class DBMultiGetAsyncIOTest : public DBBasicTest { +class DBMultiGetAsyncIOTest : public DBBasicTest, + public ::testing::WithParamInterface { public: DBMultiGetAsyncIOTest() : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { @@ -2210,7 +2209,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest { std::shared_ptr statistics_; }; -TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { // All 3 keys in L0. The L0 files should be read serially. std::vector key_strs{Key(0), Key(40), Key(80)}; std::vector keys{key_strs[0], key_strs[1], key_strs[2]}; @@ -2219,6 +2218,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2233,13 +2233,17 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); - // No async IO in this case since we don't do parallel lookup in L0 - ASSERT_EQ(multiget_io_batch_size.count, 0); - ASSERT_EQ(multiget_io_batch_size.max, 0); - ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); + // With async IO, lookups will happen in parallel for each key + if (GetParam()) { + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 3); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); + } else { + ASSERT_EQ(multiget_io_batch_size.count, 0); + } } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2256,6 +2260,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2276,7 +2281,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } -TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { +TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { std::vector key_strs; std::vector keys; std::vector values; @@ -2294,6 +2299,7 @@ TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2316,7 +2322,7 @@ TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { ASSERT_EQ(multiget_io_batch_size.max, 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { std::vector key_strs; std::vector keys; std::vector values; @@ -2334,6 +2340,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); @@ -2348,13 +2355,13 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); - // There is only one MultiGet key in the bottommost level - 56. Thus - // the bottommost level will not use async IO. + // There are 2 keys in L1 in twp separate files, and 1 in L2. With + // async IO, all three lookups will happen in parallel ASSERT_EQ(multiget_io_batch_size.count, 1); - ASSERT_EQ(multiget_io_batch_size.max, 2); + ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2370,6 +2377,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 2); @@ -2382,7 +2390,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2398,6 +2406,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 2); @@ -2408,7 +2417,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } -TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { +TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { std::vector key_strs; std::vector keys; std::vector values; @@ -2426,6 +2435,7 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { ReadOptions ro; ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), keys.size()); @@ -2437,6 +2447,9 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { // Bloom filters in L0/L1 will avoid the coroutine calls in those levels ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } + +INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest, + testing::Bool()); #endif // USE_COROUTINES TEST_F(DBBasicTest, MultiGetStats) { diff --git a/db/version_set.cc b/db/version_set.cc index 0c459a56b..a3f37c85b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -347,6 +347,7 @@ class FilePicker { return false; } }; +} // anonymous namespace class FilePickerMultiGet { private: @@ -362,20 +363,21 @@ class FilePickerMultiGet { curr_level_(static_cast(-1)), returned_file_level_(static_cast(-1)), hit_file_level_(static_cast(-1)), - range_(range), - batch_iter_(range->begin()), - batch_iter_prev_(range->begin()), - upper_key_(range->begin()), + range_(*range, range->begin(), range->end()), maybe_repeat_key_(false), current_level_range_(*range, range->begin(), range->end()), current_file_range_(*range, range->begin(), range->end()), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + upper_key_(range->begin()), level_files_brief_(file_levels), is_hit_file_last_in_level_(false), curr_file_level_(nullptr), file_indexer_(file_indexer), user_comparator_(user_comparator), - internal_comparator_(internal_comparator) { - for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + internal_comparator_(internal_comparator), + hit_file_(nullptr) { + for (auto iter = range_.begin(); iter != range_.end(); ++iter) { fp_ctx_array_[iter.index()] = FilePickerContext(0, FileIndexer::kLevelMaxIndex); } @@ -391,7 +393,7 @@ class FilePickerMultiGet { for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) { auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; if (r) { - for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + for (auto iter = range_.begin(); iter != range_.end(); ++iter) { r->Prepare(iter->ikey); } } @@ -399,8 +401,186 @@ class FilePickerMultiGet { } } + FilePickerMultiGet(MultiGetRange* range, const FilePickerMultiGet& other) + : num_levels_(other.num_levels_), + curr_level_(other.curr_level_), + returned_file_level_(other.returned_file_level_), + hit_file_level_(other.hit_file_level_), + fp_ctx_array_(other.fp_ctx_array_), + range_(*range, range->begin(), range->end()), + maybe_repeat_key_(false), + current_level_range_(*range, range->begin(), range->end()), + current_file_range_(*range, range->begin(), range->end()), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + upper_key_(range->begin()), + level_files_brief_(other.level_files_brief_), + is_hit_file_last_in_level_(false), + curr_file_level_(other.curr_file_level_), + file_indexer_(other.file_indexer_), + user_comparator_(other.user_comparator_), + internal_comparator_(other.internal_comparator_), + hit_file_(nullptr) { + PrepareNextLevelForSearch(); + } + int GetCurrentLevel() const { return curr_level_; } + void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } + + FdWithKeyRange* GetNextFileInLevel() { + if (batch_iter_ == current_level_range_.end() || search_ended_) { + hit_file_ = nullptr; + return nullptr; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + // For L0, always advance the key because we will look in the next + // file regardless for all keys not found yet + if (current_level_range_.CheckKeyDone(batch_iter_) || + curr_level_ == 0) { + batch_iter_ = upper_key_; + } + } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; + } + + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + hit_file_ = nullptr; + return nullptr; + } else { + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // the file index for all keys between batch_iter_ and upper_key_ + auto tmp_iter = batch_iter_; + while (tmp_iter != upper_key_) { + ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); + ++tmp_iter; + } + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + hit_file_ = f; + return f; + } + } + + // getter for current file level + // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts + unsigned int GetHitFileLevel() { return hit_file_level_; } + + FdWithKeyRange* GetHitFile() { return hit_file_; } + + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + + bool KeyMaySpanNextFile() { return maybe_repeat_key_; } + + bool IsSearchEnded() { return search_ended_; } + + const MultiGetRange& CurrentFileRange() { return current_file_range_; } + + bool RemainingOverlapInLevel() { + return !current_level_range_.Suffix(current_file_range_).empty(); + } + + MultiGetRange& GetRange() { return range_; } + + void ReplaceRange(const MultiGetRange& other) { + range_ = other; + current_level_range_ = other; + } + + FilePickerMultiGet(FilePickerMultiGet&& other) + : num_levels_(other.num_levels_), + curr_level_(other.curr_level_), + returned_file_level_(other.returned_file_level_), + hit_file_level_(other.hit_file_level_), + fp_ctx_array_(std::move(other.fp_ctx_array_)), + range_(std::move(other.range_)), + maybe_repeat_key_(other.maybe_repeat_key_), + current_level_range_(std::move(other.current_level_range_)), + current_file_range_(std::move(other.current_file_range_)), + batch_iter_(other.batch_iter_, ¤t_level_range_), + batch_iter_prev_(other.batch_iter_prev_, ¤t_level_range_), + upper_key_(other.upper_key_, ¤t_level_range_), + level_files_brief_(other.level_files_brief_), + search_ended_(other.search_ended_), + is_hit_file_last_in_level_(other.is_hit_file_last_in_level_), + curr_file_level_(other.curr_file_level_), + file_indexer_(other.file_indexer_), + user_comparator_(other.user_comparator_), + internal_comparator_(other.internal_comparator_), + hit_file_(other.hit_file_) {} + + private: + unsigned int num_levels_; + unsigned int curr_level_; + unsigned int returned_file_level_; + unsigned int hit_file_level_; + + struct FilePickerContext { + int32_t search_left_bound; + int32_t search_right_bound; + unsigned int curr_index_in_curr_level; + unsigned int start_index_in_curr_level; + + FilePickerContext(int32_t left, int32_t right) + : search_left_bound(left), + search_right_bound(right), + curr_index_in_curr_level(0), + start_index_in_curr_level(0) {} + + FilePickerContext() = default; + }; + std::array fp_ctx_array_; + MultiGetRange range_; + bool maybe_repeat_key_; + MultiGetRange current_level_range_; + MultiGetRange current_file_range_; + // Iterator to iterate through the keys in a MultiGet batch, that gets reset + // at the beginning of each level. Each call to GetNextFile() will position + // batch_iter_ at or right after the last key that was found in the returned + // SST file + MultiGetRange::Iterator batch_iter_; + // An iterator that records the previous position of batch_iter_, i.e last + // key found in the previous SST file, in order to serve as the start of + // the batch key range for the next SST file + MultiGetRange::Iterator batch_iter_prev_; + MultiGetRange::Iterator upper_key_; + autovector* level_files_brief_; + bool search_ended_; + bool is_hit_file_last_in_level_; + LevelFilesBrief* curr_file_level_; + FileIndexer* file_indexer_; + const Comparator* user_comparator_; + const InternalKeyComparator* internal_comparator_; + FdWithKeyRange* hit_file_; + // Iterates through files in the current level until it finds a file that // contains at least one key from the MultiGet batch bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range, @@ -524,124 +704,6 @@ class FilePickerMultiGet { return file_hit; } - void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } - - FdWithKeyRange* GetNextFileInLevel() { - if (batch_iter_ == current_level_range_.end() || search_ended_) { - return nullptr; - } else { - if (maybe_repeat_key_) { - maybe_repeat_key_ = false; - // Check if we found the final value for the last key in the - // previous lookup range. If we did, then there's no need to look - // any further for that key, so advance batch_iter_. Else, keep - // batch_iter_ positioned on that key so we look it up again in - // the next file - // For L0, always advance the key because we will look in the next - // file regardless for all keys not found yet - if (current_level_range_.CheckKeyDone(batch_iter_) || - curr_level_ == 0) { - batch_iter_ = upper_key_; - } - } - // batch_iter_prev_ will become the start key for the next file - // lookup - batch_iter_prev_ = batch_iter_; - } - - MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, - current_level_range_.end()); - size_t curr_file_index = - (batch_iter_ != current_level_range_.end()) - ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level - : curr_file_level_->num_files; - FdWithKeyRange* f; - bool is_last_key_in_file; - if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, - &is_last_key_in_file)) { - return nullptr; - } else { - if (is_last_key_in_file) { - // Since cmp_largest is 0, batch_iter_ still points to the last key - // that falls in this file, instead of the next one. Increment - // the file index for all keys between batch_iter_ and upper_key_ - auto tmp_iter = batch_iter_; - while (tmp_iter != upper_key_) { - ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); - ++tmp_iter; - } - maybe_repeat_key_ = true; - } - // Set the range for this file - current_file_range_ = - MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); - returned_file_level_ = curr_level_; - hit_file_level_ = curr_level_; - is_hit_file_last_in_level_ = - curr_file_index == curr_file_level_->num_files - 1; - return f; - } - } - - // getter for current file level - // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts - unsigned int GetHitFileLevel() { return hit_file_level_; } - - // Returns true if the most recent "hit file" (i.e., one returned by - // GetNextFile()) is at the last index in its level. - bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } - - bool KeyMaySpanNextFile() { return maybe_repeat_key_; } - - bool IsSearchEnded() { return search_ended_; } - - const MultiGetRange& CurrentFileRange() { return current_file_range_; } - - bool RemainingOverlapInLevel() { - return !current_level_range_.Suffix(current_file_range_).empty(); - } - - private: - unsigned int num_levels_; - unsigned int curr_level_; - unsigned int returned_file_level_; - unsigned int hit_file_level_; - - struct FilePickerContext { - int32_t search_left_bound; - int32_t search_right_bound; - unsigned int curr_index_in_curr_level; - unsigned int start_index_in_curr_level; - - FilePickerContext(int32_t left, int32_t right) - : search_left_bound(left), search_right_bound(right), - curr_index_in_curr_level(0), start_index_in_curr_level(0) {} - - FilePickerContext() = default; - }; - std::array fp_ctx_array_; - MultiGetRange* range_; - // Iterator to iterate through the keys in a MultiGet batch, that gets reset - // at the beginning of each level. Each call to GetNextFile() will position - // batch_iter_ at or right after the last key that was found in the returned - // SST file - MultiGetRange::Iterator batch_iter_; - // An iterator that records the previous position of batch_iter_, i.e last - // key found in the previous SST file, in order to serve as the start of - // the batch key range for the next SST file - MultiGetRange::Iterator batch_iter_prev_; - MultiGetRange::Iterator upper_key_; - bool maybe_repeat_key_; - MultiGetRange current_level_range_; - MultiGetRange current_file_range_; - autovector* level_files_brief_; - bool search_ended_; - bool is_hit_file_last_in_level_; - LevelFilesBrief* curr_file_level_; - FileIndexer* file_indexer_; - const Comparator* user_comparator_; - const InternalKeyComparator* internal_comparator_; - // Setup local variables to search next level. // Returns false if there are no more levels to search. bool PrepareNextLevel() { @@ -692,7 +754,7 @@ class FilePickerMultiGet { // are always compacted into a single entry). int32_t start_index = -1; current_level_range_ = - MultiGetRange(*range_, range_->begin(), range_->end()); + MultiGetRange(range_, range_.begin(), range_.end()); for (auto mget_iter = current_level_range_.begin(); mget_iter != current_level_range_.end(); ++mget_iter) { struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; @@ -754,7 +816,6 @@ class FilePickerMultiGet { return false; } }; -} // anonymous namespace VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } @@ -2190,151 +2251,162 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, iter->get_context = &(get_ctx[get_ctx_index]); } - MultiGetRange file_picker_range(*range, range->begin(), range->end()); - FilePickerMultiGet fp( - &file_picker_range, - &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, - &storage_info_.file_indexer_, user_comparator(), internal_comparator()); - FdWithKeyRange* f = fp.GetNextFileInLevel(); Status s; - uint64_t num_index_read = 0; - uint64_t num_filter_read = 0; - uint64_t num_sst_read = 0; - uint64_t num_level_read = 0; - - MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); // blob_file => [[blob_idx, it], ...] std::unordered_map blob_ctxs; - int prev_level = -1; - - while (!fp.IsSearchEnded()) { - // This will be set to true later if we actually look up in a file in L0. - // For per level stats purposes, an L0 file is treated as a level - bool dump_stats_for_l0_file = false; - - // Avoid using the coroutine version if we're looking in a L0 file, since - // L0 files won't be parallelized anyway. The regular synchronous version - // is faster. - if (!read_options.async_io || !using_coroutines() || - fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { - if (f) { - bool skip_filters = IsFilterSkipped( - static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); - // Call MultiGetFromSST for looking up a single file - s = MultiGetFromSST(read_options, fp.CurrentFileRange(), - fp.GetHitFileLevel(), skip_filters, - /*skip_range_deletions=*/false, f, blob_ctxs, - /*table_handle=*/nullptr, num_filter_read, - num_index_read, num_sst_read); - if (fp.GetHitFileLevel() == 0) { - dump_stats_for_l0_file = true; - } - } - if (s.ok()) { - f = fp.GetNextFileInLevel(); - } + MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); #if USE_COROUTINES - } else { - std::vector> mget_tasks; - while (f != nullptr) { - MultiGetRange file_range = fp.CurrentFileRange(); - Cache::Handle* table_handle = nullptr; - bool skip_filters = IsFilterSkipped( - static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); - bool skip_range_deletions = false; - if (!skip_filters) { - Status status = table_cache_->MultiGetFilter( - read_options, *internal_comparator(), *f->file_metadata, - mutable_cf_options_.prefix_extractor, - cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - fp.GetHitFileLevel(), &file_range, &table_handle); - skip_range_deletions = true; - if (status.ok()) { - skip_filters = true; - } else if (!status.IsNotSupported()) { - s = status; + if (read_options.async_io && read_options.optimize_multiget_for_io && + using_coroutines()) { + s = MultiGetAsync(read_options, range, &blob_ctxs); + } else +#endif // USE_COROUTINES + { + MultiGetRange file_picker_range(*range, range->begin(), range->end()); + FilePickerMultiGet fp(&file_picker_range, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), + internal_comparator()); + FdWithKeyRange* f = fp.GetNextFileInLevel(); + uint64_t num_index_read = 0; + uint64_t num_filter_read = 0; + uint64_t num_sst_read = 0; + uint64_t num_level_read = 0; + + int prev_level = -1; + + while (!fp.IsSearchEnded()) { + // This will be set to true later if we actually look up in a file in L0. + // For per level stats purposes, an L0 file is treated as a level + bool dump_stats_for_l0_file = false; + + // Avoid using the coroutine version if we're looking in a L0 file, since + // L0 files won't be parallelized anyway. The regular synchronous version + // is faster. + if (!read_options.async_io || !using_coroutines() || + fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { + if (f) { + bool skip_filters = + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + // Call MultiGetFromSST for looking up a single file + s = MultiGetFromSST(read_options, fp.CurrentFileRange(), + fp.GetHitFileLevel(), skip_filters, + /*skip_range_deletions=*/false, f, blob_ctxs, + /*table_handle=*/nullptr, num_filter_read, + num_index_read, num_sst_read); + if (fp.GetHitFileLevel() == 0) { + dump_stats_for_l0_file = true; } } - - if (!s.ok()) { - break; + if (s.ok()) { + f = fp.GetNextFileInLevel(); } +#if USE_COROUTINES + } else { + std::vector> mget_tasks; + while (f != nullptr) { + MultiGetRange file_range = fp.CurrentFileRange(); + Cache::Handle* table_handle = nullptr; + bool skip_filters = + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + bool skip_range_deletions = false; + if (!skip_filters) { + Status status = table_cache_->MultiGetFilter( + read_options, *internal_comparator(), *f->file_metadata, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + fp.GetHitFileLevel(), &file_range, &table_handle); + skip_range_deletions = true; + if (status.ok()) { + skip_filters = true; + } else if (!status.IsNotSupported()) { + s = status; + } + } - if (!file_range.empty()) { - mget_tasks.emplace_back(MultiGetFromSSTCoroutine( - read_options, file_range, fp.GetHitFileLevel(), skip_filters, - skip_range_deletions, f, blob_ctxs, table_handle, num_filter_read, - num_index_read, num_sst_read)); - } - if (fp.KeyMaySpanNextFile()) { - break; - } - f = fp.GetNextFileInLevel(); - } - if (s.ok() && mget_tasks.size() > 0) { - RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); - // Collect all results so far - std::vector statuses = folly::coro::blockingWait( - folly::coro::collectAllRange(std::move(mget_tasks)) - .scheduleOn(&range->context()->executor())); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; + if (!s.ok()) { + break; } - } - if (s.ok() && fp.KeyMaySpanNextFile()) { + if (!file_range.empty()) { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, file_range, fp.GetHitFileLevel(), skip_filters, + skip_range_deletions, f, blob_ctxs, table_handle, + num_filter_read, num_index_read, num_sst_read)); + } + if (fp.KeyMaySpanNextFile()) { + break; + } f = fp.GetNextFileInLevel(); } - } + if (s.ok() && mget_tasks.size() > 0) { + RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, + mget_tasks.size()); + // Collect all results so far + std::vector statuses = folly::coro::blockingWait( + folly::coro::collectAllRange(std::move(mget_tasks)) + .scheduleOn(&range->context()->executor())); + for (Status stat : statuses) { + if (!stat.ok()) { + s = stat; + } + } + + if (s.ok() && fp.KeyMaySpanNextFile()) { + f = fp.GetNextFileInLevel(); + } + } #endif // USE_COROUTINES - } - // If bad status or we found final result for all the keys - if (!s.ok() || file_picker_range.empty()) { - break; - } - if (!f) { - // Reached the end of this level. Prepare the next level - fp.PrepareNextLevelForSearch(); - if (!fp.IsSearchEnded()) { - // Its possible there is no overlap on this level and f is nullptr - f = fp.GetNextFileInLevel(); - } - if (dump_stats_for_l0_file || - (prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) { - // Dump the stats if the search has moved to the next level and - // reset for next level. - if (num_filter_read + num_index_read) { - RecordInHistogram(db_statistics_, - NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, - num_index_read + num_filter_read); + } + // If bad status or we found final result for all the keys + if (!s.ok() || file_picker_range.empty()) { + break; + } + if (!f) { + // Reached the end of this level. Prepare the next level + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + // Its possible there is no overlap on this level and f is nullptr + f = fp.GetNextFileInLevel(); } - if (num_sst_read) { - RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, - num_sst_read); - num_level_read++; + if (dump_stats_for_l0_file || + (prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) { + // Dump the stats if the search has moved to the next level and + // reset for next level. + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, + num_sst_read); + num_level_read++; + } + num_filter_read = 0; + num_index_read = 0; + num_sst_read = 0; } - num_filter_read = 0; - num_index_read = 0; - num_sst_read = 0; + prev_level = fp.GetHitFileLevel(); } - prev_level = fp.GetHitFileLevel(); } - } - // Dump stats for most recent level - if (num_filter_read + num_index_read) { - RecordInHistogram(db_statistics_, - NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, - num_index_read + num_filter_read); - } - if (num_sst_read) { - RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); - num_level_read++; - } - if (num_level_read) { - RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET, - num_level_read); + // Dump stats for most recent level + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); + num_level_read++; + } + if (num_level_read) { + RecordInHistogram(db_statistics_, NUM_LEVEL_READ_PER_MULTIGET, + num_level_read); + } } if (s.ok() && !blob_ctxs.empty()) { @@ -2386,6 +2458,201 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } } +#ifdef USE_COROUTINES +Status Version::ProcessBatch( + const ReadOptions& read_options, FilePickerMultiGet* batch, + std::vector>& mget_tasks, + std::unordered_map* blob_ctxs, + autovector& batches, std::deque& waiting, + std::deque& to_process, unsigned int& num_tasks_queued, + uint64_t& num_filter_read, uint64_t& num_index_read, + uint64_t& num_sst_read) { + FilePickerMultiGet& fp = *batch; + MultiGetRange range = fp.GetRange(); + // Initialize a new empty range. Any keys that are not in this level will + // eventually become part of the new range. + MultiGetRange leftover(range, range.begin(), range.begin()); + FdWithKeyRange* f = nullptr; + Status s; + + f = fp.GetNextFileInLevel(); + while (!f) { + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + f = fp.GetNextFileInLevel(); + } else { + break; + } + } + while (f) { + MultiGetRange file_range = fp.CurrentFileRange(); + Cache::Handle* table_handle = nullptr; + bool skip_filters = IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()); + bool skip_range_deletions = false; + if (!skip_filters) { + Status status = table_cache_->MultiGetFilter( + read_options, *internal_comparator(), *f->file_metadata, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + fp.GetHitFileLevel(), &file_range, &table_handle); + if (status.ok()) { + skip_filters = true; + skip_range_deletions = true; + } else if (!status.IsNotSupported()) { + s = status; + } + } + if (!s.ok()) { + break; + } + // At this point, file_range contains any keys that are likely in this + // file. It may have false positives, but that's ok since higher level + // lookups for the key are dependent on this lookup anyway. + // Add the complement of file_range to leftover. That's the set of keys + // definitely not in this level. + // Subtract the complement of file_range from range, since they will be + // processed in a separate batch in parallel. + leftover += ~file_range; + range -= ~file_range; + if (!file_range.empty()) { + if (waiting.empty() && to_process.empty() && + !fp.RemainingOverlapInLevel() && leftover.empty() && + mget_tasks.empty()) { + // All keys are in one SST file, so take the fast path + s = MultiGetFromSST(read_options, file_range, fp.GetHitFileLevel(), + skip_filters, skip_range_deletions, f, *blob_ctxs, + table_handle, num_filter_read, num_index_read, + num_sst_read); + } else { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, file_range, fp.GetHitFileLevel(), skip_filters, + skip_range_deletions, f, *blob_ctxs, table_handle, num_filter_read, + num_index_read, num_sst_read)); + ++num_tasks_queued; + } + } + if (fp.KeyMaySpanNextFile() && !file_range.empty()) { + break; + } + f = fp.GetNextFileInLevel(); + } + // Split the current batch only if some keys are likely in this level and + // some are not. + if (s.ok() && !leftover.empty() && !range.empty()) { + fp.ReplaceRange(range); + batches.emplace_back(&leftover, fp); + to_process.emplace_back(batches.size() - 1); + } + // 1. If f is non-null, that means we might not be done with this level. + // This can happen if one of the keys is the last key in the file, i.e + // fp.KeyMaySpanNextFile() is true. + // 2. If range is empty, then we're done with this range and no need to + // prepare the next level + // 3. If some tasks were queued for this range, then the next level will be + // prepared after executing those tasks + if (!f && !range.empty() && !num_tasks_queued) { + fp.PrepareNextLevelForSearch(); + } + return s; +} + +Status Version::MultiGetAsync( + const ReadOptions& options, MultiGetRange* range, + std::unordered_map* blob_ctxs) { + autovector batches; + std::deque waiting; + std::deque to_process; + Status s; + std::vector> mget_tasks; + uint64_t num_filter_read = 0; + uint64_t num_index_read = 0; + uint64_t num_sst_read = 0; + + // Create the initial batch with the input range + batches.emplace_back(range, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), + internal_comparator()); + to_process.emplace_back(0); + + while (!to_process.empty()) { + size_t idx = to_process.front(); + FilePickerMultiGet* batch = &batches.at(idx); + unsigned int num_tasks_queued = 0; + to_process.pop_front(); + if (batch->IsSearchEnded() || batch->GetRange().empty()) { + if (!to_process.empty()) { + continue; + } + } else { + // Look through one level. This may split the batch and enqueue it to + // to_process + s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting, + to_process, num_tasks_queued, num_filter_read, + num_index_read, num_sst_read); + if (!s.ok()) { + break; + } + // Dump the stats since the search has moved to the next level + if (num_filter_read + num_index_read) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + } + if (num_sst_read) { + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); + } + // If ProcessBatch didn't enqueue any coroutine tasks, it means all + // keys were filtered out. So put the batch back in to_process to + // lookup in the next level + if (!num_tasks_queued && !batch->IsSearchEnded()) { + // Put this back in the processing queue + to_process.emplace_back(idx); + } else if (num_tasks_queued) { + waiting.emplace_back(idx); + } + } + if (to_process.empty()) { + if (s.ok() && mget_tasks.size() > 0) { + assert(waiting.size()); + RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); + // Collect all results so far + std::vector statuses = folly::coro::blockingWait( + folly::coro::collectAllRange(std::move(mget_tasks)) + .scheduleOn(&range->context()->executor())); + for (Status stat : statuses) { + if (!stat.ok()) { + s = stat; + break; + } + } + + if (!s.ok()) { + break; + } + + for (size_t wait_idx : waiting) { + FilePickerMultiGet& fp = batches.at(wait_idx); + // 1. If fp.GetHitFile() is non-null, then there could be more + // overlap in this level. So skip preparing next level. + // 2. If fp.GetRange() is empty, then this batch is completed + // and no need to prepare the next level. + if (!fp.GetHitFile() && !fp.GetRange().empty()) { + fp.PrepareNextLevelForSearch(); + } + } + to_process.swap(waiting); + } else { + assert(!s.ok() || waiting.size() == 0); + } + } + } + + return s; +} +#endif + bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. diff --git a/db/version_set.h b/db/version_set.h index c164d8e4c..90257b434 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -46,6 +46,10 @@ #include "db/version_edit.h" #include "db/write_controller.h" #include "env/file_system_tracer.h" +#if USE_COROUTINES +#include "folly/experimental/coro/BlockingWait.h" +#include "folly/experimental/coro/Collect.h" +#endif #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" #include "port/port.h" @@ -54,6 +58,7 @@ #include "table/get_context.h" #include "table/multiget_context.h" #include "trace_replay/block_cache_tracer.h" +#include "util/autovector.h" #include "util/coro_utils.h" #include "util/hash_containers.h" @@ -76,6 +81,7 @@ class ColumnFamilySet; class MergeIteratorBuilder; class SystemClock; class ManifestTailer; +class FilePickerMultiGet; // VersionEdit is always supposed to be valid and it is used to point at // entries in Manifest. Ideally it should not be used as a container to @@ -997,6 +1003,28 @@ class Version { Cache::Handle* table_handle, uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read); +#ifdef USE_COROUTINES + // MultiGet using async IO to read data blocks from SST files in parallel + // within and across levels + Status MultiGetAsync( + const ReadOptions& options, MultiGetRange* range, + std::unordered_map* blob_ctxs); + + // A helper function to lookup a batch of keys in a single level. It will + // queue coroutine tasks to mget_tasks. It may also split the input batch + // by creating a new batch with keys definitely not in this level and + // enqueuing it to to_process. + Status ProcessBatch(const ReadOptions& read_options, + FilePickerMultiGet* batch, + std::vector>& mget_tasks, + std::unordered_map* blob_ctxs, + autovector& batches, + std::deque& waiting, + std::deque& to_process, + unsigned int& num_tasks_queued, uint64_t& num_filter_read, + uint64_t& num_index_read, uint64_t& num_sst_read); +#endif + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; Statistics* db_statistics_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b0d21f62c..0338c7697 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1686,6 +1686,17 @@ struct ReadOptions { // Default: false bool async_io; + // Experimental + // + // If async_io is set, then this flag controls whether we read SST files + // in multiple levels asynchronously. Enabling this flag can help reduce + // MultiGet latency by maximizing the number of SST files read in + // parallel if the keys in the MultiGet batch are in different levels. It + // comes at the expense of slightly higher CPU overhead. + // + // Default: false + bool optimize_multiget_for_io; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 6dff5e62f..6f5691515 100644 --- a/options/options.cc +++ b/options/options.cc @@ -696,7 +696,8 @@ ReadOptions::ReadOptions() io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), - async_io(false) {} + async_io(false), + optimize_multiget_for_io(false) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -721,6 +722,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), - async_io(false) {} + async_io(false), + optimize_multiget_for_io(false) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/multiget_context.h b/table/multiget_context.h index 188681480..933f4e17d 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -199,17 +199,24 @@ class MultiGetContext { : range_(range), ctx_(range->ctx_), index_(idx) { while (index_ < range_->end_ && (Mask{1} << index_) & - (range_->ctx_->value_mask_ | range_->skip_mask_)) + (range_->ctx_->value_mask_ | range_->skip_mask_ | + range_->invalid_mask_)) index_++; } Iterator(const Iterator&) = default; + + Iterator(const Iterator& other, const Range* range) + : range_(range), ctx_(other.ctx_), index_(other.index_) { + assert(range->ctx_ == other.ctx_); + } Iterator& operator=(const Iterator&) = default; Iterator& operator++() { while (++index_ < range_->end_ && (Mask{1} << index_) & - (range_->ctx_->value_mask_ | range_->skip_mask_)) + (range_->ctx_->value_mask_ | range_->skip_mask_ | + range_->invalid_mask_)) ; return *this; } @@ -247,9 +254,17 @@ class MultiGetContext { const Iterator& first, const Iterator& last) { ctx_ = mget_range.ctx_; - start_ = first.index_; - end_ = last.index_; + if (first == last) { + // This means create an empty range based on mget_range. So just + // set start_ and and_ to the same value + start_ = mget_range.start_; + end_ = start_; + } else { + start_ = first.index_; + end_ = last.index_; + } skip_mask_ = mget_range.skip_mask_; + invalid_mask_ = mget_range.invalid_mask_; assert(start_ < 64); assert(end_ < 64); } @@ -305,18 +320,67 @@ class MultiGetContext { } } + // The += operator expands the number of keys in this range. The expansion + // is always to the right, i.e start of the additional range >= end of + // current range. There should be no overlap. Any skipped keys in rhs are + // marked as invalid in the invalid_mask_. + Range& operator+=(const Range& rhs) { + assert(rhs.start_ >= end_); + // Check for non-overlapping ranges and adjust invalid_mask_ accordingly + if (end_ < rhs.start_) { + invalid_mask_ |= RangeMask(end_, rhs.start_); + skip_mask_ |= RangeMask(end_, rhs.start_); + } + start_ = std::min(start_, rhs.start_); + end_ = std::max(end_, rhs.end_); + skip_mask_ |= rhs.skip_mask_ & RangeMask(rhs.start_, rhs.end_); + invalid_mask_ |= (rhs.invalid_mask_ | rhs.skip_mask_) & + RangeMask(rhs.start_, rhs.end_); + assert(start_ < 64); + assert(end_ < 64); + return *this; + } + + // The -= operator removes keys from this range. The removed keys should + // come from a range completely overlapping the current range. The removed + // keys are marked invalid in the invalid_mask_. + Range& operator-=(const Range& rhs) { + assert(start_ <= rhs.start_ && end_ >= rhs.end_); + skip_mask_ |= (~rhs.skip_mask_ | rhs.invalid_mask_) & + RangeMask(rhs.start_, rhs.end_); + invalid_mask_ |= (~rhs.skip_mask_ | rhs.invalid_mask_) & + RangeMask(rhs.start_, rhs.end_); + return *this; + } + + // Return a complement of the current range + Range operator~() { + Range res = *this; + res.skip_mask_ = ~skip_mask_ & RangeMask(start_, end_); + return res; + } + private: friend MultiGetContext; MultiGetContext* ctx_; size_t start_; size_t end_; Mask skip_mask_; + Mask invalid_mask_; Range(MultiGetContext* ctx, size_t num_keys) - : ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) { + : ctx_(ctx), + start_(0), + end_(num_keys), + skip_mask_(0), + invalid_mask_(0) { assert(num_keys < 64); } + static Mask RangeMask(size_t start, size_t end) { + return (((Mask{1} << (end - start)) - 1) << start); + } + Mask RemainingMask() const { return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) & ~(ctx_->value_mask_ | skip_mask_)); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index cd7596e31..ddee9c057 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1177,6 +1177,10 @@ DEFINE_bool(async_io, false, "When set true, RocksDB does asynchronous reads for internal auto " "readahead prefetching."); +DEFINE_bool(optimize_multiget_for_io, true, + "When set true, RocksDB does asynchronous reads for SST files in " + "multiple levels for MultiGet."); + DEFINE_bool(charge_compression_dictionary_building_buffer, false, "Setting for " "CacheEntryRoleOptions::charged of " @@ -3364,6 +3368,7 @@ class Benchmark { read_options_.readahead_size = FLAGS_readahead_size; read_options_.adaptive_readahead = FLAGS_adaptive_readahead; read_options_.async_io = FLAGS_async_io; + read_options_.optimize_multiget_for_io = FLAGS_optimize_multiget_for_io; void (Benchmark::*method)(ThreadState*) = nullptr; void (Benchmark::*post_process_method)() = nullptr;