diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 0b04a3d8e..9a4b7bc91 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -491,10 +491,6 @@ Status CompactionJob::Run() { } compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; - compaction_stats_.files_in_leveln = - static_cast(compact_->compaction->num_input_files(0)); - compaction_stats_.files_in_levelnp1 = - static_cast(compact_->compaction->num_input_files(1)); MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); size_t num_output_files = compact_->outputs.size(); @@ -503,19 +499,9 @@ Status CompactionJob::Run() { assert(num_output_files > 0); --num_output_files; } - compaction_stats_.files_out_levelnp1 = static_cast(num_output_files); + compaction_stats_.num_output_files = static_cast(num_output_files); - for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) { - compaction_stats_.bytes_readn += - compact_->compaction->input(0, i)->fd.GetFileSize(); - compaction_stats_.num_input_records += - static_cast(compact_->compaction->input(0, i)->num_entries); - } - - for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) { - compaction_stats_.bytes_readnp1 += - compact_->compaction->input(1, i)->fd.GetFileSize(); - } + UpdateCompactionInputStats(); for (size_t i = 0; i < num_output_files; i++) { compaction_stats_.bytes_written += compact_->outputs[i].file_size; @@ -548,24 +534,30 @@ void CompactionJob::Install(Status* status, VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; - LogToBuffer(log_buffer_, - "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " - "files in(%d, %d) out(%d) " - "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " - "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", - cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), - (stats.bytes_readn + stats.bytes_readnp1) / - static_cast(stats.micros), - stats.bytes_written / static_cast(stats.micros), - compact_->compaction->output_level(), stats.files_in_leveln, - stats.files_in_levelnp1, stats.files_out_levelnp1, - stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0, - stats.bytes_written / 1048576.0, - (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / - static_cast(stats.bytes_readn), - stats.bytes_written / static_cast(stats.bytes_readn), - status->ToString().c_str(), stats.num_input_records, - stats.num_dropped_records); + LogToBuffer( + log_buffer_, + "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " + "files in(%d, %d) out(%d) " + "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " + "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", + cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), + (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / + static_cast(stats.micros), + stats.bytes_written / static_cast(stats.micros), + compact_->compaction->output_level(), + stats.num_input_files_in_non_output_levels, + stats.num_input_files_in_output_level, + stats.num_output_files, + stats.bytes_read_non_output_levels / 1048576.0, + stats.bytes_read_output_level / 1048576.0, + stats.bytes_written / 1048576.0, + (stats.bytes_written + stats.bytes_read_output_level + + stats.bytes_read_non_output_levels) / + static_cast(stats.bytes_read_non_output_levels), + stats.bytes_written / + static_cast(stats.bytes_read_non_output_levels), + status->ToString().c_str(), stats.num_input_records, + stats.num_dropped_records); UpdateCompactionJobStats(stats); @@ -574,9 +566,9 @@ void CompactionJob::Install(Status* status, << "compaction_finished" << "output_level" << compact_->compaction->output_level() << "num_output_files" << compact_->outputs.size() - << "total_output_size" << compact_->total_bytes << "num_input_records" - << compact_->num_input_records << "num_output_records" - << compact_->num_output_records; + << "total_output_size" << compact_->total_bytes + << "num_input_records" << compact_->num_input_records + << "num_output_records" << compact_->num_output_records; stream << "lsm_state"; stream.StartArray(); for (int level = 0; level < vstorage->num_levels(); ++level) { @@ -1245,6 +1237,42 @@ void CopyPrefix( #endif // !ROCKSDB_LITE +void CompactionJob::UpdateCompactionInputStats() { + Compaction* compaction = compact_->compaction; + compaction_stats_.num_input_files_in_non_output_levels = 0; + compaction_stats_.num_input_files_in_output_level = 0; + for (int input_level = 0; + input_level < static_cast(compaction->num_input_levels()); + ++input_level) { + if (compaction->start_level() + input_level + != compaction->output_level()) { + UpdateCompactionInputStatsHelper( + &compaction_stats_.num_input_files_in_non_output_levels, + &compaction_stats_.bytes_read_non_output_levels, + input_level); + } else { + UpdateCompactionInputStatsHelper( + &compaction_stats_.num_input_files_in_output_level, + &compaction_stats_.bytes_read_output_level, + input_level); + } + } +} + +void CompactionJob::UpdateCompactionInputStatsHelper( + int* num_files, uint64_t* bytes_read, int input_level) { + const Compaction* compaction = compact_->compaction; + auto num_input_files = compaction->num_input_files(input_level); + *num_files += static_cast(num_input_files); + + for (size_t i = 0; i < num_input_files; ++i) { + const auto* file_meta = compaction->input(input_level, i); + *bytes_read += file_meta->fd.GetFileSize(); + compaction_stats_.num_input_records += + static_cast(file_meta->num_entries); + } +} + void CompactionJob::UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const { #ifndef ROCKSDB_LITE @@ -1253,18 +1281,21 @@ void CompactionJob::UpdateCompactionJobStats( // input information compaction_job_stats_->total_input_bytes = - stats.bytes_readn + stats.bytes_readnp1; - compaction_job_stats_->num_input_records = compact_->num_input_records; + stats.bytes_read_non_output_levels + + stats.bytes_read_output_level; + compaction_job_stats_->num_input_records = + compact_->num_input_records; compaction_job_stats_->num_input_files = - stats.files_in_leveln + stats.files_in_levelnp1; + stats.num_input_files_in_non_output_levels + + stats.num_input_files_in_output_level; compaction_job_stats_->num_input_files_at_output_level = - stats.files_in_levelnp1; + stats.num_input_files_in_output_level; // output information compaction_job_stats_->total_output_bytes = stats.bytes_written; compaction_job_stats_->num_output_records = compact_->num_output_records; - compaction_job_stats_->num_output_files = stats.files_out_levelnp1; + compaction_job_stats_->num_output_files = stats.num_output_files; if (compact_->outputs.size() > 0U) { CopyPrefix( diff --git a/db/compaction_job.h b/db/compaction_job.h index 5a285d4eb..fdf13e8b2 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -105,6 +105,10 @@ class CompactionJob { int64_t* key_drop_newer_entry, int64_t* key_drop_obsolete); + void UpdateCompactionInputStats(); + void UpdateCompactionInputStatsHelper( + int* num_files, uint64_t* bytes_read, int input_level); + int job_id_; // CompactionJob state diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 3efbbfdeb..3fe4afcd7 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -475,7 +475,8 @@ CompactionJobStats NewManualCompactionJobStats( size_t num_input_files, size_t num_input_files_at_output_level, uint64_t num_input_records, size_t key_size, size_t value_size, size_t num_output_files, uint64_t num_output_records, - double compression_ratio, uint64_t num_records_replaced) { + double compression_ratio, uint64_t num_records_replaced, + bool is_manual = true) { CompactionJobStats stats; stats.Reset(); @@ -499,7 +500,7 @@ CompactionJobStats NewManualCompactionJobStats( stats.total_input_raw_value_bytes = num_input_records * value_size; - stats.is_manual_compaction = true; + stats.is_manual_compaction = is_manual; stats.num_records_replaced = num_records_replaced; @@ -671,7 +672,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { stats_checker->AddExpectedStats( NewManualCompactionJobStats( smallest_key, largest_key, - 4, 0, num_keys_per_L0_file * 8, + 4, 4, num_keys_per_L0_file * 8, kKeySize, kValueSize, 1, num_keys_per_L0_file * 8, compression_ratio, 0)); @@ -688,6 +689,90 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); } +namespace { +int GetUniversalCompactionInputUnits(uint32_t num_flushes) { + uint32_t compaction_input_units; + for (compaction_input_units = 1; + num_flushes >= compaction_input_units; + compaction_input_units *= 2) { + if ((num_flushes & compaction_input_units) != 0) { + return compaction_input_units > 1 ? compaction_input_units : 0; + } + } + return 0; +} +} // namespace + +TEST_F(CompactionJobStatsTest, UniversalCompactionTest) { + Random rnd(301); + uint64_t key_base = 100000000l; + // Note: key_base must be multiple of num_keys_per_L0_file + int num_keys_per_table = 100; + const uint32_t kTestScale = 8; + const int kKeySize = 10; + const int kValueSize = 900; + double compression_ratio = 1.0; + uint64_t key_interval = key_base / num_keys_per_table; + + auto* stats_checker = new CompactionJobStatsChecker(); + Options options; + options.listeners.emplace_back(stats_checker); + options.create_if_missing = true; + options.max_background_flushes = 0; + options.max_mem_compaction_level = 0; + options.num_levels = 3; + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 2; + options.target_file_size_base = num_keys_per_table * 1000; + options.compaction_style = kCompactionStyleUniversal; + options.compaction_options_universal.size_ratio = 1; + options.compaction_options_universal.max_size_amplification_percent = 1000; + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Generates the expected CompactionJobStats for each compaction + for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) { + // Here we treat one newly flushed file as an unit. + // + // For example, if a newly flushed file is 100k, and a compaction has + // 4 input units, then this compaction inputs 400k. + uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes); + if (num_input_units == 0) { + continue; + } + // The following statement determines the expected smallest key + // based on whether it is a full compaction. A full compaction only + // happens when the number of flushes equals to the number of compaction + // input runs. + uint64_t smallest_key = + (num_flushes == num_input_units) ? + key_base : key_base * (num_flushes - 1); + + stats_checker->AddExpectedStats( + NewManualCompactionJobStats( + Key(smallest_key, 10), + Key(smallest_key + key_base * num_input_units - key_interval, 10), + num_input_units, + num_input_units > 2 ? num_input_units / 2 : 0, + num_keys_per_table * num_input_units, + kKeySize, kValueSize, + num_input_units, + num_keys_per_table * num_input_units, + 1.0, 0, false)); + } + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 4U); + + for (uint64_t start_key = key_base; + start_key <= key_base * kTestScale; + start_key += key_base) { + MakeTableWithKeyValues( + &rnd, start_key, start_key + key_base - 1, + kKeySize, kValueSize, key_interval, + compression_ratio, 1); + } + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index a8a1389f9..680f5dd50 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1215,7 +1215,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, InternalStats::CompactionStats stats(1); stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.fd.GetFileSize(); - stats.files_out_levelnp1 = 1; + stats.num_output_files = 1; cfd->internal_stats()->AddCompactionStats(level, stats); cfd->internal_stats()->AddCFStats( InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index bfcd3f718..319232c41 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -45,8 +45,10 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, int num_files, int being_compacted, double total_file_size, double score, double w_amp, uint64_t stalls, const InternalStats::CompactionStats& stats) { - uint64_t bytes_read = stats.bytes_readn + stats.bytes_readnp1; - int64_t bytes_new = stats.bytes_written - stats.bytes_readnp1; + uint64_t bytes_read = + stats.bytes_read_non_output_levels + stats.bytes_read_output_level; + int64_t bytes_new = + stats.bytes_written - stats.bytes_read_output_level; double elapsed = (stats.micros + 1) / 1000000.0; std::string num_input_records = NumberToHumanString(stats.num_input_records); std::string num_dropped_records = @@ -71,8 +73,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, "%7s " /* KeyIn */ "%6s\n", /* KeyDrop */ name.c_str(), num_files, being_compacted, total_file_size / kMB, - score, bytes_read / kGB, stats.bytes_readn / kGB, - stats.bytes_readnp1 / kGB, stats.bytes_written / kGB, + score, bytes_read / kGB, stats.bytes_read_non_output_levels / kGB, + stats.bytes_read_output_level / kGB, stats.bytes_written / kGB, bytes_new / kGB, stats.bytes_moved / kGB, w_amp, bytes_read / kMB / elapsed, stats.bytes_written / kMB / elapsed, stats.micros / 1000000.0, @@ -440,8 +442,8 @@ void InternalStats::DumpDBStats(std::string* value) { value->append(buf); // Compact for (int level = 0; level < number_levels_; level++) { - compact_bytes_read += comp_stats_[level].bytes_readnp1 + - comp_stats_[level].bytes_readn; + compact_bytes_read += comp_stats_[level].bytes_read_output_level + + comp_stats_[level].bytes_read_non_output_levels; compact_bytes_write += comp_stats_[level].bytes_written; compact_micros += comp_stats_[level].micros; } @@ -598,9 +600,10 @@ void InternalStats::DumpCFStats(std::string* value) { total_stall_count += stalls; total_slowdown_count_soft += stall_leveln_slowdown_count_soft_[level]; total_slowdown_count_hard += stall_leveln_slowdown_count_hard_[level]; - double w_amp = (comp_stats_[level].bytes_readn == 0) ? 0.0 - : comp_stats_[level].bytes_written / - static_cast(comp_stats_[level].bytes_readn); + double w_amp = + (comp_stats_[level].bytes_read_non_output_levels == 0) ? 0.0 + : static_cast(comp_stats_[level].bytes_written) / + comp_stats_[level].bytes_read_non_output_levels; PrintLevelStats(buf, sizeof(buf), "L" + ToString(level), files, files_being_compacted[level], vstorage->NumLevelBytes(level), compaction_score[level], diff --git a/db/internal_stats.h b/db/internal_stats.h index 3a6c77362..392b5ddae 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -120,26 +120,26 @@ class InternalStats { struct CompactionStats { uint64_t micros; - // Bytes read from level N during compaction between levels N and N+1 - uint64_t bytes_readn; + // The number of bytes read from all non-output levels + uint64_t bytes_read_non_output_levels; - // Bytes read from level N+1 during compaction between levels N and N+1 - uint64_t bytes_readnp1; + // The number of bytes read from the compaction output level. + uint64_t bytes_read_output_level; - // Total bytes written during compaction between levels N and N+1 + // Total number of bytes written during compaction uint64_t bytes_written; - // Total bytes moved to this level + // Total number of bytes moved to the output level uint64_t bytes_moved; - // Files read from level N during compaction between levels N and N+1 - int files_in_leveln; + // The number of compaction input files in all non-output levels. + int num_input_files_in_non_output_levels; - // Files read from level N+1 during compaction between levels N and N+1 - int files_in_levelnp1; + // The number of compaction input files in the output level. + int num_input_files_in_output_level; - // Files written during compaction between levels N and N+1 - int files_out_levelnp1; + // The number of compaction output files. + int num_output_files; // Total incoming entries during compaction between levels N and N+1 uint64_t num_input_records; @@ -153,39 +153,43 @@ class InternalStats { explicit CompactionStats(int _count = 0) : micros(0), - bytes_readn(0), - bytes_readnp1(0), + bytes_read_non_output_levels(0), + bytes_read_output_level(0), bytes_written(0), bytes_moved(0), - files_in_leveln(0), - files_in_levelnp1(0), - files_out_levelnp1(0), + num_input_files_in_non_output_levels(0), + num_input_files_in_output_level(0), + num_output_files(0), num_input_records(0), num_dropped_records(0), count(_count) {} explicit CompactionStats(const CompactionStats& c) : micros(c.micros), - bytes_readn(c.bytes_readn), - bytes_readnp1(c.bytes_readnp1), + bytes_read_non_output_levels(c.bytes_read_non_output_levels), + bytes_read_output_level(c.bytes_read_output_level), bytes_written(c.bytes_written), bytes_moved(c.bytes_moved), - files_in_leveln(c.files_in_leveln), - files_in_levelnp1(c.files_in_levelnp1), - files_out_levelnp1(c.files_out_levelnp1), + num_input_files_in_non_output_levels( + c.num_input_files_in_non_output_levels), + num_input_files_in_output_level( + c.num_input_files_in_output_level), + num_output_files(c.num_output_files), num_input_records(c.num_input_records), num_dropped_records(c.num_dropped_records), count(c.count) {} void Add(const CompactionStats& c) { this->micros += c.micros; - this->bytes_readn += c.bytes_readn; - this->bytes_readnp1 += c.bytes_readnp1; + this->bytes_read_non_output_levels += c.bytes_read_non_output_levels; + this->bytes_read_output_level += c.bytes_read_output_level; this->bytes_written += c.bytes_written; this->bytes_moved += c.bytes_moved; - this->files_in_leveln += c.files_in_leveln; - this->files_in_levelnp1 += c.files_in_levelnp1; - this->files_out_levelnp1 += c.files_out_levelnp1; + this->num_input_files_in_non_output_levels += + c.num_input_files_in_non_output_levels; + this->num_input_files_in_output_level += + c.num_input_files_in_output_level; + this->num_output_files += c.num_output_files; this->num_input_records += c.num_input_records; this->num_dropped_records += c.num_dropped_records; this->count += c.count; @@ -193,13 +197,15 @@ class InternalStats { void Subtract(const CompactionStats& c) { this->micros -= c.micros; - this->bytes_readn -= c.bytes_readn; - this->bytes_readnp1 -= c.bytes_readnp1; + this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels; + this->bytes_read_output_level -= c.bytes_read_output_level; this->bytes_written -= c.bytes_written; this->bytes_moved -= c.bytes_moved; - this->files_in_leveln -= c.files_in_leveln; - this->files_in_levelnp1 -= c.files_in_levelnp1; - this->files_out_levelnp1 -= c.files_out_levelnp1; + this->num_input_files_in_non_output_levels -= + c.num_input_files_in_non_output_levels; + this->num_input_files_in_output_level -= + c.num_input_files_in_output_level; + this->num_output_files -= c.num_output_files; this->num_input_records -= c.num_input_records; this->num_dropped_records -= c.num_dropped_records; this->count -= c.count; @@ -352,13 +358,13 @@ class InternalStats { struct CompactionStats { uint64_t micros; - uint64_t bytes_readn; - uint64_t bytes_readnp1; + uint64_t bytes_read_non_output_levels; + uint64_t bytes_read_output_level; uint64_t bytes_written; uint64_t bytes_moved; - int files_in_leveln; - int files_in_levelnp1; - int files_out_levelnp1; + int num_input_files_in_non_output_levels; + int num_input_files_in_output_level; + int num_output_files; uint64_t num_input_records; uint64_t num_dropped_records; int count;