From f34782a67d4b372421c1a3d292bb5de86f6488ab Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 23 Jan 2020 15:25:23 -0800 Subject: [PATCH] Fix the "records dropped" statistics (#6325) Summary: The earlier code used two conflicting definitions for the number of input records going into a compaction, one based on the `rocksdb.num.entries` table property and one based on `CompactionIterationStats`. The first one is correct and in line with how output records are counted, while the second one incorrectly ignores input records in various cases when the `CompactionIterator` advances or reseeks the input iterator (this can happen, amongst other cases, when dealing with `SingleDelete`s, regular `Delete`s, `Merge`s, and compaction filters). This can result in the code undercounting the input records and computing an incorrect value for "records dropped" during the compaction. The patch fixes this by switching over to the correct (table property based) input record count for "records dropped". Pull Request resolved: https://github.com/facebook/rocksdb/pull/6325 Test Plan: Tested using `make check` and `db_bench`. Differential Revision: D19525491 Pulled By: ltamasi fbshipit-source-id: 4340b0b2f41546db8e356db70ca02199e48fa636 --- HISTORY.md | 1 + db/compaction/compaction_job.cc | 34 ++++++++++++++++----------------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 020ada336..5000378d5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev(). * Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface. * Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset. +* Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions. ### New Features * It is now possible to enable periodic compactions for the base DB when using BlobDB. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index da29b1c53..8c7680b7e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -144,7 +144,6 @@ struct CompactionJob::SubcompactionState { // State during the subcompaction uint64_t total_bytes; - uint64_t num_input_records; uint64_t num_output_records; CompactionJobStats compaction_job_stats; uint64_t approx_size; @@ -165,7 +164,6 @@ struct CompactionJob::SubcompactionState { builder(nullptr), current_output_file_size(0), total_bytes(0), - num_input_records(0), num_output_records(0), approx_size(size), grandparent_index(0), @@ -186,7 +184,6 @@ struct CompactionJob::SubcompactionState { builder = std::move(o.builder); current_output_file_size = std::move(o.current_output_file_size); total_bytes = std::move(o.total_bytes); - num_input_records = std::move(o.num_input_records); num_output_records = std::move(o.num_output_records); compaction_job_stats = std::move(o.compaction_job_stats); approx_size = std::move(o.approx_size); @@ -245,13 +242,11 @@ struct CompactionJob::CompactionState { Status status; uint64_t total_bytes; - uint64_t num_input_records; uint64_t num_output_records; explicit CompactionState(Compaction* c) : compaction(c), total_bytes(0), - num_input_records(0), num_output_records(0) {} size_t NumOutputFiles() { @@ -289,7 +284,6 @@ struct CompactionJob::CompactionState { void CompactionJob::AggregateStatistics() { for (SubcompactionState& sc : compact_->sub_compact_states) { compact_->total_bytes += sc.total_bytes; - compact_->num_input_records += sc.num_input_records; compact_->num_output_records += sc.num_output_records; } if (compaction_job_stats_) { @@ -770,12 +764,12 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { auto stream = event_logger_->LogToBuffer(log_buffer_); stream << "job" << job_id_ << "event" << "compaction_finished" - << "compaction_time_micros" << compaction_stats_.micros - << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros - << "output_level" << compact_->compaction->output_level() - << "num_output_files" << compact_->NumOutputFiles() - << "total_output_size" << compact_->total_bytes << "num_input_records" - << compact_->num_input_records << "num_output_records" + << "compaction_time_micros" << stats.micros + << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level" + << compact_->compaction->output_level() << "num_output_files" + << compact_->NumOutputFiles() << "total_output_size" + << compact_->total_bytes << "num_input_records" + << stats.num_input_records << "num_output_records" << compact_->num_output_records << "num_subcompactions" << compact_->sub_compact_states.size() << "output_compression" << CompressionTypeToString(compact_->compaction->output_compression()); @@ -991,7 +985,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } - sub_compact->num_input_records = c_iter_stats.num_input_records; sub_compact->compaction_job_stats.num_input_deletion_records = c_iter_stats.num_input_deletion_records; sub_compact->compaction_job_stats.num_corrupt_keys = @@ -1589,6 +1582,8 @@ void CompactionJob::UpdateCompactionStats() { } } + uint64_t num_output_records = 0; + for (const auto& sub_compact : compact_->sub_compact_states) { size_t num_output_files = sub_compact.outputs.size(); if (sub_compact.builder != nullptr) { @@ -1598,13 +1593,16 @@ void CompactionJob::UpdateCompactionStats() { } compaction_stats_.num_output_files += static_cast(num_output_files); + num_output_records += sub_compact.num_output_records; + for (const auto& out : sub_compact.outputs) { compaction_stats_.bytes_written += out.meta.fd.file_size; } - if (sub_compact.num_input_records > sub_compact.num_output_records) { - compaction_stats_.num_dropped_records += - sub_compact.num_input_records - sub_compact.num_output_records; - } + } + + if (compaction_stats_.num_input_records > num_output_records) { + compaction_stats_.num_dropped_records = + compaction_stats_.num_input_records - num_output_records; } } @@ -1632,7 +1630,7 @@ void CompactionJob::UpdateCompactionJobStats( // input information compaction_job_stats_->total_input_bytes = stats.bytes_read_non_output_levels + stats.bytes_read_output_level; - compaction_job_stats_->num_input_records = compact_->num_input_records; + compaction_job_stats_->num_input_records = stats.num_input_records; compaction_job_stats_->num_input_files = stats.num_input_files_in_non_output_levels + stats.num_input_files_in_output_level;