diff --git a/db/db_impl.cc b/db/db_impl.cc index 85dab9f9e..5d6eaf197 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2694,7 +2694,10 @@ Status DBImpl::ProcessKeyValueCompaction( Iterator* input, CompactionState* compact, bool is_compaction_v2, + int* num_output_records, LogBuffer* log_buffer) { + assert(num_output_records != nullptr); + size_t combined_idx = 0; Status status; std::string compaction_filter_value; @@ -2965,6 +2968,7 @@ Status DBImpl::ProcessKeyValueCompaction( } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + (*num_output_records)++, compact->current_output()->largest_seqno = std::max(compact->current_output()->largest_seqno, seqno); @@ -3140,6 +3144,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, mutex_.Unlock(); log_buffer->FlushBufferToLog(); + int num_output_records = 0; const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); input->SeekToFirst(); @@ -3168,6 +3173,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, input.get(), compact, false, + &num_output_records, log_buffer); } else { // temp_backup_input always point to the start of the current buffer @@ -3249,6 +3255,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, input.get(), compact, true, + &num_output_records, log_buffer); if (!status.ok()) { @@ -3286,6 +3293,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, input.get(), compact, true, + &num_output_records, log_buffer); compact->CleanupBatchBuffer(); @@ -3309,6 +3317,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, input.get(), compact, true, + &num_output_records, log_buffer); } // checking for compaction filter v2 @@ -3342,17 +3351,24 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } stats.files_out_levelnp1 = num_output_files; + uint64_t num_input_records = 0; + for (int i = 0; i < compact->compaction->num_input_files(0); i++) { stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize(); + stats.num_input_records += compact->compaction->input(0, i)->num_entries; + num_input_records += compact->compaction->input(0, i)->num_entries; } for (int i = 0; i < compact->compaction->num_input_files(1); i++) { stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize(); + num_input_records += compact->compaction->input(1, i)->num_entries; } for (int i = 0; i < num_output_files; i++) { stats.bytes_written += compact->outputs[i].file_size; } + stats.num_dropped_records = + static_cast(num_input_records) - num_output_records; RecordCompactionIOStats(); @@ -3375,7 +3391,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " - "write-amplify(%.1f) %s\n", + "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", cfd->GetName().c_str(), cfd->current()->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1) / static_cast(stats.micros), @@ -3387,7 +3403,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / (double)stats.bytes_readn, stats.bytes_written / (double)stats.bytes_readn, - status.ToString().c_str()); + status.ToString().c_str(), stats.num_input_records, + stats.num_dropped_records); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index f1a81e00c..622df4293 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -419,6 +419,7 @@ class DBImpl : public DB { Iterator* input, CompactionState* compact, bool is_compaction_v2, + int* num_output_records, LogBuffer* log_buffer); // Call compaction_filter_v2->Filter() on kv-pairs in compact diff --git a/db/internal_stats.cc b/db/internal_stats.cc index c9f9306e2..3f60d72ce 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -30,7 +30,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) { "Level Files Size(MB) Score Read(GB) Rn(GB) Rnp1(GB) " "Write(GB) Wnew(GB) RW-Amp W-Amp Rd(MB/s) Wr(MB/s) Rn(cnt) " "Rnp1(cnt) Wnp1(cnt) Wnew(cnt) Comp(sec) Comp(cnt) Avg(sec) " - "Stall(sec) Stall(cnt) Avg(ms)\n" + "Stall(sec) Stall(cnt) Avg(ms) RecordIn RecordDrop\n" "--------------------------------------------------------------------" "--------------------------------------------------------------------" "--------------------------------------------------------------------\n", @@ -65,7 +65,9 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, "%8.3f " /* Avg(sec) */ "%10.2f " /* Stall(sec) */ "%10" PRIu64 " " /* Stall(cnt) */ - "%7.2f\n" /* Avg(ms) */, + "%7.2f" /* Avg(ms) */ + "%8d " /* input entries */ + "%10d\n" /* number of records reduced */, name.c_str(), num_files, being_compacted, total_file_size / kMB, score, bytes_read / kGB, stats.bytes_readn / kGB, @@ -85,7 +87,9 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, stats.count == 0 ? 0 : stats.micros / 1000000.0 / stats.count, stall_us / 1000000.0, stalls, - stalls == 0 ? 0 : stall_us / 1000.0 / stalls); + stalls == 0 ? 0 : stall_us / 1000.0 / stalls, + stats.num_input_records, + stats.num_dropped_records); } diff --git a/db/internal_stats.h b/db/internal_stats.h index 2e04f24e7..18d67de5c 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -123,6 +123,13 @@ class InternalStats { // Files written during compaction between levels N and N+1 int files_out_levelnp1; + // Total incoming entries during compaction between levels N and N+1 + int num_input_records; + + // Accumulated diff number of entries + // (num input entries - num output entires) for compaction levels N and N+1 + int num_dropped_records; + // Number of compactions done int count; @@ -134,6 +141,8 @@ class InternalStats { files_in_leveln(0), files_in_levelnp1(0), files_out_levelnp1(0), + num_input_records(0), + num_dropped_records(0), count(count) {} explicit CompactionStats(const CompactionStats& c) @@ -144,6 +153,8 @@ class InternalStats { files_in_leveln(c.files_in_leveln), files_in_levelnp1(c.files_in_levelnp1), files_out_levelnp1(c.files_out_levelnp1), + num_input_records(c.num_input_records), + num_dropped_records(c.num_dropped_records), count(c.count) {} void Add(const CompactionStats& c) { @@ -154,6 +165,8 @@ class InternalStats { this->files_in_leveln += c.files_in_leveln; this->files_in_levelnp1 += c.files_in_levelnp1; this->files_out_levelnp1 += c.files_out_levelnp1; + this->num_input_records += c.num_input_records; + this->num_dropped_records += c.num_dropped_records; this->count += c.count; } @@ -165,6 +178,8 @@ class InternalStats { this->files_in_leveln -= c.files_in_leveln; this->files_in_levelnp1 -= c.files_in_levelnp1; this->files_out_levelnp1 -= c.files_out_levelnp1; + this->num_input_records -= c.num_input_records; + this->num_dropped_records -= c.num_dropped_records; this->count -= c.count; } };