|
|
@ -491,10 +491,6 @@ Status CompactionJob::Run() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; |
|
|
|
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; |
|
|
|
compaction_stats_.files_in_leveln = |
|
|
|
|
|
|
|
static_cast<int>(compact_->compaction->num_input_files(0)); |
|
|
|
|
|
|
|
compaction_stats_.files_in_levelnp1 = |
|
|
|
|
|
|
|
static_cast<int>(compact_->compaction->num_input_files(1)); |
|
|
|
|
|
|
|
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); |
|
|
|
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); |
|
|
|
|
|
|
|
|
|
|
|
size_t num_output_files = compact_->outputs.size(); |
|
|
|
size_t num_output_files = compact_->outputs.size(); |
|
|
@ -503,19 +499,9 @@ Status CompactionJob::Run() { |
|
|
|
assert(num_output_files > 0); |
|
|
|
assert(num_output_files > 0); |
|
|
|
--num_output_files; |
|
|
|
--num_output_files; |
|
|
|
} |
|
|
|
} |
|
|
|
compaction_stats_.files_out_levelnp1 = static_cast<int>(num_output_files); |
|
|
|
compaction_stats_.num_output_files = static_cast<int>(num_output_files); |
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) { |
|
|
|
UpdateCompactionInputStats(); |
|
|
|
compaction_stats_.bytes_readn += |
|
|
|
|
|
|
|
compact_->compaction->input(0, i)->fd.GetFileSize(); |
|
|
|
|
|
|
|
compaction_stats_.num_input_records += |
|
|
|
|
|
|
|
static_cast<uint64_t>(compact_->compaction->input(0, i)->num_entries); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) { |
|
|
|
|
|
|
|
compaction_stats_.bytes_readnp1 += |
|
|
|
|
|
|
|
compact_->compaction->input(1, i)->fd.GetFileSize(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_output_files; i++) { |
|
|
|
for (size_t i = 0; i < num_output_files; i++) { |
|
|
|
compaction_stats_.bytes_written += compact_->outputs[i].file_size; |
|
|
|
compaction_stats_.bytes_written += compact_->outputs[i].file_size; |
|
|
@ -548,24 +534,30 @@ void CompactionJob::Install(Status* status, |
|
|
|
VersionStorageInfo::LevelSummaryStorage tmp; |
|
|
|
VersionStorageInfo::LevelSummaryStorage tmp; |
|
|
|
auto vstorage = cfd->current()->storage_info(); |
|
|
|
auto vstorage = cfd->current()->storage_info(); |
|
|
|
const auto& stats = compaction_stats_; |
|
|
|
const auto& stats = compaction_stats_; |
|
|
|
LogToBuffer(log_buffer_, |
|
|
|
LogToBuffer( |
|
|
|
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " |
|
|
|
log_buffer_, |
|
|
|
"files in(%d, %d) out(%d) " |
|
|
|
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " |
|
|
|
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " |
|
|
|
"files in(%d, %d) out(%d) " |
|
|
|
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", |
|
|
|
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " |
|
|
|
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), |
|
|
|
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", |
|
|
|
(stats.bytes_readn + stats.bytes_readnp1) / |
|
|
|
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), |
|
|
|
static_cast<double>(stats.micros), |
|
|
|
(stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / |
|
|
|
stats.bytes_written / static_cast<double>(stats.micros), |
|
|
|
static_cast<double>(stats.micros), |
|
|
|
compact_->compaction->output_level(), stats.files_in_leveln, |
|
|
|
stats.bytes_written / static_cast<double>(stats.micros), |
|
|
|
stats.files_in_levelnp1, stats.files_out_levelnp1, |
|
|
|
compact_->compaction->output_level(), |
|
|
|
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0, |
|
|
|
stats.num_input_files_in_non_output_levels, |
|
|
|
stats.bytes_written / 1048576.0, |
|
|
|
stats.num_input_files_in_output_level, |
|
|
|
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / |
|
|
|
stats.num_output_files, |
|
|
|
static_cast<double>(stats.bytes_readn), |
|
|
|
stats.bytes_read_non_output_levels / 1048576.0, |
|
|
|
stats.bytes_written / static_cast<double>(stats.bytes_readn), |
|
|
|
stats.bytes_read_output_level / 1048576.0, |
|
|
|
status->ToString().c_str(), stats.num_input_records, |
|
|
|
stats.bytes_written / 1048576.0, |
|
|
|
stats.num_dropped_records); |
|
|
|
(stats.bytes_written + stats.bytes_read_output_level + |
|
|
|
|
|
|
|
stats.bytes_read_non_output_levels) / |
|
|
|
|
|
|
|
static_cast<double>(stats.bytes_read_non_output_levels), |
|
|
|
|
|
|
|
stats.bytes_written / |
|
|
|
|
|
|
|
static_cast<double>(stats.bytes_read_non_output_levels), |
|
|
|
|
|
|
|
status->ToString().c_str(), stats.num_input_records, |
|
|
|
|
|
|
|
stats.num_dropped_records); |
|
|
|
|
|
|
|
|
|
|
|
UpdateCompactionJobStats(stats); |
|
|
|
UpdateCompactionJobStats(stats); |
|
|
|
|
|
|
|
|
|
|
@ -574,9 +566,9 @@ void CompactionJob::Install(Status* status, |
|
|
|
<< "compaction_finished" |
|
|
|
<< "compaction_finished" |
|
|
|
<< "output_level" << compact_->compaction->output_level() |
|
|
|
<< "output_level" << compact_->compaction->output_level() |
|
|
|
<< "num_output_files" << compact_->outputs.size() |
|
|
|
<< "num_output_files" << compact_->outputs.size() |
|
|
|
<< "total_output_size" << compact_->total_bytes << "num_input_records" |
|
|
|
<< "total_output_size" << compact_->total_bytes |
|
|
|
<< compact_->num_input_records << "num_output_records" |
|
|
|
<< "num_input_records" << compact_->num_input_records |
|
|
|
<< compact_->num_output_records; |
|
|
|
<< "num_output_records" << compact_->num_output_records; |
|
|
|
stream << "lsm_state"; |
|
|
|
stream << "lsm_state"; |
|
|
|
stream.StartArray(); |
|
|
|
stream.StartArray(); |
|
|
|
for (int level = 0; level < vstorage->num_levels(); ++level) { |
|
|
|
for (int level = 0; level < vstorage->num_levels(); ++level) { |
|
|
@ -1245,6 +1237,42 @@ void CopyPrefix( |
|
|
|
|
|
|
|
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void CompactionJob::UpdateCompactionInputStats() { |
|
|
|
|
|
|
|
Compaction* compaction = compact_->compaction; |
|
|
|
|
|
|
|
compaction_stats_.num_input_files_in_non_output_levels = 0; |
|
|
|
|
|
|
|
compaction_stats_.num_input_files_in_output_level = 0; |
|
|
|
|
|
|
|
for (int input_level = 0; |
|
|
|
|
|
|
|
input_level < static_cast<int>(compaction->num_input_levels()); |
|
|
|
|
|
|
|
++input_level) { |
|
|
|
|
|
|
|
if (compaction->start_level() + input_level |
|
|
|
|
|
|
|
!= compaction->output_level()) { |
|
|
|
|
|
|
|
UpdateCompactionInputStatsHelper( |
|
|
|
|
|
|
|
&compaction_stats_.num_input_files_in_non_output_levels, |
|
|
|
|
|
|
|
&compaction_stats_.bytes_read_non_output_levels, |
|
|
|
|
|
|
|
input_level); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
UpdateCompactionInputStatsHelper( |
|
|
|
|
|
|
|
&compaction_stats_.num_input_files_in_output_level, |
|
|
|
|
|
|
|
&compaction_stats_.bytes_read_output_level, |
|
|
|
|
|
|
|
input_level); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void CompactionJob::UpdateCompactionInputStatsHelper( |
|
|
|
|
|
|
|
int* num_files, uint64_t* bytes_read, int input_level) { |
|
|
|
|
|
|
|
const Compaction* compaction = compact_->compaction; |
|
|
|
|
|
|
|
auto num_input_files = compaction->num_input_files(input_level); |
|
|
|
|
|
|
|
*num_files += static_cast<int>(num_input_files); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_input_files; ++i) { |
|
|
|
|
|
|
|
const auto* file_meta = compaction->input(input_level, i); |
|
|
|
|
|
|
|
*bytes_read += file_meta->fd.GetFileSize(); |
|
|
|
|
|
|
|
compaction_stats_.num_input_records += |
|
|
|
|
|
|
|
static_cast<uint64_t>(file_meta->num_entries); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void CompactionJob::UpdateCompactionJobStats( |
|
|
|
void CompactionJob::UpdateCompactionJobStats( |
|
|
|
const InternalStats::CompactionStats& stats) const { |
|
|
|
const InternalStats::CompactionStats& stats) const { |
|
|
|
#ifndef ROCKSDB_LITE |
|
|
|
#ifndef ROCKSDB_LITE |
|
|
@ -1253,18 +1281,21 @@ void CompactionJob::UpdateCompactionJobStats( |
|
|
|
|
|
|
|
|
|
|
|
// input information
|
|
|
|
// input information
|
|
|
|
compaction_job_stats_->total_input_bytes = |
|
|
|
compaction_job_stats_->total_input_bytes = |
|
|
|
stats.bytes_readn + stats.bytes_readnp1; |
|
|
|
stats.bytes_read_non_output_levels + |
|
|
|
compaction_job_stats_->num_input_records = compact_->num_input_records; |
|
|
|
stats.bytes_read_output_level; |
|
|
|
|
|
|
|
compaction_job_stats_->num_input_records = |
|
|
|
|
|
|
|
compact_->num_input_records; |
|
|
|
compaction_job_stats_->num_input_files = |
|
|
|
compaction_job_stats_->num_input_files = |
|
|
|
stats.files_in_leveln + stats.files_in_levelnp1; |
|
|
|
stats.num_input_files_in_non_output_levels + |
|
|
|
|
|
|
|
stats.num_input_files_in_output_level; |
|
|
|
compaction_job_stats_->num_input_files_at_output_level = |
|
|
|
compaction_job_stats_->num_input_files_at_output_level = |
|
|
|
stats.files_in_levelnp1; |
|
|
|
stats.num_input_files_in_output_level; |
|
|
|
|
|
|
|
|
|
|
|
// output information
|
|
|
|
// output information
|
|
|
|
compaction_job_stats_->total_output_bytes = stats.bytes_written; |
|
|
|
compaction_job_stats_->total_output_bytes = stats.bytes_written; |
|
|
|
compaction_job_stats_->num_output_records = |
|
|
|
compaction_job_stats_->num_output_records = |
|
|
|
compact_->num_output_records; |
|
|
|
compact_->num_output_records; |
|
|
|
compaction_job_stats_->num_output_files = stats.files_out_levelnp1; |
|
|
|
compaction_job_stats_->num_output_files = stats.num_output_files; |
|
|
|
|
|
|
|
|
|
|
|
if (compact_->outputs.size() > 0U) { |
|
|
|
if (compact_->outputs.size() > 0U) { |
|
|
|
CopyPrefix( |
|
|
|
CopyPrefix( |
|
|
|