Support subcmpct using reserved resources for round-robin priority (#10341)

Summary:
Earlier implementation of round-robin priority can only pick one file at a time and disallows parallel compactions within the same level. In this PR, round-robin compaction policy will expand towards more input files with respecting some additional constraints, which are summarized as follows:
 * Constraint 1: We can only pick consecutive files
   - Constraint 1a: When a file is being compacted (or some input files are being compacted after expanding), we cannot choose it and have to stop choosing more files
   - Constraint 1b: When we reach the last file (with the largest keys), we cannot choose more files (the next file will be the first one with small keys)
 * Constraint 2: We should ensure the total compaction bytes (including the overlapped files from the next level) is no more than `mutable_cf_options_.max_compaction_bytes`
 * Constraint 3: We try our best to pick as many files as possible so that the post-compaction level size can be just less than `MaxBytesForLevel(start_level_)`
 * Constraint 4: If trivial move is allowed, we reuse the logic of `TryNonL0TrivialMove()` instead of expanding files with Constraint 3

More details can be found in `LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion()`.

The above optimization accelerates the process of moving the compaction cursor, in which the write-amp can be further reduced. While a large compaction may lead to high write stall, we break this large compaction into several subcompactions **regardless of** the `max_subcompactions` limit.  The number of subcompactions for round-robin compaction priority is determined through the following steps:
* Step 1: Initialized against `max_output_file_limit`, the number of input files in the start level, and also the range size limit `ranges.size()`
* Step 2: Call `AcquireSubcompactionResources()`when max subcompactions is not sufficient, but we may or may not obtain desired resources, additional number of resources is stored in `extra_num_subcompaction_threads_reserved_`). Subcompaction limit is changed and update `num_planned_subcompactions` with `GetSubcompactionLimit()`
* Step 3: Call `ShrinkSubcompactionResources()` to ensure extra resources can be released (extra resources may exist for round-robin compaction when the number of actual number of subcompactions is less than the number of planned subcompactions)

