From 849cf1bf68673ae114638276194e12cca583c6c6 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Wed, 14 Sep 2022 22:09:12 -0700 Subject: [PATCH] Refactor Compaction file cut `ShouldStopBefore()` (#10629) Summary: Consolidate compaction output cut logic to `ShouldStopBefore()` and move it inside of CompactionOutputs class. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10629 Reviewed By: cbi42 Differential Revision: D39315536 Pulled By: jay-zhuang fbshipit-source-id: 7d81037babbd35c276bbaad02dbc2bb555fdac18 --- db/compaction/compaction_job.cc | 17 --- db/compaction/compaction_outputs.cc | 191 +++++++++++++++++++++++---- db/compaction/compaction_outputs.h | 69 +++++++--- db/compaction/subcompaction_state.cc | 117 ---------------- db/compaction/subcompaction_state.h | 55 +------- 5 files changed, 221 insertions(+), 228 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 15acde694..4e62c2f04 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1221,13 +1221,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // it only output to single level sub_compact->AssignRangeDelAggregator(std::move(range_del_agg)); - if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { - sub_compact->FillFilesToCutForTtl(); - // ShouldStopBefore() maintains state based on keys processed so far. The - // compaction loop always calls it on the "next" key, thus won't tell it the - // first key. So we do that here. - sub_compact->ShouldStopBefore(c_iter->key()); - } const auto& c_iter_stats = c_iter->iter_stats(); // define the open and close functions for the compaction files, which will be @@ -1276,16 +1269,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (c_iter->status().IsManualCompactionPaused()) { break; } - - // TODO: Support earlier file cut for the penultimate level files. Maybe by - // moving `ShouldStopBefore()` to `CompactionOutputs` class. Currently - // the penultimate level output is only cut when it reaches the size limit. - if (!sub_compact->Current().IsPendingClose() && - sub_compact->compaction->output_level() != 0 && - !sub_compact->compaction->SupportsPerKeyPlacement() && - sub_compact->ShouldStopBefore(c_iter->key())) { - sub_compact->Current().SetPendingClose(); - } } sub_compact->compaction_job_stats.num_blobs_read = diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index 849a583fb..677b47530 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -76,6 +76,107 @@ IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status, return io_s; } +bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { + assert(c_iter.Valid()); + + // If there's user defined partitioner, check that first + if (HasBuilder() && partitioner_ && + partitioner_->ShouldPartition( + PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(), + current_output_file_size_)) == kRequired) { + return true; + } + + // files output to Level 0 won't be split + if (compaction_->output_level() == 0) { + return false; + } + + // reach the target file size + if (current_output_file_size_ >= compaction_->max_output_file_size()) { + return true; + } + + const Slice& internal_key = c_iter.key(); + const InternalKeyComparator* icmp = + &compaction_->column_family_data()->internal_comparator(); + + // Check if it needs to split for RoundRobin + // Invalid local_output_split_key indicates that we do not need to split + if (local_output_split_key_ != nullptr && !is_split_) { + // Split occurs when the next key is larger than/equal to the cursor + if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) { + is_split_ = true; + return true; + } + } + + // Update grandparent information + const std::vector& grandparents = compaction_->grandparents(); + bool grandparant_file_switched = false; + // Scan to find the earliest grandparent file that contains key. + while (grandparent_index_ < grandparents.size() && + icmp->Compare(internal_key, + grandparents[grandparent_index_]->largest.Encode()) > + 0) { + if (seen_key_) { + overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize(); + grandparant_file_switched = true; + } + assert(grandparent_index_ + 1 >= grandparents.size() || + icmp->Compare( + grandparents[grandparent_index_]->largest.Encode(), + grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0); + grandparent_index_++; + } + seen_key_ = true; + + if (grandparant_file_switched && + overlapped_bytes_ + current_output_file_size_ > + compaction_->max_compaction_bytes()) { + // Too much overlap for current output; start new output + overlapped_bytes_ = 0; + return true; + } + + // check ttl file boundaries if there's any + if (!files_to_cut_for_ttl_.empty()) { + if (cur_files_to_cut_for_ttl_ != -1) { + // Previous key is inside the range of a file + if (icmp->Compare(internal_key, + files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_] + ->largest.Encode()) > 0) { + next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1; + cur_files_to_cut_for_ttl_ = -1; + return true; + } + } else { + // Look for the key position + while (next_files_to_cut_for_ttl_ < + static_cast(files_to_cut_for_ttl_.size())) { + if (icmp->Compare(internal_key, + files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] + ->smallest.Encode()) >= 0) { + if (icmp->Compare(internal_key, + files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] + ->largest.Encode()) <= 0) { + // With in the current file + cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_; + return true; + } + // Beyond the current file + next_files_to_cut_for_ttl_++; + } else { + // Still fall into the gap + break; + } + } + } + } + + return false; +} + Status CompactionOutputs::AddToOutput( const CompactionIterator& c_iter, const CompactionFileOpenFunc& open_file_func, @@ -83,27 +184,19 @@ Status CompactionOutputs::AddToOutput( Status s; const Slice& key = c_iter.key(); - if (!pending_close_ && c_iter.Valid() && partitioner_ && HasBuilder() && - partitioner_->ShouldPartition( - PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(), - current_output_file_size_)) == kRequired) { - pending_close_ = true; - } - - if (pending_close_) { + if (ShouldStopBefore(c_iter) && HasBuilder()) { s = close_file_func(*this, c_iter.InputStatus(), key); - pending_close_ = false; - } - if (!s.ok()) { - return s; + if (!s.ok()) { + return s; + } } // Open output file if necessary if (!HasBuilder()) { s = open_file_func(*this); - } - if (!s.ok()) { - return s; + if (!s.ok()) { + return s; + } } Output& curr = current_output(); @@ -130,19 +223,6 @@ Status CompactionOutputs::AddToOutput( s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, ikey.type); - // Close output file if it is big enough. Two possibilities determine it's - // time to close it: (1) the current key should be this file's last key, (2) - // the next key should not be in this file. - // - // TODO(aekmekji): determine if file should be closed earlier than this - // during subcompactions (i.e. if output size, estimated by input size, is - // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB - // and 0.6MB instead of 1MB and 0.2MB) - if (compaction_->output_level() != 0 && - current_output_file_size_ >= compaction_->max_output_file_size()) { - pending_close_ = true; - } - if (partitioner_) { last_key_for_partitioner_.assign(c_iter.user_key().data_, c_iter.user_key().size_); @@ -318,4 +398,59 @@ Status CompactionOutputs::AddRangeDels( } return Status::OK(); } + +void CompactionOutputs::FillFilesToCutForTtl() { + if (compaction_->immutable_options()->compaction_style != + kCompactionStyleLevel || + compaction_->immutable_options()->compaction_pri != + kMinOverlappingRatio || + compaction_->mutable_cf_options()->ttl == 0 || + compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) { + return; + } + + // We define new file with the oldest ancestor time to be younger than 1/4 + // TTL, and an old one to be older than 1/2 TTL time. + int64_t temp_current_time; + auto get_time_status = + compaction_->immutable_options()->clock->GetCurrentTime( + &temp_current_time); + if (!get_time_status.ok()) { + return; + } + + auto current_time = static_cast(temp_current_time); + if (current_time < compaction_->mutable_cf_options()->ttl) { + return; + } + + uint64_t old_age_thres = + current_time - compaction_->mutable_cf_options()->ttl / 2; + const std::vector& olevel = + *(compaction_->inputs(compaction_->num_input_levels() - 1)); + for (FileMetaData* file : olevel) { + // Worth filtering out by start and end? + uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime(); + // We put old files if they are not too small to prevent a flood + // of small files. + if (oldest_ancester_time < old_age_thres && + file->fd.GetFileSize() > + compaction_->mutable_cf_options()->target_file_size_base / 2) { + files_to_cut_for_ttl_.push_back(file); + } + } +} + +CompactionOutputs::CompactionOutputs(const Compaction* compaction, + const bool is_penultimate_level) + : compaction_(compaction), is_penultimate_level_(is_penultimate_level) { + partitioner_ = compaction->output_level() == 0 + ? nullptr + : compaction->CreateSstPartitioner(); + + if (compaction->output_level() != 0) { + FillFilesToCutForTtl(); + } +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 635924989..107044a0d 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -45,12 +45,7 @@ class CompactionOutputs { CompactionOutputs() = delete; explicit CompactionOutputs(const Compaction* compaction, - const bool is_penultimate_level) - : compaction_(compaction), is_penultimate_level_(is_penultimate_level) { - partitioner_ = compaction->output_level() == 0 - ? nullptr - : compaction->CreateSstPartitioner(); - } + const bool is_penultimate_level); // Add generated output to the list void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp, @@ -179,12 +174,6 @@ class CompactionOutputs { SequenceNumber earliest_snapshot, const Slice& next_table_min_key); - // Is the current file is already pending for close - bool IsPendingClose() const { return pending_close_; } - - // Current file should close before adding a new key - void SetPendingClose() { pending_close_ = true; } - // if the outputs have range delete, range delete is also data bool HasRangeDel() const { return range_del_agg_ && !range_del_agg_->IsEmpty(); @@ -193,6 +182,32 @@ class CompactionOutputs { private: friend class SubcompactionState; + void FillFilesToCutForTtl(); + + void SetOutputSlitKey(const std::optional start, + const std::optional end) { + const InternalKeyComparator* icmp = + &compaction_->column_family_data()->internal_comparator(); + + const InternalKey* output_split_key = compaction_->GetOutputSplitKey(); + // Invalid output_split_key indicates that we do not need to split + if (output_split_key != nullptr) { + // We may only split the output when the cursor is in the range. Split + if ((!end.has_value() || + icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key->Encode()), end.value()) < 0) && + (!start.has_value() || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key->Encode()), + start.value()) > 0)) { + local_output_split_key_ = output_split_key; + } + } + } + + // Returns true iff we should stop building the current output + // before processing the current key in compaction iterator. + bool ShouldStopBefore(const CompactionIterator& c_iter); + void Cleanup() { if (builder_ != nullptr) { // May happen if we get a shutdown call in the middle of compaction @@ -205,7 +220,7 @@ class CompactionOutputs { return current_output_file_size_; } - // Add curent key from compaction_iterator to the output file. If needed + // Add current key from compaction_iterator to the output file. If needed // close and open new compaction output with the functions provided. Status AddToOutput(const CompactionIterator& c_iter, const CompactionFileOpenFunc& open_file_func, @@ -255,10 +270,6 @@ class CompactionOutputs { const Compaction* compaction_; - // The current file is pending close, which needs to run `close_file_func()` - // first to add a new key. - bool pending_close_ = false; - // current output builder and writer std::unique_ptr builder_; std::unique_ptr file_writer_; @@ -282,6 +293,30 @@ class CompactionOutputs { // partitioner information std::string last_key_for_partitioner_; std::unique_ptr partitioner_; + + // A flag determines if this subcompaction has been split by the cursor + bool is_split_ = false; + + // We also maintain the output split key for each subcompaction to avoid + // repetitive comparison in ShouldStopBefore() + const InternalKey* local_output_split_key_ = nullptr; + + // Some identified files with old oldest ancester time and the range should be + // isolated out so that the output file(s) in that range can be merged down + // for TTL and clear the timestamps for the range. + std::vector files_to_cut_for_ttl_; + int cur_files_to_cut_for_ttl_ = -1; + int next_files_to_cut_for_ttl_ = 0; + + // An index that used to speed up ShouldStopBefore(). + size_t grandparent_index_ = 0; + + // The number of bytes overlapping between the current output and + // grandparent files used in ShouldStopBefore(). + uint64_t overlapped_bytes_ = 0; + + // A flag determines whether the key has been seen in ShouldStopBefore() + bool seen_key_ = false; }; // helper struct to concatenate the last level and penultimate level outputs diff --git a/db/compaction/subcompaction_state.cc b/db/compaction/subcompaction_state.cc index 89914f479..0c56471e9 100644 --- a/db/compaction/subcompaction_state.cc +++ b/db/compaction/subcompaction_state.cc @@ -23,46 +23,6 @@ void SubcompactionState::AggregateCompactionStats( } } -void SubcompactionState::FillFilesToCutForTtl() { - if (compaction->immutable_options()->compaction_style != - CompactionStyle::kCompactionStyleLevel || - compaction->immutable_options()->compaction_pri != - CompactionPri::kMinOverlappingRatio || - compaction->mutable_cf_options()->ttl == 0 || - compaction->num_input_levels() < 2 || compaction->bottommost_level()) { - return; - } - - // We define new file with the oldest ancestor time to be younger than 1/4 - // TTL, and an old one to be older than 1/2 TTL time. - int64_t temp_current_time; - auto get_time_status = compaction->immutable_options()->clock->GetCurrentTime( - &temp_current_time); - if (!get_time_status.ok()) { - return; - } - auto current_time = static_cast(temp_current_time); - if (current_time < compaction->mutable_cf_options()->ttl) { - return; - } - uint64_t old_age_thres = - current_time - compaction->mutable_cf_options()->ttl / 2; - - const std::vector& olevel = - *(compaction->inputs(compaction->num_input_levels() - 1)); - for (FileMetaData* file : olevel) { - // Worth filtering out by start and end? - uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime(); - // We put old files if they are not too small to prevent a flood - // of small files. - if (oldest_ancester_time < old_age_thres && - file->fd.GetFileSize() > - compaction->mutable_cf_options()->target_file_size_base / 2) { - files_to_cut_for_ttl_.push_back(file); - } - } -} - OutputIterator SubcompactionState::GetOutputs() const { return OutputIterator(penultimate_level_outputs_.outputs_, compaction_outputs_.outputs_); @@ -128,83 +88,6 @@ Slice SubcompactionState::LargestUserKey() const { } } -bool SubcompactionState::ShouldStopBefore(const Slice& internal_key) { - uint64_t curr_file_size = Current().GetCurrentOutputFileSize(); - const InternalKeyComparator* icmp = - &compaction->column_family_data()->internal_comparator(); - - // Invalid local_output_split_key indicates that we do not need to split - if (local_output_split_key_ != nullptr && !is_split_) { - // Split occurs when the next key is larger than/equal to the cursor - if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) { - is_split_ = true; - return true; - } - } - - const std::vector& grandparents = compaction->grandparents(); - bool grandparant_file_switched = false; - // Scan to find the earliest grandparent file that contains key. - while (grandparent_index_ < grandparents.size() && - icmp->Compare(internal_key, - grandparents[grandparent_index_]->largest.Encode()) > - 0) { - if (seen_key_) { - overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize(); - grandparant_file_switched = true; - } - assert(grandparent_index_ + 1 >= grandparents.size() || - icmp->Compare( - grandparents[grandparent_index_]->largest.Encode(), - grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0); - grandparent_index_++; - } - seen_key_ = true; - - if (grandparant_file_switched && - overlapped_bytes_ + curr_file_size > compaction->max_compaction_bytes()) { - // Too much overlap for current output; start new output - overlapped_bytes_ = 0; - return true; - } - - if (!files_to_cut_for_ttl_.empty()) { - if (cur_files_to_cut_for_ttl_ != -1) { - // Previous key is inside the range of a file - if (icmp->Compare(internal_key, - files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_] - ->largest.Encode()) > 0) { - next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1; - cur_files_to_cut_for_ttl_ = -1; - return true; - } - } else { - // Look for the key position - while (next_files_to_cut_for_ttl_ < - static_cast(files_to_cut_for_ttl_.size())) { - if (icmp->Compare(internal_key, - files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] - ->smallest.Encode()) >= 0) { - if (icmp->Compare(internal_key, - files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] - ->largest.Encode()) <= 0) { - // With in the current file - cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_; - return true; - } - // Beyond the current file - next_files_to_cut_for_ttl_++; - } else { - // Still fall into the gap - break; - } - } - } - } - - return false; -} - Status SubcompactionState::AddToOutput( const CompactionIterator& iter, const CompactionFileOpenFunc& open_file_func, diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index e25b47e6b..13e63120f 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -128,21 +128,12 @@ class SubcompactionState { compaction_outputs_(c, /*is_penultimate_level=*/false), penultimate_level_outputs_(c, /*is_penultimate_level=*/true) { assert(compaction != nullptr); - const InternalKeyComparator* icmp = - &compaction->column_family_data()->internal_comparator(); - const InternalKey* output_split_key = compaction->GetOutputSplitKey(); - // Invalid output_split_key indicates that we do not need to split - if (output_split_key != nullptr) { - // We may only split the output when the cursor is in the range. Split - if ((!end.has_value() || - icmp->user_comparator()->Compare( - ExtractUserKey(output_split_key->Encode()), end.value()) < 0) && - (!start.has_value() || icmp->user_comparator()->Compare( - ExtractUserKey(output_split_key->Encode()), - start.value()) > 0)) { - local_output_split_key_ = output_split_key; - } - } + // Set output split key (used for RoundRobin feature) only for normal + // compaction_outputs, output to penultimate_level feature doesn't support + // RoundRobin feature (and may never going to be supported, because for + // RoundRobin, the data time is mostly naturally sorted, no need to have + // per-key placement with output_to_penultimate_level). + compaction_outputs_.SetOutputSlitKey(start, end); } SubcompactionState(SubcompactionState&& state) noexcept @@ -155,12 +146,6 @@ class SubcompactionState { state.notify_on_subcompaction_completion), compaction_job_stats(std::move(state.compaction_job_stats)), sub_job_id(state.sub_job_id), - files_to_cut_for_ttl_(std::move(state.files_to_cut_for_ttl_)), - cur_files_to_cut_for_ttl_(state.cur_files_to_cut_for_ttl_), - next_files_to_cut_for_ttl_(state.next_files_to_cut_for_ttl_), - grandparent_index_(state.grandparent_index_), - overlapped_bytes_(state.overlapped_bytes_), - seen_key_(state.seen_key_), compaction_outputs_(std::move(state.compaction_outputs_)), penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)), is_current_penultimate_level_(state.is_current_penultimate_level_), @@ -175,12 +160,6 @@ class SubcompactionState { penultimate_level_outputs_.HasRangeDel(); } - void FillFilesToCutForTtl(); - - // Returns true iff we should stop building the current output - // before processing "internal_key". - bool ShouldStopBefore(const Slice& internal_key); - bool IsCurrentPenultimateLevel() const { return is_current_penultimate_level_; } @@ -224,28 +203,6 @@ class SubcompactionState { } private: - // Some identified files with old oldest ancester time and the range should be - // isolated out so that the output file(s) in that range can be merged down - // for TTL and clear the timestamps for the range. - std::vector files_to_cut_for_ttl_; - int cur_files_to_cut_for_ttl_ = -1; - int next_files_to_cut_for_ttl_ = 0; - - // An index that used to speed up ShouldStopBefore(). - size_t grandparent_index_ = 0; - // The number of bytes overlapping between the current output and - // grandparent files used in ShouldStopBefore(). - uint64_t overlapped_bytes_ = 0; - // A flag determines whether the key has been seen in ShouldStopBefore() - bool seen_key_ = false; - - // A flag determines if this subcompaction has been split by the cursor - bool is_split_ = false; - - // We also maintain the output split key for each subcompaction to avoid - // repetitive comparison in ShouldStopBefore() - const InternalKey* local_output_split_key_ = nullptr; - // State kept for output being generated CompactionOutputs compaction_outputs_; CompactionOutputs penultimate_level_outputs_;