Refactor Compaction file cut `ShouldStopBefore()` (#10629)

Summary:
Consolidate compaction output cut logic to `ShouldStopBefore()` and move
it inside of CompactionOutputs class.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10629

Reviewed By: cbi42

Differential Revision: D39315536

Pulled By: jay-zhuang

fbshipit-source-id: 7d81037babbd35c276bbaad02dbc2bb555fdac18
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent ce2c11d848
commit 849cf1bf68
  1. 17
      db/compaction/compaction_job.cc
  2. 183
      db/compaction/compaction_outputs.cc
  3. 69
      db/compaction/compaction_outputs.h
  4. 117
      db/compaction/subcompaction_state.cc
  5. 55
      db/compaction/subcompaction_state.h

@ -1221,13 +1221,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// it only output to single level // it only output to single level
sub_compact->AssignRangeDelAggregator(std::move(range_del_agg)); sub_compact->AssignRangeDelAggregator(std::move(range_del_agg));
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
sub_compact->FillFilesToCutForTtl();
// ShouldStopBefore() maintains state based on keys processed so far. The
// compaction loop always calls it on the "next" key, thus won't tell it the
// first key. So we do that here.
sub_compact->ShouldStopBefore(c_iter->key());
}
const auto& c_iter_stats = c_iter->iter_stats(); const auto& c_iter_stats = c_iter->iter_stats();
// define the open and close functions for the compaction files, which will be // define the open and close functions for the compaction files, which will be
@ -1276,16 +1269,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (c_iter->status().IsManualCompactionPaused()) { if (c_iter->status().IsManualCompactionPaused()) {
break; break;
} }
// TODO: Support earlier file cut for the penultimate level files. Maybe by
// moving `ShouldStopBefore()` to `CompactionOutputs` class. Currently
// the penultimate level output is only cut when it reaches the size limit.
if (!sub_compact->Current().IsPendingClose() &&
sub_compact->compaction->output_level() != 0 &&
!sub_compact->compaction->SupportsPerKeyPlacement() &&
sub_compact->ShouldStopBefore(c_iter->key())) {
sub_compact->Current().SetPendingClose();
}
} }
sub_compact->compaction_job_stats.num_blobs_read = sub_compact->compaction_job_stats.num_blobs_read =

