Add InternalStats and logging for AddFile()

Summary:
We dont report the bytes that we ingested from AddFile which make the write amplification numbers incorrect
Update InternalStats and add logging for AddFile()

Test Plan: Make sure the code compile and existing tests pass

Reviewers: lightmark, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59763
main
Islam AbdelRahman 8 years ago
parent d26a848074
commit 30a24f2d3d
  1. 36
      db/db_impl.cc
  2. 20
      db/internal_stats.cc
  3. 8
      db/internal_stats.h

@ -3851,6 +3851,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
const uint64_t start_micros = env_->NowMicros();
if (file_info->num_entries == 0) {
return Status::InvalidArgument("File contain no entries");
@ -3960,6 +3961,15 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
if (status.ok()) {
delete InstallSuperVersionAndScheduleWork(cfd, nullptr,
mutable_cf_options);
// Update internal stats
InternalStats::CompactionStats stats(1);
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1;
cfd->internal_stats()->AddCompactionStats(0 /* L0 */, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
meta.fd.GetFileSize());
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
}
@ -3972,14 +3982,24 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
"AddFile() clean up for file %s failed : %s", db_fname.c_str(),
s.ToString().c_str());
}
} else if (status.ok() && move_file) {
// The file was moved and added successfully, remove original file link
Status s = env_->DeleteFile(file_info->file_path);
if (!s.ok()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"%s was added to DB successfully but failed to remove original file "
"link : %s",
file_info->file_path.c_str(), s.ToString().c_str());
} else {
// File was ingested successfully
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"New file %" PRIu64 " was added to L0 (Size: %.2f MB, "
"entries: %" PRIu64 ")",
meta.fd.GetNumber(),
static_cast<double>(meta.fd.GetFileSize()) / 1048576.0,
file_info->num_entries);
if (move_file) {
// The file was moved and added successfully, remove original file link
Status s = env_->DeleteFile(file_info->file_path);
if (!s.ok()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"%s was added to DB successfully but failed to remove original "
"file link : %s",
file_info->file_path.c_str(), s.ToString().c_str());
}
}
}
return status;

@ -770,7 +770,9 @@ void InternalStats::DumpCFStats(std::string* value) {
}
}
uint64_t curr_ingest = cf_stats_value_[BYTES_FLUSHED];
uint64_t flush_ingest = cf_stats_value_[BYTES_FLUSHED];
uint64_t add_file_ingest = cf_stats_value_[BYTES_INGESTED_ADD_FILE];
uint64_t curr_ingest = flush_ingest + add_file_ingest;
// Cumulative summary
double w_amp = stats_sum.bytes_written / static_cast<double>(curr_ingest + 1);
uint64_t total_stall_count =
@ -785,8 +787,12 @@ void InternalStats::DumpCFStats(std::string* value) {
stats_sum);
value->append(buf);
// Interval summary
uint64_t interval_flush_ingest =
flush_ingest - cf_stats_snapshot_.ingest_bytes_flush;
uint64_t interval_add_file_inget =
add_file_ingest - cf_stats_snapshot_.ingest_bytes_add_file;
uint64_t interval_ingest =
curr_ingest - cf_stats_snapshot_.ingest_bytes + 1;
interval_flush_ingest + interval_add_file_inget + 1;
CompactionStats interval_stats(stats_sum);
interval_stats.Subtract(cf_stats_snapshot_.comp_stats);
w_amp = interval_stats.bytes_written / static_cast<double>(interval_ingest);
@ -799,9 +805,10 @@ void InternalStats::DumpCFStats(std::string* value) {
seconds_up, interval_seconds_up);
value->append(buf);
snprintf(buf, sizeof(buf),
"Flush(GB): cumulative %.3f, interval %.3f\n",
curr_ingest / kGB, interval_ingest / kGB);
snprintf(buf, sizeof(buf), "Flush(GB): cumulative %.3f, interval %.3f\n",
flush_ingest / kGB, interval_flush_ingest / kGB);
snprintf(buf, sizeof(buf), "AddFile(GB): cumulative %.3f, interval %.3f\n",
add_file_ingest / kGB, interval_add_file_inget / kGB);
value->append(buf);
// Compact
@ -873,7 +880,8 @@ void InternalStats::DumpCFStats(std::string* value) {
total_stall_count - cf_stats_snapshot_.stall_count);
value->append(buf);
cf_stats_snapshot_.ingest_bytes = curr_ingest;
cf_stats_snapshot_.ingest_bytes_flush = flush_ingest;
cf_stats_snapshot_.ingest_bytes_add_file = add_file_ingest;
cf_stats_snapshot_.comp_stats = stats_sum;
cf_stats_snapshot_.stall_count = total_stall_count;
}

@ -59,6 +59,7 @@ class InternalStats {
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED,
BYTES_INGESTED_ADD_FILE,
INTERNAL_CF_STATS_ENUM_MAX,
};
@ -244,7 +245,8 @@ class InternalStats {
struct CFStatsSnapshot {
// ColumnFamily-level stats
CompactionStats comp_stats;
uint64_t ingest_bytes; // Bytes written to L0
uint64_t ingest_bytes_flush; // Bytes written to L0 (Flush)
uint64_t ingest_bytes_add_file; // Bytes written to L0 (AddFile)
uint64_t stall_count; // Stall count
// Stats from compaction jobs - bytes written, bytes read, duration.
uint64_t compact_bytes_write;
@ -254,7 +256,8 @@ class InternalStats {
CFStatsSnapshot()
: comp_stats(0),
ingest_bytes(0),
ingest_bytes_flush(0),
ingest_bytes_add_file(0),
stall_count(0),
compact_bytes_write(0),
compact_bytes_read(0),
@ -372,6 +375,7 @@ class InternalStats {
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED,
BYTES_INGESTED_ADD_FILE,
INTERNAL_CF_STATS_ENUM_MAX,
};

Loading…
Cancel
Save