diff --git a/db/column_family.cc b/db/column_family.cc index d35d6fbfc..9b5c3ed39 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -995,11 +995,13 @@ const int ColumnFamilyData::kCompactToBaseLevel = -2; Compaction* ColumnFamilyData::CompactRange( const MutableCFOptions& mutable_cf_options, int input_level, - int output_level, uint32_t output_path_id, const InternalKey* begin, - const InternalKey* end, InternalKey** compaction_end, bool* conflict) { + int output_level, uint32_t output_path_id, uint32_t max_subcompactions, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* conflict) { auto* result = compaction_picker_->CompactRange( GetName(), mutable_cf_options, current_->storage_info(), input_level, - output_level, output_path_id, begin, end, compaction_end, conflict); + output_level, output_path_id, max_subcompactions, begin, end, + compaction_end, conflict); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/column_family.h b/db/column_family.h index d7fb5db74..d570afaef 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -296,9 +296,9 @@ class ColumnFamilyData { // REQUIRES: DB mutex held Compaction* CompactRange(const MutableCFOptions& mutable_cf_options, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, - const InternalKey* end, InternalKey** compaction_end, - bool* manual_conflict); + uint32_t output_path_id, uint32_t max_subcompactions, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* manual_conflict); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe diff --git a/db/compaction.cc b/db/compaction.cc index 9db41139b..de87a62c4 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -134,6 +134,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, int _output_level, uint64_t _target_file_size, uint64_t _max_compaction_bytes, uint32_t _output_path_id, CompressionType _compression, + uint32_t _max_subcompactions, std::vector _grandparents, bool _manual_compaction, double _score, bool _deletion_compaction, @@ -143,6 +144,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, output_level_(_output_level), max_output_file_size_(_target_file_size), max_compaction_bytes_(_max_compaction_bytes), + max_subcompactions_(_max_subcompactions), immutable_cf_options_(_immutable_cf_options), mutable_cf_options_(_mutable_cf_options), input_version_(nullptr), @@ -163,6 +165,9 @@ Compaction::Compaction(VersionStorageInfo* vstorage, if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; } + if (max_subcompactions_ == 0) { + max_subcompactions_ = immutable_cf_options_.max_subcompactions; + } #ifndef NDEBUG for (size_t i = 1; i < inputs_.size(); ++i) { @@ -442,7 +447,7 @@ bool Compaction::IsOutputLevelEmpty() const { } bool Compaction::ShouldFormSubcompactions() const { - if (immutable_cf_options_.max_subcompactions <= 1 || cfd_ == nullptr) { + if (max_subcompactions_ <= 1 || cfd_ == nullptr) { return false; } if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { diff --git a/db/compaction.h b/db/compaction.h index 7be6df2c1..e1ddd170e 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -40,6 +40,7 @@ class Compaction { std::vector inputs, int output_level, uint64_t target_file_size, uint64_t max_compaction_bytes, uint32_t output_path_id, CompressionType compression, + uint32_t max_subcompactions, std::vector grandparents, bool manual_compaction = false, double score = -1, bool deletion_compaction = false, @@ -241,6 +242,8 @@ class Compaction { uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + uint32_t max_subcompactions() const { return max_subcompactions_; } + uint64_t MaxInputFileCreationTime() const; private: @@ -267,6 +270,7 @@ class Compaction { const int output_level_; // levels to which output files are stored uint64_t max_output_file_size_; uint64_t max_compaction_bytes_; + uint32_t max_subcompactions_; const ImmutableCFOptions immutable_cf_options_; const MutableCFOptions mutable_cf_options_; Version* input_version_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 5c7f157ec..a58883cf8 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -531,7 +531,7 @@ void CompactionJob::GenSubcompactionBoundaries() { c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl))); uint64_t subcompactions = std::min({static_cast(ranges.size()), - static_cast(db_options_.max_subcompactions), + static_cast(c->max_subcompactions()), max_output_files}); if (subcompactions > 1) { diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 08c018a08..61f7b7792 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -246,7 +246,7 @@ class CompactionJobTest : public testing::Test { Compaction compaction(cfd->current()->storage_info(), *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), compaction_input_files, 1, 1024 * 1024, - 10 * 1024 * 1024, 0, kNoCompression, {}, true); + 10 * 1024 * 1024, 0, kNoCompression, 0, {}, true); compaction.SetInputVersion(cfd->current()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 337072ea9..3ee98564b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -301,7 +301,9 @@ Compaction* CompactionPicker::CompactFiles( new Compaction(vstorage, ioptions_, mutable_cf_options, input_files, output_level, compact_options.output_file_size_limit, mutable_cf_options.max_compaction_bytes, output_path_id, - compact_options.compression, /* grandparents */ {}, true); + compact_options.compression, + compact_options.max_subcompactions, + /* grandparents */ {}, true); RegisterCompaction(c); return c; } @@ -504,7 +506,8 @@ void CompactionPicker::GetGrandparents( Compaction* CompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + uint32_t output_path_id, uint32_t max_subcompactions, + const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -570,7 +573,7 @@ Compaction* CompactionPicker::CompactRange( /* max_compaction_bytes */ LLONG_MAX, output_path_id, GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), - /* grandparents */ {}, /* is manual */ true); + max_subcompactions, /* grandparents */ {}, /* is manual */ true); RegisterCompaction(c); return c; } @@ -677,7 +680,8 @@ Compaction* CompactionPicker::CompactRange( mutable_cf_options.max_compaction_bytes, output_path_id, GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, vstorage->base_level()), - std::move(grandparents), /* is manual compaction */ true); + /* max_subcompactions */ 0, std::move(grandparents), + /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); RegisterCompaction(compaction); @@ -1312,8 +1316,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() { GetPathId(ioptions_, mutable_cf_options_, output_level_), GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level_, vstorage_->base_level()), - std::move(grandparents_), is_manual_, start_level_score_, - false /* deletion_compaction */, compaction_reason_); + /* max_subcompactions */ 0, std::move(grandparents_), is_manual_, + start_level_score_, false /* deletion_compaction */, compaction_reason_); // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel @@ -1561,8 +1565,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, - kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, CompactionReason::kFIFOTtl); + kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false, + vstorage->CompactionScore(0), /* is deletion compaction */ true, + CompactionReason::kFIFOTtl); return c; } @@ -1599,9 +1604,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */, 0 /* max compaction bytes, not applicable */, - 0 /* output path ID */, mutable_cf_options.compression, {}, - /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ false, + 0 /* output path ID */, mutable_cf_options.compression, + 0 /* max_subcompactions */, {}, /* is manual */ false, + vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); return c; } @@ -1647,8 +1652,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, - kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false, + vstorage->CompactionScore(0), /* is deletion compaction */ true, + CompactionReason::kFIFOMaxSize); return c; } @@ -1671,9 +1677,9 @@ Compaction* FIFOCompactionPicker::PickCompaction( Compaction* FIFOCompactionPicker::CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t /*output_path_id*/, const InternalKey* /*begin*/, - const InternalKey* /*end*/, InternalKey** compaction_end, - bool* /*manual_conflict*/) { + uint32_t /*output_path_id*/, uint32_t /*max_subcompactions*/, + const InternalKey* /*begin*/, const InternalKey* /*end*/, + InternalKey** compaction_end, bool* /*manual_conflict*/) { #ifdef NDEBUG (void)input_level; (void)output_level; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index a6a551881..3e4c12e8c 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -58,7 +58,8 @@ class CompactionPicker { virtual Compaction* CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + uint32_t output_path_id, uint32_t max_subcompactions, + const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict); // The maximum allowed output level. Default value is NumberLevels() - 1. @@ -238,7 +239,8 @@ class FIFOCompactionPicker : public CompactionPicker { virtual Compaction* CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + uint32_t output_path_id, uint32_t max_subcompactions, + const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict) override; // The maximum allowed output level. Always returns 0. @@ -280,6 +282,7 @@ class NullCompactionPicker : public CompactionPicker { VersionStorageInfo* /*vstorage*/, int /*input_level*/, int /*output_level*/, uint32_t /*output_path_id*/, + uint32_t /*max_subcompactions*/, const InternalKey* /*begin*/, const InternalKey* /*end*/, InternalKey** /*compaction_end*/, diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc index 655fc1050..bf46916e0 100644 --- a/db/compaction_picker_universal.cc +++ b/db/compaction_picker_universal.cc @@ -612,8 +612,8 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level, 1, enable_compression), - /* grandparents */ {}, /* is manual */ false, score, - false /* deletion_compaction */, compaction_reason); + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + score, false /* deletion_compaction */, compaction_reason); } // Look at overall size amplification. If size amplification @@ -746,8 +746,8 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), - /* grandparents */ {}, /* is manual */ false, score, - false /* deletion_compaction */, + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + score, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification); } } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index 25455f2b7..076c4e17a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -342,6 +342,7 @@ class DBImpl : public DB { Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, + uint32_t max_subcompactions, const Slice* begin, const Slice* end, bool exclusive, bool disallow_trivial_move = false); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 7c15e2e2c..a4095de6a 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -427,8 +427,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, final_output_level--; } s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, - final_output_level, options.target_path_id, begin, - end, exclusive); + final_output_level, options.target_path_id, + options.max_subcompactions, begin, end, exclusive); } else { for (int level = 0; level <= max_level_with_files; level++) { int output_level; @@ -462,7 +462,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, } } s = RunManualCompaction(cfd, level, output_level, options.target_path_id, - begin, end, exclusive); + options.max_subcompactions, begin, end, exclusive); if (!s.ok()) { break; } @@ -974,6 +974,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options, Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, + uint32_t max_subcompactions, const Slice* begin, const Slice* end, bool exclusive, bool disallow_trivial_move) { assert(input_level == ColumnFamilyData::kCompactAllLevels || @@ -1061,8 +1062,9 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, (((manual.manual_end = &manual.tmp_storage1) != nullptr) && ((compaction = manual.cfd->CompactRange( *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, - manual.output_level, manual.output_path_id, manual.begin, - manual.end, &manual.manual_end, &manual_conflict)) == nullptr && + manual.output_level, manual.output_path_id, max_subcompactions, + manual.begin, manual.end, &manual.manual_end, + &manual_conflict)) == nullptr && manual_conflict))) { // exclusive manual compactions should not see a conflict during // CompactRange diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 9d87f5c29..88b752b4c 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -80,7 +80,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd->ioptions()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; - return RunManualCompaction(cfd, level, output_level, 0, begin, end, true, + return RunManualCompaction(cfd, level, output_level, 0, 0, begin, end, true, disallow_trivial_move); } diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 982cbb85a..39ee46d77 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -433,8 +433,8 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) { reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(), 1 /* input_level */, 2 /* output_level */, 0 /* output_path_id */, - nullptr /* begin */, nullptr /* end */, true /* exclusive */, - true /* disallow_trivial_move */)); + 0 /* max_subcompactions */, nullptr /* begin */, nullptr /* end */, + true /* exclusive */, true /* disallow_trivial_move */)); } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e09e161c9..6d44e2435 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1185,10 +1185,13 @@ struct CompactionOptions { // Compaction will create files of size `output_file_size_limit`. // Default: MAX, which means that compaction will create a single file uint64_t output_file_size_limit; + // If > 0, it will replace the option in the DBOptions for this compaction. + uint32_t max_subcompactions; CompactionOptions() : compression(kSnappyCompression), - output_file_size_limit(std::numeric_limits::max()) {} + output_file_size_limit(std::numeric_limits::max()), + max_subcompactions(0) {} }; // For level based compaction, we can configure if we want to skip/force @@ -1224,6 +1227,8 @@ struct CompactRangeOptions { // If true, will execute immediately even if doing so would cause the DB to // enter write stall mode. Otherwise, it'll sleep until load is low enough. bool allow_write_stall = false; + // If > 0, it will replace the option in the DBOptions for this compaction. + uint32_t max_subcompactions = 0; }; // IngestExternalFileOptions is used by IngestExternalFile()