Add monitoring for universal compaction and add counters for compaction IO

Summary:
Adds these counters
{ WAL_FILE_SYNCED, "rocksdb.wal.synced" }
  number of writes that request a WAL sync
{ WAL_FILE_BYTES, "rocksdb.wal.bytes" },
  number of bytes written to the WAL
{ WRITE_DONE_BY_SELF, "rocksdb.write.self" },
  number of writes processed by the calling thread
{ WRITE_DONE_BY_OTHER, "rocksdb.write.other" },
  number of writes not processed by the calling thread. Instead these were
  processed by the current holder of the write lock
{ WRITE_WITH_WAL, "rocksdb.write.wal" },
  number of writes that request WAL logging
{ COMPACT_READ_BYTES, "rocksdb.compact.read.bytes" },
  number of bytes read during compaction
{ COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes" },
  number of bytes written during compaction

Per-interval stats output was updated with WAL stats and correct stats for universal compaction
including a correct value for write-amplification. It now looks like:
                               Compactions
Level  Files Size(MB) Score Time(sec)  Read(MB) Write(MB)    Rn(MB)  Rnp1(MB)  Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s)      Rn     Rnp1     Wnp1     NewW    Count  Ln-stall Stall-cnt
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  0        7      464  46.4       281      3411      3875      3411         0      3875        2.1      12.1        13.8      621        0      240      240      628       0.0         0
Uptime(secs): 310.8 total, 2.0 interval
Writes cumulative: 9999999 total, 9999999 batches, 1.0 per batch, 1.22 ingest GB
WAL cumulative: 9999999 WAL writes, 9999999 WAL syncs, 1.00 writes per sync, 1.22 GB written
Compaction IO cumulative (GB): 1.22 new, 3.33 read, 3.78 write, 7.12 read+write
Compaction IO cumulative (MB/sec): 4.0 new, 11.0 read, 12.5 write, 23.4 read+write
Amplification cumulative: 4.1 write, 6.8 compaction
Writes interval: 100000 total, 100000 batches, 1.0 per batch, 12.5 ingest MB
WAL interval: 100000 WAL writes, 100000 WAL syncs, 1.00 writes per sync, 0.01 MB written
Compaction IO interval (MB): 12.49 new, 14.98 read, 21.50 write, 36.48 read+write
Compaction IO interval (MB/sec): 6.4 new, 7.6 read, 11.0 write, 18.6 read+write
Amplification interval: 101.7 write, 102.9 compaction
Stalls(secs): 142.924 level0_slowdown, 0.000 level0_numfiles, 0.805 memtable_compaction, 0.000 leveln_slowdown
Stalls(count): 132461 level0_slowdown, 0 level0_numfiles, 3 memtable_compaction, 0 leveln_slowdown

Task ID: #3329644, #3301695

Blame Rev:

Test Plan:
Revert Plan:

Database Impact:

Memcache Impact:

Other Notes:

EImportant:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Reviewers: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14583
main
Mark Callaghan 11 years ago
parent 249e736bc5
commit e9e6b00d29
  1. 2
      db/db_bench.cc
  2. 126
      db/db_impl.cc
  3. 20
      db/db_impl.h
  4. 22
      include/rocksdb/statistics.h

