Fixed a bug of CompactionStats in multi-level universal compaction case

Summary:
Universal compaction can involves in multiple levels.  However,
the current implementation of bytes_readn and bytes_readnp1
(and some other stats with postfix `n` and `np1`) assumes compaction
can only have two levels.

This patch fixes this bug and redefines bytes_readn and bytes_readnp1:
* bytes_readnp1: the number of bytes read in the compaction output level.
* bytes_readn: the total number of bytes read minus bytes_readnp1

Test Plan: Add a test in compaction_job_stats_test

Reviewers: igor, sdong, rven, anthony, kradhakrishnan, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D40239
main
Yueh-Hsuan Chiang 10 years ago
parent 2dc3910b5e
commit bb1c74ce18
  1. 115
      db/compaction_job.cc
  2. 4
      db/compaction_job.h
  3. 91
      db/compaction_job_stats_test.cc
  4. 2
      db/db_impl.cc
  5. 21
      db/internal_stats.cc
  6. 80
      db/internal_stats.h

@ -491,10 +491,6 @@ Status CompactionJob::Run() {
}
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
compaction_stats_.files_in_leveln =
static_cast<int>(compact_->compaction->num_input_files(0));
compaction_stats_.files_in_levelnp1 =
static_cast<int>(compact_->compaction->num_input_files(1));
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
size_t num_output_files = compact_->outputs.size();
@ -503,19 +499,9 @@ Status CompactionJob::Run() {
assert(num_output_files > 0);
--num_output_files;
}
compaction_stats_.files_out_levelnp1 = static_cast<int>(num_output_files);
compaction_stats_.num_output_files = static_cast<int>(num_output_files);
for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) {
compaction_stats_.bytes_readn +=
compact_->compaction->input(0, i)->fd.GetFileSize();
compaction_stats_.num_input_records +=
static_cast<uint64_t>(compact_->compaction->input(0, i)->num_entries);
}
for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) {
compaction_stats_.bytes_readnp1 +=
compact_->compaction->input(1, i)->fd.GetFileSize();
}
UpdateCompactionInputStats();
for (size_t i = 0; i < num_output_files; i++) {
compaction_stats_.bytes_written += compact_->outputs[i].file_size;
@ -548,24 +534,30 @@ void CompactionJob::Install(Status* status,
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
LogToBuffer(log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact_->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
static_cast<double>(stats.bytes_readn),
stats.bytes_written / static_cast<double>(stats.bytes_readn),
status->ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
LogToBuffer(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
(stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level,
stats.num_output_files,
stats.bytes_read_non_output_levels / 1048576.0,
stats.bytes_read_output_level / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_read_output_level +
stats.bytes_read_non_output_levels) /
static_cast<double>(stats.bytes_read_non_output_levels),
stats.bytes_written /
static_cast<double>(stats.bytes_read_non_output_levels),
status->ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
UpdateCompactionJobStats(stats);
@ -574,9 +566,9 @@ void CompactionJob::Install(Status* status,
<< "compaction_finished"
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->outputs.size()
<< "total_output_size" << compact_->total_bytes << "num_input_records"
<< compact_->num_input_records << "num_output_records"
<< compact_->num_output_records;
<< "total_output_size" << compact_->total_bytes
<< "num_input_records" << compact_->num_input_records
<< "num_output_records" << compact_->num_output_records;
stream << "lsm_state";
stream.StartArray();
for (int level = 0; level < vstorage->num_levels(); ++level) {
@ -1245,6 +1237,42 @@ void CopyPrefix(
#endif // !ROCKSDB_LITE
void CompactionJob::UpdateCompactionInputStats() {
Compaction* compaction = compact_->compaction;
compaction_stats_.num_input_files_in_non_output_levels = 0;
compaction_stats_.num_input_files_in_output_level = 0;
for (int input_level = 0;
input_level < static_cast<int>(compaction->num_input_levels());
++input_level) {
if (compaction->start_level() + input_level
!= compaction->output_level()) {
UpdateCompactionInputStatsHelper(
&compaction_stats_.num_input_files_in_non_output_levels,
&compaction_stats_.bytes_read_non_output_levels,
input_level);
} else {
UpdateCompactionInputStatsHelper(
&compaction_stats_.num_input_files_in_output_level,
&compaction_stats_.bytes_read_output_level,
input_level);
}
}
}
void CompactionJob::UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level) {
const Compaction* compaction = compact_->compaction;
auto num_input_files = compaction->num_input_files(input_level);
*num_files += static_cast<int>(num_input_files);
for (size_t i = 0; i < num_input_files; ++i) {
const auto* file_meta = compaction->input(input_level, i);
*bytes_read += file_meta->fd.GetFileSize();
compaction_stats_.num_input_records +=
static_cast<uint64_t>(file_meta->num_entries);
}
}
void CompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
#ifndef ROCKSDB_LITE
@ -1253,18 +1281,21 @@ void CompactionJob::UpdateCompactionJobStats(
// input information
compaction_job_stats_->total_input_bytes =
stats.bytes_readn + stats.bytes_readnp1;
compaction_job_stats_->num_input_records = compact_->num_input_records;
stats.bytes_read_non_output_levels +
stats.bytes_read_output_level;
compaction_job_stats_->num_input_records =
compact_->num_input_records;
compaction_job_stats_->num_input_files =
stats.files_in_leveln + stats.files_in_levelnp1;
stats.num_input_files_in_non_output_levels +
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.files_in_levelnp1;
stats.num_input_files_in_output_level;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->num_output_records =
compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.files_out_levelnp1;
compaction_job_stats_->num_output_files = stats.num_output_files;
if (compact_->outputs.size() > 0U) {
CopyPrefix(

@ -105,6 +105,10 @@ class CompactionJob {
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete);
void UpdateCompactionInputStats();
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
int job_id_;
// CompactionJob state

@ -475,7 +475,8 @@ CompactionJobStats NewManualCompactionJobStats(
size_t num_input_files, size_t num_input_files_at_output_level,
uint64_t num_input_records, size_t key_size, size_t value_size,
size_t num_output_files, uint64_t num_output_records,
double compression_ratio, uint64_t num_records_replaced) {
double compression_ratio, uint64_t num_records_replaced,
bool is_manual = true) {
CompactionJobStats stats;
stats.Reset();
@ -499,7 +500,7 @@ CompactionJobStats NewManualCompactionJobStats(
stats.total_input_raw_value_bytes =
num_input_records * value_size;
stats.is_manual_compaction = true;
stats.is_manual_compaction = is_manual;
stats.num_records_replaced = num_records_replaced;
@ -671,7 +672,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
smallest_key, largest_key,
4, 0, num_keys_per_L0_file * 8,
4, 4, num_keys_per_L0_file * 8,
kKeySize, kValueSize,
1, num_keys_per_L0_file * 8,
compression_ratio, 0));
@ -688,6 +689,90 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
namespace {
int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
uint32_t compaction_input_units;
for (compaction_input_units = 1;
num_flushes >= compaction_input_units;
compaction_input_units *= 2) {
if ((num_flushes & compaction_input_units) != 0) {
return compaction_input_units > 1 ? compaction_input_units : 0;
}
}
return 0;
}
} // namespace
TEST_F(CompactionJobStatsTest, UniversalCompactionTest) {
Random rnd(301);
uint64_t key_base = 100000000l;
// Note: key_base must be multiple of num_keys_per_L0_file
int num_keys_per_table = 100;
const uint32_t kTestScale = 8;
const int kKeySize = 10;
const int kValueSize = 900;
double compression_ratio = 1.0;
uint64_t key_interval = key_base / num_keys_per_table;
auto* stats_checker = new CompactionJobStatsChecker();
Options options;
options.listeners.emplace_back(stats_checker);
options.create_if_missing = true;
options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
options.num_levels = 3;
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 2;
options.target_file_size_base = num_keys_per_table * 1000;
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 1;
options.compaction_options_universal.max_size_amplification_percent = 1000;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Generates the expected CompactionJobStats for each compaction
for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
// Here we treat one newly flushed file as an unit.
//
// For example, if a newly flushed file is 100k, and a compaction has
// 4 input units, then this compaction inputs 400k.
uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
if (num_input_units == 0) {
continue;
}
// The following statement determines the expected smallest key
// based on whether it is a full compaction. A full compaction only
// happens when the number of flushes equals to the number of compaction
// input runs.
uint64_t smallest_key =
(num_flushes == num_input_units) ?
key_base : key_base * (num_flushes - 1);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
Key(smallest_key, 10),
Key(smallest_key + key_base * num_input_units - key_interval, 10),
num_input_units,
num_input_units > 2 ? num_input_units / 2 : 0,
num_keys_per_table * num_input_units,
kKeySize, kValueSize,
num_input_units,
num_keys_per_table * num_input_units,
1.0, 0, false));
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 4U);
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale;
start_key += key_base) {
MakeTableWithKeyValues(
&rnd, start_key, start_key + key_base - 1,
kKeySize, kValueSize, key_interval,
compression_ratio, 1);
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -1215,7 +1215,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
InternalStats::CompactionStats stats(1);
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
stats.files_out_levelnp1 = 1;
stats.num_output_files = 1;
cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());

@ -45,8 +45,10 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
int num_files, int being_compacted, double total_file_size, double score,
double w_amp, uint64_t stalls,
const InternalStats::CompactionStats& stats) {
uint64_t bytes_read = stats.bytes_readn + stats.bytes_readnp1;
int64_t bytes_new = stats.bytes_written - stats.bytes_readnp1;
uint64_t bytes_read =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
int64_t bytes_new =
stats.bytes_written - stats.bytes_read_output_level;
double elapsed = (stats.micros + 1) / 1000000.0;
std::string num_input_records = NumberToHumanString(stats.num_input_records);
std::string num_dropped_records =
@ -71,8 +73,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
"%7s " /* KeyIn */
"%6s\n", /* KeyDrop */
name.c_str(), num_files, being_compacted, total_file_size / kMB,
score, bytes_read / kGB, stats.bytes_readn / kGB,
stats.bytes_readnp1 / kGB, stats.bytes_written / kGB,
score, bytes_read / kGB, stats.bytes_read_non_output_levels / kGB,
stats.bytes_read_output_level / kGB, stats.bytes_written / kGB,
bytes_new / kGB, stats.bytes_moved / kGB,
w_amp, bytes_read / kMB / elapsed,
stats.bytes_written / kMB / elapsed, stats.micros / 1000000.0,
@ -440,8 +442,8 @@ void InternalStats::DumpDBStats(std::string* value) {
value->append(buf);
// Compact
for (int level = 0; level < number_levels_; level++) {
compact_bytes_read += comp_stats_[level].bytes_readnp1 +
comp_stats_[level].bytes_readn;
compact_bytes_read += comp_stats_[level].bytes_read_output_level +
comp_stats_[level].bytes_read_non_output_levels;
compact_bytes_write += comp_stats_[level].bytes_written;
compact_micros += comp_stats_[level].micros;
}
@ -598,9 +600,10 @@ void InternalStats::DumpCFStats(std::string* value) {
total_stall_count += stalls;
total_slowdown_count_soft += stall_leveln_slowdown_count_soft_[level];
total_slowdown_count_hard += stall_leveln_slowdown_count_hard_[level];
double w_amp = (comp_stats_[level].bytes_readn == 0) ? 0.0
: comp_stats_[level].bytes_written /
static_cast<double>(comp_stats_[level].bytes_readn);
double w_amp =
(comp_stats_[level].bytes_read_non_output_levels == 0) ? 0.0
: static_cast<double>(comp_stats_[level].bytes_written) /
comp_stats_[level].bytes_read_non_output_levels;
PrintLevelStats(buf, sizeof(buf), "L" + ToString(level), files,
files_being_compacted[level],
vstorage->NumLevelBytes(level), compaction_score[level],

@ -120,26 +120,26 @@ class InternalStats {
struct CompactionStats {
uint64_t micros;
// Bytes read from level N during compaction between levels N and N+1
uint64_t bytes_readn;
// The number of bytes read from all non-output levels
uint64_t bytes_read_non_output_levels;
// Bytes read from level N+1 during compaction between levels N and N+1
uint64_t bytes_readnp1;
// The number of bytes read from the compaction output level.
uint64_t bytes_read_output_level;
// Total bytes written during compaction between levels N and N+1
// Total number of bytes written during compaction
uint64_t bytes_written;
// Total bytes moved to this level
// Total number of bytes moved to the output level
uint64_t bytes_moved;
// Files read from level N during compaction between levels N and N+1
int files_in_leveln;
// The number of compaction input files in all non-output levels.
int num_input_files_in_non_output_levels;
// Files read from level N+1 during compaction between levels N and N+1
int files_in_levelnp1;
// The number of compaction input files in the output level.
int num_input_files_in_output_level;
// Files written during compaction between levels N and N+1
int files_out_levelnp1;
// The number of compaction output files.
int num_output_files;
// Total incoming entries during compaction between levels N and N+1
uint64_t num_input_records;
@ -153,39 +153,43 @@ class InternalStats {
explicit CompactionStats(int _count = 0)
: micros(0),
bytes_readn(0),
bytes_readnp1(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_written(0),
bytes_moved(0),
files_in_leveln(0),
files_in_levelnp1(0),
files_out_levelnp1(0),
num_input_files_in_non_output_levels(0),
num_input_files_in_output_level(0),
num_output_files(0),
num_input_records(0),
num_dropped_records(0),
count(_count) {}
explicit CompactionStats(const CompactionStats& c)
: micros(c.micros),
bytes_readn(c.bytes_readn),
bytes_readnp1(c.bytes_readnp1),
bytes_read_non_output_levels(c.bytes_read_non_output_levels),
bytes_read_output_level(c.bytes_read_output_level),
bytes_written(c.bytes_written),
bytes_moved(c.bytes_moved),
files_in_leveln(c.files_in_leveln),
files_in_levelnp1(c.files_in_levelnp1),
files_out_levelnp1(c.files_out_levelnp1),
num_input_files_in_non_output_levels(
c.num_input_files_in_non_output_levels),
num_input_files_in_output_level(
c.num_input_files_in_output_level),
num_output_files(c.num_output_files),
num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records),
count(c.count) {}
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_readn += c.bytes_readn;
this->bytes_readnp1 += c.bytes_readnp1;
this->bytes_read_non_output_levels += c.bytes_read_non_output_levels;
this->bytes_read_output_level += c.bytes_read_output_level;
this->bytes_written += c.bytes_written;
this->bytes_moved += c.bytes_moved;
this->files_in_leveln += c.files_in_leveln;
this->files_in_levelnp1 += c.files_in_levelnp1;
this->files_out_levelnp1 += c.files_out_levelnp1;
this->num_input_files_in_non_output_levels +=
c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level +=
c.num_input_files_in_output_level;
this->num_output_files += c.num_output_files;
this->num_input_records += c.num_input_records;
this->num_dropped_records += c.num_dropped_records;
this->count += c.count;
@ -193,13 +197,15 @@ class InternalStats {
void Subtract(const CompactionStats& c) {
this->micros -= c.micros;
this->bytes_readn -= c.bytes_readn;
this->bytes_readnp1 -= c.bytes_readnp1;
this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels;
this->bytes_read_output_level -= c.bytes_read_output_level;
this->bytes_written -= c.bytes_written;
this->bytes_moved -= c.bytes_moved;
this->files_in_leveln -= c.files_in_leveln;
this->files_in_levelnp1 -= c.files_in_levelnp1;
this->files_out_levelnp1 -= c.files_out_levelnp1;
this->num_input_files_in_non_output_levels -=
c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level -=
c.num_input_files_in_output_level;
this->num_output_files -= c.num_output_files;
this->num_input_records -= c.num_input_records;
this->num_dropped_records -= c.num_dropped_records;
this->count -= c.count;
@ -352,13 +358,13 @@ class InternalStats {
struct CompactionStats {
uint64_t micros;
uint64_t bytes_readn;
uint64_t bytes_readnp1;
uint64_t bytes_read_non_output_levels;
uint64_t bytes_read_output_level;
uint64_t bytes_written;
uint64_t bytes_moved;
int files_in_leveln;
int files_in_levelnp1;
int files_out_levelnp1;
int num_input_files_in_non_output_levels;
int num_input_files_in_output_level;
int num_output_files;
uint64_t num_input_records;
uint64_t num_dropped_records;
int count;

Loading…
Cancel
Save