diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 23b1c60d0..35535db0b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1341,6 +1341,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); + // TODO: since we already use C++17, should use + // std::optional instead. const Slice* const start = sub_compact->start; const Slice* const end = sub_compact->end; @@ -1362,9 +1364,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. - std::unique_ptr raw_input( - versions_->MakeInputIterator(read_options, sub_compact->compaction, - &range_del_agg, file_options_for_read_)); + std::unique_ptr raw_input(versions_->MakeInputIterator( + read_options, sub_compact->compaction, &range_del_agg, + file_options_for_read_, + (start == nullptr) ? std::optional{} + : std::optional{*start}, + (end == nullptr) ? std::optional{} + : std::optional{*end})); InternalIterator* input = raw_input.get(); IterKey start_ikey; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 03502157d..1c8de5576 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1354,6 +1354,74 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { } } +TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) { + class SubCompactionEventListener : public EventListener { + public: + void OnSubcompactionCompleted(const SubcompactionJobInfo&) override { + sub_compaction_finished_++; + } + std::atomic sub_compaction_finished_{0}; + }; + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.write_buffer_size = 10 * 1024 * 1024; + options.max_subcompactions = max_subcompactions_; + SubCompactionEventListener* listener = new SubCompactionEventListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + + // For subcompactino to trigger, output level needs to be non-empty. + ASSERT_OK(Put("key", "")); + ASSERT_OK(Put("kez", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("key", "")); + ASSERT_OK(Put("kez", "")); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Ranges that are only briefly overlapping so that they won't be trivially + // moved but subcompaction ranges would only contain a subset of files. + std::vector> ranges = { + {100, 199}, {198, 399}, {397, 600}, {598, 800}, {799, 900}, {895, 999}, + }; + int32_t value_size = 10 * 1024; // 10 KB + + Random rnd(301); + std::map values; + for (size_t i = 0; i < ranges.size(); i++) { + for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { + values[j] = rnd.RandomString(value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + ASSERT_OK(Flush()); + } + + int32_t level0_files = NumTableFilesAtLevel(0, 0); + ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 + ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // One file in L1 + + listener->sub_compaction_finished_ = 0; + ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()})); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + if (max_subcompactions_ > 3) { + // RocksDB might not generate the exact number of sub compactions. + // Here we validate that at least subcompaction happened. + ASSERT_GT(listener->sub_compaction_finished_.load(), 2); + } + + // We expect that all the files were compacted to L1 + ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); + ASSERT_GT(NumTableFilesAtLevel(1, 0), 1); + + for (size_t i = 0; i < ranges.size(); i++) { + for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { + ASSERT_EQ(Get(Key(j)), values[j]); + } + } +} + TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; diff --git a/db/version_set.cc b/db/version_set.cc index eabdb86bc..ec4edfc25 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -27,6 +27,7 @@ #include "db/blob/blob_log_format.h" #include "db/compaction/compaction.h" #include "db/compaction/file_pri.h" +#include "db/dbformat.h" #include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -5816,7 +5817,9 @@ void VersionSet::AddLiveFiles(std::vector* live_table_files, InternalIterator* VersionSet::MakeInputIterator( const ReadOptions& read_options, const Compaction* c, RangeDelAggregator* range_del_agg, - const FileOptions& file_options_compactions) { + const FileOptions& file_options_compactions, + const std::optional& start, + const std::optional& end) { auto cfd = c->column_family_data(); // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. @@ -5831,10 +5834,25 @@ InternalIterator* VersionSet::MakeInputIterator( if (c->level(which) == 0) { const LevelFilesBrief* flevel = c->input_levels(which); for (size_t i = 0; i < flevel->num_files; i++) { + const FileMetaData& fmd = *flevel->files[i].file_metadata; + if (start.has_value() && + cfd->user_comparator()->Compare(start.value(), + fmd.largest.user_key()) > 0) { + continue; + } + // We should be able to filter out the case where the end key + // equals to the end boundary, since the end key is exclusive. + // We try to be extra safe here. + if (end.has_value() && + cfd->user_comparator()->Compare(end.value(), + fmd.smallest.user_key()) < 0) { + continue; + } + list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, - cfd->internal_comparator(), *flevel->files[i].file_metadata, - range_del_agg, c->mutable_cf_options()->prefix_extractor, + cfd->internal_comparator(), fmd, range_del_agg, + c->mutable_cf_options()->prefix_extractor, /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction, /*arena=*/nullptr, diff --git a/db/version_set.h b/db/version_set.h index 1fe7c0556..eeabb53a0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -1261,10 +1262,13 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. // @param read_options Must outlive the returned iterator. + // @param start, end indicates compaction range InternalIterator* MakeInputIterator( const ReadOptions& read_options, const Compaction* c, RangeDelAggregator* range_del_agg, - const FileOptions& file_options_compactions); + const FileOptions& file_options_compactions, + const std::optional& start, + const std::optional& end); // Add all files listed in any live version to *live_table_files and // *live_blob_files. Note that these lists may contain duplicates.