@ -76,6 +76,107 @@ IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status,
return io_s; return io_s;
} }
bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
assert(c_iter.Valid());
// If there's user defined partitioner, check that first
if (HasBuilder() && partitioner_ &&
partitioner_->ShouldPartition(
PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(),
current_output_file_size_)) == kRequired) {
return true;
}
// files output to Level 0 won't be split
if (compaction_->output_level() == 0) {
return false;
}
// reach the target file size
if (current_output_file_size_ >= compaction_->max_output_file_size()) {
return true;
}
const Slice& internal_key = c_iter.key();
const InternalKeyComparator* icmp =
&compaction_->column_family_data()->internal_comparator();
// Check if it needs to split for RoundRobin
// Invalid local_output_split_key indicates that we do not need to split
if (local_output_split_key_ != nullptr && !is_split_) {
// Split occurs when the next key is larger than/equal to the cursor
if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) {
is_split_ = true;
return true;
}
}
// Update grandparent information
const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
bool grandparant_file_switched = false;
// Scan to find the earliest grandparent file that contains key.
while (grandparent_index_ < grandparents.size() &&
icmp->Compare(internal_key,
grandparents[grandparent_index_]->largest.Encode()) >
0) {
if (seen_key_) {
overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize();
grandparant_file_switched = true;
}
assert(grandparent_index_ + 1 >= grandparents.size() ||
icmp->Compare(
grandparents[grandparent_index_]->largest.Encode(),
grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0);
grandparent_index_++;
}
seen_key_ = true;
if (grandparant_file_switched &&
overlapped_bytes_ + current_output_file_size_ >
compaction_->max_compaction_bytes()) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
}
// check ttl file boundaries if there's any
if (!files_to_cut_for_ttl_.empty()) {
if (cur_files_to_cut_for_ttl_ != -1) {
// Previous key is inside the range of a file
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_]
->largest.Encode()) > 0) {
next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1;
cur_files_to_cut_for_ttl_ = -1;
return true;
}
} else {
// Look for the key position
while (next_files_to_cut_for_ttl_ <
static_cast<int>(files_to_cut_for_ttl_.size())) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->smallest.Encode()) >= 0) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->largest.Encode()) <= 0) {
// With in the current file
cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_;
return true;
}
// Beyond the current file
next_files_to_cut_for_ttl_++;
} else {
// Still fall into the gap
break;
}
}
}
}
return false;
}
Status CompactionOutputs::AddToOutput( Status CompactionOutputs::AddToOutput(
const CompactionIterator& c_iter, const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func, const CompactionFileOpenFunc& open_file_func,
@ -83,28 +184,20 @@ Status CompactionOutputs::AddToOutput(
Status s; Status s;
const Slice& key = c_iter.key(); const Slice& key = c_iter.key();
if (!pending_close_ && c_iter.Valid() && partitioner_ && HasBuilder() && if (ShouldStopBefore(c_iter) && HasBuilder()) {
partitioner_->ShouldPartition(
PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(),
current_output_file_size_)) == kRequired) {
pending_close_ = true;
}
if (pending_close_) {
s = close_file_func(*this, c_iter.InputStatus(), key); s = close_file_func(*this, c_iter.InputStatus(), key);
pending_close_ = false;
}
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
}
// Open output file if necessary // Open output file if necessary
if (!HasBuilder()) { if (!HasBuilder()) {
s = open_file_func(*this); s = open_file_func(*this);
}
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
}
Output& curr = current_output(); Output& curr = current_output();
assert(builder_ != nullptr); assert(builder_ != nullptr);
@ -130,19 +223,6 @@ Status CompactionOutputs::AddToOutput(
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type); ikey.type);
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
//
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
if (compaction_->output_level() != 0 &&
current_output_file_size_ >= compaction_->max_output_file_size()) {
pending_close_ = true;
}
if (partitioner_) { if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_, last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_); c_iter.user_key().size_);
@ -318,4 +398,59 @@ Status CompactionOutputs::AddRangeDels(
} }
return Status::OK(); return Status::OK();
} }
void CompactionOutputs::FillFilesToCutForTtl() {
if (compaction_->immutable_options()->compaction_style !=
kCompactionStyleLevel ||
compaction_->immutable_options()->compaction_pri !=
kMinOverlappingRatio ||
compaction_->mutable_cf_options()->ttl == 0 ||
compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) {
return;
}
// We define new file with the oldest ancestor time to be younger than 1/4
// TTL, and an old one to be older than 1/2 TTL time.
int64_t temp_current_time;
auto get_time_status =
compaction_->immutable_options()->clock->GetCurrentTime(
&temp_current_time);
if (!get_time_status.ok()) {
return;
}
auto current_time = static_cast<uint64_t>(temp_current_time);
if (current_time < compaction_->mutable_cf_options()->ttl) {
return;
}
uint64_t old_age_thres =
current_time - compaction_->mutable_cf_options()->ttl / 2;
const std::vector<FileMetaData*>& olevel =
*(compaction_->inputs(compaction_->num_input_levels() - 1));
for (FileMetaData* file : olevel) {
// Worth filtering out by start and end?
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
// We put old files if they are not too small to prevent a flood
// of small files.
if (oldest_ancester_time < old_age_thres &&
file->fd.GetFileSize() >
compaction_->mutable_cf_options()->target_file_size_base / 2) {
files_to_cut_for_ttl_.push_back(file);
}
}
}
CompactionOutputs::CompactionOutputs(const Compaction* compaction,
const bool is_penultimate_level)
: compaction_(compaction), is_penultimate_level_(is_penultimate_level) {
partitioner_ = compaction->output_level() == 0
? nullptr
: compaction->CreateSstPartitioner();
if (compaction->output_level() != 0) {
FillFilesToCutForTtl();
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -45,12 +45,7 @@ class CompactionOutputs {
CompactionOutputs() = delete; CompactionOutputs() = delete;
explicit CompactionOutputs(const Compaction* compaction, explicit CompactionOutputs(const Compaction* compaction,
const bool is_penultimate_level) const bool is_penultimate_level);
: compaction_(compaction), is_penultimate_level_(is_penultimate_level) {
partitioner_ = compaction->output_level() == 0
? nullptr
: compaction->CreateSstPartitioner();
}
// Add generated output to the list // Add generated output to the list
void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp, void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
@ -179,12 +174,6 @@ class CompactionOutputs {
SequenceNumber earliest_snapshot, SequenceNumber earliest_snapshot,
const Slice& next_table_min_key); const Slice& next_table_min_key);
// Is the current file is already pending for close
bool IsPendingClose() const { return pending_close_; }
// Current file should close before adding a new key
void SetPendingClose() { pending_close_ = true; }
// if the outputs have range delete, range delete is also data // if the outputs have range delete, range delete is also data
bool HasRangeDel() const { bool HasRangeDel() const {
return range_del_agg_ && !range_del_agg_->IsEmpty(); return range_del_agg_ && !range_del_agg_->IsEmpty();
@ -193,6 +182,32 @@ class CompactionOutputs {
private: private:
friend class SubcompactionState; friend class SubcompactionState;
void FillFilesToCutForTtl();
void SetOutputSlitKey(const std::optional<Slice> start,
const std::optional<Slice> end) {
const InternalKeyComparator* icmp =
&compaction_->column_family_data()->internal_comparator();
const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
// Invalid output_split_key indicates that we do not need to split
if (output_split_key != nullptr) {
// We may only split the output when the cursor is in the range. Split
if ((!end.has_value() ||
icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()), end.value()) < 0) &&
(!start.has_value() || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()),
start.value()) > 0)) {
local_output_split_key_ = output_split_key;
}
}
}
// Returns true iff we should stop building the current output
// before processing the current key in compaction iterator.
bool ShouldStopBefore(const CompactionIterator& c_iter);
void Cleanup() { void Cleanup() {
if (builder_ != nullptr) { if (builder_ != nullptr) {
// May happen if we get a shutdown call in the middle of compaction // May happen if we get a shutdown call in the middle of compaction
@ -205,7 +220,7 @@ class CompactionOutputs {
return current_output_file_size_; return current_output_file_size_;
} }
// Add curent key from compaction_iterator to the output file. If needed // Add current key from compaction_iterator to the output file. If needed
// close and open new compaction output with the functions provided. // close and open new compaction output with the functions provided.
Status AddToOutput(const CompactionIterator& c_iter, Status AddToOutput(const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func, const CompactionFileOpenFunc& open_file_func,
@ -255,10 +270,6 @@ class CompactionOutputs {
const Compaction* compaction_; const Compaction* compaction_;
// The current file is pending close, which needs to run `close_file_func()`
// first to add a new key.
bool pending_close_ = false;
// current output builder and writer // current output builder and writer
std::unique_ptr<TableBuilder> builder_; std::unique_ptr<TableBuilder> builder_;
std::unique_ptr<WritableFileWriter> file_writer_; std::unique_ptr<WritableFileWriter> file_writer_;
@ -282,6 +293,30 @@ class CompactionOutputs {
// partitioner information // partitioner information
std::string last_key_for_partitioner_; std::string last_key_for_partitioner_;
std::unique_ptr<SstPartitioner> partitioner_; std::unique_ptr<SstPartitioner> partitioner_;
// A flag determines if this subcompaction has been split by the cursor
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
// repetitive comparison in ShouldStopBefore()
const InternalKey* local_output_split_key_ = nullptr;
// Some identified files with old oldest ancester time and the range should be
// isolated out so that the output file(s) in that range can be merged down
// for TTL and clear the timestamps for the range.
std::vector<FileMetaData*> files_to_cut_for_ttl_;
int cur_files_to_cut_for_ttl_ = -1;
int next_files_to_cut_for_ttl_ = 0;
// An index that used to speed up ShouldStopBefore().
size_t grandparent_index_ = 0;
// The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes_ = 0;
// A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key_ = false;
}; };
// helper struct to concatenate the last level and penultimate level outputs // helper struct to concatenate the last level and penultimate level outputs

@ -23,46 +23,6 @@ void SubcompactionState::AggregateCompactionStats(
} }
} }
void SubcompactionState::FillFilesToCutForTtl() {
if (compaction->immutable_options()->compaction_style !=
CompactionStyle::kCompactionStyleLevel ||
compaction->immutable_options()->compaction_pri !=
CompactionPri::kMinOverlappingRatio ||
compaction->mutable_cf_options()->ttl == 0 ||
compaction->num_input_levels() < 2 || compaction->bottommost_level()) {
return;
}
// We define new file with the oldest ancestor time to be younger than 1/4
// TTL, and an old one to be older than 1/2 TTL time.
int64_t temp_current_time;
auto get_time_status = compaction->immutable_options()->clock->GetCurrentTime(
&temp_current_time);
if (!get_time_status.ok()) {
return;
}
auto current_time = static_cast<uint64_t>(temp_current_time);
if (current_time < compaction->mutable_cf_options()->ttl) {
return;
}
uint64_t old_age_thres =
current_time - compaction->mutable_cf_options()->ttl / 2;
const std::vector<FileMetaData*>& olevel =
*(compaction->inputs(compaction->num_input_levels() - 1));
for (FileMetaData* file : olevel) {
// Worth filtering out by start and end?
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
// We put old files if they are not too small to prevent a flood
// of small files.
if (oldest_ancester_time < old_age_thres &&
file->fd.GetFileSize() >
compaction->mutable_cf_options()->target_file_size_base / 2) {
files_to_cut_for_ttl_.push_back(file);
}
}
}
OutputIterator SubcompactionState::GetOutputs() const { OutputIterator SubcompactionState::GetOutputs() const {
return OutputIterator(penultimate_level_outputs_.outputs_, return OutputIterator(penultimate_level_outputs_.outputs_,
compaction_outputs_.outputs_); compaction_outputs_.outputs_);
@ -128,83 +88,6 @@ Slice SubcompactionState::LargestUserKey() const {
} }
} }
bool SubcompactionState::ShouldStopBefore(const Slice& internal_key) {
uint64_t curr_file_size = Current().GetCurrentOutputFileSize();
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
// Invalid local_output_split_key indicates that we do not need to split
if (local_output_split_key_ != nullptr && !is_split_) {
// Split occurs when the next key is larger than/equal to the cursor
if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) {
is_split_ = true;
return true;
}
}
const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
bool grandparant_file_switched = false;
// Scan to find the earliest grandparent file that contains key.
while (grandparent_index_ < grandparents.size() &&
icmp->Compare(internal_key,
grandparents[grandparent_index_]->largest.Encode()) >
0) {
if (seen_key_) {
overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize();
grandparant_file_switched = true;
}
assert(grandparent_index_ + 1 >= grandparents.size() ||
icmp->Compare(
grandparents[grandparent_index_]->largest.Encode(),
grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0);
grandparent_index_++;
}
seen_key_ = true;
if (grandparant_file_switched &&
overlapped_bytes_ + curr_file_size > compaction->max_compaction_bytes()) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
}
if (!files_to_cut_for_ttl_.empty()) {
if (cur_files_to_cut_for_ttl_ != -1) {
// Previous key is inside the range of a file
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_]
->largest.Encode()) > 0) {
next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1;
cur_files_to_cut_for_ttl_ = -1;
return true;
}
} else {
// Look for the key position
while (next_files_to_cut_for_ttl_ <
static_cast<int>(files_to_cut_for_ttl_.size())) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->smallest.Encode()) >= 0) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->largest.Encode()) <= 0) {
// With in the current file
cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_;
return true;
}
// Beyond the current file
next_files_to_cut_for_ttl_++;
} else {
// Still fall into the gap
break;
}
}
}
}
return false;
}
Status SubcompactionState::AddToOutput( Status SubcompactionState::AddToOutput(
const CompactionIterator& iter, const CompactionIterator& iter,
const CompactionFileOpenFunc& open_file_func, const CompactionFileOpenFunc& open_file_func,

@ -128,21 +128,12 @@ class SubcompactionState {
compaction_outputs_(c, /*is_penultimate_level=*/false), compaction_outputs_(c, /*is_penultimate_level=*/false),
penultimate_level_outputs_(c, /*is_penultimate_level=*/true) { penultimate_level_outputs_(c, /*is_penultimate_level=*/true) {
assert(compaction != nullptr); assert(compaction != nullptr);
const InternalKeyComparator* icmp = // Set output split key (used for RoundRobin feature) only for normal
&compaction->column_family_data()->internal_comparator(); // compaction_outputs, output to penultimate_level feature doesn't support
const InternalKey* output_split_key = compaction->GetOutputSplitKey(); // RoundRobin feature (and may never going to be supported, because for
// Invalid output_split_key indicates that we do not need to split // RoundRobin, the data time is mostly naturally sorted, no need to have
if (output_split_key != nullptr) { // per-key placement with output_to_penultimate_level).
// We may only split the output when the cursor is in the range. Split compaction_outputs_.SetOutputSlitKey(start, end);
if ((!end.has_value() ||
icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()), end.value()) < 0) &&
(!start.has_value() || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()),
start.value()) > 0)) {
local_output_split_key_ = output_split_key;
}
}
} }
SubcompactionState(SubcompactionState&& state) noexcept SubcompactionState(SubcompactionState&& state) noexcept
@ -155,12 +146,6 @@ class SubcompactionState {
state.notify_on_subcompaction_completion), state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)), compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id), sub_job_id(state.sub_job_id),
files_to_cut_for_ttl_(std::move(state.files_to_cut_for_ttl_)),
cur_files_to_cut_for_ttl_(state.cur_files_to_cut_for_ttl_),
next_files_to_cut_for_ttl_(state.next_files_to_cut_for_ttl_),
grandparent_index_(state.grandparent_index_),
overlapped_bytes_(state.overlapped_bytes_),
seen_key_(state.seen_key_),
compaction_outputs_(std::move(state.compaction_outputs_)), compaction_outputs_(std::move(state.compaction_outputs_)),
penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)), penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
is_current_penultimate_level_(state.is_current_penultimate_level_), is_current_penultimate_level_(state.is_current_penultimate_level_),
@ -175,12 +160,6 @@ class SubcompactionState {
penultimate_level_outputs_.HasRangeDel(); penultimate_level_outputs_.HasRangeDel();
} }
void FillFilesToCutForTtl();
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
bool IsCurrentPenultimateLevel() const { bool IsCurrentPenultimateLevel() const {
return is_current_penultimate_level_; return is_current_penultimate_level_;
} }
@ -224,28 +203,6 @@ class SubcompactionState {
} }
private: private:
// Some identified files with old oldest ancester time and the range should be
// isolated out so that the output file(s) in that range can be merged down
// for TTL and clear the timestamps for the range.
std::vector<FileMetaData*> files_to_cut_for_ttl_;
int cur_files_to_cut_for_ttl_ = -1;
int next_files_to_cut_for_ttl_ = 0;
// An index that used to speed up ShouldStopBefore().
size_t grandparent_index_ = 0;
// The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes_ = 0;
// A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key_ = false;
// A flag determines if this subcompaction has been split by the cursor
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
// repetitive comparison in ShouldStopBefore()
const InternalKey* local_output_split_key_ = nullptr;
// State kept for output being generated // State kept for output being generated
CompactionOutputs compaction_outputs_; CompactionOutputs compaction_outputs_;
CompactionOutputs penultimate_level_outputs_; CompactionOutputs penultimate_level_outputs_;

Loading…
Cancel
Save