@ -149,7 +149,7 @@ DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
DEFINE_bool(histogram, false, "Print histogram of operation timings"); DEFINE_bool(histogram, false, "Print histogram of operation timings");
DEFINE_int32(write_buffer_size, rocksdb::Options().write_buffer_size, DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
"Number of bytes to buffer in memtable before compacting"); "Number of bytes to buffer in memtable before compacting");
DEFINE_int32(max_write_buffer_number, DEFINE_int32(max_write_buffer_number,

@ -1041,6 +1041,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
stats.bytes_written = meta.file_size; stats.bytes_written = meta.file_size;
stats.files_out_levelnp1 = 1; stats.files_out_levelnp1 = 1;
stats_[level].Add(stats); stats_[level].Add(stats);
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
return s; return s;
} }
@ -1129,6 +1130,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
stats.micros = env_->NowMicros() - start_micros; stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size; stats.bytes_written = meta.file_size;
stats_[level].Add(stats); stats_[level].Add(stats);
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
return s; return s;
} }
@ -2454,14 +2456,22 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
} }
stats.files_out_levelnp1 = num_output_files; stats.files_out_levelnp1 = num_output_files;
for (int i = 0; i < compact->compaction->num_input_files(0); i++) for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
stats.bytes_readn += compact->compaction->input(0, i)->file_size; stats.bytes_readn += compact->compaction->input(0, i)->file_size;
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
compact->compaction->input(0, i)->file_size);
}
for (int i = 0; i < compact->compaction->num_input_files(1); i++) for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size; stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size;
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
compact->compaction->input(1, i)->file_size);
}
for (int i = 0; i < num_output_files; i++) { for (int i = 0; i < num_output_files; i++) {
stats.bytes_written += compact->outputs[i].file_size; stats.bytes_written += compact->outputs[i].file_size;
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
compact->outputs[i].file_size);
} }
LogFlush(options_.info_log); LogFlush(options_.info_log);
@ -2810,8 +2820,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
while (!w.done && &w != writers_.front()) { while (!w.done && &w != writers_.front()) {
w.cv.Wait(); w.cv.Wait();
} }
if (!options.disableWAL) {
RecordTick(options_.statistics.get(), WRITE_WITH_WAL, 1);
}
if (w.done) { if (w.done) {
RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
return w.status; return w.status;
} else {
RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
} }
// May temporarily unlock and wait. // May temporarily unlock and wait.
@ -2849,7 +2867,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (!options.disableWAL) { if (!options.disableWAL) {
StopWatchNano timer(env_); StopWatchNano timer(env_);
StartPerfTimer(&timer); StartPerfTimer(&timer);
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); Slice log_entry = WriteBatchInternal::Contents(updates);
status = log_->AddRecord(log_entry);
RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
BumpPerfTime(&perf_context.wal_write_time, &timer); BumpPerfTime(&perf_context.wal_write_time, &timer);
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
if (options_.use_fsync) { if (options_.use_fsync) {
@ -3225,6 +3246,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
} else if (in == "stats") { } else if (in == "stats") {
char buf[1000]; char buf[1000];
uint64_t wal_bytes = 0;
uint64_t wal_synced = 0;
uint64_t user_bytes_written = 0;
uint64_t write_other = 0;
uint64_t write_self = 0;
uint64_t write_with_wal = 0;
uint64_t total_bytes_written = 0; uint64_t total_bytes_written = 0;
uint64_t total_bytes_read = 0; uint64_t total_bytes_read = 0;
uint64_t micros_up = env_->NowMicros() - started_at_; uint64_t micros_up = env_->NowMicros() - started_at_;
@ -3237,6 +3265,16 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
uint64_t interval_bytes_new = 0; uint64_t interval_bytes_new = 0;
double interval_seconds_up = 0; double interval_seconds_up = 0;
Statistics* s = options_.statistics.get();
if (s) {
wal_bytes = s->getTickerCount(WAL_FILE_BYTES);
wal_synced = s->getTickerCount(WAL_FILE_SYNCED);
user_bytes_written = s->getTickerCount(BYTES_WRITTEN);
write_other = s->getTickerCount(WRITE_DONE_BY_OTHER);
write_self = s->getTickerCount(WRITE_DONE_BY_SELF);
write_with_wal = s->getTickerCount(WRITE_WITH_WAL);
}
// Pardon the long line but I think it is easier to read this way. // Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
" Compactions\n" " Compactions\n"
@ -3293,19 +3331,38 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
} }
} }
interval_bytes_new = stats_[0].bytes_written - last_stats_.bytes_new_; interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_;
interval_bytes_read = total_bytes_read - last_stats_.bytes_read_; interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_;
interval_bytes_written = total_bytes_written - last_stats_.bytes_written_; interval_bytes_written =
total_bytes_written - last_stats_.compaction_bytes_written_;
interval_seconds_up = seconds_up - last_stats_.seconds_up_; interval_seconds_up = seconds_up - last_stats_.seconds_up_;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
seconds_up, interval_seconds_up); seconds_up, interval_seconds_up);
value->append(buf); value->append(buf);
snprintf(buf, sizeof(buf),
"Writes cumulative: %llu total, %llu batches, "
"%.1f per batch, %.2f ingest GB\n",
(unsigned long long) (write_other + write_self),
(unsigned long long) write_self,
(write_other + write_self) / (double) (write_self + 1),
user_bytes_written / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf),
"WAL cumulative: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f GB written\n",
(unsigned long long) write_with_wal,
(unsigned long long ) wal_synced,
write_with_wal / (double) (wal_synced + 1),
wal_bytes / (1048576.0 * 1024));
value->append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Compaction IO cumulative (GB): " "Compaction IO cumulative (GB): "
"%.2f new, %.2f read, %.2f write, %.2f read+write\n", "%.2f new, %.2f read, %.2f write, %.2f read+write\n",
stats_[0].bytes_written / (1048576.0 * 1024), user_bytes_written / (1048576.0 * 1024),
total_bytes_read / (1048576.0 * 1024), total_bytes_read / (1048576.0 * 1024),
total_bytes_written / (1048576.0 * 1024), total_bytes_written / (1048576.0 * 1024),
(total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); (total_bytes_read + total_bytes_written) / (1048576.0 * 1024));
@ -3314,7 +3371,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Compaction IO cumulative (MB/sec): " "Compaction IO cumulative (MB/sec): "
"%.1f new, %.1f read, %.1f write, %.1f read+write\n", "%.1f new, %.1f read, %.1f write, %.1f read+write\n",
stats_[0].bytes_written / 1048576.0 / seconds_up, user_bytes_written / 1048576.0 / seconds_up,
total_bytes_read / 1048576.0 / seconds_up, total_bytes_read / 1048576.0 / seconds_up,
total_bytes_written / 1048576.0 / seconds_up, total_bytes_written / 1048576.0 / seconds_up,
(total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up);
@ -3323,9 +3380,38 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
// +1 to avoid divide by 0 and NaN // +1 to avoid divide by 0 and NaN
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Amplification cumulative: %.1f write, %.1f compaction\n", "Amplification cumulative: %.1f write, %.1f compaction\n",
(double) total_bytes_written / (stats_[0].bytes_written+1), (double) (total_bytes_written + wal_bytes)
(double) (total_bytes_written + total_bytes_read) / (user_bytes_written + 1),
/ (stats_[0].bytes_written+1)); (double) (total_bytes_written + total_bytes_read + wal_bytes)
/ (user_bytes_written + 1));
value->append(buf);
uint64_t interval_write_other = write_other - last_stats_.write_other_;
uint64_t interval_write_self = write_self - last_stats_.write_self_;
snprintf(buf, sizeof(buf),
"Writes interval: %llu total, %llu batches, "
"%.1f per batch, %.1f ingest MB\n",
(unsigned long long) (interval_write_other + interval_write_self),
(unsigned long long) interval_write_self,
(double) (interval_write_other + interval_write_self)
/ (interval_write_self + 1),
(user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0);
value->append(buf);
uint64_t interval_write_with_wal =
write_with_wal - last_stats_.write_with_wal_;
uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_;
uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_;
snprintf(buf, sizeof(buf),
"WAL interval: %llu WAL writes, %llu WAL syncs, "
"%.2f writes per sync, %.2f MB written\n",
(unsigned long long) interval_write_with_wal,
(unsigned long long ) interval_wal_synced,
interval_write_with_wal / (double) (interval_wal_synced + 1),
interval_wal_bytes / (1048576.0 * 1024));
value->append(buf); value->append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
@ -3350,9 +3436,10 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
// +1 to avoid divide by 0 and NaN // +1 to avoid divide by 0 and NaN
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Amplification interval: %.1f write, %.1f compaction\n", "Amplification interval: %.1f write, %.1f compaction\n",
(double) interval_bytes_written / (interval_bytes_new+1), (double) (interval_bytes_written + wal_bytes)
(double) (interval_bytes_written + interval_bytes_read) / / (interval_bytes_new + 1),
(interval_bytes_new+1)); (double) (interval_bytes_written + interval_bytes_read + wal_bytes)
/ (interval_bytes_new + 1));
value->append(buf); value->append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
@ -3373,10 +3460,15 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
(unsigned long) total_slowdown_count); (unsigned long) total_slowdown_count);
value->append(buf); value->append(buf);
last_stats_.bytes_read_ = total_bytes_read; last_stats_.compaction_bytes_read_ = total_bytes_read;
last_stats_.bytes_written_ = total_bytes_written; last_stats_.compaction_bytes_written_ = total_bytes_written;
last_stats_.bytes_new_ = stats_[0].bytes_written; last_stats_.ingest_bytes_ = user_bytes_written;
last_stats_.seconds_up_ = seconds_up; last_stats_.seconds_up_ = seconds_up;
last_stats_.wal_bytes_ = wal_bytes;
last_stats_.wal_synced_ = wal_synced;
last_stats_.write_with_wal_ = write_with_wal;
last_stats_.write_other_ = write_other;
last_stats_.write_self_ = write_self;
return true; return true;
} else if (in == "sstables") { } else if (in == "sstables") {

@ -441,15 +441,25 @@ class DBImpl : public DB {
// Used to compute per-interval statistics // Used to compute per-interval statistics
struct StatsSnapshot { struct StatsSnapshot {
uint64_t bytes_read_; uint64_t compaction_bytes_read_; // Bytes read by compaction
uint64_t bytes_written_; uint64_t compaction_bytes_written_; // Bytes written by compaction
uint64_t bytes_new_; uint64_t ingest_bytes_; // Bytes written by user
uint64_t wal_bytes_; // Bytes written to WAL
uint64_t wal_synced_; // Number of times WAL is synced
uint64_t write_with_wal_; // Number of writes that request WAL
// These count the number of writes processed by the calling thread or
// another thread.
uint64_t write_other_;
uint64_t write_self_;
double seconds_up_; double seconds_up_;
StatsSnapshot() : bytes_read_(0), bytes_written_(0), StatsSnapshot() : compaction_bytes_read_(0), compaction_bytes_written_(0),
bytes_new_(0), seconds_up_(0) {} ingest_bytes_(0), wal_bytes_(0), wal_synced_(0),
write_with_wal_(0), write_other_(0), write_self_(0),
seconds_up_(0) {}
}; };
// Counters from the previous time per-interval stats were computed
StatsSnapshot last_stats_; StatsSnapshot last_stats_;
static const int KEEP_LOG_FILE_NUM = 1000; static const int KEEP_LOG_FILE_NUM = 1000;

@ -114,6 +114,19 @@ enum Tickers {
BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache
BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache
WAL_FILE_SYNCED, // Number of times WAL sync is done
WAL_FILE_BYTES, // Number of bytes written to WAL
// Writes can be processed by requesting thread or by the thread at the
// head of the writers queue.
WRITE_DONE_BY_SELF,
WRITE_DONE_BY_OTHER,
WRITE_WITH_WAL, // Number of Write calls that request WAL
COMPACT_READ_BYTES, // Bytes read during compaction
COMPACT_WRITE_BYTES, // Bytes written during compaction
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };
@ -159,7 +172,14 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" }, { NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" },
{ GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" }, { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" },
{ BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" }, { BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" },
{ BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" } { BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" },
{ WAL_FILE_SYNCED, "rocksdb.wal.synced" },
{ WAL_FILE_BYTES, "rocksdb.wal.bytes" },
{ WRITE_DONE_BY_SELF, "rocksdb.write.self" },
{ WRITE_DONE_BY_OTHER, "rocksdb.write.other" },
{ WRITE_WITH_WAL, "rocksdb.write.wal" },
{ COMPACT_READ_BYTES, "rocksdb.compact.read.bytes" },
{ COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes" },
}; };
/** /**

Loading…
Cancel
Save