diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 021964e82..4d40ab503 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -611,23 +611,21 @@ Compaction* CompactionPicker::CompactRange( // Universal compaction with more than one level always compacts all the // files together to the last level. assert(vstorage->num_levels() > 1); + int max_output_level = + vstorage->MaxOutputLevel(ioptions_.allow_ingest_behind); // DBImpl::CompactRange() set output level to be the last level - if (ioptions_.allow_ingest_behind) { - assert(output_level == vstorage->num_levels() - 2); - } else { - assert(output_level == vstorage->num_levels() - 1); - } + assert(output_level == max_output_level); // DBImpl::RunManualCompaction will make full range for universal compaction assert(begin == nullptr); assert(end == nullptr); *compaction_end = nullptr; int start_level = 0; - for (; start_level < vstorage->num_levels() && + for (; start_level <= max_output_level && vstorage->NumLevelFiles(start_level) == 0; start_level++) { } - if (start_level == vstorage->num_levels()) { + if (start_level > max_output_level) { return nullptr; } @@ -637,9 +635,9 @@ Compaction* CompactionPicker::CompactRange( return nullptr; } - std::vector inputs(vstorage->num_levels() - + std::vector inputs(max_output_level + 1 - start_level); - for (int level = start_level; level < vstorage->num_levels(); level++) { + for (int level = start_level; level <= max_output_level; level++) { inputs[level - start_level].level = level; auto& files = inputs[level - start_level].files; for (FileMetaData* f : vstorage->LevelFiles(level)) { diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 8f6aac076..799f288ca 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -505,7 +505,7 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) { TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) { const uint64_t kFileSize = 100000; - NewVersionStorage(1, kCompactionStyleUniversal); + NewVersionStorage(3 /* num_levels */, kCompactionStyleUniversal); ioptions_.allow_ingest_behind = true; ioptions_.num_levels = 3; UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); @@ -532,6 +532,14 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) { // output level should be the one above the bottom-most ASSERT_EQ(1, compaction->output_level()); + + // input should not include the reserved level + const std::vector* inputs = compaction->inputs(); + for (const auto& compaction_input : *inputs) { + if (!compaction_input.empty()) { + ASSERT_LT(compaction_input.level, 2); + } + } } // Tests if the files can be trivially moved in multi level // universal compaction when allow_trivial_move option is set diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 8ad43efcc..9eaf39546 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -133,8 +133,8 @@ class UniversalCompactionBuilder { UniversalCompactionPicker* picker_; LogBuffer* log_buffer_; - static std::vector CalculateSortedRuns( - const VersionStorageInfo& vstorage); + static std::vector CalculateSortedRuns( + const VersionStorageInfo& vstorage, int last_level); // Pick a path ID to place a newly generated file, with its estimated file // size. @@ -339,13 +339,13 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo( std::vector UniversalCompactionBuilder::CalculateSortedRuns( - const VersionStorageInfo& vstorage) { + const VersionStorageInfo& vstorage, int last_level) { std::vector ret; for (FileMetaData* f : vstorage.LevelFiles(0)) { ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); } - for (int level = 1; level < vstorage.num_levels(); level++) { + for (int level = 1; level <= last_level; level++) { uint64_t total_compensated_size = 0U; uint64_t total_size = 0U; bool being_compacted = false; @@ -374,7 +374,9 @@ UniversalCompactionBuilder::CalculateSortedRuns( Compaction* UniversalCompactionBuilder::PickCompaction() { const int kLevel0 = 0; score_ = vstorage_->CompactionScore(kLevel0); - sorted_runs_ = CalculateSortedRuns(*vstorage_); + int max_output_level = + vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind); + sorted_runs_ = CalculateSortedRuns(*vstorage_, max_output_level); if (sorted_runs_.size() == 0 || (vstorage_->FilesMarkedForPeriodicCompaction().empty() && @@ -471,6 +473,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { "UniversalCompactionBuilder::PickCompaction:Return", nullptr); return nullptr; } + assert(c->output_level() <= + vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind)); if (mutable_cf_options_.compaction_options_universal.allow_trivial_move == true && @@ -698,22 +702,18 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); int start_level = sorted_runs_[start_index].level; int output_level; + // last level is reserved for the files ingested behind + int max_output_level = + vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind); if (first_index_after == sorted_runs_.size()) { - output_level = vstorage_->num_levels() - 1; + output_level = max_output_level; } else if (sorted_runs_[first_index_after].level == 0) { output_level = 0; } else { output_level = sorted_runs_[first_index_after].level - 1; } - // last level is reserved for the files ingested behind - if (ioptions_.allow_ingest_behind && - (output_level == vstorage_->num_levels() - 1)) { - assert(output_level > 1); - output_level--; - } - - std::vector inputs(vstorage_->num_levels()); + std::vector inputs(max_output_level + 1); for (size_t i = 0; i < inputs.size(); ++i) { inputs[i].level = start_level + static_cast(i); } @@ -1192,8 +1192,10 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { return nullptr; } + int max_output_level = + vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind); // Pick the first non-empty level after the start_level - for (output_level = start_level + 1; output_level < vstorage_->num_levels(); + for (output_level = start_level + 1; output_level <= max_output_level; output_level++) { if (vstorage_->NumLevelFiles(output_level) != 0) { break; @@ -1201,9 +1203,9 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { } // If all higher levels are empty, pick the highest level as output level - if (output_level == vstorage_->num_levels()) { + if (output_level > max_output_level) { if (start_level == 0) { - output_level = vstorage_->num_levels() - 1; + output_level = max_output_level; } else { // If start level is non-zero and all higher levels are empty, this // compaction will translate into a trivial move. Since the idea is @@ -1212,11 +1214,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { return nullptr; } } - if (ioptions_.allow_ingest_behind && - output_level == vstorage_->num_levels() - 1) { - assert(output_level > 1); - output_level--; - } + assert(output_level <= max_output_level); if (output_level != 0) { if (start_level == 0) { @@ -1293,8 +1291,9 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange( uint32_t path_id = GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); int start_level = sorted_runs_[start_index].level; - - std::vector inputs(vstorage_->num_levels()); + int max_output_level = + vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind); + std::vector inputs(max_output_level + 1); for (size_t i = 0; i < inputs.size(); ++i) { inputs[i].level = start_level + static_cast(i); } @@ -1331,13 +1330,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange( int output_level; if (end_index == sorted_runs_.size() - 1) { - // output files at the last level, unless it's reserved - output_level = vstorage_->num_levels() - 1; - // last level is reserved for the files ingested behind - if (ioptions_.allow_ingest_behind) { - assert(output_level > 1); - output_level--; - } + output_level = max_output_level; } else { // if it's not including all sorted_runs, it can only output to the level // above the `end_index + 1` sorted_run. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4e313af81..2ace6159b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1386,6 +1386,14 @@ Status DBImpl::CompactFilesImpl( } } + if (cfd->ioptions()->allow_ingest_behind && + output_level >= cfd->ioptions()->num_levels - 1) { + return Status::InvalidArgument( + "Exceed the maximum output level defined by " + "the current compaction algorithm with ingest_behind --- " + + std::to_string(cfd->ioptions()->num_levels - 1)); + } + Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( &input_set, cf_meta, output_level); TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles"); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 9a3de4e12..4249b2fc6 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -2160,13 +2160,13 @@ TEST_P(ExternalSSTFileTest, IngestBehind) { // Insert 100 -> 200 into the memtable for (int i = 100; i <= 200; i++) { ASSERT_OK(Put(Key(i), "memtable")); - true_data[Key(i)] = "memtable"; } // Insert 100 -> 200 using IngestExternalFile file_data.clear(); for (int i = 0; i <= 20; i++) { file_data.emplace_back(Key(i), "ingest_behind"); + true_data[Key(i)] = "ingest_behind"; } bool allow_global_seqno = true; @@ -2188,6 +2188,7 @@ TEST_P(ExternalSSTFileTest, IngestBehind) { options.num_levels = 3; DestroyAndReopen(options); + true_data.clear(); // Insert 100 -> 200 into the memtable for (int i = 100; i <= 200; i++) { ASSERT_OK(Put(Key(i), "memtable")); @@ -2207,12 +2208,43 @@ TEST_P(ExternalSSTFileTest, IngestBehind) { verify_checksums_before_ingest, true /*ingest_behind*/, false /*sort_data*/, &true_data)); ASSERT_EQ("0,1,1", FilesPerLevel()); + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files); + uint64_t ingested_file_number = level_to_files[2][0].fd.GetNumber(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - // bottom level should be empty - ASSERT_EQ("0,1", FilesPerLevel()); - + // Last level should not be compacted + ASSERT_EQ("0,1,1", FilesPerLevel()); + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files); + ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber()); size_t kcnt = 0; VerifyDBFromMap(true_data, &kcnt, false); + + // Auto-compaction should not include the last level. + // Trigger compaction if size amplification exceeds 110%. + options.compaction_options_universal.max_size_amplification_percent = 110; + options.level0_file_num_compaction_trigger = 4; + TryReopen(options); + Random rnd(301); + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < 10; j++) { + true_data[Key(j)] = rnd.RandomString(1000); + ASSERT_OK(Put(Key(j), true_data[Key(j)])); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files); + ASSERT_EQ(1, level_to_files[2].size()); + ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber()); + + // Turning off the option allows DB to compact ingested files. + options.allow_ingest_behind = false; + TryReopen(options); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files); + ASSERT_EQ(1, level_to_files[2].size()); + ASSERT_NE(ingested_file_number, level_to_files[2][0].fd.GetNumber()); + VerifyDBFromMap(true_data, &kcnt, false); } TEST_F(ExternalSSTFileTest, SkipBloomFilter) { diff --git a/db/version_set.cc b/db/version_set.cc index c1a5555f1..7b070892c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3390,6 +3390,7 @@ void VersionStorageInfo::ComputeCompactionScore( // maintaining it to be over 1.0, we scale the original score by 10x // if it is larger than 1.0. const double kScoreScale = 10.0; + int max_output_level = MaxOutputLevel(immutable_options.allow_ingest_behind); for (int level = 0; level <= MaxInputLevel(); level++) { double score; if (level == 0) { @@ -3417,7 +3418,7 @@ void VersionStorageInfo::ComputeCompactionScore( // For universal compaction, we use level0 score to indicate // compaction score for the whole DB. Adding other levels as if // they are L0 files. - for (int i = 1; i < num_levels(); i++) { + for (int i = 1; i <= max_output_level; i++) { // It's possible that a subset of the files in a level may be in a // compaction, due to delete triggered compaction or trivial move. // In that case, the below check may not catch a level being @@ -3561,16 +3562,18 @@ void VersionStorageInfo::ComputeCompactionScore( } } } - ComputeFilesMarkedForCompaction(); + ComputeFilesMarkedForCompaction(max_output_level); if (!immutable_options.allow_ingest_behind) { ComputeBottommostFilesMarkedForCompaction(); } - if (mutable_cf_options.ttl > 0) { + if (mutable_cf_options.ttl > 0 && + compaction_style_ == kCompactionStyleLevel) { ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl); } if (mutable_cf_options.periodic_compaction_seconds > 0) { ComputeFilesMarkedForPeriodicCompaction( - immutable_options, mutable_cf_options.periodic_compaction_seconds); + immutable_options, mutable_cf_options.periodic_compaction_seconds, + max_output_level); } if (mutable_cf_options.enable_blob_garbage_collection && @@ -3584,14 +3587,14 @@ void VersionStorageInfo::ComputeCompactionScore( EstimateCompactionBytesNeeded(mutable_cf_options); } -void VersionStorageInfo::ComputeFilesMarkedForCompaction() { +void VersionStorageInfo::ComputeFilesMarkedForCompaction(int last_level) { files_marked_for_compaction_.clear(); int last_qualify_level = 0; // Do not include files from the last level with data // If table properties collector suggests a file on the last level, // we should not move it to a new level. - for (int level = num_levels() - 1; level >= 1; level--) { + for (int level = last_level; level >= 1; level--) { if (!files_[level].empty()) { last_qualify_level = level - 1; break; @@ -3635,7 +3638,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles( void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction( const ImmutableOptions& ioptions, - const uint64_t periodic_compaction_seconds) { + const uint64_t periodic_compaction_seconds, int last_level) { assert(periodic_compaction_seconds > 0); files_marked_for_periodic_compaction_.clear(); @@ -3656,7 +3659,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction( const uint64_t allowed_time_limit = current_time - periodic_compaction_seconds; - for (int level = 0; level < num_levels(); level++) { + for (int level = 0; level <= last_level; level++) { for (auto f : files_[level]) { if (!f->being_compacted) { // Compute a file's modification time in the following order: diff --git a/db/version_set.h b/db/version_set.h index f82aaa0fa..c3e48bb4e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -204,7 +204,7 @@ class VersionStorageInfo { // This computes files_marked_for_compaction_ and is called by // ComputeCompactionScore() - void ComputeFilesMarkedForCompaction(); + void ComputeFilesMarkedForCompaction(int last_level); // This computes ttl_expired_files_ and is called by // ComputeCompactionScore() @@ -215,7 +215,7 @@ class VersionStorageInfo { // ComputeCompactionScore() void ComputeFilesMarkedForPeriodicCompaction( const ImmutableOptions& ioptions, - const uint64_t periodic_compaction_seconds); + const uint64_t periodic_compaction_seconds, int last_level); // This computes bottommost_files_marked_for_compaction_ and is called by // ComputeCompactionScore() or UpdateOldestSnapshot(). @@ -465,6 +465,7 @@ class VersionStorageInfo { // REQUIRES: ComputeCompactionScore has been called // REQUIRES: DB mutex held during access + // Used by Leveled Compaction only. const autovector>& ExpiredTtlFiles() const { assert(finalized_); return expired_ttl_files_; @@ -472,6 +473,7 @@ class VersionStorageInfo { // REQUIRES: ComputeCompactionScore has been called // REQUIRES: DB mutex held during access + // Used by Leveled and Universal Compaction. const autovector>& FilesMarkedForPeriodicCompaction() const { assert(finalized_); @@ -680,7 +682,7 @@ class VersionStorageInfo { // This vector contains list of files marked for compaction and also not // currently being compacted. It is protected by DB mutex. It is calculated in - // ComputeCompactionScore() + // ComputeCompactionScore(). Used by Leveled and Universal Compaction. autovector> files_marked_for_compaction_; autovector> expired_ttl_files_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 97999fcc1..d37af823c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1198,11 +1198,11 @@ struct DBOptions { // Set this option to true during creation of database if you want // to be able to ingest behind (call IngestExternalFile() skipping keys // that already exist, rather than overwriting matching keys). - // Setting this option to true will affect 2 things: - // 1) Disable some internal optimizations around SST file compression - // 2) Reserve bottom-most level for ingested files only. - // Note that only universal compaction supports reserving last level - // for file ingestion only. + // Setting this option to true has the following effects: + // 1) Disable some internal optimizations around SST file compression. + // 2) Reserve the last level for ingested files only. + // 3) Compaction will not include any file from the last level. + // Note that only Universal Compaction supports allow_ingest_behind. // `num_levels` should be >= 3 if this option is turned on. // // diff --git a/unreleased_history/behavior_changes/ingest_behind_universal.md b/unreleased_history/behavior_changes/ingest_behind_universal.md new file mode 100644 index 000000000..50d120562 --- /dev/null +++ b/unreleased_history/behavior_changes/ingest_behind_universal.md @@ -0,0 +1 @@ +When a DB is openend with `allow_ingest_behind=true` (currently only Universal compaction is supported), files in the last level, i.e. the ingested files, will not be included in any compaction. (#11489)