From 02db03af8db7af03b8b026bc40d86e04ac741958 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 9 Jun 2020 16:49:07 -0700 Subject: [PATCH] make L0 index/filter pinned memory usage predictable (#6911) Summary: Memory pinned by `pin_l0_filter_and_index_blocks_in_cache` needs to be predictable based on user config. This PR makes sure we do not pin extra memory for large files generated by intra-L0 (see https://github.com/facebook/rocksdb/issues/6889). Pull Request resolved: https://github.com/facebook/rocksdb/pull/6911 Test Plan: unit test Reviewed By: siying Differential Revision: D21835818 Pulled By: ajkr fbshipit-source-id: a11a088549d06bed8aacc2548d266e5983f0ead4 --- HISTORY.md | 1 + db/builder.cc | 4 +- db/compaction/compaction_job.cc | 2 + db/db_compaction_test.cc | 20 ++++++++++ db/forward_iterator.cc | 16 ++++---- db/repair.cc | 3 +- db/table_cache.cc | 30 ++++++++------ db/table_cache.h | 11 ++++-- db/version_builder.cc | 14 ++++--- db/version_builder.h | 3 +- db/version_edit_handler.cc | 3 +- db/version_set.cc | 39 +++++++++++-------- db/version_set.h | 2 + options/cf_options.cc | 11 ++++++ options/cf_options.h | 5 +++ .../block_based/block_based_table_factory.cc | 3 +- table/block_based/block_based_table_reader.cc | 12 ++++-- table/block_based/block_based_table_reader.h | 4 +- table/table_builder.h | 15 +++++-- table/table_test.cc | 3 +- 20 files changed, 141 insertions(+), 60 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1ed89a76a..7d485a276 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Behavior Changes * Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug. +* `pin_l0_filter_and_index_blocks_in_cache` no longer applies to L0 files larger than `1.5 * write_buffer_size` to give more predictable memory usage. Such L0 files may exist due to intra-L0 compaction, external file ingestion, or user dynamically changing `write_buffer_size` (note, however, that files that are already pinned will continue being pinned, even after such a dynamic change). ### Bug Fixes * Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true. diff --git a/db/builder.cc b/db/builder.cc index 8568f8139..309a10ff7 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -239,7 +239,9 @@ Status BuildTable( (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), TableReaderCaller::kFlush, /*arena=*/nullptr, - /*skip_filter=*/false, level, /*smallest_compaction_key=*/nullptr, + /*skip_filter=*/false, level, + MaxFileSizeForL0MetaPin(mutable_cf_options), + /*smallest_compaction_key=*/nullptr, /*largest_compaction_key*/ nullptr, /*allow_unprepared_value*/ false)); s = it->status(); diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index b9d3fa313..aabff4fd6 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -654,6 +654,8 @@ Status CompactionJob::Run() { compact_->compaction->output_level()), TableReaderCaller::kCompactionRefill, /*arena=*/nullptr, /*skip_filters=*/false, compact_->compaction->output_level(), + MaxFileSizeForL0MetaPin( + *compact_->compaction->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index da5e46600..c1ca17b1d 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3184,6 +3184,15 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) { options.level0_file_num_compaction_trigger = 5; options.max_background_compactions = 2; options.max_subcompactions = max_subcompactions_; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.write_buffer_size = 2 << 20; // 2MB + + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(64 << 20); // 64MB + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + DestroyAndReopen(options); const size_t kValueSize = 1 << 20; @@ -3214,6 +3223,7 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) { ASSERT_OK(Put(Key(i + 1), value)); } ASSERT_OK(Flush()); + ASSERT_EQ(i + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); } dbfull()->TEST_WaitForCompact(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); @@ -3228,6 +3238,16 @@ TEST_P(DBCompactionTestWithParam, IntraL0Compaction) { for (int i = 0; i < 2; ++i) { ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21); } + + // The index/filter in the file produced by intra-L0 should not be pinned. + // That means clearing unref'd entries in block cache and re-accessing the + // file produced by intra-L0 should bump the index block miss count. + uint64_t prev_index_misses = + TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); + table_options.block_cache->EraseUnRefEntries(); + ASSERT_EQ("", Get(Key(0))); + ASSERT_EQ(prev_index_misses + 1, + TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); } TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 156a23a45..bec48376e 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -84,9 +84,9 @@ class ForwardLevelIterator : public InternalIterator { prefix_extractor_, /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, + /*max_file_size_for_l0_meta_pin=*/0, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, - allow_unprepared_value_); + /*largest_compaction_key=*/nullptr, allow_unprepared_value_); file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); valid_ = false; if (!range_del_agg.IsEmpty()) { @@ -686,9 +686,9 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, + MaxFileSizeForL0MetaPin(sv_->mutable_cf_options), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, - allow_unprepared_value_)); + /*largest_compaction_key=*/nullptr, allow_unprepared_value_)); } BuildLevelIterators(vstorage); current_ = nullptr; @@ -764,9 +764,9 @@ void ForwardIterator::RenewIterators() { /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, + MaxFileSizeForL0MetaPin(svnew->mutable_cf_options), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, - allow_unprepared_value_)); + /*largest_compaction_key=*/nullptr, allow_unprepared_value_)); } for (auto* f : l0_iters_) { @@ -830,9 +830,9 @@ void ForwardIterator::ResetIncompleteIterators() { /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, + MaxFileSizeForL0MetaPin(sv_->mutable_cf_options), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, - allow_unprepared_value_); + /*largest_compaction_key=*/nullptr, allow_unprepared_value_); l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); } diff --git a/db/repair.cc b/db/repair.cc index e964f6884..f82db9fbb 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -528,7 +528,8 @@ class Repairer { cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false, - /*level=*/-1, /*smallest_compaction_key=*/nullptr, + /*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0, + /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false); ParsedInternalKey parsed; diff --git a/db/table_cache.cc b/db/table_cache.cc index d4b0c6518..6b29349f3 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -97,7 +97,8 @@ Status TableCache::GetTableReader( bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, const SliceTransform* prefix_extractor, bool skip_filters, int level, - bool prefetch_index_and_filter_in_cache) { + bool prefetch_index_and_filter_in_cache, + size_t max_file_size_for_l0_meta_pin) { std::string fname = TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId()); std::unique_ptr file; @@ -124,7 +125,8 @@ Status TableCache::GetTableReader( TableReaderOptions(ioptions_, prefix_extractor, file_options, internal_comparator, skip_filters, immortal_tables_, false /* force_direct_prefetch */, level, - fd.largest_seqno, block_cache_tracer_), + fd.largest_seqno, block_cache_tracer_, + max_file_size_for_l0_meta_pin), std::move(file_reader), fd.GetFileSize(), table_reader, prefetch_index_and_filter_in_cache); TEST_SYNC_POINT("TableCache::GetTableReader:0"); @@ -145,8 +147,8 @@ Status TableCache::FindTable(const FileOptions& file_options, const SliceTransform* prefix_extractor, const bool no_io, bool record_read_stats, HistogramImpl* file_read_hist, bool skip_filters, - int level, - bool prefetch_index_and_filter_in_cache) { + int level, bool prefetch_index_and_filter_in_cache, + size_t max_file_size_for_l0_meta_pin) { PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env); Status s; uint64_t number = fd.GetNumber(); @@ -170,7 +172,8 @@ Status TableCache::FindTable(const FileOptions& file_options, s = GetTableReader(file_options, internal_comparator, fd, false /* sequential mode */, record_read_stats, file_read_hist, &table_reader, prefix_extractor, - skip_filters, level, prefetch_index_and_filter_in_cache); + skip_filters, level, prefetch_index_and_filter_in_cache, + max_file_size_for_l0_meta_pin); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -194,6 +197,7 @@ InternalIterator* TableCache::NewIterator( RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena, bool skip_filters, int level, + size_t max_file_size_for_l0_meta_pin, const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value) { PERF_TIMER_GUARD(new_table_iterator_nanos); @@ -211,7 +215,9 @@ InternalIterator* TableCache::NewIterator( s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, !for_compaction /* record_read_stats */, file_read_hist, - skip_filters, level); + skip_filters, level, + true /* prefetch_index_and_filter_in_cache */, + max_file_size_for_l0_meta_pin); if (s.ok()) { table_reader = GetTableReaderFromHandle(handle); } @@ -372,7 +378,7 @@ Status TableCache::Get(const ReadOptions& options, GetContext* get_context, const SliceTransform* prefix_extractor, HistogramImpl* file_read_hist, bool skip_filters, - int level) { + int level, size_t max_file_size_for_l0_meta_pin) { auto& fd = file_meta.fd; std::string* row_cache_entry = nullptr; bool done = false; @@ -397,10 +403,12 @@ Status TableCache::Get(const ReadOptions& options, Cache::Handle* handle = nullptr; if (!done && s.ok()) { if (t == nullptr) { - s = FindTable( - file_options_, internal_comparator, fd, &handle, prefix_extractor, - options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters, level); + s = FindTable(file_options_, internal_comparator, fd, &handle, + prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level, true /* prefetch_index_and_filter_in_cache */, + max_file_size_for_l0_meta_pin); if (s.ok()) { t = GetTableReaderFromHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index 71724808e..35b432c6b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -72,7 +72,8 @@ class TableCache { const FileMetaData& file_meta, RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena, - bool skip_filters, int level, const InternalKey* smallest_compaction_key, + bool skip_filters, int level, size_t max_file_size_for_l0_meta_pin, + const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value); // If a seek to internal key "k" in specified file finds an entry, @@ -91,7 +92,7 @@ class TableCache { GetContext* get_context, const SliceTransform* prefix_extractor = nullptr, HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, - int level = -1); + int level = -1, size_t max_file_size_for_l0_meta_pin = 0); // Return the range delete tombstone iterator of the file specified by // `file_meta`. @@ -135,7 +136,8 @@ class TableCache { const bool no_io = false, bool record_read_stats = true, HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, int level = -1, - bool prefetch_index_and_filter_in_cache = true); + bool prefetch_index_and_filter_in_cache = true, + size_t max_file_size_for_l0_meta_pin = 0); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); @@ -200,7 +202,8 @@ class TableCache { std::unique_ptr* table_reader, const SliceTransform* prefix_extractor = nullptr, bool skip_filters = false, int level = -1, - bool prefetch_index_and_filter_in_cache = true); + bool prefetch_index_and_filter_in_cache = true, + size_t max_file_size_for_l0_meta_pin = 0); // Create a key prefix for looking up the row cache. The prefix is of the // format row_cache_id + fd_number + seq_no. Later, the user key can be diff --git a/db/version_builder.cc b/db/version_builder.cc index 6a98ba864..d614ee8f9 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -754,7 +754,8 @@ class VersionBuilder::Rep { Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, - const SliceTransform* prefix_extractor) { + const SliceTransform* prefix_extractor, + size_t max_file_size_for_l0_meta_pin) { assert(table_cache_ != nullptr); size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); @@ -823,7 +824,7 @@ class VersionBuilder::Rep { file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, internal_stats->GetFileReadHist(level), false, level, - prefetch_index_and_filter_in_cache); + prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin); if (file_meta->table_reader_handle != nullptr) { // Load table_reader file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( @@ -882,10 +883,11 @@ Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { Status VersionBuilder::LoadTableHandlers( InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, - const SliceTransform* prefix_extractor) { - return rep_->LoadTableHandlers(internal_stats, max_threads, - prefetch_index_and_filter_in_cache, - is_initial_load, prefix_extractor); + const SliceTransform* prefix_extractor, + size_t max_file_size_for_l0_meta_pin) { + return rep_->LoadTableHandlers( + internal_stats, max_threads, prefetch_index_and_filter_in_cache, + is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin); } BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( diff --git a/db/version_builder.h b/db/version_builder.h index fc7abe888..a4e1c0d63 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -42,7 +42,8 @@ class VersionBuilder { Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, - const SliceTransform* prefix_extractor); + const SliceTransform* prefix_extractor, + size_t max_file_size_for_l0_meta_pin); private: class Rep; diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 3d03614c4..62296e76f 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -414,7 +414,8 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, cfd->internal_stats(), version_set_->db_options_->max_file_opening_threads, prefetch_index_and_filter_in_cache, is_initial_load, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); if ((s.IsPathNotFound() || s.IsCorruption()) && no_error_if_table_files_missing_) { s = Status::OK(); diff --git a/db/version_set.cc b/db/version_set.cc index ce69afe34..732fb175c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1004,7 +1004,8 @@ class LevelIterator final : public InternalIterator { read_options_, file_options_, icomparator_, *file_meta.file_metadata, range_del_agg_, prefix_extractor_, nullptr /* don't need reference to table */, file_read_hist_, caller_, - /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key, + /*arena=*/nullptr, skip_filters_, level_, + /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key, largest_compaction_key, allow_unprepared_value_); } @@ -1599,10 +1600,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(0), TableReaderCaller::kUserIterator, arena, - /*skip_filters=*/false, /*level=*/0, + /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr, - allow_unprepared_value)); + /*largest_compaction_key=*/nullptr, allow_unprepared_value)); } if (should_sample) { // Count ones for every L0 files. This is done per iterator creation @@ -1624,8 +1624,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(), cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, - range_del_agg, /*largest_compaction_key=*/nullptr, - allow_unprepared_value)); + range_del_agg, + /*compaction_boundaries=*/nullptr, allow_unprepared_value)); } } @@ -1659,7 +1659,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(0), TableReaderCaller::kUserIterator, &arena, - /*skip_filters=*/false, /*level=*/0, + /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false)); @@ -1763,6 +1763,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, refs_(0), file_options_(file_opt), mutable_cf_options_(mutable_cf_options), + max_file_size_for_l0_meta_pin_( + MaxFileSizeForL0MetaPin(mutable_cf_options_)), version_number_(version_number) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, @@ -1826,7 +1828,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()), - fp.GetHitFileLevel()); + fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_); // TODO: examine the behavior for corrupted key if (timer_enabled) { PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), @@ -3922,7 +3924,8 @@ Status VersionSet::ProcessManifestWrites( cfd->internal_stats(), 1 /* max_threads */, true /* prefetch_index_and_filter_in_cache */, false /* is_initial_load */, - mutable_cf_options_ptrs[i]->prefix_extractor.get()); + mutable_cf_options_ptrs[i]->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i])); if (!s.ok()) { if (db_options_->paranoid_checks) { break; @@ -4649,7 +4652,8 @@ Status VersionSet::Recover( cfd->internal_stats(), db_options_->max_file_opening_threads, false /* prefetch_index_and_filter_in_cache */, true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); if (!s.ok()) { if (db_options_->paranoid_checks) { return s; @@ -5686,13 +5690,14 @@ InternalIterator* VersionSet::MakeInputIterator( for (size_t i = 0; i < flevel->num_files; i++) { list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, - cfd->internal_comparator(), - *flevel->files[i].file_metadata, range_del_agg, - c->mutable_cf_options()->prefix_extractor.get(), + cfd->internal_comparator(), *flevel->files[i].file_metadata, + range_del_agg, c->mutable_cf_options()->prefix_extractor.get(), /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction, /*arena=*/nullptr, - /*skip_filters=*/false, /*level=*/static_cast(c->level(which)), + /*skip_filters=*/false, + /*level=*/static_cast(c->level(which)), + MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false); @@ -6023,7 +6028,8 @@ Status ReactiveVersionSet::Recover( cfd->internal_stats(), db_options_->max_file_opening_threads, false /* prefetch_index_and_filter_in_cache */, true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); if (!s.ok()) { enough = false; if (s.IsPathNotFound()) { @@ -6299,7 +6305,8 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( cfd->internal_stats(), db_options_->max_file_opening_threads, false /* prefetch_index_and_filter_in_cache */, false /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); TEST_SYNC_POINT_CALLBACK( "ReactiveVersionSet::ApplyOneVersionEditToBuilder:" "AfterLoadTableHandlers", diff --git a/db/version_set.h b/db/version_set.h index 5b8a77523..bac1c50f8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -803,6 +803,8 @@ class Version { int refs_; // Number of live refs to this version const FileOptions file_options_; const MutableCFOptions mutable_cf_options_; + // Cached value to avoid recomputing it on every read. + const size_t max_file_size_for_l0_meta_pin_; // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. diff --git a/options/cf_options.cc b/options/cf_options.cc index ef782f6a8..008c12641 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -720,6 +720,17 @@ uint64_t MaxFileSizeForLevel(const MutableCFOptions& cf_options, } } +size_t MaxFileSizeForL0MetaPin(const MutableCFOptions& cf_options) { + // We do not want to pin meta-blocks that almost certainly came from intra-L0 + // or a former larger `write_buffer_size` value to avoid surprising users with + // pinned memory usage. We use a factor of 1.5 to account for overhead + // introduced during flush in most cases. + if (port::kMaxSizet / 3 < cf_options.write_buffer_size / 2) { + return port::kMaxSizet; + } + return cf_options.write_buffer_size / 2 * 3; +} + void MutableCFOptions::RefreshDerivedOptions(int num_levels, CompactionStyle compaction_style) { max_file_size.resize(num_levels); diff --git a/options/cf_options.h b/options/cf_options.h index d0e784779..0a68321b1 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -268,4 +268,9 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, double op2); uint64_t MaxFileSizeForLevel(const MutableCFOptions& cf_options, int level, CompactionStyle compaction_style, int base_level = 1, bool level_compaction_dynamic_level_bytes = false); + +// Get the max size of an L0 file for which we will pin its meta-blocks when +// `pin_l0_filter_and_index_blocks_in_cache` is set. +size_t MaxFileSizeForL0MetaPin(const MutableCFOptions& cf_options); + } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index af2b55676..3bfc0cb2e 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -420,7 +420,8 @@ Status BlockBasedTableFactory::NewTableReader( table_reader_options.level, table_reader_options.immortal, table_reader_options.largest_seqno, table_reader_options.force_direct_prefetch, &tail_prefetch_stats_, - table_reader_options.block_cache_tracer); + table_reader_options.block_cache_tracer, + table_reader_options.max_file_size_for_l0_meta_pin); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index e32000e88..7ce7ca55e 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -610,7 +610,8 @@ Status BlockBasedTable::Open( const int level, const bool immortal_table, const SequenceNumber largest_seqno, const bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats, - BlockCacheTracer* const block_cache_tracer) { + BlockCacheTracer* const block_cache_tracer, + size_t max_file_size_for_l0_meta_pin) { table_reader->reset(); Status s; @@ -706,7 +707,8 @@ Status BlockBasedTable::Open( } s = new_table->PrefetchIndexAndFilterBlocks( prefetch_buffer.get(), metaindex_iter.get(), new_table.get(), - prefetch_all, table_options, level, &lookup_context); + prefetch_all, table_options, level, file_size, + max_file_size_for_l0_meta_pin, &lookup_context); if (s.ok()) { // Update tail prefetch stats @@ -943,6 +945,7 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all, const BlockBasedTableOptions& table_options, const int level, + size_t file_size, size_t max_file_size_for_l0_meta_pin, BlockCacheLookupContext* lookup_context) { Status s; @@ -987,9 +990,10 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( const bool use_cache = table_options.cache_index_and_filter_blocks; - // pin both index and filters, down to all partitions + // pin both index and filters, down to all partitions. const bool pin_all = - rep_->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0; + rep_->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0 && file_size <= max_file_size_for_l0_meta_pin; // prefetch the first level of index const bool prefetch_index = diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 497168076..94f659530 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -101,7 +101,8 @@ class BlockBasedTable : public TableReader { const SequenceNumber largest_seqno = 0, bool force_direct_prefetch = false, TailPrefetchStats* tail_prefetch_stats = nullptr, - BlockCacheTracer* const block_cache_tracer = nullptr); + BlockCacheTracer* const block_cache_tracer = nullptr, + size_t max_file_size_for_l0_meta_pin = 0); bool PrefixMayMatch(const Slice& internal_key, const ReadOptions& read_options, @@ -421,6 +422,7 @@ class BlockBasedTable : public TableReader { FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all, const BlockBasedTableOptions& table_options, const int level, + size_t file_size, size_t max_file_size_for_l0_meta_pin, BlockCacheLookupContext* lookup_context); static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name); diff --git a/table/table_builder.h b/table/table_builder.h index afbd179b1..1b56d1a9c 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -34,11 +34,13 @@ struct TableReaderOptions { const InternalKeyComparator& _internal_comparator, bool _skip_filters = false, bool _immortal = false, bool _force_direct_prefetch = false, int _level = -1, - BlockCacheTracer* const _block_cache_tracer = nullptr) + BlockCacheTracer* const _block_cache_tracer = nullptr, + size_t _max_file_size_for_l0_meta_pin = 0) : TableReaderOptions(_ioptions, _prefix_extractor, _env_options, _internal_comparator, _skip_filters, _immortal, _force_direct_prefetch, _level, - 0 /* _largest_seqno */, _block_cache_tracer) {} + 0 /* _largest_seqno */, _block_cache_tracer, + _max_file_size_for_l0_meta_pin) {} // @param skip_filters Disables loading/accessing the filter block TableReaderOptions(const ImmutableCFOptions& _ioptions, @@ -48,7 +50,8 @@ struct TableReaderOptions { bool _skip_filters, bool _immortal, bool _force_direct_prefetch, int _level, SequenceNumber _largest_seqno, - BlockCacheTracer* const _block_cache_tracer) + BlockCacheTracer* const _block_cache_tracer, + size_t _max_file_size_for_l0_meta_pin) : ioptions(_ioptions), prefix_extractor(_prefix_extractor), env_options(_env_options), @@ -58,7 +61,8 @@ struct TableReaderOptions { force_direct_prefetch(_force_direct_prefetch), level(_level), largest_seqno(_largest_seqno), - block_cache_tracer(_block_cache_tracer) {} + block_cache_tracer(_block_cache_tracer), + max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin) {} const ImmutableCFOptions& ioptions; const SliceTransform* prefix_extractor; @@ -77,6 +81,9 @@ struct TableReaderOptions { // largest seqno in the table SequenceNumber largest_seqno; BlockCacheTracer* const block_cache_tracer; + // Largest L0 file size whose meta-blocks may be pinned (can be zero when + // unknown). + const size_t max_file_size_for_l0_meta_pin; }; struct TableBuilderOptions { diff --git a/table/table_test.cc b/table/table_test.cc index 90d032d9c..ef944d32a 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -378,7 +378,8 @@ class TableConstructor: public Constructor { return ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, internal_comparator, !kSkipFilters, !kImmortal, - false, level_, largest_seqno_, &block_cache_tracer_), + false, level_, largest_seqno_, &block_cache_tracer_, + moptions.write_buffer_size), std::move(file_reader_), TEST_GetSink()->contents().size(), &table_reader_); }