From f0da6977a35927406134fa51c87572732133f7c3 Mon Sep 17 00:00:00 2001 From: Ari Ekmekji Date: Tue, 18 Aug 2015 11:06:23 -0700 Subject: [PATCH] [Parallel L0-L1 Compaction Prep]: Giving Subcompactions Their Own State Summary: In prepration for running multiple threads at the same time during a compaction job, this patch assigns each subcompaction its own state (instead of sharing the one global CompactionState). Each subcompaction then uses this state to update its statistics, keep track of its snapshots, etc. during the course of execution. Then at the end of all the executions the statistics are aggregated across the subcompactions so that the final result is the same as if only one larger compaction had run. Test Plan: ./db_test ./db_compaction_test ./compaction_job_test Reviewers: sdong, anthony, igor, noetzli, yhchiang Reviewed By: yhchiang Subscribers: MarkCallaghan, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D43239 --- db/compaction.cc | 13 +- db/compaction.h | 14 +- db/compaction_job.cc | 633 +++++++++++++++---------- db/compaction_job.h | 48 +- db/compaction_job_test.cc | 4 +- db/db_impl.cc | 8 +- include/rocksdb/compaction_job_stats.h | 2 + tools/db_stress.cc | 7 + util/compaction_job_stats_impl.cc | 29 ++ 9 files changed, 456 insertions(+), 302 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 0aa17c09c..882f05d1a 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -101,8 +101,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), - is_manual_compaction_(_manual_compaction), - level_ptrs_(std::vector(number_levels_, 0)) { + is_manual_compaction_(_manual_compaction) { MarkFilesBeingCompacted(true); #ifndef NDEBUG @@ -187,8 +186,11 @@ void Compaction::AddInputDeletions(VersionEdit* out_edit) { } } -bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { +bool Compaction::KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { assert(input_version_ != nullptr); + assert(level_ptrs != nullptr); + assert(level_ptrs->size() == static_cast(number_levels_)); assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; @@ -198,8 +200,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { const std::vector& files = input_version_->storage_info()->LevelFiles(lvl); - for (; level_ptrs_[lvl] < files.size(); ) { - FileMetaData* f = files[level_ptrs_[lvl]]; + for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) { + auto* f = files[level_ptrs->at(lvl)]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { // We've advanced far enough if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { @@ -209,7 +211,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { } break; } - level_ptrs_[lvl]++; } } return true; diff --git a/db/compaction.h b/db/compaction.h index c2069a707..1fed0a31d 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -128,7 +128,8 @@ class Compaction { // Returns true if the available information we have guarantees that // the input "user_key" does not exist in any level beyond "output_level()". - bool KeyNotExistsBeyondOutputLevel(const Slice& user_key); + bool KeyNotExistsBeyondOutputLevel(const Slice& user_key, + std::vector* level_ptrs) const; // Returns true iff we should stop building the current output // before processing "internal_key". @@ -168,6 +169,9 @@ class Compaction { // are non-overlapping and can be trivially moved. bool is_trivial_move() { return is_trivial_move_; } + // How many total levels are there? + int number_levels() const { return number_levels_; } + // Return the MutableCFOptions that should be used throughout the compaction // procedure const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } @@ -258,16 +262,8 @@ class Compaction { // True if we can do trivial move in Universal multi level // compaction - bool is_trivial_move_; - // "level_ptrs_" holds indices into "input_version_->levels_", where each - // index remembers which file of an associated level we are currently used - // to check KeyNotExistsBeyondOutputLevel() for deletion operation. - // As it is for checking KeyNotExistsBeyondOutputLevel(), it only - // records indices for all levels beyond "output_level_". - std::vector level_ptrs_; - // Does input compression match the output compression? bool InputCompressionMatchesOutput() const; }; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index dd68b2147..61bbb5746 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -57,9 +57,18 @@ namespace rocksdb { -struct CompactionJob::CompactionState { +// Maintains state for each sub-compaction +struct CompactionJob::SubCompactionState { Compaction* const compaction; + // The boundaries of the key-range this compaction is interested in. No two + // subcompactions may have overlapping key-ranges. + // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded + Slice *start, *end; + + // The return status of this compaction + Status status; + // Files produced by compaction struct Output { uint64_t number; @@ -69,15 +78,64 @@ struct CompactionJob::CompactionState { SequenceNumber smallest_seqno, largest_seqno; bool need_compaction; }; - std::vector outputs; // State kept for output being generated + std::vector outputs; std::unique_ptr outfile; std::unique_ptr builder; + Output* current_output() { + assert(!outputs.empty()); + return &outputs.back(); + } + // State during the sub-compaction uint64_t total_bytes; + uint64_t num_input_records; + uint64_t num_output_records; + SequenceNumber earliest_snapshot; + SequenceNumber visible_at_tip; + SequenceNumber latest_snapshot; + CompactionJobStats compaction_job_stats; + + // "level_ptrs" holds indices that remember which file of an associated + // level we were last checking during the last call to compaction-> + // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function + // to pick off where it left off since each subcompaction's key range is + // increasing so a later call to the function must be looking for a key that + // is in or beyond the last file checked during the previous call + std::vector level_ptrs; + + explicit SubCompactionState(Compaction* c, Slice* _start, Slice* _end, + SequenceNumber earliest, SequenceNumber visible, + SequenceNumber latest) + : compaction(c), + start(_start), + end(_end), + outfile(nullptr), + builder(nullptr), + total_bytes(0), + num_input_records(0), + num_output_records(0), + earliest_snapshot(earliest), + visible_at_tip(visible), + latest_snapshot(latest) { + assert(compaction != nullptr); + level_ptrs = std::vector(compaction->number_levels(), 0); + } +}; + +// Maintains state for the entire compaction +struct CompactionJob::CompactionState { + Compaction* const compaction; - Output* current_output() { return &outputs[outputs.size() - 1]; } + // REQUIRED: subcompaction states are stored in order of increasing + // key-range + std::vector sub_compact_states; + Status status; + + uint64_t total_bytes; + uint64_t num_input_records; + uint64_t num_output_records; explicit CompactionState(Compaction* c) : compaction(c), @@ -85,10 +143,39 @@ struct CompactionJob::CompactionState { num_input_records(0), num_output_records(0) {} - uint64_t num_input_records; - uint64_t num_output_records; + size_t NumOutputFiles() { + size_t total = 0; + for (auto& s : sub_compact_states) { + total += s.outputs.size(); + } + return total; + } + + Slice SmallestUserKey() { + assert(!sub_compact_states.empty() && + sub_compact_states[0].start == nullptr); + return sub_compact_states[0].outputs[0].smallest.user_key(); + } + + Slice LargestUserKey() { + assert(!sub_compact_states.empty() && + sub_compact_states.back().end == nullptr); + return sub_compact_states.back().current_output()->largest.user_key(); + } }; +void CompactionJob::AggregateStatistics() { + for (SubCompactionState& sc : compact_->sub_compact_states) { + compact_->total_bytes += sc.total_bytes; + compact_->num_input_records += sc.num_input_records; + compact_->num_output_records += sc.num_output_records; + + if (compaction_job_stats_) { + compaction_job_stats_->Add(sc.compaction_job_stats); + } + } +} + CompactionJob::CompactionJob( int job_id, Compaction* compaction, const DBOptions& db_options, const EnvOptions& env_options, VersionSet* versions, @@ -179,59 +266,89 @@ void CompactionJob::Prepare() { ThreadStatus::STAGE_COMPACTION_PREPARE); // Generate file_levels_ for compaction berfore making Iterator - ColumnFamilyData* cfd __attribute__((unused)) = - compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); assert(cfd != nullptr); assert(cfd->current()->storage_info()->NumLevelFiles( compact_->compaction->level()) > 0); - assert(compact_->builder == nullptr); - assert(!compact_->outfile); - visible_at_tip_ = 0; - latest_snapshot_ = 0; + // Is this compaction producing files at the bottommost level? + bottommost_level_ = compact_->compaction->bottommost_level(); + + // Initialize subcompaction states + SequenceNumber earliest_snapshot; + SequenceNumber latest_snapshot = 0; + SequenceNumber visible_at_tip = 0; + if (existing_snapshots_.size() == 0) { // optimize for fast path if there are no snapshots - visible_at_tip_ = versions_->LastSequence(); - earliest_snapshot_ = visible_at_tip_; + visible_at_tip = versions_->LastSequence(); + earliest_snapshot = visible_at_tip; } else { - latest_snapshot_ = existing_snapshots_.back(); + latest_snapshot = existing_snapshots_.back(); // Add the current seqno as the 'latest' virtual // snapshot to the end of this list. existing_snapshots_.push_back(versions_->LastSequence()); - earliest_snapshot_ = existing_snapshots_[0]; + earliest_snapshot = existing_snapshots_[0]; } - // Is this compaction producing files at the bottommost level? - bottommost_level_ = compact_->compaction->bottommost_level(); - - GetSubCompactionBoundaries(); + InitializeSubCompactions(earliest_snapshot, visible_at_tip, latest_snapshot); } // For L0-L1 compaction, iterators work in parallel by processing -// different subsets of the full key range. This function returns -// the Slices that designate the boundaries of these ranges. Now -// these boundaries are defined the key ranges of the files in L1, -// and the first and last entries are always nullptr (unrestricted) -void CompactionJob::GetSubCompactionBoundaries() { - auto* c = compact_->compaction; - auto& slices = sub_compaction_boundaries_; +// different subsets of the full key range. This function sets up +// the local states used by each of these subcompactions during +// their execution +void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest, + const SequenceNumber& visible, + const SequenceNumber& latest) { + Compaction* c = compact_->compaction; + auto& bounds = sub_compaction_boundaries_; if (c->IsSubCompaction()) { // TODO(aekmekji): take the option num_subcompactions into account // when dividing up the key range between multiple iterators instead // of just assigning each iterator one L1 file's key range + auto* cmp = c->column_family_data()->user_comparator(); for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->level(which) == 1) { - if (c->input_levels(which)->num_files > 1) { - const LevelFilesBrief* flevel = c->input_levels(which); - for (size_t i = 1; i < flevel->num_files; i++) { - slices.emplace_back(flevel->files[i].smallest_key); + const LevelFilesBrief* flevel = c->input_levels(which); + size_t num_files = flevel->num_files; + + if (num_files > 1) { + auto& files = flevel->files; + Slice global_min = ExtractUserKey(files[0].smallest_key); + Slice global_max = ExtractUserKey(files[num_files - 1].largest_key); + + for (size_t i = 1; i < num_files; i++) { + // Make sure the smallest key in two consecutive L1 files are + // unique before adding the smallest key as a boundary. Also ensure + // that the boundary won't lead to an empty subcompaction (happens + // if the boundary == the smallest or largest key) + Slice s1 = ExtractUserKey(files[i].smallest_key); + Slice s2 = i == num_files - 1 + ? Slice() + : ExtractUserKey(files[i + 1].smallest_key); + + if ( (i == num_files - 1 && cmp->Compare(s1, global_max) < 0) + || (i < num_files - 1 && cmp->Compare(s1, s2) < 0 && + cmp->Compare(s1, global_min) > 0)) { + bounds.emplace_back(s1); + } } } break; } } } + + // Note: it's necessary for the first iterator sub-range to have + // start == nullptr and for the last to have end == nullptr + for (size_t i = 0; i <= bounds.size(); i++) { + Slice *start = i == 0 ? nullptr : &bounds[i - 1]; + Slice *end = i == bounds.size() ? nullptr : &bounds[i]; + compact_->sub_compact_states.emplace_back(compact_->compaction, start, + end, earliest, visible, latest); + } } Status CompactionJob::Run() { @@ -239,100 +356,48 @@ Status CompactionJob::Run() { ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); - auto* compaction = compact_->compaction; - LogCompaction(compaction->column_family_data(), compaction); + LogCompaction(); - Status status; - Slice *start, *end; - for (size_t i = 0; i < sub_compaction_boundaries_.size() + 1; i++) { - if (i == 0) { - start = nullptr; - } else { - start = &sub_compaction_boundaries_[i - 1]; - } - if (i == sub_compaction_boundaries_.size()) { - end = nullptr; - } else { - end = &sub_compaction_boundaries_[i]; - } + // Run each subcompaction sequentially + const uint64_t start_micros = env_->NowMicros(); + for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) { + ProcessKeyValueCompaction(&compact_->sub_compact_states[i]); + } + compaction_stats_.micros = env_->NowMicros() - start_micros; + MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - status = SubCompactionRun(start, end); - if (!status.ok()) { + // Determine if any of the subcompactions failed + Status status; + for (const auto& state : compact_->sub_compact_states) { + if (!state.status.ok()) { + status = state.status; break; } } + // Finish up all book-keeping to unify the subcompaction results + AggregateStatistics(); UpdateCompactionStats(); RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); + compact_->status = status; return status; } -Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) { - PerfLevel prev_perf_level = PerfLevel::kEnableTime; - uint64_t prev_write_nanos = 0; - uint64_t prev_fsync_nanos = 0; - uint64_t prev_range_sync_nanos = 0; - uint64_t prev_prepare_write_nanos = 0; - bool enabled_io_stats = false; - if (measure_io_stats_ && compaction_job_stats_ != nullptr) { - prev_perf_level = GetPerfLevel(); - SetPerfLevel(PerfLevel::kEnableTime); - prev_write_nanos = iostats_context.write_nanos; - prev_fsync_nanos = iostats_context.fsync_nanos; - prev_range_sync_nanos = iostats_context.range_sync_nanos; - prev_prepare_write_nanos = iostats_context.prepare_write_nanos; - enabled_io_stats = true; - } - - auto* compaction = compact_->compaction; - const uint64_t start_micros = env_->NowMicros(); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions - - std::unique_ptr input(versions_->MakeInputIterator(compaction)); - Status status = ProcessKeyValueCompaction(&imm_micros, input.get(), - start, end); - - input.reset(); - - if (output_directory_ && !db_options_.disableDataSync) { - output_directory_->Fsync(); - } - - compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; - MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - - if (enabled_io_stats) { - // Not supporting parallel running yet - compaction_job_stats_->file_write_nanos += - iostats_context.write_nanos - prev_write_nanos; - compaction_job_stats_->file_fsync_nanos += - iostats_context.fsync_nanos - prev_fsync_nanos; - compaction_job_stats_->file_range_sync_nanos += - iostats_context.range_sync_nanos - prev_range_sync_nanos; - compaction_job_stats_->file_prepare_write_nanos += - iostats_context.prepare_write_nanos - prev_prepare_write_nanos; - if (prev_perf_level != PerfLevel::kEnableTime) { - SetPerfLevel(prev_perf_level); - } - } - return status; -} - -void CompactionJob::Install(Status* status, - const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex) { +Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex->AssertHeld(); + Status status = compact_->status; ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); - if (status->ok()) { - *status = InstallCompactionResults(db_mutex, mutable_cf_options); + if (status.ok()) { + status = InstallCompactionResults(mutable_cf_options, db_mutex); } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); @@ -359,7 +424,7 @@ void CompactionJob::Install(Status* status, static_cast(stats.bytes_read_non_output_levels), stats.bytes_written / static_cast(stats.bytes_read_non_output_levels), - status->ToString().c_str(), stats.num_input_records, + status.ToString().c_str(), stats.num_input_records, stats.num_dropped_records); UpdateCompactionJobStats(stats); @@ -368,7 +433,7 @@ void CompactionJob::Install(Status* status, stream << "job" << job_id_ << "event" << "compaction_finished" << "output_level" << compact_->compaction->output_level() - << "num_output_files" << compact_->outputs.size() + << "num_output_files" << compact_->NumOutputFiles() << "total_output_size" << compact_->total_bytes << "num_input_records" << compact_->num_input_records << "num_output_records" << compact_->num_output_records; @@ -389,24 +454,45 @@ void CompactionJob::Install(Status* status, } stream.EndArray(); - CleanupCompaction(*status); + CleanupCompaction(); + return status; } -Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, - Iterator* input, - Slice* start, Slice* end) { +void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { + assert(sub_compact != nullptr); + std::unique_ptr input_ptr( + versions_->MakeInputIterator(sub_compact->compaction)); + Iterator* input = input_ptr.get(); + AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); + + // I/O measurement variables + PerfLevel prev_perf_level = PerfLevel::kEnableTime; + uint64_t prev_write_nanos = 0; + uint64_t prev_fsync_nanos = 0; + uint64_t prev_range_sync_nanos = 0; + uint64_t prev_prepare_write_nanos = 0; + if (measure_io_stats_) { + prev_perf_level = GetPerfLevel(); + SetPerfLevel(PerfLevel::kEnableTime); + prev_write_nanos = iostats_context.write_nanos; + prev_fsync_nanos = iostats_context.fsync_nanos; + prev_range_sync_nanos = iostats_context.range_sync_nanos; + prev_prepare_write_nanos = iostats_context.prepare_write_nanos; + } + + // Variables used inside the loop Status status; std::string compaction_filter_value; ParsedInternalKey ikey; IterKey current_user_key; bool has_current_user_key = false; IterKey delete_key; - SequenceNumber last_sequence_for_key __attribute__((unused)) = - kMaxSequenceNumber; + + SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator, db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands, @@ -415,7 +501,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { compaction_filter_from_factory = - compact_->compaction->CreateCompactionFilter(); + sub_compact->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); } @@ -429,6 +515,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, StopWatchNano timer(env_, stats_ != nullptr); uint64_t total_filter_time = 0; + Slice* start = sub_compact->start; + Slice* end = sub_compact->end; if (start != nullptr) { IterKey start_iter; start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); @@ -446,7 +534,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Slice value = input->value(); // First check that the key is parseable before performing the comparison - // to determine if it's within the range we want + // to determine if it's within the range we want. Parsing may fail if the + // key being passed in is a user key without any internal key component if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the rationale @@ -455,46 +544,43 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; + sub_compact->compaction_job_stats.num_corrupt_keys++; - if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->num_corrupt_keys++; - } - - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); input->Next(); continue; } - // If an end key is specified, check if the current key is >= than it - // and exit if it is because the iterator is out of the range desired + // If an end key (exclusive) is specified, check if the current key is + // >= than it and exit if it is because the iterator is out of its range if (end != nullptr && cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) { break; } - compact_->num_input_records++; + sub_compact->num_input_records++; if (++loop_cnt > 1000) { - RecordDroppedKeys( - &key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); + RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, + &key_drop_obsolete, + &sub_compact->compaction_job_stats); RecordCompactionIOStats(); loop_cnt = 0; } - if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->total_input_raw_key_bytes += key.size(); - compaction_job_stats_->total_input_raw_value_bytes += value.size(); - } + sub_compact->compaction_job_stats.total_input_raw_key_bytes += key.size(); + sub_compact->compaction_job_stats.total_input_raw_value_bytes += + value.size(); - if (compact_->compaction->ShouldStopBefore(key) && - compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); + if (sub_compact->compaction->ShouldStopBefore(key) && + sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile(input->status(), sub_compact); if (!status.ok()) { break; } } - if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { - compaction_job_stats_->num_input_deletion_records++; + if (ikey.type == kTypeDeletion) { + sub_compact->compaction_job_stats.num_input_deletion_records++; } if (!has_current_user_key || @@ -507,19 +593,19 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, visible_in_snapshot = kMaxSequenceNumber; // apply the compaction filter to the first occurrence of the user key if (compaction_filter && ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + (sub_compact->visible_at_tip || + ikey.sequence > sub_compact->latest_snapshot)) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the - // filter. - // If the return value of the compaction filter is true, replace - // the entry with a delete marker. + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. bool value_changed = false; compaction_filter_value.clear(); if (stats_ != nullptr) { timer.Start(); } bool to_delete = compaction_filter->Filter( - compact_->compaction->level(), ikey.user_key, value, + sub_compact->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); total_filter_time += timer.ElapsedNanos(); if (to_delete) { @@ -544,10 +630,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // the earlist snapshot that is affected by this kv. SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot SequenceNumber visible = - visible_at_tip_ - ? visible_at_tip_ - : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, - &prev_snapshot); + sub_compact->visible_at_tip + ? sub_compact->visible_at_tip + : findEarliestVisibleSnapshot(ikey.sequence, &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in @@ -559,9 +644,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, ++key_drop_newer_entry; input->Next(); // (A) } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot_ && - compact_->compaction->KeyNotExistsBeyondOutputLevel( - ikey.user_key)) { + ikey.sequence <= sub_compact->earliest_snapshot && + sub_compact->compaction->KeyNotExistsBeyondOutputLevel( + ikey.user_key, &sub_compact->level_ptrs)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers @@ -604,10 +689,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to valid. assert(valid_key); - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); } } else { - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); input->Next(); } @@ -616,7 +701,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); - RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); + RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete, + &sub_compact->compaction_job_stats); RecordCompactionIOStats(); if (status.ok() && @@ -624,18 +710,39 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } - if (status.ok() && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); + if (status.ok() && sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile(input->status(), sub_compact); } if (status.ok()) { status = input->status(); } + if (output_directory_ && !db_options_.disableDataSync) { + // TODO(aekmekji): Maybe only call once after all subcompactions complete? + output_directory_->Fsync(); + } - return status; + if (measure_io_stats_) { + sub_compact->compaction_job_stats.file_write_nanos += + iostats_context.write_nanos - prev_write_nanos; + sub_compact->compaction_job_stats.file_fsync_nanos += + iostats_context.fsync_nanos - prev_fsync_nanos; + sub_compact->compaction_job_stats.file_range_sync_nanos += + iostats_context.range_sync_nanos - prev_range_sync_nanos; + sub_compact->compaction_job_stats.file_prepare_write_nanos += + iostats_context.prepare_write_nanos - prev_prepare_write_nanos; + if (prev_perf_level != PerfLevel::kEnableTime) { + SetPerfLevel(prev_perf_level); + } + } + + input_ptr.reset(); + sub_compact->status = status; } Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, - const ParsedInternalKey& ikey, const Status& input_status) { + const ParsedInternalKey& ikey, const Status& input_status, + SubCompactionState* sub_compact) { + Slice newkey(key.data(), key.size()); std::string kstr; @@ -643,7 +750,7 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && + if (bottommost_level_ && ikey.sequence < sub_compact->earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -654,33 +761,33 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, } // Open output file if necessary - if (compact_->builder == nullptr) { - Status status = OpenCompactionOutputFile(); + if (sub_compact->builder == nullptr) { + Status status = OpenCompactionOutputFile(sub_compact); if (!status.ok()) { return status; } } - assert(compact_->builder != nullptr); + assert(sub_compact->builder != nullptr); SequenceNumber seqno = GetInternalKeySeqno(newkey); - if (compact_->builder->NumEntries() == 0) { - compact_->current_output()->smallest.DecodeFrom(newkey); - compact_->current_output()->smallest_seqno = seqno; + if (sub_compact->builder->NumEntries() == 0) { + sub_compact->current_output()->smallest.DecodeFrom(newkey); + sub_compact->current_output()->smallest_seqno = seqno; } else { - compact_->current_output()->smallest_seqno = - std::min(compact_->current_output()->smallest_seqno, seqno); + sub_compact->current_output()->smallest_seqno = + std::min(sub_compact->current_output()->smallest_seqno, seqno); } - compact_->current_output()->largest.DecodeFrom(newkey); - compact_->builder->Add(newkey, value); - compact_->num_output_records++; - compact_->current_output()->largest_seqno = - std::max(compact_->current_output()->largest_seqno, seqno); + sub_compact->current_output()->largest.DecodeFrom(newkey); + sub_compact->builder->Add(newkey, value); + sub_compact->num_output_records++; + sub_compact->current_output()->largest_seqno = + std::max(sub_compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough Status status; - if (compact_->builder->FileSize() >= - compact_->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input_status); + if (sub_compact->builder->FileSize() >= + sub_compact->compaction->max_output_file_size()) { + status = FinishCompactionOutputFile(input_status, sub_compact); } return status; @@ -689,67 +796,68 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, void CompactionJob::RecordDroppedKeys( int64_t* key_drop_user, int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete) { + int64_t* key_drop_obsolete, + CompactionJobStats* compaction_job_stats) { if (*key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user); *key_drop_user = 0; } if (*key_drop_newer_entry > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry); - if (compaction_job_stats_) { - compaction_job_stats_->num_records_replaced += *key_drop_newer_entry; + if (compaction_job_stats) { + compaction_job_stats->num_records_replaced += *key_drop_newer_entry; } *key_drop_newer_entry = 0; } if (*key_drop_obsolete > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); - if (compaction_job_stats_) { - compaction_job_stats_->num_expired_deletion_records - += *key_drop_obsolete; + if (compaction_job_stats) { + compaction_job_stats->num_expired_deletion_records += *key_drop_obsolete; } *key_drop_obsolete = 0; } } -Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { +Status CompactionJob::FinishCompactionOutputFile(const Status& input_status, + SubCompactionState* sub_compact) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); - assert(compact_ != nullptr); - assert(compact_->outfile); - assert(compact_->builder != nullptr); + assert(sub_compact != nullptr); + assert(sub_compact->outfile); + assert(sub_compact->builder != nullptr); - const uint64_t output_number = compact_->current_output()->number; - const uint32_t output_path_id = compact_->current_output()->path_id; + const uint64_t output_number = sub_compact->current_output()->number; + const uint32_t output_path_id = sub_compact->current_output()->path_id; assert(output_number != 0); TableProperties table_properties; // Check for iterator errors Status s = input_status; - const uint64_t current_entries = compact_->builder->NumEntries(); - compact_->current_output()->need_compaction = - compact_->builder->NeedCompact(); + const uint64_t current_entries = sub_compact->builder->NumEntries(); + sub_compact->current_output()->need_compaction = + sub_compact->builder->NeedCompact(); if (s.ok()) { - s = compact_->builder->Finish(); + s = sub_compact->builder->Finish(); } else { - compact_->builder->Abandon(); + sub_compact->builder->Abandon(); } - const uint64_t current_bytes = compact_->builder->FileSize(); - compact_->current_output()->file_size = current_bytes; - compact_->total_bytes += current_bytes; + const uint64_t current_bytes = sub_compact->builder->FileSize(); + sub_compact->current_output()->file_size = current_bytes; + sub_compact->total_bytes += current_bytes; // Finish and check for file errors if (s.ok() && !db_options_.disableDataSync) { StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); - s = compact_->outfile->Sync(db_options_.use_fsync); + s = sub_compact->outfile->Sync(db_options_.use_fsync); } if (s.ok()) { - s = compact_->outfile->Close(); + s = sub_compact->outfile->Close(); } - compact_->outfile.reset(); + sub_compact->outfile.reset(); if (s.ok() && current_entries > 0) { // Verify that the table is usable - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), fd, nullptr, @@ -765,7 +873,7 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { delete iter; if (s.ok()) { - TableFileCreationInfo info(compact_->builder->GetTableProperties()); + TableFileCreationInfo info(sub_compact->builder->GetTableProperties()); info.db_name = dbname_; info.cf_name = cfd->GetName(); info.file_path = TableFileName(cfd->ioptions()->db_paths, @@ -777,18 +885,18 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { " keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes, - compact_->current_output()->need_compaction ? " (need compaction)" + sub_compact->current_output()->need_compaction ? " (need compaction)" : ""); EventHelpers::LogAndNotifyTableFileCreation( event_logger_, cfd->ioptions()->listeners, fd, info); } } - compact_->builder.reset(); + sub_compact->builder.reset(); return s; } Status CompactionJob::InstallCompactionResults( - InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); auto* compaction = compact_->compaction; @@ -816,12 +924,15 @@ Status CompactionJob::InstallCompactionResults( // Add compaction outputs compaction->AddInputDeletions(compact_->compaction->edit()); - for (size_t i = 0; i < compact_->outputs.size(); i++) { - const CompactionState::Output& out = compact_->outputs[i]; - compaction->edit()->AddFile(compaction->output_level(), out.number, - out.path_id, out.file_size, out.smallest, - out.largest, out.smallest_seqno, - out.largest_seqno, out.need_compaction); + + for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + for (size_t i = 0; i < sub_compact.outputs.size(); i++) { + const SubCompactionState::Output& out = sub_compact.outputs[i]; + compaction->edit()->AddFile(compaction->output_level(), out.number, + out.path_id, out.file_size, out.smallest, + out.largest, out.smallest_seqno, + out.largest_seqno, out.need_compaction); + } } return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, compaction->edit(), @@ -835,11 +946,10 @@ Status CompactionJob::InstallCompactionResults( // Employ a sequential search because the total number of // snapshots are typically small. inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( - SequenceNumber in, const std::vector& snapshots, - SequenceNumber* prev_snapshot) { - assert(snapshots.size()); + SequenceNumber in, SequenceNumber* prev_snapshot) { + assert(existing_snapshots_.size()); SequenceNumber prev __attribute__((unused)) = 0; - for (const auto cur : snapshots) { + for (const auto cur : existing_snapshots_) { assert(prev <= cur); if (cur >= in) { *prev_snapshot = prev; @@ -852,7 +962,7 @@ inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( "CompactionJob is not able to find snapshot" " with SeqId later than %" PRIu64 ": current MaxSeqId is %" PRIu64 "", - in, snapshots[snapshots.size() - 1]); + in, existing_snapshots_[existing_snapshots_.size() - 1]); assert(0); return 0; } @@ -868,40 +978,41 @@ void CompactionJob::RecordCompactionIOStats() { IOSTATS_RESET(bytes_written); } -Status CompactionJob::OpenCompactionOutputFile() { - assert(compact_ != nullptr); - assert(compact_->builder == nullptr); +Status CompactionJob::OpenCompactionOutputFile(SubCompactionState* + sub_compact) { + assert(sub_compact != nullptr); + assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); // Make the output file unique_ptr writable_file; std::string fname = TableFileName(db_options_.db_paths, file_number, - compact_->compaction->output_path_id()); + sub_compact->compaction->output_path_id()); Status s = env_->NewWritableFile(fname, &writable_file, env_options_); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 " fails at NewWritableFile with status %s", - compact_->compaction->column_family_data()->GetName().c_str(), job_id_, - file_number, s.ToString().c_str()); + sub_compact->compaction->column_family_data()->GetName().c_str(), + job_id_, file_number, s.ToString().c_str()); LogFlush(db_options_.info_log); return s; } - CompactionState::Output out; + SubCompactionState::Output out; out.number = file_number; - out.path_id = compact_->compaction->output_path_id(); + out.path_id = sub_compact->compaction->output_path_id(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; - compact_->outputs.push_back(out); + sub_compact->outputs.push_back(out); writable_file->SetIOPriority(Env::IO_LOW); - writable_file->SetPreallocationBlockSize( - static_cast(compact_->compaction->OutputFilePreallocationSize())); - compact_->outfile.reset( + writable_file->SetPreallocationBlockSize(static_cast( + sub_compact->compaction->OutputFilePreallocationSize())); + sub_compact->outfile.reset( new WritableFileWriter(std::move(writable_file), env_options_)); - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); bool skip_filters = false; // If the Column family flag is to only optimize filters for hits, @@ -912,30 +1023,34 @@ Status CompactionJob::OpenCompactionOutputFile() { skip_filters = true; } - compact_->builder.reset(NewTableBuilder( + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), - compact_->compaction->output_compression(), + cfd->int_tbl_prop_collector_factories(), sub_compact->outfile.get(), + sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, skip_filters)); LogFlush(db_options_.info_log); return s; } -void CompactionJob::CleanupCompaction(const Status& status) { - if (compact_->builder != nullptr) { - // May happen if we get a shutdown call in the middle of compaction - compact_->builder->Abandon(); - compact_->builder.reset(); - } else { - assert(!status.ok() || compact_->outfile == nullptr); - } - for (size_t i = 0; i < compact_->outputs.size(); i++) { - const CompactionState::Output& out = compact_->outputs[i]; +void CompactionJob::CleanupCompaction() { + for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + const auto& sub_status = sub_compact.status; - // If this file was inserted into the table cache then remove - // them here because this compaction was not committed. - if (!status.ok()) { - TableCache::Evict(table_cache_.get(), out.number); + if (sub_compact.builder != nullptr) { + // May happen if we get a shutdown call in the middle of compaction + sub_compact.builder->Abandon(); + sub_compact.builder.reset(); + } else { + assert(!sub_status.ok() || sub_compact.outfile == nullptr); + } + for (size_t i = 0; i < sub_compact.outputs.size(); i++) { + const SubCompactionState::Output& out = sub_compact.outputs[i]; + + // If this file was inserted into the table cache then remove + // them here because this compaction was not committed. + if (!sub_status.ok()) { + TableCache::Evict(table_cache_.get(), out.number); + } } } delete compact_; @@ -955,14 +1070,6 @@ void CopyPrefix( #endif // !ROCKSDB_LITE void CompactionJob::UpdateCompactionStats() { - size_t num_output_files = compact_->outputs.size(); - if (compact_->builder != nullptr) { - // An error occurred so ignore the last output. - assert(num_output_files > 0); - --num_output_files; - } - compaction_stats_.num_output_files = static_cast(num_output_files); - Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; @@ -983,12 +1090,22 @@ void CompactionJob::UpdateCompactionStats() { } } - for (size_t i = 0; i < num_output_files; i++) { - compaction_stats_.bytes_written += compact_->outputs[i].file_size; - } - if (compact_->num_input_records > compact_->num_output_records) { - compaction_stats_.num_dropped_records += - compact_->num_input_records - compact_->num_output_records; + for (const auto& sub_compact : compact_->sub_compact_states) { + size_t num_output_files = sub_compact.outputs.size(); + if (sub_compact.builder != nullptr) { + // An error occurred so ignore the last output. + assert(num_output_files > 0); + --num_output_files; + } + compaction_stats_.num_output_files += static_cast(num_output_files); + + for (size_t i = 0; i < num_output_files; i++) { + compaction_stats_.bytes_written += sub_compact.outputs[i].file_size; + } + if (sub_compact.num_input_records > sub_compact.num_output_records) { + compaction_stats_.num_dropped_records += + sub_compact.num_input_records - sub_compact.num_output_records; + } } } @@ -1030,13 +1147,13 @@ void CompactionJob::UpdateCompactionJobStats( compact_->num_output_records; compaction_job_stats_->num_output_files = stats.num_output_files; - if (compact_->outputs.size() > 0U) { + if (compact_->NumOutputFiles() > 0U) { CopyPrefix( - compact_->outputs[0].smallest.user_key(), + compact_->SmallestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->smallest_output_key_prefix); CopyPrefix( - compact_->current_output()->largest.user_key(), + compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->largest_output_key_prefix); } @@ -1044,8 +1161,10 @@ void CompactionJob::UpdateCompactionJobStats( #endif // !ROCKSDB_LITE } -void CompactionJob::LogCompaction( - ColumnFamilyData* cfd, Compaction* compaction) { +void CompactionJob::LogCompaction() { + Compaction* compaction = compact_->compaction; + ColumnFamilyData* cfd = compaction->column_family_data(); + // Let's check if anything will get logged. Don't prepare all the info if // we're not logging if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { diff --git a/db/compaction_job.h b/db/compaction_job.h index ea546299c..eadd89fea 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -75,48 +75,51 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held - // status is the return of Run() - void Install(Status* status, const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex); + Status Install(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex); private: - // REQUIRED: mutex not held - Status SubCompactionRun(Slice* start, Slice* end); + struct SubCompactionState; + + void AggregateStatistics(); + // Set up the individual states used by each subcompaction + void InitializeSubCompactions(const SequenceNumber& earliest, + const SequenceNumber& visible, + const SequenceNumber& latest); - void GetSubCompactionBoundaries(); // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); // Call compaction filter. Then iterate through input and compact the // kv-pairs - Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, - Slice* start = nullptr, - Slice* end = nullptr); + void ProcessKeyValueCompaction(SubCompactionState* sub_compact); Status WriteKeyValue(const Slice& key, const Slice& value, const ParsedInternalKey& ikey, - const Status& input_status); - - Status FinishCompactionOutputFile(const Status& input_status); - Status InstallCompactionResults(InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options); - SequenceNumber findEarliestVisibleSnapshot( - SequenceNumber in, const std::vector& snapshots, - SequenceNumber* prev_snapshot); + const Status& input_status, + SubCompactionState* sub_compact); + + Status FinishCompactionOutputFile(const Status& input_status, + SubCompactionState* sub_compact); + Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex); + SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, + SequenceNumber* prev_snapshot); void RecordCompactionIOStats(); - Status OpenCompactionOutputFile(); - void CleanupCompaction(const Status& status); + Status OpenCompactionOutputFile(SubCompactionState* sub_compact); + void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; void RecordDroppedKeys(int64_t* key_drop_user, int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete); + int64_t* key_drop_obsolete, + CompactionJobStats* compaction_job_stats = nullptr); void UpdateCompactionStats(); void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); - void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction); + void LogCompaction(); int job_id_; @@ -126,9 +129,6 @@ class CompactionJob { CompactionJobStats* compaction_job_stats_; bool bottommost_level_; - SequenceNumber earliest_snapshot_; - SequenceNumber visible_at_tip_; - SequenceNumber latest_snapshot_; InternalStats::CompactionStats compaction_stats_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index a7140b595..a3360e822 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -253,8 +253,8 @@ class CompactionJobTest : public testing::Test { s = compaction_job.Run(); ASSERT_OK(s); mutex_.Lock(); - compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); - ASSERT_OK(s); + ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(), + &mutex_)); mutex_.Unlock(); ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1c3883eae..4b6585ff2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1687,10 +1687,10 @@ Status DBImpl::CompactFilesImpl( compaction_job.Prepare(); mutex_.Unlock(); - Status status = compaction_job.Run(); + compaction_job.Run(); mutex_.Lock(); - compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); + Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); @@ -2689,11 +2689,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Prepare(); mutex_.Unlock(); - status = compaction_job.Run(); + compaction_job.Run(); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); mutex_.Lock(); - compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); + status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index b62a88ca3..533190015 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -12,6 +12,8 @@ namespace rocksdb { struct CompactionJobStats { CompactionJobStats() { Reset(); } void Reset(); + // Aggregate the CompactionJobStats from another instance with this one + void Add(const CompactionJobStats& stats); // the elapsed time in micro of this compaction. uint64_t elapsed_micros; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 084cb5e79..a013981ab 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -226,6 +226,12 @@ DEFINE_int32(set_in_place_one_in, 0, DEFINE_int64(cache_size, 2 * KB * KB * KB, "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint64(subcompactions, 1, + "Maximum number of subcompactions to divide L0-L1 compactions " + "into."); +static const bool FLAGS_subcompactions_dummy __attribute__((unused)) = + RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); + static bool ValidateInt32Positive(const char* flagname, int32_t value) { if (value < 0) { fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n", @@ -1877,6 +1883,7 @@ class StressTest { options_.max_manifest_file_size = 10 * 1024; options_.filter_deletes = FLAGS_filter_deletes; options_.inplace_update_support = FLAGS_in_place_update; + options_.num_subcompactions = static_cast(FLAGS_subcompactions); if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { fprintf(stderr, "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 4c3b94694..f64db3dda 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -40,6 +40,35 @@ void CompactionJobStats::Reset() { file_prepare_write_nanos = 0; } +void CompactionJobStats::Add(const CompactionJobStats& stats) { + elapsed_micros += stats.elapsed_micros; + + num_input_records += stats.num_input_records; + num_input_files += stats.num_input_files; + num_input_files_at_output_level += stats.num_input_files_at_output_level; + + num_output_records += stats.num_output_records; + num_output_files += stats.num_output_files; + + total_input_bytes += stats.total_input_bytes; + total_output_bytes += stats.total_output_bytes; + + num_records_replaced += stats.num_records_replaced; + + total_input_raw_key_bytes += stats.total_input_raw_key_bytes; + total_input_raw_value_bytes += stats.total_input_raw_value_bytes; + + num_input_deletion_records += stats.num_input_deletion_records; + num_expired_deletion_records += stats.num_expired_deletion_records; + + num_corrupt_keys += stats.num_corrupt_keys; + + file_write_nanos += stats.file_write_nanos; + file_range_sync_nanos += stats.file_range_sync_nanos; + file_fsync_nanos += stats.file_fsync_nanos; + file_prepare_write_nanos += stats.file_prepare_write_nanos; +} + #else void CompactionJobStats::Reset() {}