From e03f8a0c12b93250e4c5c508f90aaf87d9494833 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 6 Apr 2022 18:19:19 -0700 Subject: [PATCH] L0 Subcompaction to trim input files (#9802) Summary: When sub compaction is decided for L0->L1 compaction, most of the cases, all L0 files will be involved in all sub compactions. However, it is not always the case. When files are generally (but not strictly) inserted in sequential order, there can be a subset of L0 files invovled. Yet RocksDB always open all those L0 files, and build an iterator, read many of the files' first of last block with expensive readahead. We trim some input files to reduce overhead a little bit. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9802 Test Plan: Add a unit test to cover this case and manually validate the behavior while running the test. Reviewed By: ajkr Differential Revision: D35371031 fbshipit-source-id: 701ed7375b5cbe41672e93b38fe8a1503dad08b6 --- db/compaction/compaction_job.cc | 12 ++++-- db/db_compaction_test.cc | 68 +++++++++++++++++++++++++++++++++ db/version_set.cc | 24 ++++++++++-- db/version_set.h | 6 ++- 4 files changed, 103 insertions(+), 7 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 23b1c60d0..35535db0b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1341,6 +1341,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); + // TODO: since we already use C++17, should use + // std::optional instead. const Slice* const start = sub_compact->start; const Slice* const end = sub_compact->end; @@ -1362,9 +1364,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. - std::unique_ptr raw_input( - versions_->MakeInputIterator(read_options, sub_compact->compaction, - &range_del_agg, file_options_for_read_)); + std::unique_ptr raw_input(versions_->MakeInputIterator( + read_options, sub_compact->compaction, &range_del_agg, + file_options_for_read_, + (start == nullptr) ? std::optional{} + : std::optional{*start}, + (end == nullptr) ? std::optional{} + : std::optional{*end})); InternalIterator* input = raw_input.get(); IterKey start_ikey; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 03502157d..1c8de5576 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1354,6 +1354,74 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { } } +TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) { + class SubCompactionEventListener : public EventListener { + public: + void OnSubcompactionCompleted(const SubcompactionJobInfo&) override { + sub_compaction_finished_++; + } + std::atomic sub_compaction_finished_{0}; + }; + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.write_buffer_size = 10 * 1024 * 1024; + options.max_subcompactions = max_subcompactions_; + SubCompactionEventListener* listener = new SubCompactionEventListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + + // For subcompactino to trigger, output level needs to be non-empty. + ASSERT_OK(Put("key", "")); + ASSERT_OK(Put("kez", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("key", "")); + ASSERT_OK(Put("kez", "")); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Ranges that are only briefly overlapping so that they won't be trivially + // moved but subcompaction ranges would only contain a subset of files. + std::vector> ranges = { + {100, 199}, {198, 399}, {397, 600}, {598, 800}, {799, 900}, {895, 999}, + }; + int32_t value_size = 10 * 1024; // 10 KB + + Random rnd(301); + std::map values; + for (size_t i = 0; i < ranges.size(); i++) { + for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { + values[j] = rnd.RandomString(value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + ASSERT_OK(Flush()); + } + + int32_t level0_files = NumTableFilesAtLevel(0, 0); + ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 + ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // One file in L1 + + listener->sub_compaction_finished_ = 0; + ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()})); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + if (max_subcompactions_ > 3) { + // RocksDB might not generate the exact number of sub compactions. + // Here we validate that at least subcompaction happened. + ASSERT_GT(listener->sub_compaction_finished_.load(), 2); + } + + // We expect that all the files were compacted to L1 + ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); + ASSERT_GT(NumTableFilesAtLevel(1, 0), 1); + + for (size_t i = 0; i < ranges.size(); i++) { + for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { + ASSERT_EQ(Get(Key(j)), values[j]); + } + } +} + TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; diff --git a/db/version_set.cc b/db/version_set.cc index eabdb86bc..ec4edfc25 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -27,6 +27,7 @@ #include "db/blob/blob_log_format.h" #include "db/compaction/compaction.h" #include "db/compaction/file_pri.h" +#include "db/dbformat.h" #include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -5816,7 +5817,9 @@ void VersionSet::AddLiveFiles(std::vector* live_table_files, InternalIterator* VersionSet::MakeInputIterator( const ReadOptions& read_options, const Compaction* c, RangeDelAggregator* range_del_agg, - const FileOptions& file_options_compactions) { + const FileOptions& file_options_compactions, + const std::optional& start, + const std::optional& end) { auto cfd = c->column_family_data(); // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. @@ -5831,10 +5834,25 @@ InternalIterator* VersionSet::MakeInputIterator( if (c->level(which) == 0) { const LevelFilesBrief* flevel = c->input_levels(which); for (size_t i = 0; i < flevel->num_files; i++) { + const FileMetaData& fmd = *flevel->files[i].file_metadata; + if (start.has_value() && + cfd->user_comparator()->Compare(start.value(), + fmd.largest.user_key()) > 0) { + continue; + } + // We should be able to filter out the case where the end key + // equals to the end boundary, since the end key is exclusive. + // We try to be extra safe here. + if (end.has_value() && + cfd->user_comparator()->Compare(end.value(), + fmd.smallest.user_key()) < 0) { + continue; + } + list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, - cfd->internal_comparator(), *flevel->files[i].file_metadata, - range_del_agg, c->mutable_cf_options()->prefix_extractor, + cfd->internal_comparator(), fmd, range_del_agg, + c->mutable_cf_options()->prefix_extractor, /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction, /*arena=*/nullptr, diff --git a/db/version_set.h b/db/version_set.h index 1fe7c0556..eeabb53a0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -1261,10 +1262,13 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. // @param read_options Must outlive the returned iterator. + // @param start, end indicates compaction range InternalIterator* MakeInputIterator( const ReadOptions& read_options, const Compaction* c, RangeDelAggregator* range_del_agg, - const FileOptions& file_options_compactions); + const FileOptions& file_options_compactions, + const std::optional& start, + const std::optional& end); // Add all files listed in any live version to *live_table_files and // *live_blob_files. Note that these lists may contain duplicates.