diff --git a/db/compaction.cc b/db/compaction.cc index 882f05d1a..4dd6d163b 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -377,4 +377,21 @@ std::unique_ptr Compaction::CreateCompactionFilter() const { context); } +bool Compaction::IsOutputLevelEmpty() const { + return inputs_.back().level != output_level_ || inputs_.back().empty(); +} + +bool Compaction::ShouldFormSubcompactions() const { + if (mutable_cf_options_.max_subcompactions <= 1 || cfd_ == nullptr) { + return false; + } + if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { + return start_level_ == 0 && !IsOutputLevelEmpty(); + } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { + return number_levels_ > 1 && output_level_ > 0; + } else { + return false; + } +} + } // namespace rocksdb diff --git a/db/compaction.h b/db/compaction.h index ba1ccc1a1..e660a9e53 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -198,16 +198,11 @@ class Compaction { // Create a CompactionFilter from compaction_filter_factory std::unique_ptr CreateCompactionFilter() const; - // Should this compaction be broken up into smaller ones run in parallel? - bool IsSubCompaction() const { - return start_level_ == 0 && output_level_ == 1 && - mutable_cf_options_.max_subcompactions > 1; - } + // Is the input level corresponding to output_level_ empty? + bool IsOutputLevelEmpty() const; - // If is_sub_compaction == true, how many smaller compactions should execute - int NumSubCompactions() const { - return mutable_cf_options_.max_subcompactions; - } + // Should this compaction be broken up into smaller ones run in parallel? + bool ShouldFormSubcompactions() const; private: // mark (or clear) all files that are being compacted diff --git a/db/compaction_job.cc b/db/compaction_job.cc index ffb294edb..b1a0c4878 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -15,9 +15,13 @@ #include #include +#include #include #include #include +#include +#include +#include #include "db/builder.h" #include "db/db_iter.h" @@ -58,7 +62,7 @@ namespace rocksdb { // Maintains state for each sub-compaction -struct CompactionJob::SubCompactionState { +struct CompactionJob::SubcompactionState { Compaction* compaction; // The boundaries of the key-range this compaction is interested in. No two @@ -66,10 +70,10 @@ struct CompactionJob::SubCompactionState { // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded Slice *start, *end; - // The return status of this compaction + // The return status of this subcompaction Status status; - // Files produced by compaction + // Files produced by this subcompaction struct Output { uint64_t number; uint32_t path_id; @@ -88,7 +92,7 @@ struct CompactionJob::SubCompactionState { // This subcompaction's ouptut could be empty if compaction was aborted // before this subcompaction had a chance to generate any output files. // When subcompactions are executed sequentially this is more likely and - // will be particulalry likely for the last subcompaction to be empty. + // will be particulalry likely for the later subcompactions to be empty. // Once they are run in parallel however it should be much rarer. return nullptr; } else { @@ -96,11 +100,12 @@ struct CompactionJob::SubCompactionState { } } - // State during the sub-compaction + // State during the subcompaction uint64_t total_bytes; uint64_t num_input_records; uint64_t num_output_records; CompactionJobStats compaction_job_stats; + uint64_t approx_size; // "level_ptrs" holds indices that remember which file of an associated // level we were last checking during the last call to compaction-> @@ -110,7 +115,8 @@ struct CompactionJob::SubCompactionState { // is in or beyond the last file checked during the previous call std::vector level_ptrs; - SubCompactionState(Compaction* c, Slice* _start, Slice* _end) + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, + uint64_t size = 0) : compaction(c), start(_start), end(_end), @@ -118,16 +124,15 @@ struct CompactionJob::SubCompactionState { builder(nullptr), total_bytes(0), num_input_records(0), - num_output_records(0) { + num_output_records(0), + approx_size(size) { assert(compaction != nullptr); level_ptrs = std::vector(compaction->number_levels(), 0); } - SubCompactionState(SubCompactionState&& o) { - *this = std::move(o); - } + SubcompactionState(SubcompactionState&& o) { *this = std::move(o); } - SubCompactionState& operator=(SubCompactionState&& o) { + SubcompactionState& operator=(SubcompactionState&& o) { compaction = std::move(o.compaction); start = std::move(o.start); end = std::move(o.end); @@ -138,14 +143,16 @@ struct CompactionJob::SubCompactionState { total_bytes = std::move(o.total_bytes); num_input_records = std::move(o.num_input_records); num_output_records = std::move(o.num_output_records); + compaction_job_stats = std::move(o.compaction_job_stats); + approx_size = std::move(o.approx_size); level_ptrs = std::move(o.level_ptrs); return *this; } // Because member unique_ptrs do not have these. - SubCompactionState(const SubCompactionState&) = delete; + SubcompactionState(const SubcompactionState&) = delete; - SubCompactionState& operator=(const SubCompactionState&) = delete; + SubcompactionState& operator=(const SubcompactionState&) = delete; }; // Maintains state for the entire compaction @@ -154,7 +161,7 @@ struct CompactionJob::CompactionState { // REQUIRED: subcompaction states are stored in order of increasing // key-range - std::vector sub_compact_states; + std::vector sub_compact_states; Status status; uint64_t total_bytes; @@ -176,13 +183,11 @@ struct CompactionJob::CompactionState { } Slice SmallestUserKey() { - for (size_t i = 0; i < sub_compact_states.size(); i++) { - if (!sub_compact_states[i].outputs.empty()) { - return sub_compact_states[i].outputs[0].smallest.user_key(); + for (auto& s : sub_compact_states) { + if (!s.outputs.empty()) { + return s.outputs[0].smallest.user_key(); } } - // TODO(aekmekji): should we exit with an error if it reaches here? - assert(0); return Slice(nullptr, 0); } @@ -193,19 +198,18 @@ struct CompactionJob::CompactionState { return sub_compact_states[i].current_output()->largest.user_key(); } } - // TODO(aekmekji): should we exit with an error if it reaches here? - assert(0); return Slice(nullptr, 0); } }; void CompactionJob::AggregateStatistics() { - for (SubCompactionState& sc : compact_->sub_compact_states) { + for (SubcompactionState& sc : compact_->sub_compact_states) { compact_->total_bytes += sc.total_bytes; compact_->num_input_records += sc.num_input_records; compact_->num_output_records += sc.num_output_records; - - if (compaction_job_stats_) { + } + if (compaction_job_stats_) { + for (SubcompactionState& sc : compact_->sub_compact_states) { compaction_job_stats_->Add(sc.compaction_job_stats); } } @@ -325,78 +329,141 @@ void CompactionJob::Prepare() { earliest_snapshot_ = existing_snapshots_[0]; } - InitializeSubCompactions(); + if (c->ShouldFormSubcompactions()) { + const uint64_t start_micros = env_->NowMicros(); + GenSubcompactionBoundaries(); + MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME, + env_->NowMicros() - start_micros); + + assert(sizes_.size() == boundaries_.size() + 1); + + for (size_t i = 0; i <= boundaries_.size(); i++) { + Slice* start = i == 0 ? nullptr : &boundaries_[i - 1]; + Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i]; + compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]); + } + } else { + compact_->sub_compact_states.emplace_back(c, nullptr, nullptr); + } } -// For L0-L1 compaction, iterators work in parallel by processing -// different subsets of the full key range. This function sets up -// the local states used by each of these subcompactions during -// their execution -void CompactionJob::InitializeSubCompactions() { - Compaction* c = compact_->compaction; - auto& bounds = sub_compaction_boundaries_; - if (c->IsSubCompaction()) { - auto* cmp = c->column_family_data()->user_comparator(); - for (size_t which = 0; which < c->num_input_levels(); which++) { - if (c->level(which) == 1) { - const LevelFilesBrief* flevel = c->input_levels(which); - size_t num_files = flevel->num_files; - - if (num_files > 1) { - std::vector candidates; - auto& files = flevel->files; - Slice global_min = ExtractUserKey(files[0].smallest_key); - Slice global_max = ExtractUserKey(files[num_files - 1].largest_key); +struct RangeWithSize { + Range range; + uint64_t size; - for (size_t i = 1; i < num_files; i++) { - // Make sure the smallest key in two consecutive L1 files are - // unique before adding the smallest key as a boundary. Also ensure - // that the boundary won't lead to an empty subcompaction (happens - // if the boundary == the smallest or largest key) - Slice s1 = ExtractUserKey(files[i].smallest_key); - Slice s2 = i == num_files - 1 - ? Slice() - : ExtractUserKey(files[i + 1].smallest_key); - - if ( (i == num_files - 1 && cmp->Compare(s1, global_max) < 0) - || (i < num_files - 1 && cmp->Compare(s1, s2) < 0 && - cmp->Compare(s1, global_min) > 0)) { - candidates.emplace_back(s1); - } - } + RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0) + : range(a, b), size(s) {} +}; + +bool SliceCompare(const Comparator* cmp, const Slice& a, const Slice& b) { + // Returns true if a < b + return cmp->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0; +} + +// Generates a histogram representing potential divisions of key ranges from +// the input. It adds the starting and/or ending keys of certain input files +// to the working set and then finds the approximate size of data in between +// each consecutive pair of slices. Then it divides these ranges into +// consecutive groups such that each group has a similar size. +void CompactionJob::GenSubcompactionBoundaries() { + auto* c = compact_->compaction; + auto* cfd = c->column_family_data(); + std::set > bounds( + std::bind(&SliceCompare, cfd->user_comparator(), std::placeholders::_1, + std::placeholders::_2)); + int start_lvl = c->start_level(); + int out_lvl = c->output_level(); + + // Add the starting and/or ending key of certain input files as a potential + // boundary (because we're inserting into a set, it avoids duplicates) + for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { + int lvl = c->level(lvl_idx); + if (lvl >= start_lvl && lvl <= out_lvl) { + const LevelFilesBrief* flevel = c->input_levels(lvl_idx); + size_t num_files = flevel->num_files; + + if (num_files == 0) { + break; + } - // Divide the potential L1 file boundaries (those that passed the - // checks above) into 'max_subcompactions' groups such that each have - // as close to an equal number of files in it as possible - // TODO(aekmekji): refine this later to depend on file size - size_t files_left = candidates.size(); - size_t subcompactions_left = - static_cast(db_options_.max_subcompactions) < files_left - ? db_options_.max_subcompactions - : files_left; - - size_t num_to_include; - size_t index = 0; - - while (files_left > 1 && subcompactions_left > 1) { - num_to_include = files_left / subcompactions_left; - index += num_to_include; - sub_compaction_boundaries_.emplace_back(candidates[index]); - files_left -= num_to_include; - subcompactions_left--; + if (lvl == 0) { + // For level 0 add the starting and ending key of each file since the + // files may have greatly differing key ranges (not range-partitioned) + for (size_t i = 0; i < num_files; i++) { + bounds.emplace(flevel->files[i].smallest_key); + bounds.emplace(flevel->files[i].largest_key); + } + } else { + // For all other levels add the smallest/largest key in the level to + // encompass the range covered by that level + bounds.emplace(flevel->files[0].smallest_key); + bounds.emplace(flevel->files[num_files - 1].largest_key); + if (lvl == out_lvl) { + // For the last level include the starting keys of all files since + // the last level is the largest and probably has the widest key + // range. Since it's range partitioned, the ending key of one file + // and the starting key of the next are very close (or identical). + for (size_t i = 1; i < num_files; i++) { + bounds.emplace(flevel->files[i].smallest_key); } } - break; } } } - // Note: it's necessary for the first iterator sub-range to have - // start == nullptr and for the last to have end == nullptr - for (size_t i = 0; i <= bounds.size(); i++) { - Slice *start = i == 0 ? nullptr : &bounds[i - 1]; - Slice *end = i == bounds.size() ? nullptr : &bounds[i]; - compact_->sub_compact_states.emplace_back(compact_->compaction, start, end); + // Combine consecutive pairs of boundaries into ranges with an approximate + // size of data covered by keys in that range + uint64_t sum = 0; + std::vector ranges; + auto* v = cfd->current(); + for (auto it = bounds.begin();;) { + const Slice a = *it; + it++; + + if (it == bounds.end()) { + break; + } + + const Slice b = *it; + uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1); + ranges.emplace_back(a, b, size); + sum += size; + } + + // Group the ranges into subcompactions + const double min_file_fill_percent = 4.0 / 5; + uint64_t max_output_files = std::ceil( + sum / min_file_fill_percent / + cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(out_lvl)); + uint64_t subcompactions = + std::min({static_cast(ranges.size()), + static_cast(db_options_.max_subcompactions), + max_output_files}); + + double mean = sum * 1.0 / subcompactions; + + if (subcompactions > 1) { + // Greedily add ranges to the subcompaction until the sum of the ranges' + // sizes becomes >= the expected mean size of a subcompaction + sum = 0; + for (size_t i = 0; i < ranges.size() - 1; i++) { + if (subcompactions == 1) { + // If there's only one left to schedule then it goes to the end so no + // need to put an end boundary + break; + } + sum += ranges[i].size; + if (sum >= mean) { + boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); + sizes_.emplace_back(sum); + subcompactions--; + sum = 0; + } + } + sizes_.emplace_back(sum + ranges.back().size); + } else { + // Only one range so its size is the total sum of sizes computed above + sizes_.emplace_back(sum); } } @@ -407,15 +474,35 @@ Status CompactionJob::Run() { log_buffer_->FlushBufferToLog(); LogCompaction(); - // Run each subcompaction sequentially + const size_t num_threads = compact_->sub_compact_states.size(); + assert(num_threads > 0); const uint64_t start_micros = env_->NowMicros(); - for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) { - ProcessKeyValueCompaction(&compact_->sub_compact_states[i]); + + // Launch a thread for each of subcompactions 1...num_threads-1 + std::vector thread_pool; + thread_pool.reserve(num_threads - 1); + for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { + thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, + &compact_->sub_compact_states[i]); } + + // Always schedule the first subcompaction (whether or not there are also + // others) in the current thread to be efficient with resources + ProcessKeyValueCompaction(&compact_->sub_compact_states[0]); + + // Wait for all other threads (if there are any) to finish execution + for (auto& thread : thread_pool) { + thread.join(); + } + + if (output_directory_ && !db_options_.disableDataSync) { + output_directory_->Fsync(); + } + compaction_stats_.micros = env_->NowMicros() - start_micros; MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - // Determine if any of the subcompactions failed + // Check if any thread encountered an error during execution Status status; for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { @@ -485,7 +572,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, << "num_output_files" << compact_->NumOutputFiles() << "total_output_size" << compact_->total_bytes << "num_input_records" << compact_->num_input_records - << "num_output_records" << compact_->num_output_records; + << "num_output_records" << compact_->num_output_records + << "num_subcompactions" << compact_->sub_compact_states.size(); if (measure_io_stats_ && compaction_job_stats_ != nullptr) { stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos; @@ -507,7 +595,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, return status; } -void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { +void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); std::unique_ptr input_ptr( versions_->MakeInputIterator(sub_compact->compaction)); @@ -764,10 +852,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { if (status.ok()) { status = input->status(); } - if (output_directory_ && !db_options_.disableDataSync) { - // TODO(aekmekji): Maybe only call once after all subcompactions complete? - output_directory_->Fsync(); - } if (measure_io_stats_) { sub_compact->compaction_job_stats.file_write_nanos += @@ -788,9 +872,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { } Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, - const ParsedInternalKey& ikey, const Status& input_status, - SubCompactionState* sub_compact) { - + const ParsedInternalKey& ikey, + const Status& input_status, + SubcompactionState* sub_compact) { Slice newkey(key.data(), key.size()); std::string kstr; @@ -833,6 +917,10 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, std::max(sub_compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough + // TODO(aekmekji): determine if file should be closed earlier than this + // during subcompactions (i.e. if output size, estimated by input size, is + // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB + // and 0.6MB instead of 1MB and 0.2MB) Status status; if (sub_compact->builder->FileSize() >= sub_compact->compaction->max_output_file_size()) { @@ -867,8 +955,8 @@ void CompactionJob::RecordDroppedKeys( } } -Status CompactionJob::FinishCompactionOutputFile(const Status& input_status, - SubCompactionState* sub_compact) { +Status CompactionJob::FinishCompactionOutputFile( + const Status& input_status, SubcompactionState* sub_compact) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(sub_compact != nullptr); @@ -975,9 +1063,9 @@ Status CompactionJob::InstallCompactionResults( // Add compaction outputs compaction->AddInputDeletions(compact_->compaction->edit()); - for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + for (SubcompactionState& sub_compact : compact_->sub_compact_states) { for (size_t i = 0; i < sub_compact.outputs.size(); i++) { - const SubCompactionState::Output& out = sub_compact.outputs[i]; + const SubcompactionState::Output& out = sub_compact.outputs[i]; compaction->edit()->AddFile(compaction->output_level(), out.number, out.path_id, out.file_size, out.smallest, out.largest, out.smallest_seqno, @@ -1028,8 +1116,8 @@ void CompactionJob::RecordCompactionIOStats() { IOSTATS_RESET(bytes_written); } -Status CompactionJob::OpenCompactionOutputFile(SubCompactionState* - sub_compact) { +Status CompactionJob::OpenCompactionOutputFile( + SubcompactionState* sub_compact) { assert(sub_compact != nullptr); assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic @@ -1048,7 +1136,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubCompactionState* LogFlush(db_options_.info_log); return s; } - SubCompactionState::Output out; + SubcompactionState::Output out; out.number = file_number; out.path_id = sub_compact->compaction->output_path_id(); out.smallest.Clear(); @@ -1083,7 +1171,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubCompactionState* } void CompactionJob::CleanupCompaction() { - for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + for (SubcompactionState& sub_compact : compact_->sub_compact_states) { const auto& sub_status = sub_compact.status; if (sub_compact.builder != nullptr) { @@ -1094,7 +1182,7 @@ void CompactionJob::CleanupCompaction() { assert(!sub_status.ok() || sub_compact.outfile == nullptr); } for (size_t i = 0; i < sub_compact.outputs.size(); i++) { - const SubCompactionState::Output& out = sub_compact.outputs[i]; + const SubcompactionState::Output& out = sub_compact.outputs[i]; // If this file was inserted into the table cache then remove // them here because this compaction was not committed. diff --git a/db/compaction_job.h b/db/compaction_job.h index fc0dc471d..b3dd3e717 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -79,32 +79,31 @@ class CompactionJob { InstrumentedMutex* db_mutex); private: - struct SubCompactionState; + struct SubcompactionState; void AggregateStatistics(); - // Set up the individual states used by each subcompaction - void InitializeSubCompactions(); + void GenSubcompactionBoundaries(); // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); // Call compaction filter. Then iterate through input and compact the // kv-pairs - void ProcessKeyValueCompaction(SubCompactionState* sub_compact); + void ProcessKeyValueCompaction(SubcompactionState* sub_compact); Status WriteKeyValue(const Slice& key, const Slice& value, const ParsedInternalKey& ikey, const Status& input_status, - SubCompactionState* sub_compact); + SubcompactionState* sub_compact); Status FinishCompactionOutputFile(const Status& input_status, - SubCompactionState* sub_compact); + SubcompactionState* sub_compact); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex); SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, SequenceNumber* prev_snapshot); void RecordCompactionIOStats(); - Status OpenCompactionOutputFile(SubCompactionState* sub_compact); + Status OpenCompactionOutputFile(SubcompactionState* sub_compact); void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; @@ -156,7 +155,10 @@ class CompactionJob { bool paranoid_file_checks_; bool measure_io_stats_; - std::vector sub_compaction_boundaries_; + // Stores the Slices that designate the boundaries for each subcompaction + std::vector boundaries_; + // Stores the approx size of keys covered in the range of each subcompaction + std::vector sizes_; }; } // namespace rocksdb diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index f0b1a3413..516b75d5c 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -758,8 +758,6 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) { num_keys_per_L0_file)); ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); Compact(1, smallest_key, largest_key); - // TODO(aekmekji): account for whether parallel L0-L1 compaction is - // enabled or not. If so then num_L1_files will increase by 1 if (options.max_subcompactions == 1) { --num_L1_files; } diff --git a/db/db_test.cc b/db/db_test.cc index 4e4ab2426..f84c31904 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7224,7 +7224,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { ASSERT_GT(num_zlib.load(), 0); } -TEST_P(DBTestWithParam, DynamicCompactionOptions) { +TEST_F(DBTest, DynamicCompactionOptions) { // minimum write buffer size is enforced at 64KB const uint64_t k32KB = 1 << 15; const uint64_t k64KB = 1 << 16; @@ -7250,7 +7250,6 @@ TEST_P(DBTestWithParam, DynamicCompactionOptions) { options.target_file_size_multiplier = 1; options.max_bytes_for_level_base = k128KB; options.max_bytes_for_level_multiplier = 4; - options.max_subcompactions = max_subcompactions_; // Block flush thread and disable compaction thread env_->SetBackgroundThreads(1, Env::LOW); diff --git a/db/version_set.cc b/db/version_set.cc index 1a649e24f..cedaa3e29 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3002,15 +3002,27 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_num, return result; } +// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this +// function is called repeatedly with consecutive pairs of slices. For example +// if the slice list is [a, b, c, d] this function is called with arguments +// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where +// we avoid doing binary search for the keys b and c twice and instead somehow +// maintain state of where they first appear in the files. uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start, - const Slice& end) { + const Slice& end, int start_level, + int end_level) { // pre-condition assert(v->cfd_->internal_comparator().Compare(start, end) <= 0); uint64_t size = 0; const auto* vstorage = v->storage_info(); + end_level = end_level == -1 + ? vstorage->num_non_empty_levels() + : std::min(end_level, vstorage->num_non_empty_levels()); - for (int level = 0; level < vstorage->num_non_empty_levels(); level++) { + assert(start_level <= end_level); + + for (int level = start_level; level < end_level; level++) { const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level); if (!files_brief.num_files) { // empty level, skip exploration @@ -3142,7 +3154,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { read_options.verify_checksums = c->mutable_cf_options()->verify_checksums_in_compaction; read_options.fill_cache = false; - if (c->IsSubCompaction()) { + if (c->ShouldFormSubcompactions()) { read_options.total_order_seek = true; } diff --git a/db/version_set.h b/db/version_set.h index 069342da9..7707bb1ac 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -646,7 +646,10 @@ class VersionSet { void AddLiveFiles(std::vector* live_list); // Return the approximate size of data to be scanned for range [start, end) - uint64_t ApproximateSize(Version* v, const Slice& start, const Slice& end); + // in levels [start_level, end_level). If end_level == 0 it will search + // through all non-empty levels + uint64_t ApproximateSize(Version* v, const Slice& start, const Slice& end, + int start_level = 0, int end_level = -1); // Return the size of the current manifest file uint64_t manifest_file_size() const { return manifest_file_size_; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 537366905..23b8507e1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -907,12 +907,9 @@ struct DBOptions { int max_background_compactions; // This integer represents the maximum number of threads that will - // concurrently perform a level-based compaction from L0 to L1. A value - // of 1 means there is no parallelism, and a greater number enables a - // multi-threaded version of the L0-L1 compaction that divides the compaction - // into multiple, smaller ones that are run simultaneously. This is still - // under development and is only available for level-based compaction. - // Default: 1 + // concurrently perform a compaction job by breaking it into multiple, + // smaller ones that are run simultaneously. + // Default: 1 (i.e. no subcompactions) uint32_t max_subcompactions; // Maximum number of concurrent background memtable flush jobs, submitted to diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 79c77766a..2aa1ce7a0 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -233,6 +233,7 @@ enum Histograms : uint32_t { DB_GET = 0, DB_WRITE, COMPACTION_TIME, + SUBCOMPACTION_SETUP_TIME, TABLE_SYNC_MICROS, COMPACTION_OUTFILE_SYNC_MICROS, WAL_FILE_SYNC_MICROS, @@ -259,6 +260,7 @@ const std::vector> HistogramsNameMap = { {DB_GET, "rocksdb.db.get.micros"}, {DB_WRITE, "rocksdb.db.write.micros"}, {COMPACTION_TIME, "rocksdb.compaction.times.micros"}, + {SUBCOMPACTION_SETUP_TIME, "rocksdb.subcompaction.setup.times.micros"}, {TABLE_SYNC_MICROS, "rocksdb.table.sync.micros"}, {COMPACTION_OUTFILE_SYNC_MICROS, "rocksdb.compaction.outfile.sync.micros"}, {WAL_FILE_SYNC_MICROS, "rocksdb.wal.file.sync.micros"},