diff --git a/db/db_impl.cc b/db/db_impl.cc index b7c01b0f3..e3b95ad5f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -262,7 +262,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, options_)), imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), @@ -280,15 +279,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) purge_wal_files_last_run_(0), last_stats_dump_time_microsec_(0), default_interval_to_delete_obsolete_WAL_(600), - stall_level0_slowdown_(0), - stall_memtable_compaction_(0), - stall_level0_num_files_(0), - stall_level0_slowdown_count_(0), - stall_memtable_compaction_count_(0), - stall_level0_num_files_count_(0), - started_at_(options.env->NowMicros()), flush_on_destroy_(false), - stats_(options.num_levels), + internal_stats_(options.num_levels, options.env, + options.statistics.get()), delayed_writes_(0), storage_options_(options), bg_work_gate_closed_(false), @@ -298,13 +291,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) env_->GetAbsolutePath(dbname, &db_absolute_path_); - stall_leveln_slowdown_.resize(options.num_levels); - stall_leveln_slowdown_count_.resize(options.num_levels); - for (int i = 0; i < options.num_levels; ++i) { - stall_leveln_slowdown_[i] = 0; - stall_leveln_slowdown_count_[i] = 0; - } - // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; table_cache_.reset(new TableCache(dbname_, &options_, @@ -1128,11 +1114,11 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { meta.smallest_seqno, meta.largest_seqno); } - CompactionStats stats; + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; stats.files_out_levelnp1 = 1; - stats_[level].Add(stats); + internal_stats_.AddCompactionStats(level, stats); RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -1217,10 +1203,10 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, meta.smallest_seqno, meta.largest_seqno); } - CompactionStats stats; + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; - stats_[level].Add(stats); + internal_stats_.AddCompactionStats(level, stats); RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -2610,7 +2596,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (!options_.disableDataSync) { db_directory_->Fsync(); } - CompactionStats stats; + + InternalStats::CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros); stats.files_in_leveln = compact->compaction->num_input_files(0); @@ -2644,7 +2631,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogFlush(options_.info_log); mutex_.Lock(); - stats_[compact->compaction->output_level()].Add(stats); + internal_stats_.AddCompactionStats(compact->compaction->output_level(), + stats); // if there were any unused file number (mostly in case of // compaction error), free up the entry from pending_putputs @@ -3327,8 +3315,7 @@ Status DBImpl::MakeRoomForWrite(bool force, delayed = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed); - stall_level0_slowdown_ += delayed; - stall_level0_slowdown_count_++; + internal_stats_.RecordWriteStall(InternalStats::LEVEL0_SLOWDOWN, delayed); allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; @@ -3353,8 +3340,8 @@ Status DBImpl::MakeRoomForWrite(bool force, } RecordTick(options_.statistics.get(), STALL_MEMTABLE_COMPACTION_MICROS, stall); - stall_memtable_compaction_ += stall; - stall_memtable_compaction_count_++; + internal_stats_.RecordWriteStall(InternalStats::MEMTABLE_COMPACTION, + stall); } else if (versions_->current()->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. @@ -3368,8 +3355,7 @@ Status DBImpl::MakeRoomForWrite(bool force, stall = sw.ElapsedMicros(); } RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); - stall_level0_num_files_ += stall; - stall_level0_num_files_count_++; + internal_stats_.RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && (score = versions_->current()->MaxCompactionScore()) > options_.hard_rate_limit) { @@ -3383,8 +3369,7 @@ Status DBImpl::MakeRoomForWrite(bool force, env_->SleepForMicroseconds(1000); delayed = sw.ElapsedMicros(); } - stall_leveln_slowdown_[max_level] += delayed; - stall_leveln_slowdown_count_[max_level]++; + internal_stats_.RecordLevelNSlowdown(max_level, delayed); // Make sure the following value doesn't round to zero. uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); rate_limit_delay_millis += rate_limit; @@ -3481,297 +3466,9 @@ const Options& DBImpl::GetOptions() const { bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); - MutexLock l(&mutex_); - Version* current = versions_->current(); - Slice in = property; - Slice prefix("rocksdb."); - if (!in.starts_with(prefix)) return false; - in.remove_prefix(prefix.size()); - - if (in.starts_with("num-files-at-level")) { - in.remove_prefix(strlen("num-files-at-level")); - uint64_t level; - bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); - if (!ok || (int)level >= NumberLevels()) { - return false; - } else { - char buf[100]; - snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); - *value = buf; - return true; - } - } else if (in == "levelstats") { - char buf[1000]; - snprintf(buf, sizeof(buf), - "Level Files Size(MB)\n" - "--------------------\n"); - value->append(buf); - - for (int level = 0; level < NumberLevels(); level++) { - snprintf(buf, sizeof(buf), - "%3d %8d %8.0f\n", - level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / 1048576.0); - value->append(buf); - } - return true; - - } else if (in == "stats") { - char buf[1000]; - - uint64_t wal_bytes = 0; - uint64_t wal_synced = 0; - uint64_t user_bytes_written = 0; - uint64_t write_other = 0; - uint64_t write_self = 0; - uint64_t write_with_wal = 0; - uint64_t total_bytes_written = 0; - uint64_t total_bytes_read = 0; - uint64_t micros_up = env_->NowMicros() - started_at_; - // Add "+1" to make sure seconds_up is > 0 and avoid NaN later - double seconds_up = (micros_up + 1) / 1000000.0; - uint64_t total_slowdown = 0; - uint64_t total_slowdown_count = 0; - uint64_t interval_bytes_written = 0; - uint64_t interval_bytes_read = 0; - uint64_t interval_bytes_new = 0; - double interval_seconds_up = 0; - - Statistics* s = options_.statistics.get(); - if (s) { - wal_bytes = s->getTickerCount(WAL_FILE_BYTES); - wal_synced = s->getTickerCount(WAL_FILE_SYNCED); - user_bytes_written = s->getTickerCount(BYTES_WRITTEN); - write_other = s->getTickerCount(WRITE_DONE_BY_OTHER); - write_self = s->getTickerCount(WRITE_DONE_BY_SELF); - write_with_wal = s->getTickerCount(WRITE_WITH_WAL); - } - - // Pardon the long line but I think it is easier to read this way. - snprintf(buf, sizeof(buf), - " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" - "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" - ); - value->append(buf); - for (int level = 0; level < current->NumberLevels(); level++) { - int files = current->NumLevelFiles(level); - if (stats_[level].micros > 0 || files > 0) { - int64_t bytes_read = stats_[level].bytes_readn + - stats_[level].bytes_readnp1; - int64_t bytes_new = stats_[level].bytes_written - - stats_[level].bytes_readnp1; - double amplify = (stats_[level].bytes_readn == 0) - ? 0.0 - : (stats_[level].bytes_written + - stats_[level].bytes_readnp1 + - stats_[level].bytes_readn) / - (double) stats_[level].bytes_readn; - - total_bytes_read += bytes_read; - total_bytes_written += stats_[level].bytes_written; - - uint64_t stalls = level == 0 ? - (stall_level0_slowdown_count_ + - stall_level0_num_files_count_ + - stall_memtable_compaction_count_) : - stall_leveln_slowdown_count_[level]; - - double stall_us = level == 0 ? - (stall_level0_slowdown_ + - stall_level0_num_files_ + - stall_memtable_compaction_) : - stall_leveln_slowdown_[level]; - - snprintf( - buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n", - level, - files, - current->NumLevelBytes(level) / 1048576.0, - current->NumLevelBytes(level) / - versions_->MaxBytesForLevel(level), - stats_[level].micros / 1e6, - bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0, - stats_[level].bytes_readn / 1048576.0, - stats_[level].bytes_readnp1 / 1048576.0, - bytes_new / 1048576.0, - amplify, - // +1 to avoid division by 0 - (bytes_read / 1048576.0) / ((stats_[level].micros+1) / 1000000.0), - (stats_[level].bytes_written / 1048576.0) / - ((stats_[level].micros+1) / 1000000.0), - stats_[level].files_in_leveln, - stats_[level].files_in_levelnp1, - stats_[level].files_out_levelnp1, - stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, - stats_[level].count, - (int) ((double) stats_[level].micros / - 1000.0 / - (stats_[level].count + 1)), - (double) stall_us / 1000.0 / (stalls + 1), - stall_us / 1000000.0, - (unsigned long) stalls); - - total_slowdown += stall_leveln_slowdown_[level]; - total_slowdown_count += stall_leveln_slowdown_count_[level]; - value->append(buf); - } - } - - interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; - interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; - interval_bytes_written = - total_bytes_written - last_stats_.compaction_bytes_written_; - interval_seconds_up = seconds_up - last_stats_.seconds_up_; - - snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", - seconds_up, interval_seconds_up); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Writes cumulative: %llu total, %llu batches, " - "%.1f per batch, %.2f ingest GB\n", - (unsigned long long) (write_other + write_self), - (unsigned long long) write_self, - (write_other + write_self) / (double) (write_self + 1), - user_bytes_written / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "WAL cumulative: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f GB written\n", - (unsigned long long) write_with_wal, - (unsigned long long ) wal_synced, - write_with_wal / (double) (wal_synced + 1), - wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (GB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - user_bytes_written / (1048576.0 * 1024), - total_bytes_read / (1048576.0 * 1024), - total_bytes_written / (1048576.0 * 1024), - (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - user_bytes_written / 1048576.0 / seconds_up, - total_bytes_read / 1048576.0 / seconds_up, - total_bytes_written / 1048576.0 / seconds_up, - (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf(buf, sizeof(buf), - "Amplification cumulative: %.1f write, %.1f compaction\n", - (double) (total_bytes_written + wal_bytes) - / (user_bytes_written + 1), - (double) (total_bytes_written + total_bytes_read + wal_bytes) - / (user_bytes_written + 1)); - value->append(buf); - - uint64_t interval_write_other = write_other - last_stats_.write_other_; - uint64_t interval_write_self = write_self - last_stats_.write_self_; - - snprintf(buf, sizeof(buf), - "Writes interval: %llu total, %llu batches, " - "%.1f per batch, %.1f ingest MB\n", - (unsigned long long) (interval_write_other + interval_write_self), - (unsigned long long) interval_write_self, - (double) (interval_write_other + interval_write_self) - / (interval_write_self + 1), - (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); - value->append(buf); - - uint64_t interval_write_with_wal = - write_with_wal - last_stats_.write_with_wal_; - - uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; - uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; - - snprintf(buf, sizeof(buf), - "WAL interval: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f MB written\n", - (unsigned long long) interval_write_with_wal, - (unsigned long long ) interval_wal_synced, - interval_write_with_wal / (double) (interval_wal_synced + 1), - interval_wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - interval_bytes_new / 1048576.0, - interval_bytes_read/ 1048576.0, - interval_bytes_written / 1048576.0, - (interval_bytes_read + interval_bytes_written) / 1048576.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - interval_bytes_new / 1048576.0 / interval_seconds_up, - interval_bytes_read / 1048576.0 / interval_seconds_up, - interval_bytes_written / 1048576.0 / interval_seconds_up, - (interval_bytes_read + interval_bytes_written) - / 1048576.0 / interval_seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf(buf, sizeof(buf), - "Amplification interval: %.1f write, %.1f compaction\n", - (double) (interval_bytes_written + wal_bytes) - / (interval_bytes_new + 1), - (double) (interval_bytes_written + interval_bytes_read + wal_bytes) - / (interval_bytes_new + 1)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction, %.3f leveln_slowdown\n", - stall_level0_slowdown_ / 1000000.0, - stall_level0_num_files_ / 1000000.0, - stall_memtable_compaction_ / 1000000.0, - total_slowdown / 1000000.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " - "%lu memtable_compaction, %lu leveln_slowdown\n", - (unsigned long) stall_level0_slowdown_count_, - (unsigned long) stall_level0_num_files_count_, - (unsigned long) stall_memtable_compaction_count_, - (unsigned long) total_slowdown_count); - value->append(buf); - - last_stats_.compaction_bytes_read_ = total_bytes_read; - last_stats_.compaction_bytes_written_ = total_bytes_written; - last_stats_.ingest_bytes_ = user_bytes_written; - last_stats_.seconds_up_ = seconds_up; - last_stats_.wal_bytes_ = wal_bytes; - last_stats_.wal_synced_ = wal_synced; - last_stats_.write_with_wal_ = write_with_wal; - last_stats_.write_other_ = write_other; - last_stats_.write_self_ = write_self; - - return true; - } else if (in == "sstables") { - *value = versions_->current()->DebugString(); - return true; - } else if (in == "num-immutable-mem-table") { - *value = std::to_string(imm_.size()); - return true; - } - - return false; + return internal_stats_.GetProperty(property, value, versions_.get(), + imm_.size()); } void DBImpl::GetApproximateSizes( diff --git a/db/db_impl.h b/db/db_impl.h index fb17a41b7..545b367aa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -24,6 +24,7 @@ #include "util/stats_logger.h" #include "memtablelist.h" #include "util/autovector.h" +#include "db/internal_stats.h" namespace rocksdb { @@ -386,7 +387,6 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes - MemTableRepFactory* mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; @@ -468,88 +468,9 @@ class DBImpl : public DB { // enabled and archive size_limit is disabled. uint64_t default_interval_to_delete_obsolete_WAL_; - // These count the number of microseconds for which MakeRoomForWrite stalls. - uint64_t stall_level0_slowdown_; - uint64_t stall_memtable_compaction_; - uint64_t stall_level0_num_files_; - std::vector stall_leveln_slowdown_; - uint64_t stall_level0_slowdown_count_; - uint64_t stall_memtable_compaction_count_; - uint64_t stall_level0_num_files_count_; - std::vector stall_leveln_slowdown_count_; - - // Time at which this instance was started. - const uint64_t started_at_; - bool flush_on_destroy_; // Used when disableWAL is true. - // Per level compaction stats. stats_[level] stores the stats for - // compactions that produced data for the specified "level". - struct CompactionStats { - uint64_t micros; - - // Bytes read from level N during compaction between levels N and N+1 - int64_t bytes_readn; - - // Bytes read from level N+1 during compaction between levels N and N+1 - int64_t bytes_readnp1; - - // Total bytes written during compaction between levels N and N+1 - int64_t bytes_written; - - // Files read from level N during compaction between levels N and N+1 - int files_in_leveln; - - // Files read from level N+1 during compaction between levels N and N+1 - int files_in_levelnp1; - - // Files written during compaction between levels N and N+1 - int files_out_levelnp1; - - // Number of compactions done - int count; - - CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0), - bytes_written(0), files_in_leveln(0), - files_in_levelnp1(0), files_out_levelnp1(0), - count(0) { } - - void Add(const CompactionStats& c) { - this->micros += c.micros; - this->bytes_readn += c.bytes_readn; - this->bytes_readnp1 += c.bytes_readnp1; - this->bytes_written += c.bytes_written; - this->files_in_leveln += c.files_in_leveln; - this->files_in_levelnp1 += c.files_in_levelnp1; - this->files_out_levelnp1 += c.files_out_levelnp1; - this->count += 1; - } - }; - - std::vector stats_; - - // Used to compute per-interval statistics - struct StatsSnapshot { - uint64_t compaction_bytes_read_; // Bytes read by compaction - uint64_t compaction_bytes_written_; // Bytes written by compaction - uint64_t ingest_bytes_; // Bytes written by user - uint64_t wal_bytes_; // Bytes written to WAL - uint64_t wal_synced_; // Number of times WAL is synced - uint64_t write_with_wal_; // Number of writes that request WAL - // These count the number of writes processed by the calling thread or - // another thread. - uint64_t write_other_; - uint64_t write_self_; - double seconds_up_; - - StatsSnapshot() : compaction_bytes_read_(0), compaction_bytes_written_(0), - ingest_bytes_(0), wal_bytes_(0), wal_synced_(0), - write_with_wal_(0), write_other_(0), write_self_(0), - seconds_up_(0) {} - }; - - // Counters from the previous time per-interval stats were computed - StatsSnapshot last_stats_; + InternalStats internal_stats_; static const int KEEP_LOG_FILE_NUM = 1000; std::string db_absolute_path_; diff --git a/db/internal_stats.cc b/db/internal_stats.cc new file mode 100644 index 000000000..7dcfd86c3 --- /dev/null +++ b/db/internal_stats.cc @@ -0,0 +1,298 @@ + +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/internal_stats.h" + +#include + +namespace rocksdb { + +bool InternalStats::GetProperty(const Slice& property, std::string* value, + VersionSet* version_set, int immsize) { + Version* current = version_set->current(); + Slice in = property; + Slice prefix("rocksdb."); + if (!in.starts_with(prefix)) return false; + in.remove_prefix(prefix.size()); + + if (in.starts_with("num-files-at-level")) { + in.remove_prefix(strlen("num-files-at-level")); + uint64_t level; + bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); + if (!ok || (int)level >= number_levels_) { + return false; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "%d", + current->NumLevelFiles(static_cast(level))); + *value = buf; + return true; + } + } else if (in == "levelstats") { + char buf[1000]; + snprintf(buf, sizeof(buf), + "Level Files Size(MB)\n" + "--------------------\n"); + value->append(buf); + + for (int level = 0; level < number_levels_; level++) { + snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, + current->NumLevelFiles(level), + current->NumLevelBytes(level) / 1048576.0); + value->append(buf); + } + return true; + + } else if (in == "stats") { + char buf[1000]; + + uint64_t wal_bytes = 0; + uint64_t wal_synced = 0; + uint64_t user_bytes_written = 0; + uint64_t write_other = 0; + uint64_t write_self = 0; + uint64_t write_with_wal = 0; + uint64_t total_bytes_written = 0; + uint64_t total_bytes_read = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + // Add "+1" to make sure seconds_up is > 0 and avoid NaN later + double seconds_up = (micros_up + 1) / 1000000.0; + uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; + uint64_t interval_bytes_written = 0; + uint64_t interval_bytes_read = 0; + uint64_t interval_bytes_new = 0; + double interval_seconds_up = 0; + + if (statistics_) { + wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); + wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); + user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); + write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); + write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); + write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + } + + // Pardon the long line but I think it is easier to read this way. + snprintf(buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + ); + value->append(buf); + for (int level = 0; level < number_levels_; level++) { + int files = current->NumLevelFiles(level); + if (compaction_stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = compaction_stats_[level].bytes_readn + + compaction_stats_[level].bytes_readnp1; + int64_t bytes_new = compaction_stats_[level].bytes_written - + compaction_stats_[level].bytes_readnp1; + double amplify = (compaction_stats_[level].bytes_readn == 0) + ? 0.0 + : (compaction_stats_[level].bytes_written + + compaction_stats_[level].bytes_readnp1 + + compaction_stats_[level].bytes_readn) / + (double)compaction_stats_[level].bytes_readn; + + total_bytes_read += bytes_read; + total_bytes_written += compaction_stats_[level].bytes_written; + + uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + + stall_counts_[LEVEL0_NUM_FILES] + + stall_counts_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + + stall_micros_[LEVEL0_NUM_FILES] + + stall_micros_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_[level]; + + snprintf(buf, sizeof(buf), + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " + "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " + "%9lu\n", + level, files, current->NumLevelBytes(level) / 1048576.0, + current->NumLevelBytes(level) / + version_set->MaxBytesForLevel(level), + compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0, + compaction_stats_[level].bytes_written / 1048576.0, + compaction_stats_[level].bytes_readn / 1048576.0, + compaction_stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, amplify, + // +1 to avoid division by 0 + (bytes_read / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + (compaction_stats_[level].bytes_written / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + compaction_stats_[level].files_in_leveln, + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].files_out_levelnp1, + compaction_stats_[level].files_out_levelnp1 - + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].count, + (int)((double)compaction_stats_[level].micros / 1000.0 / + (compaction_stats_[level].count + 1)), + (double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0, + (unsigned long)stalls); + total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; + value->append(buf); + } + } + + interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; + interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; + interval_bytes_written = + total_bytes_written - last_stats_.compaction_bytes_written_; + interval_seconds_up = seconds_up - last_stats_.seconds_up_; + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", + seconds_up, interval_seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Writes cumulative: %llu total, %llu batches, " + "%.1f per batch, %.2f ingest GB\n", + (unsigned long long)(write_other + write_self), + (unsigned long long)write_self, + (write_other + write_self) / (double)(write_self + 1), + user_bytes_written / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "WAL cumulative: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f GB written\n", + (unsigned long long)write_with_wal, (unsigned long long)wal_synced, + write_with_wal / (double)(wal_synced + 1), + wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (GB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + user_bytes_written / (1048576.0 * 1024), + total_bytes_read / (1048576.0 * 1024), + total_bytes_written / (1048576.0 * 1024), + (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + user_bytes_written / 1048576.0 / seconds_up, + total_bytes_read / 1048576.0 / seconds_up, + total_bytes_written / 1048576.0 / seconds_up, + (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification cumulative: %.1f write, %.1f compaction\n", + (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), + (double)(total_bytes_written + total_bytes_read + wal_bytes) / + (user_bytes_written + 1)); + value->append(buf); + + uint64_t interval_write_other = write_other - last_stats_.write_other_; + uint64_t interval_write_self = write_self - last_stats_.write_self_; + + snprintf(buf, sizeof(buf), + "Writes interval: %llu total, %llu batches, " + "%.1f per batch, %.1f ingest MB\n", + (unsigned long long)(interval_write_other + interval_write_self), + (unsigned long long)interval_write_self, + (double)(interval_write_other + interval_write_self) / + (interval_write_self + 1), + (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); + value->append(buf); + + uint64_t interval_write_with_wal = + write_with_wal - last_stats_.write_with_wal_; + + uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; + uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; + + snprintf(buf, sizeof(buf), + "WAL interval: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f MB written\n", + (unsigned long long)interval_write_with_wal, + (unsigned long long)interval_wal_synced, + interval_write_with_wal / (double)(interval_wal_synced + 1), + interval_wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, + interval_bytes_written / 1048576.0, + (interval_bytes_read + interval_bytes_written) / 1048576.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + interval_bytes_new / 1048576.0 / interval_seconds_up, + interval_bytes_read / 1048576.0 / interval_seconds_up, + interval_bytes_written / 1048576.0 / interval_seconds_up, + (interval_bytes_read + interval_bytes_written) / 1048576.0 / + interval_seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification interval: %.1f write, %.1f compaction\n", + (double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1), + (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / + (interval_bytes_new + 1)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction, %.3f leveln_slowdown\n", + stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, + stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, + stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, + total_slowdown / 1000000.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], + (unsigned long)stall_counts_[LEVEL0_NUM_FILES], + (unsigned long)stall_counts_[MEMTABLE_COMPACTION], + (unsigned long)total_slowdown_count); + value->append(buf); + + last_stats_.compaction_bytes_read_ = total_bytes_read; + last_stats_.compaction_bytes_written_ = total_bytes_written; + last_stats_.ingest_bytes_ = user_bytes_written; + last_stats_.seconds_up_ = seconds_up; + last_stats_.wal_bytes_ = wal_bytes; + last_stats_.wal_synced_ = wal_synced; + last_stats_.write_with_wal_ = write_with_wal; + last_stats_.write_other_ = write_other; + last_stats_.write_self_ = write_self; + + return true; + } else if (in == "sstables") { + *value = current->DebugString(); + return true; + } else if (in == "num-immutable-mem-table") { + *value = std::to_string(immsize); + return true; + } + + return false; +} + +} // namespace rocksdb diff --git a/db/internal_stats.h b/db/internal_stats.h new file mode 100644 index 000000000..8e99a9d7e --- /dev/null +++ b/db/internal_stats.h @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +#pragma once +#include "rocksdb/statistics.h" +#include "util/statistics.h" +#include "db/version_set.h" + +#include +#include + +namespace rocksdb { +class InternalStats { + public: + enum WriteStallType { + LEVEL0_SLOWDOWN, + MEMTABLE_COMPACTION, + LEVEL0_NUM_FILES, + WRITE_STALLS_ENUM_MAX, + }; + + InternalStats(int num_levels, Env* env, Statistics* statistics) + : compaction_stats_(num_levels), + stall_micros_(WRITE_STALLS_ENUM_MAX, 0), + stall_counts_(WRITE_STALLS_ENUM_MAX, 0), + stall_leveln_slowdown_(num_levels, 0), + stall_leveln_slowdown_count_(num_levels, 0), + number_levels_(num_levels), + statistics_(statistics), + env_(env), + started_at_(env->NowMicros()) {} + + // Per level compaction stats. compaction_stats_[level] stores the stats for + // compactions that produced data for the specified "level". + struct CompactionStats { + uint64_t micros; + + // Bytes read from level N during compaction between levels N and N+1 + int64_t bytes_readn; + + // Bytes read from level N+1 during compaction between levels N and N+1 + int64_t bytes_readnp1; + + // Total bytes written during compaction between levels N and N+1 + int64_t bytes_written; + + // Files read from level N during compaction between levels N and N+1 + int files_in_leveln; + + // Files read from level N+1 during compaction between levels N and N+1 + int files_in_levelnp1; + + // Files written during compaction between levels N and N+1 + int files_out_levelnp1; + + // Number of compactions done + int count; + + CompactionStats() + : micros(0), + bytes_readn(0), + bytes_readnp1(0), + bytes_written(0), + files_in_leveln(0), + files_in_levelnp1(0), + files_out_levelnp1(0), + count(0) {} + + void Add(const CompactionStats& c) { + this->micros += c.micros; + this->bytes_readn += c.bytes_readn; + this->bytes_readnp1 += c.bytes_readnp1; + this->bytes_written += c.bytes_written; + this->files_in_leveln += c.files_in_leveln; + this->files_in_levelnp1 += c.files_in_levelnp1; + this->files_out_levelnp1 += c.files_out_levelnp1; + this->count += 1; + } + }; + + void AddCompactionStats(int level, const CompactionStats& stats) { + compaction_stats_[level].Add(stats); + } + + void RecordWriteStall(WriteStallType write_stall_type, uint64_t micros) { + stall_micros_[write_stall_type] += micros; + stall_counts_[write_stall_type]++; + } + + void RecordLevelNSlowdown(int level, uint64_t micros) { + stall_leveln_slowdown_[level] += micros; + stall_leveln_slowdown_count_[level] += micros; + } + + bool GetProperty(const Slice& property, std::string* value, + VersionSet* version_set, int immsize); + + private: + std::vector compaction_stats_; + + // Used to compute per-interval statistics + struct StatsSnapshot { + uint64_t compaction_bytes_read_; // Bytes read by compaction + uint64_t compaction_bytes_written_; // Bytes written by compaction + uint64_t ingest_bytes_; // Bytes written by user + uint64_t wal_bytes_; // Bytes written to WAL + uint64_t wal_synced_; // Number of times WAL is synced + uint64_t write_with_wal_; // Number of writes that request WAL + // These count the number of writes processed by the calling thread or + // another thread. + uint64_t write_other_; + uint64_t write_self_; + double seconds_up_; + + StatsSnapshot() + : compaction_bytes_read_(0), + compaction_bytes_written_(0), + ingest_bytes_(0), + wal_bytes_(0), + wal_synced_(0), + write_with_wal_(0), + write_other_(0), + write_self_(0), + seconds_up_(0) {} + }; + + // Counters from the previous time per-interval stats were computed + StatsSnapshot last_stats_; + + // These count the number of microseconds for which MakeRoomForWrite stalls. + std::vector stall_micros_; + std::vector stall_counts_; + std::vector stall_leveln_slowdown_; + std::vector stall_leveln_slowdown_count_; + + int number_levels_; + Statistics* statistics_; + Env* env_; + uint64_t started_at_; +}; + +} // namespace rocksdb