diff --git a/db/compaction.cc b/db/compaction.cc index 0aa17c09c..882f05d1a 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -101,8 +101,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, score_(_score), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), - is_manual_compaction_(_manual_compaction), - level_ptrs_(std::vector(number_levels_, 0)) { + is_manual_compaction_(_manual_compaction) { MarkFilesBeingCompacted(true); #ifndef NDEBUG @@ -187,8 +186,11 @@ void Compaction::AddInputDeletions(VersionEdit* out_edit) { } } -bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { +bool Compaction::KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { assert(input_version_ != nullptr); + assert(level_ptrs != nullptr); + assert(level_ptrs->size() == static_cast(number_levels_)); assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; @@ -198,8 +200,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { const std::vector& files = input_version_->storage_info()->LevelFiles(lvl); - for (; level_ptrs_[lvl] < files.size(); ) { - FileMetaData* f = files[level_ptrs_[lvl]]; + for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) { + auto* f = files[level_ptrs->at(lvl)]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { // We've advanced far enough if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { @@ -209,7 +211,6 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { } break; } - level_ptrs_[lvl]++; } } return true; diff --git a/db/compaction.h b/db/compaction.h index c2069a707..1fed0a31d 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -128,7 +128,8 @@ class Compaction { // Returns true if the available information we have guarantees that // the input "user_key" does not exist in any level beyond "output_level()". - bool KeyNotExistsBeyondOutputLevel(const Slice& user_key); + bool KeyNotExistsBeyondOutputLevel(const Slice& user_key, + std::vector* level_ptrs) const; // Returns true iff we should stop building the current output // before processing "internal_key". @@ -168,6 +169,9 @@ class Compaction { // are non-overlapping and can be trivially moved. bool is_trivial_move() { return is_trivial_move_; } + // How many total levels are there? + int number_levels() const { return number_levels_; } + // Return the MutableCFOptions that should be used throughout the compaction // procedure const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } @@ -258,16 +262,8 @@ class Compaction { // True if we can do trivial move in Universal multi level // compaction - bool is_trivial_move_; - // "level_ptrs_" holds indices into "input_version_->levels_", where each - // index remembers which file of an associated level we are currently used - // to check KeyNotExistsBeyondOutputLevel() for deletion operation. - // As it is for checking KeyNotExistsBeyondOutputLevel(), it only - // records indices for all levels beyond "output_level_". - std::vector level_ptrs_; - // Does input compression match the output compression? bool InputCompressionMatchesOutput() const; }; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index dd68b2147..61bbb5746 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -57,9 +57,18 @@ namespace rocksdb { -struct CompactionJob::CompactionState { +// Maintains state for each sub-compaction +struct CompactionJob::SubCompactionState { Compaction* const compaction; + // The boundaries of the key-range this compaction is interested in. No two + // subcompactions may have overlapping key-ranges. + // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded + Slice *start, *end; + + // The return status of this compaction + Status status; + // Files produced by compaction struct Output { uint64_t number; @@ -69,15 +78,64 @@ struct CompactionJob::CompactionState { SequenceNumber smallest_seqno, largest_seqno; bool need_compaction; }; - std::vector outputs; // State kept for output being generated + std::vector outputs; std::unique_ptr outfile; std::unique_ptr builder; + Output* current_output() { + assert(!outputs.empty()); + return &outputs.back(); + } + // State during the sub-compaction uint64_t total_bytes; + uint64_t num_input_records; + uint64_t num_output_records; + SequenceNumber earliest_snapshot; + SequenceNumber visible_at_tip; + SequenceNumber latest_snapshot; + CompactionJobStats compaction_job_stats; + + // "level_ptrs" holds indices that remember which file of an associated + // level we were last checking during the last call to compaction-> + // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function + // to pick off where it left off since each subcompaction's key range is + // increasing so a later call to the function must be looking for a key that + // is in or beyond the last file checked during the previous call + std::vector level_ptrs; + + explicit SubCompactionState(Compaction* c, Slice* _start, Slice* _end, + SequenceNumber earliest, SequenceNumber visible, + SequenceNumber latest) + : compaction(c), + start(_start), + end(_end), + outfile(nullptr), + builder(nullptr), + total_bytes(0), + num_input_records(0), + num_output_records(0), + earliest_snapshot(earliest), + visible_at_tip(visible), + latest_snapshot(latest) { + assert(compaction != nullptr); + level_ptrs = std::vector(compaction->number_levels(), 0); + } +}; + +// Maintains state for the entire compaction +struct CompactionJob::CompactionState { + Compaction* const compaction; - Output* current_output() { return &outputs[outputs.size() - 1]; } + // REQUIRED: subcompaction states are stored in order of increasing + // key-range + std::vector sub_compact_states; + Status status; + + uint64_t total_bytes; + uint64_t num_input_records; + uint64_t num_output_records; explicit CompactionState(Compaction* c) : compaction(c), @@ -85,10 +143,39 @@ struct CompactionJob::CompactionState { num_input_records(0), num_output_records(0) {} - uint64_t num_input_records; - uint64_t num_output_records; + size_t NumOutputFiles() { + size_t total = 0; + for (auto& s : sub_compact_states) { + total += s.outputs.size(); + } + return total; + } + + Slice SmallestUserKey() { + assert(!sub_compact_states.empty() && + sub_compact_states[0].start == nullptr); + return sub_compact_states[0].outputs[0].smallest.user_key(); + } + + Slice LargestUserKey() { + assert(!sub_compact_states.empty() && + sub_compact_states.back().end == nullptr); + return sub_compact_states.back().current_output()->largest.user_key(); + } }; +void CompactionJob::AggregateStatistics() { + for (SubCompactionState& sc : compact_->sub_compact_states) { + compact_->total_bytes += sc.total_bytes; + compact_->num_input_records += sc.num_input_records; + compact_->num_output_records += sc.num_output_records; + + if (compaction_job_stats_) { + compaction_job_stats_->Add(sc.compaction_job_stats); + } + } +} + CompactionJob::CompactionJob( int job_id, Compaction* compaction, const DBOptions& db_options, const EnvOptions& env_options, VersionSet* versions, @@ -179,59 +266,89 @@ void CompactionJob::Prepare() { ThreadStatus::STAGE_COMPACTION_PREPARE); // Generate file_levels_ for compaction berfore making Iterator - ColumnFamilyData* cfd __attribute__((unused)) = - compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); assert(cfd != nullptr); assert(cfd->current()->storage_info()->NumLevelFiles( compact_->compaction->level()) > 0); - assert(compact_->builder == nullptr); - assert(!compact_->outfile); - visible_at_tip_ = 0; - latest_snapshot_ = 0; + // Is this compaction producing files at the bottommost level? + bottommost_level_ = compact_->compaction->bottommost_level(); + + // Initialize subcompaction states + SequenceNumber earliest_snapshot; + SequenceNumber latest_snapshot = 0; + SequenceNumber visible_at_tip = 0; + if (existing_snapshots_.size() == 0) { // optimize for fast path if there are no snapshots - visible_at_tip_ = versions_->LastSequence(); - earliest_snapshot_ = visible_at_tip_; + visible_at_tip = versions_->LastSequence(); + earliest_snapshot = visible_at_tip; } else { - latest_snapshot_ = existing_snapshots_.back(); + latest_snapshot = existing_snapshots_.back(); // Add the current seqno as the 'latest' virtual // snapshot to the end of this list. existing_snapshots_.push_back(versions_->LastSequence()); - earliest_snapshot_ = existing_snapshots_[0]; + earliest_snapshot = existing_snapshots_[0]; } - // Is this compaction producing files at the bottommost level? - bottommost_level_ = compact_->compaction->bottommost_level(); - - GetSubCompactionBoundaries(); + InitializeSubCompactions(earliest_snapshot, visible_at_tip, latest_snapshot); } // For L0-L1 compaction, iterators work in parallel by processing -// different subsets of the full key range. This function returns -// the Slices that designate the boundaries of these ranges. Now -// these boundaries are defined the key ranges of the files in L1, -// and the first and last entries are always nullptr (unrestricted) -void CompactionJob::GetSubCompactionBoundaries() { - auto* c = compact_->compaction; - auto& slices = sub_compaction_boundaries_; +// different subsets of the full key range. This function sets up +// the local states used by each of these subcompactions during +// their execution +void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest, + const SequenceNumber& visible, + const SequenceNumber& latest) { + Compaction* c = compact_->compaction; + auto& bounds = sub_compaction_boundaries_; if (c->IsSubCompaction()) { // TODO(aekmekji): take the option num_subcompactions into account // when dividing up the key range between multiple iterators instead // of just assigning each iterator one L1 file's key range + auto* cmp = c->column_family_data()->user_comparator(); for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->level(which) == 1) { - if (c->input_levels(which)->num_files > 1) { - const LevelFilesBrief* flevel = c->input_levels(which); - for (size_t i = 1; i < flevel->num_files; i++) { - slices.emplace_back(flevel->files[i].smallest_key); + const LevelFilesBrief* flevel = c->input_levels(which); + size_t num_files = flevel->num_files; + + if (num_files > 1) { + auto& files = flevel->files; + Slice global_min = ExtractUserKey(files[0].smallest_key); + Slice global_max = ExtractUserKey(files[num_files - 1].largest_key); + + for (size_t i = 1; i < num_files; i++) { + // Make sure the smallest key in two consecutive L1 files are + // unique before adding the smallest key as a boundary. Also ensure + // that the boundary won't lead to an empty subcompaction (happens + // if the boundary == the smallest or largest key) + Slice s1 = ExtractUserKey(files[i].smallest_key); + Slice s2 = i == num_files - 1 + ? Slice() + : ExtractUserKey(files[i + 1].smallest_key); + + if ( (i == num_files - 1 && cmp->Compare(s1, global_max) < 0) + || (i < num_files - 1 && cmp->Compare(s1, s2) < 0 && + cmp->Compare(s1, global_min) > 0)) { + bounds.emplace_back(s1); + } } } break; } } } + + // Note: it's necessary for the first iterator sub-range to have + // start == nullptr and for the last to have end == nullptr + for (size_t i = 0; i <= bounds.size(); i++) { + Slice *start = i == 0 ? nullptr : &bounds[i - 1]; + Slice *end = i == bounds.size() ? nullptr : &bounds[i]; + compact_->sub_compact_states.emplace_back(compact_->compaction, start, + end, earliest, visible, latest); + } } Status CompactionJob::Run() { @@ -239,100 +356,48 @@ Status CompactionJob::Run() { ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); - auto* compaction = compact_->compaction; - LogCompaction(compaction->column_family_data(), compaction); + LogCompaction(); - Status status; - Slice *start, *end; - for (size_t i = 0; i < sub_compaction_boundaries_.size() + 1; i++) { - if (i == 0) { - start = nullptr; - } else { - start = &sub_compaction_boundaries_[i - 1]; - } - if (i == sub_compaction_boundaries_.size()) { - end = nullptr; - } else { - end = &sub_compaction_boundaries_[i]; - } + // Run each subcompaction sequentially + const uint64_t start_micros = env_->NowMicros(); + for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) { + ProcessKeyValueCompaction(&compact_->sub_compact_states[i]); + } + compaction_stats_.micros = env_->NowMicros() - start_micros; + MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - status = SubCompactionRun(start, end); - if (!status.ok()) { + // Determine if any of the subcompactions failed + Status status; + for (const auto& state : compact_->sub_compact_states) { + if (!state.status.ok()) { + status = state.status; break; } } + // Finish up all book-keeping to unify the subcompaction results + AggregateStatistics(); UpdateCompactionStats(); RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); + compact_->status = status; return status; } -Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) { - PerfLevel prev_perf_level = PerfLevel::kEnableTime; - uint64_t prev_write_nanos = 0; - uint64_t prev_fsync_nanos = 0; - uint64_t prev_range_sync_nanos = 0; - uint64_t prev_prepare_write_nanos = 0; - bool enabled_io_stats = false; - if (measure_io_stats_ && compaction_job_stats_ != nullptr) { - prev_perf_level = GetPerfLevel(); - SetPerfLevel(PerfLevel::kEnableTime); - prev_write_nanos = iostats_context.write_nanos; - prev_fsync_nanos = iostats_context.fsync_nanos; - prev_range_sync_nanos = iostats_context.range_sync_nanos; - prev_prepare_write_nanos = iostats_context.prepare_write_nanos; - enabled_io_stats = true; - } - - auto* compaction = compact_->compaction; - const uint64_t start_micros = env_->NowMicros(); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions - - std::unique_ptr input(versions_->MakeInputIterator(compaction)); - Status status = ProcessKeyValueCompaction(&imm_micros, input.get(), - start, end); - - input.reset(); - - if (output_directory_ && !db_options_.disableDataSync) { - output_directory_->Fsync(); - } - - compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; - MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); - - if (enabled_io_stats) { - // Not supporting parallel running yet - compaction_job_stats_->file_write_nanos += - iostats_context.write_nanos - prev_write_nanos; - compaction_job_stats_->file_fsync_nanos += - iostats_context.fsync_nanos - prev_fsync_nanos; - compaction_job_stats_->file_range_sync_nanos += - iostats_context.range_sync_nanos - prev_range_sync_nanos; - compaction_job_stats_->file_prepare_write_nanos += - iostats_context.prepare_write_nanos - prev_prepare_write_nanos; - if (prev_perf_level != PerfLevel::kEnableTime) { - SetPerfLevel(prev_perf_level); - } - } - return status; -} - -void CompactionJob::Install(Status* status, - const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex) { +Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex->AssertHeld(); + Status status = compact_->status; ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); - if (status->ok()) { - *status = InstallCompactionResults(db_mutex, mutable_cf_options); + if (status.ok()) { + status = InstallCompactionResults(mutable_cf_options, db_mutex); } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); @@ -359,7 +424,7 @@ void CompactionJob::Install(Status* status, static_cast(stats.bytes_read_non_output_levels), stats.bytes_written / static_cast(stats.bytes_read_non_output_levels), - status->ToString().c_str(), stats.num_input_records, + status.ToString().c_str(), stats.num_input_records, stats.num_dropped_records); UpdateCompactionJobStats(stats); @@ -368,7 +433,7 @@ void CompactionJob::Install(Status* status, stream << "job" << job_id_ << "event" << "compaction_finished" << "output_level" << compact_->compaction->output_level() - << "num_output_files" << compact_->outputs.size() + << "num_output_files" << compact_->NumOutputFiles() << "total_output_size" << compact_->total_bytes << "num_input_records" << compact_->num_input_records << "num_output_records" << compact_->num_output_records; @@ -389,24 +454,45 @@ void CompactionJob::Install(Status* status, } stream.EndArray(); - CleanupCompaction(*status); + CleanupCompaction(); + return status; } -Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, - Iterator* input, - Slice* start, Slice* end) { +void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { + assert(sub_compact != nullptr); + std::unique_ptr input_ptr( + versions_->MakeInputIterator(sub_compact->compaction)); + Iterator* input = input_ptr.get(); + AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); + + // I/O measurement variables + PerfLevel prev_perf_level = PerfLevel::kEnableTime; + uint64_t prev_write_nanos = 0; + uint64_t prev_fsync_nanos = 0; + uint64_t prev_range_sync_nanos = 0; + uint64_t prev_prepare_write_nanos = 0; + if (measure_io_stats_) { + prev_perf_level = GetPerfLevel(); + SetPerfLevel(PerfLevel::kEnableTime); + prev_write_nanos = iostats_context.write_nanos; + prev_fsync_nanos = iostats_context.fsync_nanos; + prev_range_sync_nanos = iostats_context.range_sync_nanos; + prev_prepare_write_nanos = iostats_context.prepare_write_nanos; + } + + // Variables used inside the loop Status status; std::string compaction_filter_value; ParsedInternalKey ikey; IterKey current_user_key; bool has_current_user_key = false; IterKey delete_key; - SequenceNumber last_sequence_for_key __attribute__((unused)) = - kMaxSequenceNumber; + + SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator, db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands, @@ -415,7 +501,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { compaction_filter_from_factory = - compact_->compaction->CreateCompactionFilter(); + sub_compact->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); } @@ -429,6 +515,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, StopWatchNano timer(env_, stats_ != nullptr); uint64_t total_filter_time = 0; + Slice* start = sub_compact->start; + Slice* end = sub_compact->end; if (start != nullptr) { IterKey start_iter; start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); @@ -446,7 +534,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Slice value = input->value(); // First check that the key is parseable before performing the comparison - // to determine if it's within the range we want + // to determine if it's within the range we want. Parsing may fail if the + // key being passed in is a user key without any internal key component if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the rationale @@ -455,46 +544,43 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; + sub_compact->compaction_job_stats.num_corrupt_keys++; - if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->num_corrupt_keys++; - } - - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); input->Next(); continue; } - // If an end key is specified, check if the current key is >= than it - // and exit if it is because the iterator is out of the range desired + // If an end key (exclusive) is specified, check if the current key is + // >= than it and exit if it is because the iterator is out of its range if (end != nullptr && cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) { break; } - compact_->num_input_records++; + sub_compact->num_input_records++; if (++loop_cnt > 1000) { - RecordDroppedKeys( - &key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); + RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, + &key_drop_obsolete, + &sub_compact->compaction_job_stats); RecordCompactionIOStats(); loop_cnt = 0; } - if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->total_input_raw_key_bytes += key.size(); - compaction_job_stats_->total_input_raw_value_bytes += value.size(); - } + sub_compact->compaction_job_stats.total_input_raw_key_bytes += key.size(); + sub_compact->compaction_job_stats.total_input_raw_value_bytes += + value.size(); - if (compact_->compaction->ShouldStopBefore(key) && - compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); + if (sub_compact->compaction->ShouldStopBefore(key) && + sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile(input->status(), sub_compact); if (!status.ok()) { break; } } - if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { - compaction_job_stats_->num_input_deletion_records++; + if (ikey.type == kTypeDeletion) { + sub_compact->compaction_job_stats.num_input_deletion_records++; } if (!has_current_user_key || @@ -507,19 +593,19 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, visible_in_snapshot = kMaxSequenceNumber; // apply the compaction filter to the first occurrence of the user key if (compaction_filter && ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + (sub_compact->visible_at_tip || + ikey.sequence > sub_compact->latest_snapshot)) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the - // filter. - // If the return value of the compaction filter is true, replace - // the entry with a delete marker. + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. bool value_changed = false; compaction_filter_value.clear(); if (stats_ != nullptr) { timer.Start(); } bool to_delete = compaction_filter->Filter( - compact_->compaction->level(), ikey.user_key, value, + sub_compact->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); total_filter_time += timer.ElapsedNanos(); if (to_delete) { @@ -544,10 +630,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // the earlist snapshot that is affected by this kv. SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot SequenceNumber visible = - visible_at_tip_ - ? visible_at_tip_ - : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, - &prev_snapshot); + sub_compact->visible_at_tip + ? sub_compact->visible_at_tip + : findEarliestVisibleSnapshot(ikey.sequence, &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in @@ -559,9 +644,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, ++key_drop_newer_entry; input->Next(); // (A) } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot_ && - compact_->compaction->KeyNotExistsBeyondOutputLevel( - ikey.user_key)) { + ikey.sequence <= sub_compact->earliest_snapshot && + sub_compact->compaction->KeyNotExistsBeyondOutputLevel( + ikey.user_key, &sub_compact->level_ptrs)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers @@ -604,10 +689,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to valid. assert(valid_key); - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); } } else { - status = WriteKeyValue(key, value, ikey, input->status()); + status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); input->Next(); } @@ -616,7 +701,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); - RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); + RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete, + &sub_compact->compaction_job_stats); RecordCompactionIOStats(); if (status.ok() && @@ -624,18 +710,39 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } - if (status.ok() && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); + if (status.ok() && sub_compact->builder != nullptr) { + status = FinishCompactionOutputFile(input->status(), sub_compact); } if (status.ok()) { status = input->status(); } + if (output_directory_ && !db_options_.disableDataSync) { + // TODO(aekmekji): Maybe only call once after all subcompactions complete? + output_directory_->Fsync(); + } - return status; + if (measure_io_stats_) { + sub_compact->compaction_job_stats.file_write_nanos += + iostats_context.write_nanos - prev_write_nanos; + sub_compact->compaction_job_stats.file_fsync_nanos += + iostats_context.fsync_nanos - prev_fsync_nanos; + sub_compact->compaction_job_stats.file_range_sync_nanos += + iostats_context.range_sync_nanos - prev_range_sync_nanos; + sub_compact->compaction_job_stats.file_prepare_write_nanos += + iostats_context.prepare_write_nanos - prev_prepare_write_nanos; + if (prev_perf_level != PerfLevel::kEnableTime) { + SetPerfLevel(prev_perf_level); + } + } + + input_ptr.reset(); + sub_compact->status = status; } Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, - const ParsedInternalKey& ikey, const Status& input_status) { + const ParsedInternalKey& ikey, const Status& input_status, + SubCompactionState* sub_compact) { + Slice newkey(key.data(), key.size()); std::string kstr; @@ -643,7 +750,7 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && + if (bottommost_level_ && ikey.sequence < sub_compact->earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -654,33 +761,33 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, } // Open output file if necessary - if (compact_->builder == nullptr) { - Status status = OpenCompactionOutputFile(); + if (sub_compact->builder == nullptr) { + Status status = OpenCompactionOutputFile(sub_compact); if (!status.ok()) { return status; } } - assert(compact_->builder != nullptr); + assert(sub_compact->builder != nullptr); SequenceNumber seqno = GetInternalKeySeqno(newkey); - if (compact_->builder->NumEntries() == 0) { - compact_->current_output()->smallest.DecodeFrom(newkey); - compact_->current_output()->smallest_seqno = seqno; + if (sub_compact->builder->NumEntries() == 0) { + sub_compact->current_output()->smallest.DecodeFrom(newkey); + sub_compact->current_output()->smallest_seqno = seqno; } else { - compact_->current_output()->smallest_seqno = - std::min(compact_->current_output()->smallest_seqno, seqno); + sub_compact->current_output()->smallest_seqno = + std::min(sub_compact->current_output()->smallest_seqno, seqno); } - compact_->current_output()->largest.DecodeFrom(newkey); - compact_->builder->Add(newkey, value); - compact_->num_output_records++; - compact_->current_output()->largest_seqno = - std::max(compact_->current_output()->largest_seqno, seqno); + sub_compact->current_output()->largest.DecodeFrom(newkey); + sub_compact->builder->Add(newkey, value); + sub_compact->num_output_records++; + sub_compact->current_output()->largest_seqno = + std::max(sub_compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough Status status; - if (compact_->builder->FileSize() >= - compact_->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input_status); + if (sub_compact->builder->FileSize() >= + sub_compact->compaction->max_output_file_size()) { + status = FinishCompactionOutputFile(input_status, sub_compact); } return status; @@ -689,67 +796,68 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, void CompactionJob::RecordDroppedKeys( int64_t* key_drop_user, int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete) { + int64_t* key_drop_obsolete, + CompactionJobStats* compaction_job_stats) { if (*key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user); *key_drop_user = 0; } if (*key_drop_newer_entry > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry); - if (compaction_job_stats_) { - compaction_job_stats_->num_records_replaced += *key_drop_newer_entry; + if (compaction_job_stats) { + compaction_job_stats->num_records_replaced += *key_drop_newer_entry; } *key_drop_newer_entry = 0; } if (*key_drop_obsolete > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); - if (compaction_job_stats_) { - compaction_job_stats_->num_expired_deletion_records - += *key_drop_obsolete; + if (compaction_job_stats) { + compaction_job_stats->num_expired_deletion_records += *key_drop_obsolete; } *key_drop_obsolete = 0; } } -Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { +Status CompactionJob::FinishCompactionOutputFile(const Status& input_status, + SubCompactionState* sub_compact) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); - assert(compact_ != nullptr); - assert(compact_->outfile); - assert(compact_->builder != nullptr); + assert(sub_compact != nullptr); + assert(sub_compact->outfile); + assert(sub_compact->builder != nullptr); - const uint64_t output_number = compact_->current_output()->number; - const uint32_t output_path_id = compact_->current_output()->path_id; + const uint64_t output_number = sub_compact->current_output()->number; + const uint32_t output_path_id = sub_compact->current_output()->path_id; assert(output_number != 0); TableProperties table_properties; // Check for iterator errors Status s = input_status; - const uint64_t current_entries = compact_->builder->NumEntries(); - compact_->current_output()->need_compaction = - compact_->builder->NeedCompact(); + const uint64_t current_entries = sub_compact->builder->NumEntries(); + sub_compact->current_output()->need_compaction = + sub_compact->builder->NeedCompact(); if (s.ok()) { - s = compact_->builder->Finish(); + s = sub_compact->builder->Finish(); } else { - compact_->builder->Abandon(); + sub_compact->builder->Abandon(); } - const uint64_t current_bytes = compact_->builder->FileSize(); - compact_->current_output()->file_size = current_bytes; - compact_->total_bytes += current_bytes; + const uint64_t current_bytes = sub_compact->builder->FileSize(); + sub_compact->current_output()->file_size = current_bytes; + sub_compact->total_bytes += current_bytes; // Finish and check for file errors if (s.ok() && !db_options_.disableDataSync) { StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); - s = compact_->outfile->Sync(db_options_.use_fsync); + s = sub_compact->outfile->Sync(db_options_.use_fsync); } if (s.ok()) { - s = compact_->outfile->Close(); + s = sub_compact->outfile->Close(); } - compact_->outfile.reset(); + sub_compact->outfile.reset(); if (s.ok() && current_entries > 0) { // Verify that the table is usable - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), fd, nullptr, @@ -765,7 +873,7 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { delete iter; if (s.ok()) { - TableFileCreationInfo info(compact_->builder->GetTableProperties()); + TableFileCreationInfo info(sub_compact->builder->GetTableProperties()); info.db_name = dbname_; info.cf_name = cfd->GetName(); info.file_path = TableFileName(cfd->ioptions()->db_paths, @@ -777,18 +885,18 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) { " keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes, - compact_->current_output()->need_compaction ? " (need compaction)" + sub_compact->current_output()->need_compaction ? " (need compaction)" : ""); EventHelpers::LogAndNotifyTableFileCreation( event_logger_, cfd->ioptions()->listeners, fd, info); } } - compact_->builder.reset(); + sub_compact->builder.reset(); return s; } Status CompactionJob::InstallCompactionResults( - InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); auto* compaction = compact_->compaction; @@ -816,12 +924,15 @@ Status CompactionJob::InstallCompactionResults( // Add compaction outputs compaction->AddInputDeletions(compact_->compaction->edit()); - for (size_t i = 0; i < compact_->outputs.size(); i++) { - const CompactionState::Output& out = compact_->outputs[i]; - compaction->edit()->AddFile(compaction->output_level(), out.number, - out.path_id, out.file_size, out.smallest, - out.largest, out.smallest_seqno, - out.largest_seqno, out.need_compaction); + + for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + for (size_t i = 0; i < sub_compact.outputs.size(); i++) { + const SubCompactionState::Output& out = sub_compact.outputs[i]; + compaction->edit()->AddFile(compaction->output_level(), out.number, + out.path_id, out.file_size, out.smallest, + out.largest, out.smallest_seqno, + out.largest_seqno, out.need_compaction); + } } return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, compaction->edit(), @@ -835,11 +946,10 @@ Status CompactionJob::InstallCompactionResults( // Employ a sequential search because the total number of // snapshots are typically small. inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( - SequenceNumber in, const std::vector& snapshots, - SequenceNumber* prev_snapshot) { - assert(snapshots.size()); + SequenceNumber in, SequenceNumber* prev_snapshot) { + assert(existing_snapshots_.size()); SequenceNumber prev __attribute__((unused)) = 0; - for (const auto cur : snapshots) { + for (const auto cur : existing_snapshots_) { assert(prev <= cur); if (cur >= in) { *prev_snapshot = prev; @@ -852,7 +962,7 @@ inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( "CompactionJob is not able to find snapshot" " with SeqId later than %" PRIu64 ": current MaxSeqId is %" PRIu64 "", - in, snapshots[snapshots.size() - 1]); + in, existing_snapshots_[existing_snapshots_.size() - 1]); assert(0); return 0; } @@ -868,40 +978,41 @@ void CompactionJob::RecordCompactionIOStats() { IOSTATS_RESET(bytes_written); } -Status CompactionJob::OpenCompactionOutputFile() { - assert(compact_ != nullptr); - assert(compact_->builder == nullptr); +Status CompactionJob::OpenCompactionOutputFile(SubCompactionState* + sub_compact) { + assert(sub_compact != nullptr); + assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); // Make the output file unique_ptr writable_file; std::string fname = TableFileName(db_options_.db_paths, file_number, - compact_->compaction->output_path_id()); + sub_compact->compaction->output_path_id()); Status s = env_->NewWritableFile(fname, &writable_file, env_options_); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 " fails at NewWritableFile with status %s", - compact_->compaction->column_family_data()->GetName().c_str(), job_id_, - file_number, s.ToString().c_str()); + sub_compact->compaction->column_family_data()->GetName().c_str(), + job_id_, file_number, s.ToString().c_str()); LogFlush(db_options_.info_log); return s; } - CompactionState::Output out; + SubCompactionState::Output out; out.number = file_number; - out.path_id = compact_->compaction->output_path_id(); + out.path_id = sub_compact->compaction->output_path_id(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; - compact_->outputs.push_back(out); + sub_compact->outputs.push_back(out); writable_file->SetIOPriority(Env::IO_LOW); - writable_file->SetPreallocationBlockSize( - static_cast(compact_->compaction->OutputFilePreallocationSize())); - compact_->outfile.reset( + writable_file->SetPreallocationBlockSize(static_cast( + sub_compact->compaction->OutputFilePreallocationSize())); + sub_compact->outfile.reset( new WritableFileWriter(std::move(writable_file), env_options_)); - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); bool skip_filters = false; // If the Column family flag is to only optimize filters for hits, @@ -912,30 +1023,34 @@ Status CompactionJob::OpenCompactionOutputFile() { skip_filters = true; } - compact_->builder.reset(NewTableBuilder( + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), - compact_->compaction->output_compression(), + cfd->int_tbl_prop_collector_factories(), sub_compact->outfile.get(), + sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, skip_filters)); LogFlush(db_options_.info_log); return s; } -void CompactionJob::CleanupCompaction(const Status& status) { - if (compact_->builder != nullptr) { - // May happen if we get a shutdown call in the middle of compaction - compact_->builder->Abandon(); - compact_->builder.reset(); - } else { - assert(!status.ok() || compact_->outfile == nullptr); - } - for (size_t i = 0; i < compact_->outputs.size(); i++) { - const CompactionState::Output& out = compact_->outputs[i]; +void CompactionJob::CleanupCompaction() { + for (SubCompactionState& sub_compact : compact_->sub_compact_states) { + const auto& sub_status = sub_compact.status; - // If this file was inserted into the table cache then remove - // them here because this compaction was not committed. - if (!status.ok()) { - TableCache::Evict(table_cache_.get(), out.number); + if (sub_compact.builder != nullptr) { + // May happen if we get a shutdown call in the middle of compaction + sub_compact.builder->Abandon(); + sub_compact.builder.reset(); + } else { + assert(!sub_status.ok() || sub_compact.outfile == nullptr); + } + for (size_t i = 0; i < sub_compact.outputs.size(); i++) { + const SubCompactionState::Output& out = sub_compact.outputs[i]; + + // If this file was inserted into the table cache then remove + // them here because this compaction was not committed. + if (!sub_status.ok()) { + TableCache::Evict(table_cache_.get(), out.number); + } } } delete compact_; @@ -955,14 +1070,6 @@ void CopyPrefix( #endif // !ROCKSDB_LITE void CompactionJob::UpdateCompactionStats() { - size_t num_output_files = compact_->outputs.size(); - if (compact_->builder != nullptr) { - // An error occurred so ignore the last output. - assert(num_output_files > 0); - --num_output_files; - } - compaction_stats_.num_output_files = static_cast(num_output_files); - Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; @@ -983,12 +1090,22 @@ void CompactionJob::UpdateCompactionStats() { } } - for (size_t i = 0; i < num_output_files; i++) { - compaction_stats_.bytes_written += compact_->outputs[i].file_size; - } - if (compact_->num_input_records > compact_->num_output_records) { - compaction_stats_.num_dropped_records += - compact_->num_input_records - compact_->num_output_records; + for (const auto& sub_compact : compact_->sub_compact_states) { + size_t num_output_files = sub_compact.outputs.size(); + if (sub_compact.builder != nullptr) { + // An error occurred so ignore the last output. + assert(num_output_files > 0); + --num_output_files; + } + compaction_stats_.num_output_files += static_cast(num_output_files); + + for (size_t i = 0; i < num_output_files; i++) { + compaction_stats_.bytes_written += sub_compact.outputs[i].file_size; + } + if (sub_compact.num_input_records > sub_compact.num_output_records) { + compaction_stats_.num_dropped_records += + sub_compact.num_input_records - sub_compact.num_output_records; + } } } @@ -1030,13 +1147,13 @@ void CompactionJob::UpdateCompactionJobStats( compact_->num_output_records; compaction_job_stats_->num_output_files = stats.num_output_files; - if (compact_->outputs.size() > 0U) { + if (compact_->NumOutputFiles() > 0U) { CopyPrefix( - compact_->outputs[0].smallest.user_key(), + compact_->SmallestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->smallest_output_key_prefix); CopyPrefix( - compact_->current_output()->largest.user_key(), + compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->largest_output_key_prefix); } @@ -1044,8 +1161,10 @@ void CompactionJob::UpdateCompactionJobStats( #endif // !ROCKSDB_LITE } -void CompactionJob::LogCompaction( - ColumnFamilyData* cfd, Compaction* compaction) { +void CompactionJob::LogCompaction() { + Compaction* compaction = compact_->compaction; + ColumnFamilyData* cfd = compaction->column_family_data(); + // Let's check if anything will get logged. Don't prepare all the info if // we're not logging if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { diff --git a/db/compaction_job.h b/db/compaction_job.h index ea546299c..eadd89fea 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -75,48 +75,51 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held - // status is the return of Run() - void Install(Status* status, const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex); + Status Install(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex); private: - // REQUIRED: mutex not held - Status SubCompactionRun(Slice* start, Slice* end); + struct SubCompactionState; + + void AggregateStatistics(); + // Set up the individual states used by each subcompaction + void InitializeSubCompactions(const SequenceNumber& earliest, + const SequenceNumber& visible, + const SequenceNumber& latest); - void GetSubCompactionBoundaries(); // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); // Call compaction filter. Then iterate through input and compact the // kv-pairs - Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, - Slice* start = nullptr, - Slice* end = nullptr); + void ProcessKeyValueCompaction(SubCompactionState* sub_compact); Status WriteKeyValue(const Slice& key, const Slice& value, const ParsedInternalKey& ikey, - const Status& input_status); - - Status FinishCompactionOutputFile(const Status& input_status); - Status InstallCompactionResults(InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options); - SequenceNumber findEarliestVisibleSnapshot( - SequenceNumber in, const std::vector& snapshots, - SequenceNumber* prev_snapshot); + const Status& input_status, + SubCompactionState* sub_compact); + + Status FinishCompactionOutputFile(const Status& input_status, + SubCompactionState* sub_compact); + Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex); + SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, + SequenceNumber* prev_snapshot); void RecordCompactionIOStats(); - Status OpenCompactionOutputFile(); - void CleanupCompaction(const Status& status); + Status OpenCompactionOutputFile(SubCompactionState* sub_compact); + void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; void RecordDroppedKeys(int64_t* key_drop_user, int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete); + int64_t* key_drop_obsolete, + CompactionJobStats* compaction_job_stats = nullptr); void UpdateCompactionStats(); void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); - void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction); + void LogCompaction(); int job_id_; @@ -126,9 +129,6 @@ class CompactionJob { CompactionJobStats* compaction_job_stats_; bool bottommost_level_; - SequenceNumber earliest_snapshot_; - SequenceNumber visible_at_tip_; - SequenceNumber latest_snapshot_; InternalStats::CompactionStats compaction_stats_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index a7140b595..a3360e822 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -253,8 +253,8 @@ class CompactionJobTest : public testing::Test { s = compaction_job.Run(); ASSERT_OK(s); mutex_.Lock(); - compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); - ASSERT_OK(s); + ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(), + &mutex_)); mutex_.Unlock(); ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1c3883eae..4b6585ff2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1687,10 +1687,10 @@ Status DBImpl::CompactFilesImpl( compaction_job.Prepare(); mutex_.Unlock(); - Status status = compaction_job.Run(); + compaction_job.Run(); mutex_.Lock(); - compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); + Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); @@ -2689,11 +2689,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Prepare(); mutex_.Unlock(); - status = compaction_job.Run(); + compaction_job.Run(); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); mutex_.Lock(); - compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); + status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index b62a88ca3..533190015 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -12,6 +12,8 @@ namespace rocksdb { struct CompactionJobStats { CompactionJobStats() { Reset(); } void Reset(); + // Aggregate the CompactionJobStats from another instance with this one + void Add(const CompactionJobStats& stats); // the elapsed time in micro of this compaction. uint64_t elapsed_micros; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 084cb5e79..a013981ab 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -226,6 +226,12 @@ DEFINE_int32(set_in_place_one_in, 0, DEFINE_int64(cache_size, 2 * KB * KB * KB, "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint64(subcompactions, 1, + "Maximum number of subcompactions to divide L0-L1 compactions " + "into."); +static const bool FLAGS_subcompactions_dummy __attribute__((unused)) = + RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); + static bool ValidateInt32Positive(const char* flagname, int32_t value) { if (value < 0) { fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n", @@ -1877,6 +1883,7 @@ class StressTest { options_.max_manifest_file_size = 10 * 1024; options_.filter_deletes = FLAGS_filter_deletes; options_.inplace_update_support = FLAGS_in_place_update; + options_.num_subcompactions = static_cast(FLAGS_subcompactions); if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { fprintf(stderr, "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 4c3b94694..f64db3dda 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -40,6 +40,35 @@ void CompactionJobStats::Reset() { file_prepare_write_nanos = 0; } +void CompactionJobStats::Add(const CompactionJobStats& stats) { + elapsed_micros += stats.elapsed_micros; + + num_input_records += stats.num_input_records; + num_input_files += stats.num_input_files; + num_input_files_at_output_level += stats.num_input_files_at_output_level; + + num_output_records += stats.num_output_records; + num_output_files += stats.num_output_files; + + total_input_bytes += stats.total_input_bytes; + total_output_bytes += stats.total_output_bytes; + + num_records_replaced += stats.num_records_replaced; + + total_input_raw_key_bytes += stats.total_input_raw_key_bytes; + total_input_raw_value_bytes += stats.total_input_raw_value_bytes; + + num_input_deletion_records += stats.num_input_deletion_records; + num_expired_deletion_records += stats.num_expired_deletion_records; + + num_corrupt_keys += stats.num_corrupt_keys; + + file_write_nanos += stats.file_write_nanos; + file_range_sync_nanos += stats.file_range_sync_nanos; + file_fsync_nanos += stats.file_fsync_nanos; + file_prepare_write_nanos += stats.file_prepare_write_nanos; +} + #else void CompactionJobStats::Reset() {}