More details can be found in `CompactionJob::AcquireSubcompactionResources()`,`CompactionJob::ShrinkSubcompactionResources()`, and `CompactionJob::ReleaseSubcompactionResources()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10341

Test Plan: Add `CompactionPriMultipleFilesRoundRobin[1-3]` unit test in `compaction_picker_test.cc` and `RoundRobinSubcompactionsAgainstResources.SubcompactionsUsingResources/[0-4]`, `RoundRobinSubcompactionsAgainstPressureToken.PressureTokenTest/[0-1]` in `db_compaction_test.cc`

Reviewed By: ajkr, hx235

Differential Revision: D37792644

Pulled By: littlepig2013

fbshipit-source-id: 7fecb7c4ffd97b34bbf6e3b760b2c35a772a0657
main
Zichen Zhu 2 years ago committed by Facebook GitHub Bot
parent 252bea405e
commit 8860fc902a
  1. 13
      db/compaction/compaction.cc
  2. 162
      db/compaction/compaction_job.cc
  3. 33
      db/compaction/compaction_job.h
  4. 12
      db/compaction/compaction_picker.cc
  5. 3
      db/compaction/compaction_picker.h
  6. 138
      db/compaction/compaction_picker_level.cc
  7. 131
      db/compaction/compaction_picker_test.cc
  8. 8
      db/compaction/subcompaction_state.h
  9. 207
      db/db_compaction_test.cc
  10. 9
      db/db_impl/db_impl_compaction_flush.cc
  11. 5
      db/version_set.cc
  12. 10
      db/version_set.h

@ -660,7 +660,7 @@ bool Compaction::IsOutputLevelEmpty() const {
} }
bool Compaction::ShouldFormSubcompactions() const { bool Compaction::ShouldFormSubcompactions() const {
if (max_subcompactions_ <= 1 || cfd_ == nullptr) { if (cfd_ == nullptr) {
return false; return false;
} }
@ -671,6 +671,17 @@ bool Compaction::ShouldFormSubcompactions() const {
return false; return false;
} }
// Round-Robin pri under leveled compaction allows subcompactions by default
// and the number of subcompactions can be larger than max_subcompactions_
if (cfd_->ioptions()->compaction_pri == kRoundRobin &&
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
return output_level_ > 0;
}
if (max_subcompactions_ <= 1) {
return false;
}
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0; return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0;
} else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {

@ -123,7 +123,8 @@ CompactionJob::CompactionJob(
const std::atomic<bool>& manual_compaction_canceled, const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id, const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, std::string trim_ts, std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback) BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
int* bg_bottom_compaction_scheduled)
: compact_(new CompactionState(compaction)), : compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1), compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options), db_options_(db_options),
@ -162,9 +163,13 @@ CompactionJob::CompactionJob(
thread_pri_(thread_pri), thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)), full_history_ts_low_(std::move(full_history_ts_low)),
trim_ts_(std::move(trim_ts)), trim_ts_(std::move(trim_ts)),
blob_callback_(blob_callback) { blob_callback_(blob_callback),
extra_num_subcompaction_threads_reserved_(0),
bg_compaction_scheduled_(bg_compaction_scheduled),
bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
assert(compaction_job_stats_ != nullptr); assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr); assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data(); const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
db_options_.enable_thread_tracking); db_options_.enable_thread_tracking);
@ -291,6 +296,99 @@ void CompactionJob::Prepare() {
} }
} }
uint64_t CompactionJob::GetSubcompactionsLimit() {
return extra_num_subcompaction_threads_reserved_ +
std::max(
std::uint64_t(1),
static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
}
void CompactionJob::AcquireSubcompactionResources(
int num_extra_required_subcompactions) {
TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
int max_db_compactions =
DBImpl::GetBGJobLimits(
mutable_db_options_copy_.max_background_flushes,
mutable_db_options_copy_.max_background_compactions,
mutable_db_options_copy_.max_background_jobs,
versions_->GetColumnFamilySet()
->write_controller()
->NeedSpeedupCompaction())
.max_compactions;
// Apply min function first since We need to compute the extra subcompaction
// against compaction limits. And then try to reserve threads for extra
// subcompactions. The actual number of reserved threads could be less than
// the desired number.
int available_bg_compactions_against_db_limit =
std::max(max_db_compactions - *bg_compaction_scheduled_ -
*bg_bottom_compaction_scheduled_,
0);
db_mutex_->Lock();
// Reservation only supports backgrdoun threads of which the priority is
// between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
// origin thread_pri_ is higher than that. Similar to ReleaseThreads().
extra_num_subcompaction_threads_reserved_ =
env_->ReserveThreads(std::min(num_extra_required_subcompactions,
available_bg_compactions_against_db_limit),
std::min(thread_pri_, Env::Priority::HIGH));
// Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
// depending on if this compaction has the bottommost priority
if (thread_pri_ == Env::Priority::BOTTOM) {
*bg_bottom_compaction_scheduled_ +=
extra_num_subcompaction_threads_reserved_;
} else {
*bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
}
db_mutex_->Unlock();
}
void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
// Do nothing when we have zero resources to shrink
if (num_extra_resources == 0) return;
db_mutex_->Lock();
// We cannot release threads more than what we reserved before
int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
(int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
// Update the number of reserved threads and the number of background
// scheduled compactions for this compaction job
extra_num_subcompaction_threads_reserved_ -=
extra_num_subcompaction_threads_released;
// TODO (zichen): design a test case with new subcompaction partitioning
// when the number of actual partitions is less than the number of planned
// partitions
assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
// Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
// depending on if this compaction has the bottommost priority
if (thread_pri_ == Env::Priority::BOTTOM) {
*bg_bottom_compaction_scheduled_ -=
extra_num_subcompaction_threads_released;
} else {
*bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
}
db_mutex_->Unlock();
TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
}
void CompactionJob::ReleaseSubcompactionResources() {
if (extra_num_subcompaction_threads_reserved_ == 0) {
return;
}
// The number of reserved threads becomes larger than 0 only if the
// compaction prioity is round robin and there is no sufficient
// sub-compactions available
// The scheduled compaction must be no less than 1 + extra number
// subcompactions using acquired resources since this compaction job has not
// finished yet
assert(*bg_bottom_compaction_scheduled_ >=
1 + extra_num_subcompaction_threads_reserved_ ||
*bg_compaction_scheduled_ >=
1 + extra_num_subcompaction_threads_reserved_);
ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
}
struct RangeWithSize { struct RangeWithSize {
Range range; Range range;
uint64_t size; uint64_t size;
@ -327,7 +425,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
// cause relatively small inaccuracy. // cause relatively small inaccuracy.
auto* c = compact_->compaction; auto* c = compact_->compaction;
if (c->max_subcompactions() <= 1) { if (c->max_subcompactions() <= 1 &&
!(c->immutable_options()->compaction_pri == kRoundRobin &&
c->immutable_options()->compaction_style == kCompactionStyleLevel)) {
return; return;
} }
auto* cfd = c->column_family_data(); auto* cfd = c->column_family_data();
@ -342,6 +442,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
std::vector<TableReader::Anchor> all_anchors; std::vector<TableReader::Anchor> all_anchors;
int start_lvl = c->start_level(); int start_lvl = c->start_level();
int out_lvl = c->output_level(); int out_lvl = c->output_level();
for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
int lvl = c->level(lvl_idx); int lvl = c->level(lvl_idx);
if (lvl >= start_lvl && lvl <= out_lvl) { if (lvl >= start_lvl && lvl <= out_lvl) {
@ -381,9 +482,44 @@ void CompactionJob::GenSubcompactionBoundaries() {
return cfd_comparator->Compare(a.user_key, b.user_key) < 0; return cfd_comparator->Compare(a.user_key, b.user_key) < 0;
}); });
// Get the number of planned subcompactions, may update reserve threads
// and update extra_num_subcompaction_threads_reserved_ for round-robin
uint64_t num_planned_subcompactions;
if (c->immutable_options()->compaction_pri == kRoundRobin &&
c->immutable_options()->compaction_style == kCompactionStyleLevel) {
// For round-robin compaction prioity, we need to employ more
// subcompactions (may exceed the max_subcompaction limit). The extra
// subcompactions will be executed using reserved threads and taken into
// account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
// Initialized by the number of input files
num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
if (max_subcompactions_limit < num_planned_subcompactions) {
// Assert two pointers are not empty so that we can use extra
// subcompactions against db compaction limits
assert(bg_bottom_compaction_scheduled_ != nullptr);
assert(bg_compaction_scheduled_ != nullptr);
// Reserve resources when max_subcompaction is not sufficient
AcquireSubcompactionResources(
(int)(num_planned_subcompactions - max_subcompactions_limit));
// Subcompactions limit changes after acquiring additional resources.
// Need to call GetSubcompactionsLimit() again to update the number
// of planned subcompactions
num_planned_subcompactions =
std::min(num_planned_subcompactions, GetSubcompactionsLimit());
}
} else {
num_planned_subcompactions = GetSubcompactionsLimit();
}
TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
&num_planned_subcompactions);
if (num_planned_subcompactions == 1) return;
// Group the ranges into subcompactions // Group the ranges into subcompactions
uint64_t target_range_size = std::max( uint64_t target_range_size = std::max(
total_size / static_cast<uint64_t>(c->max_subcompactions()), total_size / num_planned_subcompactions,
MaxFileSizeForLevel( MaxFileSizeForLevel(
*(c->mutable_cf_options()), out_lvl, *(c->mutable_cf_options()), out_lvl,
c->immutable_options()->compaction_style, base_level, c->immutable_options()->compaction_style, base_level,
@ -395,16 +531,24 @@ void CompactionJob::GenSubcompactionBoundaries() {
uint64_t next_threshold = target_range_size; uint64_t next_threshold = target_range_size;
uint64_t cumulative_size = 0; uint64_t cumulative_size = 0;
uint64_t num_actual_subcompactions = 1U;
for (TableReader::Anchor& anchor : all_anchors) { for (TableReader::Anchor& anchor : all_anchors) {
cumulative_size += anchor.range_size; cumulative_size += anchor.range_size;
if (cumulative_size > next_threshold) { if (cumulative_size > next_threshold) {
next_threshold += target_range_size; next_threshold += target_range_size;
num_actual_subcompactions++;
boundaries_.push_back(anchor.user_key); boundaries_.push_back(anchor.user_key);
} }
if (boundaries_.size() + 1 >= uint64_t{c->max_subcompactions()}) { if (num_actual_subcompactions == num_planned_subcompactions) {
break; break;
} }
} }
TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
&num_actual_subcompactions);
// Shrink extra subcompactions resources when extra resrouces are acquired
ShrinkSubcompactionResources(
std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
extra_num_subcompaction_threads_reserved_));
} }
Status CompactionJob::Run() { Status CompactionJob::Run() {
@ -567,6 +711,7 @@ Status CompactionJob::Run() {
for (auto& thread : thread_pool) { for (auto& thread : thread_pool) {
thread.join(); thread.join();
} }
for (const auto& state : compact_->sub_compact_states) { for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) { if (!state.status.ok()) {
status = state.status; status = state.status;
@ -575,6 +720,10 @@ Status CompactionJob::Run() {
} }
} }
ReleaseSubcompactionResources();
TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0");
TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1");
TablePropertiesCollection tp; TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) { for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.GetOutputs()) { for (const auto& output : state.GetOutputs()) {
@ -1484,7 +1633,8 @@ Status CompactionJob::InstallCompactionResults(
if (start_level > 0) { if (start_level > 0) {
auto vstorage = compaction->input_version()->storage_info(); auto vstorage = compaction->input_version()->storage_info();
edit->AddCompactCursor(start_level, edit->AddCompactCursor(start_level,
vstorage->GetNextCompactCursor(start_level)); vstorage->GetNextCompactCursor(
start_level, compaction->num_input_files(0)));
} }
} }

@ -164,7 +164,9 @@ class CompactionJob {
const std::atomic<bool>& manual_compaction_canceled, const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id = "", const std::string& db_session_id = "", const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "", std::string trim_ts = "", std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr); BlobFileCompletionCallback* blob_callback = nullptr,
int* bg_compaction_scheduled = nullptr,
int* bg_bottom_compaction_scheduled = nullptr);
virtual ~CompactionJob(); virtual ~CompactionJob();
@ -225,6 +227,26 @@ class CompactionJob {
// consecutive groups such that each group has a similar size. // consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries(); void GenSubcompactionBoundaries();
// Get the number of planned subcompactions based on max_subcompactions and
// extra reserved resources
uint64_t GetSubcompactionsLimit();
// Additional reserved threads are reserved and the number is stored in
// extra_num_subcompaction_threads_reserved__. For now, this happens only if
// the compaction priority is round-robin and max_subcompactions is not
// sufficient (extra resources may be needed)
void AcquireSubcompactionResources(int num_extra_required_subcompactions);
// Additional threads may be reserved during IncreaseSubcompactionResources()
// if num_actual_subcompactions is less than num_planned_subcompactions.
// Additional threads will be released and the bg_compaction_scheduled_ or
// bg_bottom_compaction_scheduled_ will be updated if they are used.
// DB Mutex lock is required.
void ShrinkSubcompactionResources(uint64_t num_extra_resources);
// Release all reserved threads and update the compaction limits.
void ReleaseSubcompactionResources();
CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService( CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact); SubcompactionState* sub_compact);
@ -299,6 +321,15 @@ class CompactionJob {
BlobFileCompletionCallback* blob_callback_; BlobFileCompletionCallback* blob_callback_;
uint64_t GetCompactionId(SubcompactionState* sub_compact) const; uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
// Stores the number of reserved threads in shared env_ for the number of
// extra subcompaction in kRoundRobin compaction priority
int extra_num_subcompaction_threads_reserved_;
// Stores the pointer to bg_compaction_scheduled_,
// bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing
// or updating it.
int* bg_compaction_scheduled_;
int* bg_bottom_compaction_scheduled_;
// Stores the sequence number to time mapping gathered from all input files // Stores the sequence number to time mapping gathered from all input files
// it also collects the smallest_seqno -> oldest_ancester_time from the SST. // it also collects the smallest_seqno -> oldest_ancester_time from the SST.

@ -462,7 +462,7 @@ bool CompactionPicker::SetupOtherInputs(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, CompactionInputFiles* inputs, VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
CompactionInputFiles* output_level_inputs, int* parent_index, CompactionInputFiles* output_level_inputs, int* parent_index,
int base_index) { int base_index, bool only_expand_towards_right) {
assert(!inputs->empty()); assert(!inputs->empty());
assert(output_level_inputs->empty()); assert(output_level_inputs->empty());
const int input_level = inputs->level; const int input_level = inputs->level;
@ -515,8 +515,16 @@ bool CompactionPicker::SetupOtherInputs(
InternalKey all_start, all_limit; InternalKey all_start, all_limit;
GetRange(*inputs, *output_level_inputs, &all_start, &all_limit); GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
bool try_overlapping_inputs = true; bool try_overlapping_inputs = true;
if (only_expand_towards_right) {
// Round-robin compaction only allows expansion towards the larger side.
vstorage->GetOverlappingInputs(input_level, &smallest, &all_limit,
&expanded_inputs.files, base_index,
nullptr);
} else {
vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
&expanded_inputs.files, base_index, nullptr); &expanded_inputs.files, base_index,
nullptr);
}
uint64_t expanded_inputs_size = uint64_t expanded_inputs_size =
TotalCompensatedFileSize(expanded_inputs.files); TotalCompensatedFileSize(expanded_inputs.files);
if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) { if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {

@ -189,7 +189,8 @@ class CompactionPicker {
VersionStorageInfo* vstorage, VersionStorageInfo* vstorage,
CompactionInputFiles* inputs, CompactionInputFiles* inputs,
CompactionInputFiles* output_level_inputs, CompactionInputFiles* output_level_inputs,
int* parent_index, int base_index); int* parent_index, int base_index,
bool only_expand_towards_right = false);
void GetGrandparents(VersionStorageInfo* vstorage, void GetGrandparents(VersionStorageInfo* vstorage,
const CompactionInputFiles& inputs, const CompactionInputFiles& inputs,

@ -76,6 +76,9 @@ class LevelCompactionBuilder {
// files if needed. // files if needed.
bool SetupOtherL0FilesIfNeeded(); bool SetupOtherL0FilesIfNeeded();
// Compaction with round-robin compaction priority allows more files to be
// picked to form a large compaction
void SetupOtherFilesWithRoundRobinExpansion();
// Based on initial files, setup other files need to be compacted // Based on initial files, setup other files need to be compacted
// in this compaction, accordingly. // in this compaction, accordingly.
bool SetupOtherInputsIfNeeded(); bool SetupOtherInputsIfNeeded();
@ -84,7 +87,9 @@ class LevelCompactionBuilder {
// For the specfied level, pick a file that we want to compact. // For the specfied level, pick a file that we want to compact.
// Returns false if there is no file to compact. // Returns false if there is no file to compact.
// If it returns true, inputs->files.size() will be exactly one. // If it returns true, inputs->files.size() will be exactly one for
// all compaction priorities except round-robin. For round-robin,
// multiple consecutive files may be put into inputs->files.
// If level is 0 and there is already a compaction on that level, this // If level is 0 and there is already a compaction on that level, this
// function will return false. // function will return false.
bool PickFileToCompact(); bool PickFileToCompact();
@ -278,16 +283,141 @@ bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
return true; return true;
} }
void LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion() {
// We only expand when the start level is not L0 under round robin
assert(start_level_ >= 1);
// For round-robin compaction priority, we have 3 constraints when picking
// multiple files.
// Constraint 1: We can only pick consecutive files
// -> Constraint 1a: When a file is being compacted (or some input files
// are being compacted after expanding, we cannot
// choose it and have to stop choosing more files
// -> Constraint 1b: When we reach the last file (with largest keys), we
// cannot choose more files (the next file will be the
// first one)
// Constraint 2: We should ensure the total compaction bytes (including the
// overlapped files from the next level) is no more than
// mutable_cf_options_.max_compaction_bytes
// Constraint 3: We try our best to pick as many files as possible so that
// the post-compaction level size is less than
// MaxBytesForLevel(start_level_)
// Constraint 4: We do not expand if it is possible to apply a trivial move
// Constraint 5 (TODO): Try to pick minimal files to split into the target
// number of subcompactions
TEST_SYNC_POINT("LevelCompactionPicker::RoundRobin");
// Only expand the inputs when we have selected a file in start_level_inputs_
if (start_level_inputs_.size() == 0) return;
uint64_t start_lvl_bytes_no_compacting = 0;
uint64_t curr_bytes_to_compact = 0;
uint64_t start_lvl_max_bytes_to_compact = 0;
const std::vector<FileMetaData*>& level_files =
vstorage_->LevelFiles(start_level_);
// Constraint 3 (pre-calculate the ideal max bytes to compact)
for (auto f : level_files) {
if (!f->being_compacted) {
start_lvl_bytes_no_compacting += f->compensated_file_size;
}
}
if (start_lvl_bytes_no_compacting >
vstorage_->MaxBytesForLevel(start_level_)) {
start_lvl_max_bytes_to_compact = start_lvl_bytes_no_compacting -
vstorage_->MaxBytesForLevel(start_level_);
}
size_t start_index = vstorage_->FilesByCompactionPri(start_level_)[0];
InternalKey smallest, largest;
// Constraint 4 (No need to check again later)
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level_;
vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
&output_level_inputs.files);
if (output_level_inputs.empty()) {
if (TryExtendNonL0TrivialMove((int)start_index)) {
return;
}
}
// Constraint 3
if (start_level_inputs_[0]->compensated_file_size >=
start_lvl_max_bytes_to_compact) {
return;
}
CompactionInputFiles tmp_start_level_inputs;
tmp_start_level_inputs = start_level_inputs_;
// TODO (zichen): Future parallel round-robin may also need to update this
// Constraint 1b (only expand till the end)
for (size_t i = start_index + 1; i < level_files.size(); i++) {
auto* f = level_files[i];
if (f->being_compacted) {
// Constraint 1a
return;
}
tmp_start_level_inputs.files.push_back(f);
if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&tmp_start_level_inputs) ||
compaction_picker_->FilesRangeOverlapWithCompaction(
{tmp_start_level_inputs}, output_level_)) {
// Constraint 1a
tmp_start_level_inputs.clear();
return;
}
curr_bytes_to_compact = 0;
for (auto start_lvl_f : tmp_start_level_inputs.files) {
curr_bytes_to_compact += start_lvl_f->compensated_file_size;
}
// Check whether any output level files are locked
compaction_picker_->GetRange(tmp_start_level_inputs, &smallest, &largest);
vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
&output_level_inputs.files);
if (!output_level_inputs.empty() &&
!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&output_level_inputs)) {
// Constraint 1a
tmp_start_level_inputs.clear();
return;
}
uint64_t start_lvl_curr_bytes_to_compact = curr_bytes_to_compact;
for (auto output_lvl_f : output_level_inputs.files) {
curr_bytes_to_compact += output_lvl_f->compensated_file_size;
}
if (curr_bytes_to_compact > mutable_cf_options_.max_compaction_bytes) {
// Constraint 2
tmp_start_level_inputs.clear();
return;
}
start_level_inputs_.files = tmp_start_level_inputs.files;
// Constraint 3
if (start_lvl_curr_bytes_to_compact > start_lvl_max_bytes_to_compact) {
return;
}
}
}
bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
// Setup input files from output level. For output to L0, we only compact // Setup input files from output level. For output to L0, we only compact
// spans of files that do not interact with any pending compactions, so don't // spans of files that do not interact with any pending compactions, so don't
// need to consider other levels. // need to consider other levels.
if (output_level_ != 0) { if (output_level_ != 0) {
output_level_inputs_.level = output_level_; output_level_inputs_.level = output_level_;
bool round_robin_expanding =
ioptions_.compaction_pri == kRoundRobin &&
compaction_reason_ == CompactionReason::kLevelMaxLevelSize;
if (round_robin_expanding) {
SetupOtherFilesWithRoundRobinExpansion();
}
if (!is_l0_trivial_move_ && if (!is_l0_trivial_move_ &&
!compaction_picker_->SetupOtherInputs( !compaction_picker_->SetupOtherInputs(
cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_, cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
&output_level_inputs_, &parent_index_, base_index_)) { &output_level_inputs_, &parent_index_, base_index_,
round_robin_expanding)) {
return false; return false;
} }
@ -606,9 +736,6 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// user-key overlap. // user-key overlap.
start_level_inputs_.clear(); start_level_inputs_.clear();
// To ensure every file is selcted in a round-robin manner, we cannot
// skip the current file. So we return false and wait for the next time
// we can pick this file to compact
if (ioptions_.compaction_pri == kRoundRobin) { if (ioptions_.compaction_pri == kRoundRobin) {
return false; return false;
} }
@ -641,6 +768,7 @@ bool LevelCompactionBuilder::PickFileToCompact() {
continue; continue;
} }
} }
base_index_ = index; base_index_ = index;
break; break;
} }

@ -1318,7 +1318,7 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
std::vector<uint32_t> selected_files = {8U, 6U, 6U}; std::vector<uint32_t> selected_files = {8U, 6U, 6U};
ioptions_.compaction_pri = kRoundRobin; ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_bytes_for_level_base = 10000000; mutable_cf_options_.max_bytes_for_level_base = 12000000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10; mutable_cf_options_.max_bytes_for_level_multiplier = 10;
for (size_t i = 0; i < test_cursors.size(); i++) { for (size_t i = 0; i < test_cursors.size(); i++) {
// start a brand new version in each test. // start a brand new version in each test.
@ -1342,6 +1342,9 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_)); &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction.get() != nullptr);
// Since the max bytes for level 2 is 120M, picking one file to compact
// makes the post-compaction level size less than 120M, there is exactly one
// file picked for round-robin compaction
ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber());
// release the version storage // release the version storage
@ -1349,6 +1352,132 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
} }
} }
TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin1) {
ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_compaction_bytes = 100000000u;
mutable_cf_options_.max_bytes_for_level_base = 120;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor (file picking should start with 7U)
vstorage_->AddCursorForOneLevel(2, InternalKey("199", 100, kTypeValue));
Add(2, 6U, "150", "199", 500U);
Add(2, 7U, "200", "249", 500U);
Add(2, 8U, "300", "600", 500U);
Add(2, 9U, "700", "800", 500U);
Add(2, 10U, "850", "950", 500U);
Add(3, 26U, "130", "165", 600U);
Add(3, 27U, "166", "170", 600U);
Add(3, 28U, "270", "340", 600U);
Add(3, 29U, "401", "500", 600U);
Add(3, 30U, "601", "800", 600U);
Add(3, 31U, "830", "890", 600U);
UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
// The maximum compaction bytes is very large in this case so we can igore its
// constraint in this test case. The maximum bytes for level 2 is 1200
// bytes, and thus at least 3 files should be picked so that the bytes in
// level 2 is less than the maximum
ASSERT_EQ(3U, compaction->num_input_files(0));
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(8U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(9U, compaction->input(0, 2)->fd.GetNumber());
// release the version storage
DeleteVersionStorage();
}
TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin2) {
ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_compaction_bytes = 2500u;
mutable_cf_options_.max_bytes_for_level_base = 120;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor (file picking should start with 6U)
vstorage_->AddCursorForOneLevel(2, InternalKey("1000", 100, kTypeValue));
Add(2, 6U, "150", "199", 500U); // Overlap with 26U, 27U
Add(2, 7U, "200", "249", 500U); // Overlap with 27U
Add(2, 8U, "300", "600", 500U); // Overlap with 28U, 29U
Add(2, 9U, "700", "800", 500U);
Add(2, 10U, "850", "950", 500U);
Add(3, 26U, "130", "165", 600U);
Add(3, 27U, "166", "230", 600U);
Add(3, 28U, "270", "340", 600U);
Add(3, 29U, "401", "500", 600U);
Add(3, 30U, "601", "800", 600U);
Add(3, 31U, "830", "890", 600U);
UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
// The maximum compaction bytes is only 2500 bytes now. Even though we are
// required to choose 3 files so that the post-compaction level size is less
// than 1200 bytes. We cannot pick 3 files to compact since the maximum
// compaction size is 2500. After picking files 6U and 7U, the number of
// compaction bytes has reached 2200, and thus no more space to add another
// input file with 50M bytes.
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(0, 1)->fd.GetNumber());
// release the version storage
DeleteVersionStorage();
}
TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin3) {
ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_compaction_bytes = 1000000u;
mutable_cf_options_.max_bytes_for_level_base = 120;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor (file picking should start with 9U)
vstorage_->AddCursorForOneLevel(2, InternalKey("700", 100, kTypeValue));
Add(2, 6U, "150", "199", 500U);
Add(2, 7U, "200", "249", 500U);
Add(2, 8U, "300", "600", 500U);
Add(2, 9U, "700", "800", 500U);
Add(2, 10U, "850", "950", 500U);
Add(3, 26U, "130", "165", 600U);
Add(3, 27U, "166", "170", 600U);
Add(3, 28U, "270", "340", 600U);
Add(3, 29U, "401", "500", 600U);
Add(3, 30U, "601", "800", 600U);
Add(3, 31U, "830", "890", 600U);
UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
// Cannot pick more files since we reach the last file in level 2
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(9U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(10U, compaction->input(0, 1)->fd.GetNumber());
// release the version storage
DeleteVersionStorage();
}
TEST_F(CompactionPickerTest, CompactionPriMinOverlappingManyFiles) { TEST_F(CompactionPickerTest, CompactionPriMinOverlappingManyFiles) {
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kMinOverlappingRatio; ioptions_.compaction_pri = kMinOverlappingRatio;

@ -134,12 +134,12 @@ class SubcompactionState {
// Invalid output_split_key indicates that we do not need to split // Invalid output_split_key indicates that we do not need to split
if (output_split_key != nullptr) { if (output_split_key != nullptr) {
// We may only split the output when the cursor is in the range. Split // We may only split the output when the cursor is in the range. Split
if ((!end.has_value() || icmp->user_comparator()->Compare( if ((!end.has_value() ||
ExtractUserKey(output_split_key->Encode()), icmp->user_comparator()->Compare(
ExtractUserKey(end.value())) < 0) && ExtractUserKey(output_split_key->Encode()), end.value()) < 0) &&
(!start.has_value() || icmp->user_comparator()->Compare( (!start.has_value() || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()), ExtractUserKey(output_split_key->Encode()),
ExtractUserKey(start.value())) > 0)) { start.value()) > 0)) {
local_output_split_key_ = output_split_key; local_output_split_key_ = output_split_key;
} }
} }

@ -161,8 +161,32 @@ class ChangeLevelConflictsWithAuto
ChangeLevelConflictsWithAuto() : DBCompactionTest() {} ChangeLevelConflictsWithAuto() : DBCompactionTest() {}
}; };
namespace { // Param = true: grab the compaction pressure token (enable
// parallel compactions)
// Param = false: Not grab the token (no parallel compactions)
class RoundRobinSubcompactionsAgainstPressureToken
: public DBCompactionTest,
public ::testing::WithParamInterface<bool> {
public:
RoundRobinSubcompactionsAgainstPressureToken() {
grab_pressure_token_ = GetParam();
}
bool grab_pressure_token_;
};
class RoundRobinSubcompactionsAgainstResources
: public DBCompactionTest,
public ::testing::WithParamInterface<std::tuple<int, int>> {
public:
RoundRobinSubcompactionsAgainstResources() {
total_low_pri_threads_ = std::get<0>(GetParam());
max_compaction_limits_ = std::get<1>(GetParam());
}
int total_low_pri_threads_;
int max_compaction_limits_;
};
namespace {
class FlushedFileCollector : public EventListener { class FlushedFileCollector : public EventListener {
public: public:
FlushedFileCollector() {} FlushedFileCollector() {}
@ -5306,6 +5330,187 @@ TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) {
} }
} }
TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) {
const int kKeysPerBuffer = 100;
Options options = CurrentOptions();
options.num_levels = 4;
options.max_bytes_for_level_multiplier = 2;
options.level0_file_num_compaction_trigger = 4;
options.target_file_size_base = kKeysPerBuffer * 1024;
options.compaction_pri = CompactionPri::kRoundRobin;
options.max_bytes_for_level_base = 8 * kKeysPerBuffer * 1024;
options.disable_auto_compactions = true;
// Setup 7 threads but limited subcompactions so that
// RoundRobin requires extra compactions from reserved threads
options.max_subcompactions = 1;
options.max_background_compactions = 7;
options.max_compaction_bytes = 100000000;
DestroyAndReopen(options);
env_->SetBackgroundThreads(7, Env::LOW);
Random rnd(301);
const std::vector<int> files_per_level = {0, 15, 25};
for (int lvl = 2; lvl > 0; lvl--) {
for (int i = 0; i < files_per_level[lvl]; i++) {
for (int j = 0; j < kKeysPerBuffer; j++) {
// Add (lvl-1) to ensure nearly equivallent number of files
// in L2 are overlapped with fils selected to compact from
// L1
ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
rnd.RandomString(1010)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(lvl);
ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
}
// 15 files in L1; 25 files in L2
// This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted.
bool num_planned_subcompactions_verified = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
if (grab_pressure_token_) {
// 7 files are selected for round-robin under auto
// compaction. The number of planned subcompaction is restricted by
// the limited number of max_background_compactions
ASSERT_EQ(num_planned_subcompactions, 7);
} else {
ASSERT_EQ(num_planned_subcompactions, 1);
}
num_planned_subcompactions_verified = true;
});
// The following 3 dependencies have to be added to ensure the auto
// compaction and the pressure token is correctly enabled. Same for
// RoundRobinSubcompactionsUsingResources and
// DBCompactionTest.RoundRobinSubcompactionsShrinkResources
SyncPoint::GetInstance()->LoadDependency(
{{"RoundRobinSubcompactionsAgainstPressureToken:0",
"BackgroundCallCompaction:0"},
{"CompactionJob::AcquireSubcompactionResources:0",
"RoundRobinSubcompactionsAgainstPressureToken:1"},
{"RoundRobinSubcompactionsAgainstPressureToken:2",
"CompactionJob::AcquireSubcompactionResources:1"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:0");
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:1");
std::unique_ptr<WriteControllerToken> pressure_token;
if (grab_pressure_token_) {
pressure_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
}
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2");
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_TRUE(num_planned_subcompactions_verified);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstPressureToken,
RoundRobinSubcompactionsAgainstPressureToken,
testing::Bool());
TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) {
const int kKeysPerBuffer = 200;
Options options = CurrentOptions();
options.num_levels = 4;
options.level0_file_num_compaction_trigger = 3;
options.target_file_size_base = kKeysPerBuffer * 1024;
options.compaction_pri = CompactionPri::kRoundRobin;
options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024;
options.disable_auto_compactions = true;
options.max_subcompactions = 1;
options.max_background_compactions = max_compaction_limits_;
// Set a large number for max_compaction_bytes so that one round-robin
// compaction is enough to make post-compaction L1 size less than
// the maximum size (this test assumes only one round-robin compaction
// is triggered by kLevelMaxLevelSize)
options.max_compaction_bytes = 100000000;
DestroyAndReopen(options);
env_->SetBackgroundThreads(total_low_pri_threads_, Env::LOW);
Random rnd(301);
const std::vector<int> files_per_level = {0, 40, 100};
for (int lvl = 2; lvl > 0; lvl--) {
for (int i = 0; i < files_per_level[lvl]; i++) {
for (int j = 0; j < kKeysPerBuffer; j++) {
// Add (lvl-1) to ensure nearly equivallent number of files
// in L2 are overlapped with fils selected to compact from
// L1
ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
rnd.RandomString(1010)));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(lvl);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
}
// 40 files in L1; 100 files in L2
// This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted.
bool num_planned_subcompactions_verified = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
// More than 10 files are selected for round-robin under auto
// compaction. The number of planned subcompaction is restricted by
// the minimum number between available threads and compaction limits
ASSERT_EQ(num_planned_subcompactions - options.max_subcompactions,
std::min(total_low_pri_threads_, max_compaction_limits_) - 1);
num_planned_subcompactions_verified = true;
});
SyncPoint::GetInstance()->LoadDependency(
{{"RoundRobinSubcompactionsAgainstResources:0",
"BackgroundCallCompaction:0"},
{"CompactionJob::AcquireSubcompactionResources:0",
"RoundRobinSubcompactionsAgainstResources:1"},
{"RoundRobinSubcompactionsAgainstResources:2",
"CompactionJob::AcquireSubcompactionResources:1"},
{"CompactionJob::ReleaseSubcompactionResources:0",
"RoundRobinSubcompactionsAgainstResources:3"},
{"RoundRobinSubcompactionsAgainstResources:4",
"CompactionJob::ReleaseSubcompactionResources:1"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0");
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1");
auto pressure_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:2");
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:3");
// We can reserve more threads now except one is being used
ASSERT_EQ(total_low_pri_threads_ - 1,
env_->ReserveThreads(total_low_pri_threads_, Env::Priority::LOW));
ASSERT_EQ(
total_low_pri_threads_ - 1,
env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW));
TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4");
ASSERT_OK(dbfull()->WaitForCompact());
ASSERT_TRUE(num_planned_subcompactions_verified);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstResources,
RoundRobinSubcompactionsAgainstResources,
::testing::Values(std::make_tuple(1, 5),
std::make_tuple(5, 1),
std::make_tuple(10, 5),
std::make_tuple(5, 10),
std::make_tuple(10, 10)));
TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) { TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_levels = 3; options.num_levels = 3;

@ -1408,7 +1408,8 @@ Status DBImpl::CompactFilesImpl(
&compaction_job_stats, Env::Priority::USER, io_tracer_, &compaction_job_stats, Env::Priority::USER, io_tracer_,
kManualCompactionCanceledFalse_, db_id_, db_session_id_, kManualCompactionCanceledFalse_, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_); &blob_callback_, &bg_compaction_scheduled_,
&bg_bottom_compaction_scheduled_);
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already // takes running compactions into account (by skipping files that are already
@ -3330,7 +3331,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (start_level > 0) { if (start_level > 0) {
auto vstorage = c->input_version()->storage_info(); auto vstorage = c->input_version()->storage_info();
c->edit()->AddCompactCursor( c->edit()->AddCompactCursor(
start_level, vstorage->GetNextCompactCursor(start_level)); start_level,
vstorage->GetNextCompactCursor(start_level, c->num_input_files(0)));
} }
} }
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
@ -3415,7 +3417,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
is_manual ? manual_compaction->canceled is_manual ? manual_compaction->canceled
: kManualCompactionCanceledFalse_, : kManualCompactionCanceledFalse_,
db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
c->trim_ts(), &blob_callback_); c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_,
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare(); compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

@ -3209,7 +3209,7 @@ void SortFileByRoundRobin(const InternalKeyComparator& icmp,
} }
bool should_move_files = bool should_move_files =
compact_cursor->at(level).Valid() && temp->size() > 1; compact_cursor->at(level).size() > 0 && temp->size() > 1;
// The iterator points to the Fsize with smallest key larger than or equal to // The iterator points to the Fsize with smallest key larger than or equal to
// the given cursor // the given cursor
@ -3225,7 +3225,8 @@ void SortFileByRoundRobin(const InternalKeyComparator& icmp,
return icmp.Compare(cursor, f.file->smallest) > 0; return icmp.Compare(cursor, f.file->smallest) > 0;
}); });
should_move_files = current_file_iter != temp->end(); should_move_files =
current_file_iter != temp->end() && current_file_iter != temp->begin();
} }
if (should_move_files) { if (should_move_files) {
// Construct a local temporary vector // Construct a local temporary vector

@ -146,10 +146,12 @@ class VersionStorageInfo {
} }
// REQUIRES: lock is held // REQUIRES: lock is held
// Update the compact cursor and advance the file index so that it can point // Update the compact cursor and advance the file index using increment
// to the next cursor // so that it can point to the next cursor (increment means the number of
const InternalKey& GetNextCompactCursor(int level) { // input files in this level of the last compaction)
int cmp_idx = next_file_to_compact_by_size_[level] + 1; const InternalKey& GetNextCompactCursor(int level, size_t increment) {
int cmp_idx = next_file_to_compact_by_size_[level] + (int)increment;
assert(cmp_idx <= (int)files_by_compaction_pri_[level].size());
// TODO(zichen): may need to update next_file_to_compact_by_size_ // TODO(zichen): may need to update next_file_to_compact_by_size_
// for parallel compaction. // for parallel compaction.
InternalKey new_cursor; InternalKey new_cursor;

Loading…
Cancel
Save