From 8860fc902a7c8fababcb44a6373a677135a63d14 Mon Sep 17 00:00:00 2001 From: Zichen Zhu Date: Sun, 24 Jul 2022 11:12:44 -0700 Subject: [PATCH] Support subcmpct using reserved resources for round-robin priority (#10341) Summary: Earlier implementation of round-robin priority can only pick one file at a time and disallows parallel compactions within the same level. In this PR, round-robin compaction policy will expand towards more input files with respecting some additional constraints, which are summarized as follows: * Constraint 1: We can only pick consecutive files - Constraint 1a: When a file is being compacted (or some input files are being compacted after expanding), we cannot choose it and have to stop choosing more files - Constraint 1b: When we reach the last file (with the largest keys), we cannot choose more files (the next file will be the first one with small keys) * Constraint 2: We should ensure the total compaction bytes (including the overlapped files from the next level) is no more than `mutable_cf_options_.max_compaction_bytes` * Constraint 3: We try our best to pick as many files as possible so that the post-compaction level size can be just less than `MaxBytesForLevel(start_level_)` * Constraint 4: If trivial move is allowed, we reuse the logic of `TryNonL0TrivialMove()` instead of expanding files with Constraint 3 More details can be found in `LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion()`. The above optimization accelerates the process of moving the compaction cursor, in which the write-amp can be further reduced. While a large compaction may lead to high write stall, we break this large compaction into several subcompactions **regardless of** the `max_subcompactions` limit. The number of subcompactions for round-robin compaction priority is determined through the following steps: * Step 1: Initialized against `max_output_file_limit`, the number of input files in the start level, and also the range size limit `ranges.size()` * Step 2: Call `AcquireSubcompactionResources()`when max subcompactions is not sufficient, but we may or may not obtain desired resources, additional number of resources is stored in `extra_num_subcompaction_threads_reserved_`). Subcompaction limit is changed and update `num_planned_subcompactions` with `GetSubcompactionLimit()` * Step 3: Call `ShrinkSubcompactionResources()` to ensure extra resources can be released (extra resources may exist for round-robin compaction when the number of actual number of subcompactions is less than the number of planned subcompactions) More details can be found in `CompactionJob::AcquireSubcompactionResources()`,`CompactionJob::ShrinkSubcompactionResources()`, and `CompactionJob::ReleaseSubcompactionResources()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10341 Test Plan: Add `CompactionPriMultipleFilesRoundRobin[1-3]` unit test in `compaction_picker_test.cc` and `RoundRobinSubcompactionsAgainstResources.SubcompactionsUsingResources/[0-4]`, `RoundRobinSubcompactionsAgainstPressureToken.PressureTokenTest/[0-1]` in `db_compaction_test.cc` Reviewed By: ajkr, hx235 Differential Revision: D37792644 Pulled By: littlepig2013 fbshipit-source-id: 7fecb7c4ffd97b34bbf6e3b760b2c35a772a0657 --- db/compaction/compaction.cc | 13 +- db/compaction/compaction_job.cc | 162 +++++++++++++++++- db/compaction/compaction_job.h | 33 +++- db/compaction/compaction_picker.cc | 14 +- db/compaction/compaction_picker.h | 3 +- db/compaction/compaction_picker_level.cc | 138 ++++++++++++++- db/compaction/compaction_picker_test.cc | 131 +++++++++++++- db/compaction/subcompaction_state.h | 8 +- db/db_compaction_test.cc | 207 ++++++++++++++++++++++- db/db_impl/db_impl_compaction_flush.cc | 9 +- db/version_set.cc | 5 +- db/version_set.h | 10 +- 12 files changed, 701 insertions(+), 32 deletions(-) diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index c883716a0..0983bd70f 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -660,7 +660,7 @@ bool Compaction::IsOutputLevelEmpty() const { } bool Compaction::ShouldFormSubcompactions() const { - if (max_subcompactions_ <= 1 || cfd_ == nullptr) { + if (cfd_ == nullptr) { return false; } @@ -671,6 +671,17 @@ bool Compaction::ShouldFormSubcompactions() const { return false; } + // Round-Robin pri under leveled compaction allows subcompactions by default + // and the number of subcompactions can be larger than max_subcompactions_ + if (cfd_->ioptions()->compaction_pri == kRoundRobin && + cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { + return output_level_ > 0; + } + + if (max_subcompactions_ <= 1) { + return false; + } + if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0; } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index c78bf2adc..18537203c 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -123,7 +123,8 @@ CompactionJob::CompactionJob( const std::atomic& manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, std::string full_history_ts_low, std::string trim_ts, - BlobFileCompletionCallback* blob_callback) + BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled, + int* bg_bottom_compaction_scheduled) : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -162,9 +163,13 @@ CompactionJob::CompactionJob( thread_pri_(thread_pri), full_history_ts_low_(std::move(full_history_ts_low)), trim_ts_(std::move(trim_ts)), - blob_callback_(blob_callback) { + blob_callback_(blob_callback), + extra_num_subcompaction_threads_reserved_(0), + bg_compaction_scheduled_(bg_compaction_scheduled), + bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) { assert(compaction_job_stats_ != nullptr); assert(log_buffer_ != nullptr); + const auto* cfd = compact_->compaction->column_family_data(); ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, db_options_.enable_thread_tracking); @@ -291,6 +296,99 @@ void CompactionJob::Prepare() { } } +uint64_t CompactionJob::GetSubcompactionsLimit() { + return extra_num_subcompaction_threads_reserved_ + + std::max( + std::uint64_t(1), + static_cast(compact_->compaction->max_subcompactions())); +} + +void CompactionJob::AcquireSubcompactionResources( + int num_extra_required_subcompactions) { + TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0"); + TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1"); + int max_db_compactions = + DBImpl::GetBGJobLimits( + mutable_db_options_copy_.max_background_flushes, + mutable_db_options_copy_.max_background_compactions, + mutable_db_options_copy_.max_background_jobs, + versions_->GetColumnFamilySet() + ->write_controller() + ->NeedSpeedupCompaction()) + .max_compactions; + // Apply min function first since We need to compute the extra subcompaction + // against compaction limits. And then try to reserve threads for extra + // subcompactions. The actual number of reserved threads could be less than + // the desired number. + int available_bg_compactions_against_db_limit = + std::max(max_db_compactions - *bg_compaction_scheduled_ - + *bg_bottom_compaction_scheduled_, + 0); + db_mutex_->Lock(); + // Reservation only supports backgrdoun threads of which the priority is + // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the + // origin thread_pri_ is higher than that. Similar to ReleaseThreads(). + extra_num_subcompaction_threads_reserved_ = + env_->ReserveThreads(std::min(num_extra_required_subcompactions, + available_bg_compactions_against_db_limit), + std::min(thread_pri_, Env::Priority::HIGH)); + + // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_ + // depending on if this compaction has the bottommost priority + if (thread_pri_ == Env::Priority::BOTTOM) { + *bg_bottom_compaction_scheduled_ += + extra_num_subcompaction_threads_reserved_; + } else { + *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_; + } + db_mutex_->Unlock(); +} + +void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) { + // Do nothing when we have zero resources to shrink + if (num_extra_resources == 0) return; + db_mutex_->Lock(); + // We cannot release threads more than what we reserved before + int extra_num_subcompaction_threads_released = env_->ReleaseThreads( + (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH)); + // Update the number of reserved threads and the number of background + // scheduled compactions for this compaction job + extra_num_subcompaction_threads_reserved_ -= + extra_num_subcompaction_threads_released; + // TODO (zichen): design a test case with new subcompaction partitioning + // when the number of actual partitions is less than the number of planned + // partitions + assert(extra_num_subcompaction_threads_released == (int)num_extra_resources); + // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_ + // depending on if this compaction has the bottommost priority + if (thread_pri_ == Env::Priority::BOTTOM) { + *bg_bottom_compaction_scheduled_ -= + extra_num_subcompaction_threads_released; + } else { + *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released; + } + db_mutex_->Unlock(); + TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0"); +} + +void CompactionJob::ReleaseSubcompactionResources() { + if (extra_num_subcompaction_threads_reserved_ == 0) { + return; + } + // The number of reserved threads becomes larger than 0 only if the + // compaction prioity is round robin and there is no sufficient + // sub-compactions available + + // The scheduled compaction must be no less than 1 + extra number + // subcompactions using acquired resources since this compaction job has not + // finished yet + assert(*bg_bottom_compaction_scheduled_ >= + 1 + extra_num_subcompaction_threads_reserved_ || + *bg_compaction_scheduled_ >= + 1 + extra_num_subcompaction_threads_reserved_); + ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_); +} + struct RangeWithSize { Range range; uint64_t size; @@ -327,7 +425,9 @@ void CompactionJob::GenSubcompactionBoundaries() { // cause relatively small inaccuracy. auto* c = compact_->compaction; - if (c->max_subcompactions() <= 1) { + if (c->max_subcompactions() <= 1 && + !(c->immutable_options()->compaction_pri == kRoundRobin && + c->immutable_options()->compaction_style == kCompactionStyleLevel)) { return; } auto* cfd = c->column_family_data(); @@ -342,6 +442,7 @@ void CompactionJob::GenSubcompactionBoundaries() { std::vector all_anchors; int start_lvl = c->start_level(); int out_lvl = c->output_level(); + for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { int lvl = c->level(lvl_idx); if (lvl >= start_lvl && lvl <= out_lvl) { @@ -381,9 +482,44 @@ void CompactionJob::GenSubcompactionBoundaries() { return cfd_comparator->Compare(a.user_key, b.user_key) < 0; }); + // Get the number of planned subcompactions, may update reserve threads + // and update extra_num_subcompaction_threads_reserved_ for round-robin + uint64_t num_planned_subcompactions; + if (c->immutable_options()->compaction_pri == kRoundRobin && + c->immutable_options()->compaction_style == kCompactionStyleLevel) { + // For round-robin compaction prioity, we need to employ more + // subcompactions (may exceed the max_subcompaction limit). The extra + // subcompactions will be executed using reserved threads and taken into + // account bg_compaction_scheduled or bg_bottom_compaction_scheduled. + + // Initialized by the number of input files + num_planned_subcompactions = static_cast(c->num_input_files(0)); + uint64_t max_subcompactions_limit = GetSubcompactionsLimit(); + if (max_subcompactions_limit < num_planned_subcompactions) { + // Assert two pointers are not empty so that we can use extra + // subcompactions against db compaction limits + assert(bg_bottom_compaction_scheduled_ != nullptr); + assert(bg_compaction_scheduled_ != nullptr); + // Reserve resources when max_subcompaction is not sufficient + AcquireSubcompactionResources( + (int)(num_planned_subcompactions - max_subcompactions_limit)); + // Subcompactions limit changes after acquiring additional resources. + // Need to call GetSubcompactionsLimit() again to update the number + // of planned subcompactions + num_planned_subcompactions = + std::min(num_planned_subcompactions, GetSubcompactionsLimit()); + } + } else { + num_planned_subcompactions = GetSubcompactionsLimit(); + } + + TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0", + &num_planned_subcompactions); + if (num_planned_subcompactions == 1) return; + // Group the ranges into subcompactions uint64_t target_range_size = std::max( - total_size / static_cast(c->max_subcompactions()), + total_size / num_planned_subcompactions, MaxFileSizeForLevel( *(c->mutable_cf_options()), out_lvl, c->immutable_options()->compaction_style, base_level, @@ -395,16 +531,24 @@ void CompactionJob::GenSubcompactionBoundaries() { uint64_t next_threshold = target_range_size; uint64_t cumulative_size = 0; + uint64_t num_actual_subcompactions = 1U; for (TableReader::Anchor& anchor : all_anchors) { cumulative_size += anchor.range_size; if (cumulative_size > next_threshold) { next_threshold += target_range_size; + num_actual_subcompactions++; boundaries_.push_back(anchor.user_key); } - if (boundaries_.size() + 1 >= uint64_t{c->max_subcompactions()}) { + if (num_actual_subcompactions == num_planned_subcompactions) { break; } } + TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1", + &num_actual_subcompactions); + // Shrink extra subcompactions resources when extra resrouces are acquired + ShrinkSubcompactionResources( + std::min((int)(num_planned_subcompactions - num_actual_subcompactions), + extra_num_subcompaction_threads_reserved_)); } Status CompactionJob::Run() { @@ -567,6 +711,7 @@ Status CompactionJob::Run() { for (auto& thread : thread_pool) { thread.join(); } + for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { status = state.status; @@ -575,6 +720,10 @@ Status CompactionJob::Run() { } } + ReleaseSubcompactionResources(); + TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0"); + TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1"); + TablePropertiesCollection tp; for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.GetOutputs()) { @@ -1484,7 +1633,8 @@ Status CompactionJob::InstallCompactionResults( if (start_level > 0) { auto vstorage = compaction->input_version()->storage_info(); edit->AddCompactCursor(start_level, - vstorage->GetNextCompactCursor(start_level)); + vstorage->GetNextCompactCursor( + start_level, compaction->num_input_files(0))); } } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index cbe913540..d281b4c79 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -164,7 +164,9 @@ class CompactionJob { const std::atomic& manual_compaction_canceled, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", std::string trim_ts = "", - BlobFileCompletionCallback* blob_callback = nullptr); + BlobFileCompletionCallback* blob_callback = nullptr, + int* bg_compaction_scheduled = nullptr, + int* bg_bottom_compaction_scheduled = nullptr); virtual ~CompactionJob(); @@ -225,6 +227,26 @@ class CompactionJob { // consecutive groups such that each group has a similar size. void GenSubcompactionBoundaries(); + // Get the number of planned subcompactions based on max_subcompactions and + // extra reserved resources + uint64_t GetSubcompactionsLimit(); + + // Additional reserved threads are reserved and the number is stored in + // extra_num_subcompaction_threads_reserved__. For now, this happens only if + // the compaction priority is round-robin and max_subcompactions is not + // sufficient (extra resources may be needed) + void AcquireSubcompactionResources(int num_extra_required_subcompactions); + + // Additional threads may be reserved during IncreaseSubcompactionResources() + // if num_actual_subcompactions is less than num_planned_subcompactions. + // Additional threads will be released and the bg_compaction_scheduled_ or + // bg_bottom_compaction_scheduled_ will be updated if they are used. + // DB Mutex lock is required. + void ShrinkSubcompactionResources(uint64_t num_extra_resources); + + // Release all reserved threads and update the compaction limits. + void ReleaseSubcompactionResources(); + CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService( SubcompactionState* sub_compact); @@ -299,6 +321,15 @@ class CompactionJob { BlobFileCompletionCallback* blob_callback_; uint64_t GetCompactionId(SubcompactionState* sub_compact) const; + // Stores the number of reserved threads in shared env_ for the number of + // extra subcompaction in kRoundRobin compaction priority + int extra_num_subcompaction_threads_reserved_; + + // Stores the pointer to bg_compaction_scheduled_, + // bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing + // or updating it. + int* bg_compaction_scheduled_; + int* bg_bottom_compaction_scheduled_; // Stores the sequence number to time mapping gathered from all input files // it also collects the smallest_seqno -> oldest_ancester_time from the SST. diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index cc13d044a..90443cee7 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -462,7 +462,7 @@ bool CompactionPicker::SetupOtherInputs( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, CompactionInputFiles* inputs, CompactionInputFiles* output_level_inputs, int* parent_index, - int base_index) { + int base_index, bool only_expand_towards_right) { assert(!inputs->empty()); assert(output_level_inputs->empty()); const int input_level = inputs->level; @@ -515,8 +515,16 @@ bool CompactionPicker::SetupOtherInputs( InternalKey all_start, all_limit; GetRange(*inputs, *output_level_inputs, &all_start, &all_limit); bool try_overlapping_inputs = true; - vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, - &expanded_inputs.files, base_index, nullptr); + if (only_expand_towards_right) { + // Round-robin compaction only allows expansion towards the larger side. + vstorage->GetOverlappingInputs(input_level, &smallest, &all_limit, + &expanded_inputs.files, base_index, + nullptr); + } else { + vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, + &expanded_inputs.files, base_index, + nullptr); + } uint64_t expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files); if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) { diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 389ba8174..e879db5c2 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -189,7 +189,8 @@ class CompactionPicker { VersionStorageInfo* vstorage, CompactionInputFiles* inputs, CompactionInputFiles* output_level_inputs, - int* parent_index, int base_index); + int* parent_index, int base_index, + bool only_expand_towards_right = false); void GetGrandparents(VersionStorageInfo* vstorage, const CompactionInputFiles& inputs, diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 3c82fa5f8..827ba80fa 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -76,6 +76,9 @@ class LevelCompactionBuilder { // files if needed. bool SetupOtherL0FilesIfNeeded(); + // Compaction with round-robin compaction priority allows more files to be + // picked to form a large compaction + void SetupOtherFilesWithRoundRobinExpansion(); // Based on initial files, setup other files need to be compacted // in this compaction, accordingly. bool SetupOtherInputsIfNeeded(); @@ -84,7 +87,9 @@ class LevelCompactionBuilder { // For the specfied level, pick a file that we want to compact. // Returns false if there is no file to compact. - // If it returns true, inputs->files.size() will be exactly one. + // If it returns true, inputs->files.size() will be exactly one for + // all compaction priorities except round-robin. For round-robin, + // multiple consecutive files may be put into inputs->files. // If level is 0 and there is already a compaction on that level, this // function will return false. bool PickFileToCompact(); @@ -278,16 +283,141 @@ bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() { return true; } +void LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion() { + // We only expand when the start level is not L0 under round robin + assert(start_level_ >= 1); + + // For round-robin compaction priority, we have 3 constraints when picking + // multiple files. + // Constraint 1: We can only pick consecutive files + // -> Constraint 1a: When a file is being compacted (or some input files + // are being compacted after expanding, we cannot + // choose it and have to stop choosing more files + // -> Constraint 1b: When we reach the last file (with largest keys), we + // cannot choose more files (the next file will be the + // first one) + // Constraint 2: We should ensure the total compaction bytes (including the + // overlapped files from the next level) is no more than + // mutable_cf_options_.max_compaction_bytes + // Constraint 3: We try our best to pick as many files as possible so that + // the post-compaction level size is less than + // MaxBytesForLevel(start_level_) + // Constraint 4: We do not expand if it is possible to apply a trivial move + // Constraint 5 (TODO): Try to pick minimal files to split into the target + // number of subcompactions + TEST_SYNC_POINT("LevelCompactionPicker::RoundRobin"); + + // Only expand the inputs when we have selected a file in start_level_inputs_ + if (start_level_inputs_.size() == 0) return; + + uint64_t start_lvl_bytes_no_compacting = 0; + uint64_t curr_bytes_to_compact = 0; + uint64_t start_lvl_max_bytes_to_compact = 0; + const std::vector& level_files = + vstorage_->LevelFiles(start_level_); + // Constraint 3 (pre-calculate the ideal max bytes to compact) + for (auto f : level_files) { + if (!f->being_compacted) { + start_lvl_bytes_no_compacting += f->compensated_file_size; + } + } + if (start_lvl_bytes_no_compacting > + vstorage_->MaxBytesForLevel(start_level_)) { + start_lvl_max_bytes_to_compact = start_lvl_bytes_no_compacting - + vstorage_->MaxBytesForLevel(start_level_); + } + + size_t start_index = vstorage_->FilesByCompactionPri(start_level_)[0]; + InternalKey smallest, largest; + // Constraint 4 (No need to check again later) + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); + CompactionInputFiles output_level_inputs; + output_level_inputs.level = output_level_; + vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest, + &output_level_inputs.files); + if (output_level_inputs.empty()) { + if (TryExtendNonL0TrivialMove((int)start_index)) { + return; + } + } + // Constraint 3 + if (start_level_inputs_[0]->compensated_file_size >= + start_lvl_max_bytes_to_compact) { + return; + } + CompactionInputFiles tmp_start_level_inputs; + tmp_start_level_inputs = start_level_inputs_; + // TODO (zichen): Future parallel round-robin may also need to update this + // Constraint 1b (only expand till the end) + for (size_t i = start_index + 1; i < level_files.size(); i++) { + auto* f = level_files[i]; + if (f->being_compacted) { + // Constraint 1a + return; + } + + tmp_start_level_inputs.files.push_back(f); + if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &tmp_start_level_inputs) || + compaction_picker_->FilesRangeOverlapWithCompaction( + {tmp_start_level_inputs}, output_level_)) { + // Constraint 1a + tmp_start_level_inputs.clear(); + return; + } + + curr_bytes_to_compact = 0; + for (auto start_lvl_f : tmp_start_level_inputs.files) { + curr_bytes_to_compact += start_lvl_f->compensated_file_size; + } + + // Check whether any output level files are locked + compaction_picker_->GetRange(tmp_start_level_inputs, &smallest, &largest); + vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest, + &output_level_inputs.files); + if (!output_level_inputs.empty() && + !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &output_level_inputs)) { + // Constraint 1a + tmp_start_level_inputs.clear(); + return; + } + + uint64_t start_lvl_curr_bytes_to_compact = curr_bytes_to_compact; + for (auto output_lvl_f : output_level_inputs.files) { + curr_bytes_to_compact += output_lvl_f->compensated_file_size; + } + if (curr_bytes_to_compact > mutable_cf_options_.max_compaction_bytes) { + // Constraint 2 + tmp_start_level_inputs.clear(); + return; + } + + start_level_inputs_.files = tmp_start_level_inputs.files; + // Constraint 3 + if (start_lvl_curr_bytes_to_compact > start_lvl_max_bytes_to_compact) { + return; + } + } +} + bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { // Setup input files from output level. For output to L0, we only compact // spans of files that do not interact with any pending compactions, so don't // need to consider other levels. if (output_level_ != 0) { output_level_inputs_.level = output_level_; + bool round_robin_expanding = + ioptions_.compaction_pri == kRoundRobin && + compaction_reason_ == CompactionReason::kLevelMaxLevelSize; + if (round_robin_expanding) { + SetupOtherFilesWithRoundRobinExpansion(); + } if (!is_l0_trivial_move_ && !compaction_picker_->SetupOtherInputs( cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_, - &output_level_inputs_, &parent_index_, base_index_)) { + &output_level_inputs_, &parent_index_, base_index_, + round_robin_expanding)) { return false; } @@ -606,9 +736,6 @@ bool LevelCompactionBuilder::PickFileToCompact() { // user-key overlap. start_level_inputs_.clear(); - // To ensure every file is selcted in a round-robin manner, we cannot - // skip the current file. So we return false and wait for the next time - // we can pick this file to compact if (ioptions_.compaction_pri == kRoundRobin) { return false; } @@ -641,6 +768,7 @@ bool LevelCompactionBuilder::PickFileToCompact() { continue; } } + base_index_ = index; break; } diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 6f214b3a3..a112f19d2 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -1318,7 +1318,7 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { std::vector selected_files = {8U, 6U, 6U}; ioptions_.compaction_pri = kRoundRobin; - mutable_cf_options_.max_bytes_for_level_base = 10000000; + mutable_cf_options_.max_bytes_for_level_base = 12000000; mutable_cf_options_.max_bytes_for_level_multiplier = 10; for (size_t i = 0; i < test_cursors.size(); i++) { // start a brand new version in each test. @@ -1342,6 +1342,9 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), &log_buffer_)); ASSERT_TRUE(compaction.get() != nullptr); + // Since the max bytes for level 2 is 120M, picking one file to compact + // makes the post-compaction level size less than 120M, there is exactly one + // file picked for round-robin compaction ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber()); // release the version storage @@ -1349,6 +1352,132 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { } } +TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin1) { + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_compaction_bytes = 100000000u; + mutable_cf_options_.max_bytes_for_level_base = 120; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor (file picking should start with 7U) + vstorage_->AddCursorForOneLevel(2, InternalKey("199", 100, kTypeValue)); + Add(2, 6U, "150", "199", 500U); + Add(2, 7U, "200", "249", 500U); + Add(2, 8U, "300", "600", 500U); + Add(2, 9U, "700", "800", 500U); + Add(2, 10U, "850", "950", 500U); + + Add(3, 26U, "130", "165", 600U); + Add(3, 27U, "166", "170", 600U); + Add(3, 28U, "270", "340", 600U); + Add(3, 29U, "401", "500", 600U); + Add(3, 30U, "601", "800", 600U); + Add(3, 31U, "830", "890", 600U); + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + + // The maximum compaction bytes is very large in this case so we can igore its + // constraint in this test case. The maximum bytes for level 2 is 1200 + // bytes, and thus at least 3 files should be picked so that the bytes in + // level 2 is less than the maximum + ASSERT_EQ(3U, compaction->num_input_files(0)); + ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(8U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(9U, compaction->input(0, 2)->fd.GetNumber()); + // release the version storage + DeleteVersionStorage(); +} + +TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin2) { + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_compaction_bytes = 2500u; + mutable_cf_options_.max_bytes_for_level_base = 120; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor (file picking should start with 6U) + vstorage_->AddCursorForOneLevel(2, InternalKey("1000", 100, kTypeValue)); + Add(2, 6U, "150", "199", 500U); // Overlap with 26U, 27U + Add(2, 7U, "200", "249", 500U); // Overlap with 27U + Add(2, 8U, "300", "600", 500U); // Overlap with 28U, 29U + Add(2, 9U, "700", "800", 500U); + Add(2, 10U, "850", "950", 500U); + + Add(3, 26U, "130", "165", 600U); + Add(3, 27U, "166", "230", 600U); + Add(3, 28U, "270", "340", 600U); + Add(3, 29U, "401", "500", 600U); + Add(3, 30U, "601", "800", 600U); + Add(3, 31U, "830", "890", 600U); + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + + // The maximum compaction bytes is only 2500 bytes now. Even though we are + // required to choose 3 files so that the post-compaction level size is less + // than 1200 bytes. We cannot pick 3 files to compact since the maximum + // compaction size is 2500. After picking files 6U and 7U, the number of + // compaction bytes has reached 2200, and thus no more space to add another + // input file with 50M bytes. + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(7U, compaction->input(0, 1)->fd.GetNumber()); + // release the version storage + DeleteVersionStorage(); +} + +TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin3) { + ioptions_.compaction_pri = kRoundRobin; + mutable_cf_options_.max_compaction_bytes = 1000000u; + mutable_cf_options_.max_bytes_for_level_base = 120; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + // start a brand new version in each test. + NewVersionStorage(6, kCompactionStyleLevel); + vstorage_->ResizeCompactCursors(6); + // Set the cursor (file picking should start with 9U) + vstorage_->AddCursorForOneLevel(2, InternalKey("700", 100, kTypeValue)); + Add(2, 6U, "150", "199", 500U); + Add(2, 7U, "200", "249", 500U); + Add(2, 8U, "300", "600", 500U); + Add(2, 9U, "700", "800", 500U); + Add(2, 10U, "850", "950", 500U); + + Add(3, 26U, "130", "165", 600U); + Add(3, 27U, "166", "170", 600U); + Add(3, 28U, "270", "340", 600U); + Add(3, 29U, "401", "500", 600U); + Add(3, 30U, "601", "800", 600U); + Add(3, 31U, "830", "890", 600U); + UpdateVersionStorageInfo(); + LevelCompactionPicker local_level_compaction_picker = + LevelCompactionPicker(ioptions_, &icmp_); + std::unique_ptr compaction( + local_level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + + // Cannot pick more files since we reach the last file in level 2 + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(9U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(10U, compaction->input(0, 1)->fd.GetNumber()); + // release the version storage + DeleteVersionStorage(); +} + TEST_F(CompactionPickerTest, CompactionPriMinOverlappingManyFiles) { NewVersionStorage(6, kCompactionStyleLevel); ioptions_.compaction_pri = kMinOverlappingRatio; diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 4570662d8..e25b47e6b 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -134,12 +134,12 @@ class SubcompactionState { // Invalid output_split_key indicates that we do not need to split if (output_split_key != nullptr) { // We may only split the output when the cursor is in the range. Split - if ((!end.has_value() || icmp->user_comparator()->Compare( - ExtractUserKey(output_split_key->Encode()), - ExtractUserKey(end.value())) < 0) && + if ((!end.has_value() || + icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key->Encode()), end.value()) < 0) && (!start.has_value() || icmp->user_comparator()->Compare( ExtractUserKey(output_split_key->Encode()), - ExtractUserKey(start.value())) > 0)) { + start.value()) > 0)) { local_output_split_key_ = output_split_key; } } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 0820fe288..3186baf30 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -161,8 +161,32 @@ class ChangeLevelConflictsWithAuto ChangeLevelConflictsWithAuto() : DBCompactionTest() {} }; -namespace { +// Param = true: grab the compaction pressure token (enable +// parallel compactions) +// Param = false: Not grab the token (no parallel compactions) +class RoundRobinSubcompactionsAgainstPressureToken + : public DBCompactionTest, + public ::testing::WithParamInterface { + public: + RoundRobinSubcompactionsAgainstPressureToken() { + grab_pressure_token_ = GetParam(); + } + bool grab_pressure_token_; +}; + +class RoundRobinSubcompactionsAgainstResources + : public DBCompactionTest, + public ::testing::WithParamInterface> { + public: + RoundRobinSubcompactionsAgainstResources() { + total_low_pri_threads_ = std::get<0>(GetParam()); + max_compaction_limits_ = std::get<1>(GetParam()); + } + int total_low_pri_threads_; + int max_compaction_limits_; +}; +namespace { class FlushedFileCollector : public EventListener { public: FlushedFileCollector() {} @@ -5306,6 +5330,187 @@ TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) { } } +TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) { + const int kKeysPerBuffer = 100; + Options options = CurrentOptions(); + options.num_levels = 4; + options.max_bytes_for_level_multiplier = 2; + options.level0_file_num_compaction_trigger = 4; + options.target_file_size_base = kKeysPerBuffer * 1024; + options.compaction_pri = CompactionPri::kRoundRobin; + options.max_bytes_for_level_base = 8 * kKeysPerBuffer * 1024; + options.disable_auto_compactions = true; + // Setup 7 threads but limited subcompactions so that + // RoundRobin requires extra compactions from reserved threads + options.max_subcompactions = 1; + options.max_background_compactions = 7; + options.max_compaction_bytes = 100000000; + DestroyAndReopen(options); + env_->SetBackgroundThreads(7, Env::LOW); + + Random rnd(301); + const std::vector files_per_level = {0, 15, 25}; + for (int lvl = 2; lvl > 0; lvl--) { + for (int i = 0; i < files_per_level[lvl]; i++) { + for (int j = 0; j < kKeysPerBuffer; j++) { + // Add (lvl-1) to ensure nearly equivallent number of files + // in L2 are overlapped with fils selected to compact from + // L1 + ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)), + rnd.RandomString(1010))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(lvl); + ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0)); + } + // 15 files in L1; 25 files in L2 + + // This is a variable for making sure the following callback is called + // and the assertions in it are indeed excuted. + bool num_planned_subcompactions_verified = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) { + uint64_t num_planned_subcompactions = *(static_cast(arg)); + if (grab_pressure_token_) { + // 7 files are selected for round-robin under auto + // compaction. The number of planned subcompaction is restricted by + // the limited number of max_background_compactions + ASSERT_EQ(num_planned_subcompactions, 7); + } else { + ASSERT_EQ(num_planned_subcompactions, 1); + } + num_planned_subcompactions_verified = true; + }); + + // The following 3 dependencies have to be added to ensure the auto + // compaction and the pressure token is correctly enabled. Same for + // RoundRobinSubcompactionsUsingResources and + // DBCompactionTest.RoundRobinSubcompactionsShrinkResources + SyncPoint::GetInstance()->LoadDependency( + {{"RoundRobinSubcompactionsAgainstPressureToken:0", + "BackgroundCallCompaction:0"}, + {"CompactionJob::AcquireSubcompactionResources:0", + "RoundRobinSubcompactionsAgainstPressureToken:1"}, + {"RoundRobinSubcompactionsAgainstPressureToken:2", + "CompactionJob::AcquireSubcompactionResources:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()})); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:0"); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:1"); + std::unique_ptr pressure_token; + if (grab_pressure_token_) { + pressure_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); + } + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2"); + + ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_TRUE(num_planned_subcompactions_verified); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstPressureToken, + RoundRobinSubcompactionsAgainstPressureToken, + testing::Bool()); + +TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) { + const int kKeysPerBuffer = 200; + Options options = CurrentOptions(); + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 3; + options.target_file_size_base = kKeysPerBuffer * 1024; + options.compaction_pri = CompactionPri::kRoundRobin; + options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024; + options.disable_auto_compactions = true; + options.max_subcompactions = 1; + options.max_background_compactions = max_compaction_limits_; + // Set a large number for max_compaction_bytes so that one round-robin + // compaction is enough to make post-compaction L1 size less than + // the maximum size (this test assumes only one round-robin compaction + // is triggered by kLevelMaxLevelSize) + options.max_compaction_bytes = 100000000; + + DestroyAndReopen(options); + env_->SetBackgroundThreads(total_low_pri_threads_, Env::LOW); + + Random rnd(301); + const std::vector files_per_level = {0, 40, 100}; + for (int lvl = 2; lvl > 0; lvl--) { + for (int i = 0; i < files_per_level[lvl]; i++) { + for (int j = 0; j < kKeysPerBuffer; j++) { + // Add (lvl-1) to ensure nearly equivallent number of files + // in L2 are overlapped with fils selected to compact from + // L1 + ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)), + rnd.RandomString(1010))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(lvl); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0)); + } + + // 40 files in L1; 100 files in L2 + // This is a variable for making sure the following callback is called + // and the assertions in it are indeed excuted. + bool num_planned_subcompactions_verified = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) { + uint64_t num_planned_subcompactions = *(static_cast(arg)); + // More than 10 files are selected for round-robin under auto + // compaction. The number of planned subcompaction is restricted by + // the minimum number between available threads and compaction limits + ASSERT_EQ(num_planned_subcompactions - options.max_subcompactions, + std::min(total_low_pri_threads_, max_compaction_limits_) - 1); + num_planned_subcompactions_verified = true; + }); + SyncPoint::GetInstance()->LoadDependency( + {{"RoundRobinSubcompactionsAgainstResources:0", + "BackgroundCallCompaction:0"}, + {"CompactionJob::AcquireSubcompactionResources:0", + "RoundRobinSubcompactionsAgainstResources:1"}, + {"RoundRobinSubcompactionsAgainstResources:2", + "CompactionJob::AcquireSubcompactionResources:1"}, + {"CompactionJob::ReleaseSubcompactionResources:0", + "RoundRobinSubcompactionsAgainstResources:3"}, + {"RoundRobinSubcompactionsAgainstResources:4", + "CompactionJob::ReleaseSubcompactionResources:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()})); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0"); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1"); + auto pressure_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); + + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:2"); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:3"); + // We can reserve more threads now except one is being used + ASSERT_EQ(total_low_pri_threads_ - 1, + env_->ReserveThreads(total_low_pri_threads_, Env::Priority::LOW)); + ASSERT_EQ( + total_low_pri_threads_ - 1, + env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW)); + TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4"); + ASSERT_OK(dbfull()->WaitForCompact()); + ASSERT_TRUE(num_planned_subcompactions_verified); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstResources, + RoundRobinSubcompactionsAgainstResources, + ::testing::Values(std::make_tuple(1, 5), + std::make_tuple(5, 1), + std::make_tuple(10, 5), + std::make_tuple(5, 10), + std::make_tuple(10, 10))); + TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) { Options options = CurrentOptions(); options.num_levels = 3; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 351431e52..23abb5a34 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1408,7 +1408,8 @@ Status DBImpl::CompactFilesImpl( &compaction_job_stats, Env::Priority::USER, io_tracer_, kManualCompactionCanceledFalse_, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), - &blob_callback_); + &blob_callback_, &bg_compaction_scheduled_, + &bg_bottom_compaction_scheduled_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3330,7 +3331,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (start_level > 0) { auto vstorage = c->input_version()->storage_info(); c->edit()->AddCompactCursor( - start_level, vstorage->GetNextCompactCursor(start_level)); + start_level, + vstorage->GetNextCompactCursor(start_level, c->num_input_files(0))); } } status = versions_->LogAndApply(c->column_family_data(), @@ -3415,7 +3417,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, is_manual ? manual_compaction->canceled : kManualCompactionCanceledFalse_, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), - c->trim_ts(), &blob_callback_); + c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, + &bg_bottom_compaction_scheduled_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/version_set.cc b/db/version_set.cc index be2f156d5..762af16f1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3209,7 +3209,7 @@ void SortFileByRoundRobin(const InternalKeyComparator& icmp, } bool should_move_files = - compact_cursor->at(level).Valid() && temp->size() > 1; + compact_cursor->at(level).size() > 0 && temp->size() > 1; // The iterator points to the Fsize with smallest key larger than or equal to // the given cursor @@ -3225,7 +3225,8 @@ void SortFileByRoundRobin(const InternalKeyComparator& icmp, return icmp.Compare(cursor, f.file->smallest) > 0; }); - should_move_files = current_file_iter != temp->end(); + should_move_files = + current_file_iter != temp->end() && current_file_iter != temp->begin(); } if (should_move_files) { // Construct a local temporary vector diff --git a/db/version_set.h b/db/version_set.h index 93c670706..cb8e6ed07 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -146,10 +146,12 @@ class VersionStorageInfo { } // REQUIRES: lock is held - // Update the compact cursor and advance the file index so that it can point - // to the next cursor - const InternalKey& GetNextCompactCursor(int level) { - int cmp_idx = next_file_to_compact_by_size_[level] + 1; + // Update the compact cursor and advance the file index using increment + // so that it can point to the next cursor (increment means the number of + // input files in this level of the last compaction) + const InternalKey& GetNextCompactCursor(int level, size_t increment) { + int cmp_idx = next_file_to_compact_by_size_[level] + (int)increment; + assert(cmp_idx <= (int)files_by_compaction_pri_[level].size()); // TODO(zichen): may need to update next_file_to_compact_by_size_ // for parallel compaction. InternalKey new_cursor;