From bf4532eb5c52646f022f935a20923738c44cf1d0 Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 4 Aug 2022 12:51:57 -0700 Subject: [PATCH] Break TableReader MultiGet into filter and lookup stages (#10432) Summary: This PR is the first step in enhancing the coroutines MultiGet to be able to lookup a batch in parallel across levels. By having a separate TableReader function for probing the bloom filters, we can quickly figure out which overlapping keys from a batch are definitely not in the file and can move on to the next level. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10432 Reviewed By: akankshamahajan15 Differential Revision: D38245910 Pulled By: anand1976 fbshipit-source-id: 3d20db2350378c3fe6f086f0c7ba5ff01d7f04de --- db/db_basic_test.cc | 28 +++++++++++++ db/table_cache.cc | 42 +++++++++++++++++++ db/table_cache.h | 15 ++++++- db/table_cache_sync_and_async.h | 9 +++- db/version_set.cc | 41 ++++++++++++++---- db/version_set.h | 6 +-- db/version_set_sync_and_async.h | 11 +++-- table/block_based/block_based_table_reader.cc | 30 +++++++++++++ table/block_based/block_based_table_reader.h | 4 ++ table/table_reader.h | 9 ++++ 10 files changed, 175 insertions(+), 20 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index bfdf9880b..1acf64da6 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2338,6 +2338,34 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.max, 2); } + +TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // 19 and 26 are in L2, but overlap with L0 and L1 file ranges + key_strs.push_back(Key(19)); + key_strs.push_back(Key(26)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 2); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(values[0], "val_l2_" + std::to_string(19)); + ASSERT_EQ(values[1], "val_l2_" + std::to_string(26)); + + // Bloom filters in L0/L1 will avoid the coroutine calls in those levels + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); +} #endif // USE_COROUTINES TEST_F(DBBasicTest, MultiGetStats) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 3356d3b19..8265575d4 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -501,6 +501,48 @@ Status TableCache::Get( return s; } +Status TableCache::MultiGetFilter( + const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const std::shared_ptr& prefix_extractor, + HistogramImpl* file_read_hist, int level, + MultiGetContext::Range* mget_range, Cache::Handle** table_handle) { + auto& fd = file_meta.fd; +#ifndef ROCKSDB_LITE + IterKey row_cache_key; + std::string row_cache_entry_buffer; + + // Check if we need to use the row cache. If yes, then we cannot do the + // filtering here, since the filtering needs to happen after the row cache + // lookup. + KeyContext& first_key = *mget_range->begin(); + if (ioptions_.row_cache && !first_key.get_context->NeedToReadSequence()) { + return Status::NotSupported(); + } +#endif // ROCKSDB_LITE + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (t == nullptr) { + s = FindTable( + options, file_options_, internal_comparator, fd, &handle, + prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, /*skip_filters=*/false, + level, true /* prefetch_index_and_filter_in_cache */, + /*max_file_size_for_l0_meta_pin=*/0, file_meta.temperature); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + } + *table_handle = handle; + } + if (s.ok()) { + s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range); + } + + return s; +} + Status TableCache::GetTableProperties( const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, diff --git a/db/table_cache.h b/db/table_cache.h index d7a05200f..9c9342309 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -107,6 +107,19 @@ class TableCache { const FileMetaData& file_meta, std::unique_ptr* out_iter); + // Call table reader's MultiGetFilter to use the bloom filter to filter out + // keys. Returns Status::NotSupported() if row cache needs to be checked. + // If the table cache is looked up to get the table reader, the cache handle + // is returned in table_handle. This handle should be passed back to + // MultiGet() so it can be released. + Status MultiGetFilter( + const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const std::shared_ptr& prefix_extractor, + HistogramImpl* file_read_hist, int level, + MultiGetContext::Range* mget_range, Cache::Handle** table_handle); + // If a seek to internal key "k" in specified file finds an entry, // call get_context->SaveValue() repeatedly until // it returns false. As a side effect, it will insert the TableReader @@ -122,7 +135,7 @@ class TableCache { const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, const std::shared_ptr& prefix_extractor = nullptr, HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, - int level = -1); + int level = -1, Cache::Handle* table_handle = nullptr); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); diff --git a/db/table_cache_sync_and_async.h b/db/table_cache_sync_and_async.h index c6ae5f9b7..77bcd8447 100644 --- a/db/table_cache_sync_and_async.h +++ b/db/table_cache_sync_and_async.h @@ -17,13 +17,17 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet) (const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, const std::shared_ptr& prefix_extractor, - HistogramImpl* file_read_hist, bool skip_filters, int level) { + HistogramImpl* file_read_hist, bool skip_filters, int level, + Cache::Handle* table_handle) { auto& fd = file_meta.fd; Status s; TableReader* t = fd.table_reader; - Cache::Handle* handle = nullptr; + Cache::Handle* handle = table_handle; MultiGetRange table_range(*mget_range, mget_range->begin(), mget_range->end()); + if (handle != nullptr && t == nullptr) { + t = GetTableReaderFromHandle(handle); + } #ifndef ROCKSDB_LITE autovector row_cache_entries; IterKey row_cache_key; @@ -61,6 +65,7 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet) // found in the row cache and thus the range may now be empty if (s.ok() && !table_range.empty()) { if (t == nullptr) { + assert(handle == nullptr); s = FindTable(options, file_options_, internal_comparator, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, diff --git a/db/version_set.cc b/db/version_set.cc index 720c05abc..74040270f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2215,11 +2215,13 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, if (!read_options.async_io || !using_coroutines() || fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { if (f) { + bool skip_filters = IsFilterSkipped( + static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); // Call MultiGetFromSST for looking up a single file s = MultiGetFromSST(read_options, fp.CurrentFileRange(), - fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f, - blob_ctxs, num_filter_read, num_index_read, - num_sst_read); + fp.GetHitFileLevel(), skip_filters, f, blob_ctxs, + /*table_handle=*/nullptr, num_filter_read, + num_index_read, num_sst_read); if (fp.GetHitFileLevel() == 0) { dump_stats_for_l0_file = true; } @@ -2231,16 +2233,39 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } else { std::vector> mget_tasks; while (f != nullptr) { - mget_tasks.emplace_back(MultiGetFromSSTCoroutine( - read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(), - fp.IsHitFileLastInLevel(), f, blob_ctxs, num_filter_read, - num_index_read, num_sst_read)); + MultiGetRange file_range = fp.CurrentFileRange(); + Cache::Handle* table_handle = nullptr; + bool skip_filters = IsFilterSkipped( + static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); + if (!skip_filters) { + Status status = table_cache_->MultiGetFilter( + read_options, *internal_comparator(), *f->file_metadata, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + fp.GetHitFileLevel(), &file_range, &table_handle); + if (status.ok()) { + skip_filters = true; + } else if (!status.IsNotSupported()) { + s = status; + } + } + + if (!s.ok()) { + break; + } + + if (!file_range.empty()) { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, file_range, fp.GetHitFileLevel(), skip_filters, f, + blob_ctxs, table_handle, num_filter_read, num_index_read, + num_sst_read)); + } if (fp.KeyMaySpanNextFile()) { break; } f = fp.GetNextFileInLevel(); } - if (mget_tasks.size() > 0) { + if (s.ok() && mget_tasks.size() > 0) { RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); // Collect all results so far std::vector statuses = folly::coro::blockingWait( diff --git a/db/version_set.h b/db/version_set.h index cb8e6ed07..2b967c1c7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -990,10 +990,10 @@ class Version { DECLARE_SYNC_AND_ASYNC( /* ret_type */ Status, /* func_name */ MultiGetFromSST, const ReadOptions& read_options, MultiGetRange file_range, - int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, + int hit_file_level, bool skip_filters, FdWithKeyRange* f, std::unordered_map& blob_ctxs, - uint64_t& num_filter_read, uint64_t& num_index_read, - uint64_t& num_sst_read); + Cache::Handle* table_handle, uint64_t& num_filter_read, + uint64_t& num_index_read, uint64_t& num_sst_read); ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h index af281d415..33ddcffe8 100644 --- a/db/version_set_sync_and_async.h +++ b/db/version_set_sync_and_async.h @@ -13,9 +13,10 @@ namespace ROCKSDB_NAMESPACE { // Lookup a batch of keys in a single SST file DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, - bool is_hit_file_last_in_level, FdWithKeyRange* f, + bool skip_filters, FdWithKeyRange* f, std::unordered_map& blob_ctxs, - uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read) { + Cache::Handle* table_handle, uint64_t& num_filter_read, + uint64_t& num_index_read, uint64_t& num_sst_read) { bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && get_perf_context()->per_level_perf_context_enabled; @@ -24,10 +25,8 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) s = CO_AWAIT(table_cache_->MultiGet)( read_options, *internal_comparator(), *f->file_metadata, &file_range, mutable_cf_options_.prefix_extractor, - cfd_->internal_stats()->GetFileReadHist(hit_file_level), - IsFilterSkipped(static_cast(hit_file_level), - is_hit_file_last_in_level), - hit_file_level); + cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters, + hit_file_level, table_handle); // TODO: examine the behavior for corrupted key if (timer_enabled) { PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 024932a70..4d53997ba 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2273,6 +2273,36 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } +Status BlockBasedTable::MultiGetFilter(const ReadOptions& read_options, + const SliceTransform* prefix_extractor, + MultiGetRange* mget_range) { + if (mget_range->empty()) { + // Caller should ensure non-empty (performance bug) + assert(false); + return Status::OK(); // Nothing to do + } + + FilterBlockReader* const filter = rep_->filter.get(); + if (!filter) { + return Status::OK(); + } + + // First check the full filter + // If full filter not useful, Then go into each block + const bool no_io = read_options.read_tier == kBlockCacheTier; + uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId; + if (mget_range->begin()->get_context) { + tracing_mget_id = mget_range->begin()->get_context->get_tracing_get_id(); + } + BlockCacheLookupContext lookup_context{ + TableReaderCaller::kUserMultiGet, tracing_mget_id, + /*_get_from_user_specified_snapshot=*/read_options.snapshot != nullptr}; + FullFilterKeysMayMatch(filter, mget_range, no_io, prefix_extractor, + &lookup_context, read_options.rate_limiter_priority); + + return Status::OK(); +} + Status BlockBasedTable::Prefetch(const Slice* const begin, const Slice* const end) { auto& comparator = rep_->internal_comparator; diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index dc362b2f3..fee26f3c7 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -141,6 +141,10 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; + Status MultiGetFilter(const ReadOptions& read_options, + const SliceTransform* prefix_extractor, + MultiGetRange* mget_range) override; + DECLARE_SYNC_AND_ASYNC_OVERRIDE(void, MultiGet, const ReadOptions& readOptions, const MultiGetContext::Range* mget_range, diff --git a/table/table_reader.h b/table/table_reader.h index 4b4902f12..5374edf7c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -128,6 +128,15 @@ class TableReader { const SliceTransform* prefix_extractor, bool skip_filters = false) = 0; + // Use bloom filters in the table file, if present, to filter out keys. The + // mget_range will be updated to skip keys that get a negative result from + // the filter lookup. + virtual Status MultiGetFilter(const ReadOptions& /*readOptions*/, + const SliceTransform* /*prefix_extractor*/, + MultiGetContext::Range* /*mget_range*/) { + return Status::NotSupported(); + } + virtual void MultiGet(const ReadOptions& readOptions, const MultiGetContext::Range* mget_range, const SliceTransform* prefix_extractor,