From 1ec72b37b1e8b001c61e71f532ee6a10213c5e0c Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Tue, 18 Mar 2014 12:37:42 -0700 Subject: [PATCH] Several easy-to-add properties related to compaction and flushes Summary: To partly address the request @nkg- raised, add three easy-to-add properties to compactions and flushes. Test Plan: run unit tests and add a new unit test to cover new properties. Reviewers: haobo, dhruba Reviewed By: dhruba CC: nkg-, leveldb Differential Revision: https://reviews.facebook.net/D13677 --- db/db_impl.cc | 5 +- db/db_test.cc | 88 +++++++ db/internal_stats.cc | 583 +++++++++++++++++++++++-------------------- db/internal_stats.h | 22 +- db/memtable_list.cc | 2 +- db/memtable_list.h | 2 +- 6 files changed, 429 insertions(+), 273 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 4c161be1e..7b77cbaaa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3696,9 +3696,10 @@ const Options& DBImpl::GetOptions() const { bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); + DBPropertyType property_type = GetPropertyType(property); MutexLock l(&mutex_); - return internal_stats_.GetProperty(property, value, versions_.get(), - imm_.size()); + return internal_stats_.GetProperty(property_type, property, value, + versions_.get(), imm_); } void DBImpl::GetApproximateSizes( diff --git a/db/db_test.cc b/db/db_test.cc index 19b95540b..8e99bcec9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2055,6 +2055,94 @@ TEST(DBTest, NumImmutableMemTable) { } while (ChangeCompactOptions()); } +class SleepingBackgroundTask { + public: + explicit SleepingBackgroundTask(Env* env) + : env_(env), bg_cv_(&mutex_), should_sleep_(true) {} + void DoSleep() { + MutexLock l(&mutex_); + while (should_sleep_) { + bg_cv_.Wait(); + } + } + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + const Env* env_; + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; +}; + +TEST(DBTest, GetProperty) { + // Set sizes to both background thread pool to be 1 and block them. + env_->SetBackgroundThreads(1, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low(env_); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + SleepingBackgroundTask sleeping_task_high(env_); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, + Env::Priority::HIGH); + + Options options = CurrentOptions(); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + options.compaction_style = kCompactionStyleUniversal; + options.level0_file_num_compaction_trigger = 1; + options.compaction_options_universal.size_ratio = 50; + options.max_background_compactions = 1; + options.max_background_flushes = 1; + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 1; + options.write_buffer_size = 1000000; + Reopen(&options); + + std::string big_value(1000000 * 2, 'x'); + std::string num; + SetPerfLevel(kEnableTime); + + ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "0"); + perf_context.Reset(); + + ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "1"); + ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); + ASSERT_EQ(num, "2"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "1"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "0"); + + sleeping_task_high.WakeUp(); + dbfull()->TEST_WaitForFlushMemTable(); + + ASSERT_OK(dbfull()->Put(writeOpt, "k4", big_value)); + ASSERT_OK(dbfull()->Put(writeOpt, "k5", big_value)); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.mem-table-flush-pending", &num)); + ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.compaction-pending", &num)); + ASSERT_EQ(num, "1"); + sleeping_task_low.WakeUp(); +} + TEST(DBTest, FLUSH) { do { Options options = CurrentOptions(); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 7dcfd86c3..1f81023f6 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -1,4 +1,3 @@ - // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. @@ -8,291 +7,341 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/internal_stats.h" +#include "db/memtable_list.h" #include namespace rocksdb { -bool InternalStats::GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, int immsize) { - Version* current = version_set->current(); +DBPropertyType GetPropertyType(const Slice& property) { Slice in = property; Slice prefix("rocksdb."); - if (!in.starts_with(prefix)) return false; + if (!in.starts_with(prefix)) return kUnknown; in.remove_prefix(prefix.size()); if (in.starts_with("num-files-at-level")) { - in.remove_prefix(strlen("num-files-at-level")); - uint64_t level; - bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); - if (!ok || (int)level >= number_levels_) { - return false; - } else { - char buf[100]; - snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); - *value = buf; - return true; - } + return kNumFilesAtLevel; } else if (in == "levelstats") { - char buf[1000]; - snprintf(buf, sizeof(buf), - "Level Files Size(MB)\n" - "--------------------\n"); - value->append(buf); - - for (int level = 0; level < number_levels_; level++) { - snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / 1048576.0); - value->append(buf); - } - return true; - + return kLevelStats; } else if (in == "stats") { - char buf[1000]; - - uint64_t wal_bytes = 0; - uint64_t wal_synced = 0; - uint64_t user_bytes_written = 0; - uint64_t write_other = 0; - uint64_t write_self = 0; - uint64_t write_with_wal = 0; - uint64_t total_bytes_written = 0; - uint64_t total_bytes_read = 0; - uint64_t micros_up = env_->NowMicros() - started_at_; - // Add "+1" to make sure seconds_up is > 0 and avoid NaN later - double seconds_up = (micros_up + 1) / 1000000.0; - uint64_t total_slowdown = 0; - uint64_t total_slowdown_count = 0; - uint64_t interval_bytes_written = 0; - uint64_t interval_bytes_read = 0; - uint64_t interval_bytes_new = 0; - double interval_seconds_up = 0; - - if (statistics_) { - wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); - wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); - user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); - write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); - write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); - write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + return kStats; + } else if (in == "sstables") { + return kSsTables; + } else if (in == "num-immutable-mem-table") { + return kNumImmutableMemTable; + } else if (in == "mem-table-flush-pending") { + return MemtableFlushPending; + } else if (in == "compaction-pending") { + return CompactionPending; + } + return kUnknown; +} + +bool InternalStats::GetProperty(DBPropertyType property_type, + const Slice& property, std::string* value, + VersionSet* version_set, + const MemTableList& imm) { + Version* current = version_set->current(); + Slice in = property; + + switch (property_type) { + case kNumFilesAtLevel: { + in.remove_prefix(strlen("rocksdb.num-files-at-level")); + uint64_t level; + bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); + if (!ok || (int)level >= number_levels_) { + return false; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "%d", + current->NumLevelFiles(static_cast(level))); + *value = buf; + return true; + } } + case kLevelStats: { + char buf[1000]; + snprintf(buf, sizeof(buf), + "Level Files Size(MB)\n" + "--------------------\n"); + value->append(buf); - // Pardon the long line but I think it is easier to read this way. - snprintf(buf, sizeof(buf), - " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" - "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" - ); - value->append(buf); - for (int level = 0; level < number_levels_; level++) { - int files = current->NumLevelFiles(level); - if (compaction_stats_[level].micros > 0 || files > 0) { - int64_t bytes_read = compaction_stats_[level].bytes_readn + - compaction_stats_[level].bytes_readnp1; - int64_t bytes_new = compaction_stats_[level].bytes_written - - compaction_stats_[level].bytes_readnp1; - double amplify = (compaction_stats_[level].bytes_readn == 0) - ? 0.0 - : (compaction_stats_[level].bytes_written + - compaction_stats_[level].bytes_readnp1 + - compaction_stats_[level].bytes_readn) / - (double)compaction_stats_[level].bytes_readn; - - total_bytes_read += bytes_read; - total_bytes_written += compaction_stats_[level].bytes_written; - - uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + - stall_counts_[LEVEL0_NUM_FILES] + - stall_counts_[MEMTABLE_COMPACTION]) - : stall_leveln_slowdown_count_[level]; - - double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + - stall_micros_[LEVEL0_NUM_FILES] + - stall_micros_[MEMTABLE_COMPACTION]) - : stall_leveln_slowdown_[level]; - - snprintf(buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " - "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " - "%9lu\n", - level, files, current->NumLevelBytes(level) / 1048576.0, - current->NumLevelBytes(level) / - version_set->MaxBytesForLevel(level), - compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0, - compaction_stats_[level].bytes_written / 1048576.0, - compaction_stats_[level].bytes_readn / 1048576.0, - compaction_stats_[level].bytes_readnp1 / 1048576.0, - bytes_new / 1048576.0, amplify, - // +1 to avoid division by 0 - (bytes_read / 1048576.0) / - ((compaction_stats_[level].micros + 1) / 1000000.0), - (compaction_stats_[level].bytes_written / 1048576.0) / - ((compaction_stats_[level].micros + 1) / 1000000.0), - compaction_stats_[level].files_in_leveln, - compaction_stats_[level].files_in_levelnp1, - compaction_stats_[level].files_out_levelnp1, - compaction_stats_[level].files_out_levelnp1 - - compaction_stats_[level].files_in_levelnp1, - compaction_stats_[level].count, - (int)((double)compaction_stats_[level].micros / 1000.0 / - (compaction_stats_[level].count + 1)), - (double)stall_us / 1000.0 / (stalls + 1), stall_us / 1000000.0, - (unsigned long)stalls); - total_slowdown += stall_leveln_slowdown_[level]; - total_slowdown_count += stall_leveln_slowdown_count_[level]; + for (int level = 0; level < number_levels_; level++) { + snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, + current->NumLevelFiles(level), + current->NumLevelBytes(level) / 1048576.0); value->append(buf); } + return true; } + case kStats: { + char buf[1000]; + + uint64_t wal_bytes = 0; + uint64_t wal_synced = 0; + uint64_t user_bytes_written = 0; + uint64_t write_other = 0; + uint64_t write_self = 0; + uint64_t write_with_wal = 0; + uint64_t total_bytes_written = 0; + uint64_t total_bytes_read = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + // Add "+1" to make sure seconds_up is > 0 and avoid NaN later + double seconds_up = (micros_up + 1) / 1000000.0; + uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; + uint64_t interval_bytes_written = 0; + uint64_t interval_bytes_read = 0; + uint64_t interval_bytes_new = 0; + double interval_seconds_up = 0; + + if (statistics_) { + wal_bytes = statistics_->getTickerCount(WAL_FILE_BYTES); + wal_synced = statistics_->getTickerCount(WAL_FILE_SYNCED); + user_bytes_written = statistics_->getTickerCount(BYTES_WRITTEN); + write_other = statistics_->getTickerCount(WRITE_DONE_BY_OTHER); + write_self = statistics_->getTickerCount(WRITE_DONE_BY_SELF); + write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL); + } - interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; - interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; - interval_bytes_written = - total_bytes_written - last_stats_.compaction_bytes_written_; - interval_seconds_up = seconds_up - last_stats_.seconds_up_; - - snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", - seconds_up, interval_seconds_up); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Writes cumulative: %llu total, %llu batches, " - "%.1f per batch, %.2f ingest GB\n", - (unsigned long long)(write_other + write_self), - (unsigned long long)write_self, - (write_other + write_self) / (double)(write_self + 1), - user_bytes_written / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "WAL cumulative: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f GB written\n", - (unsigned long long)write_with_wal, (unsigned long long)wal_synced, - write_with_wal / (double)(wal_synced + 1), - wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (GB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - user_bytes_written / (1048576.0 * 1024), - total_bytes_read / (1048576.0 * 1024), - total_bytes_written / (1048576.0 * 1024), - (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO cumulative (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - user_bytes_written / 1048576.0 / seconds_up, - total_bytes_read / 1048576.0 / seconds_up, - total_bytes_written / 1048576.0 / seconds_up, - (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf( - buf, sizeof(buf), - "Amplification cumulative: %.1f write, %.1f compaction\n", - (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), - (double)(total_bytes_written + total_bytes_read + wal_bytes) / - (user_bytes_written + 1)); - value->append(buf); - - uint64_t interval_write_other = write_other - last_stats_.write_other_; - uint64_t interval_write_self = write_self - last_stats_.write_self_; - - snprintf(buf, sizeof(buf), - "Writes interval: %llu total, %llu batches, " - "%.1f per batch, %.1f ingest MB\n", - (unsigned long long)(interval_write_other + interval_write_self), - (unsigned long long)interval_write_self, - (double)(interval_write_other + interval_write_self) / - (interval_write_self + 1), - (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); - value->append(buf); - - uint64_t interval_write_with_wal = - write_with_wal - last_stats_.write_with_wal_; - - uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; - uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; - - snprintf(buf, sizeof(buf), - "WAL interval: %llu WAL writes, %llu WAL syncs, " - "%.2f writes per sync, %.2f MB written\n", - (unsigned long long)interval_write_with_wal, - (unsigned long long)interval_wal_synced, - interval_write_with_wal / (double)(interval_wal_synced + 1), - interval_wal_bytes / (1048576.0 * 1024)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB): " - "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, - interval_bytes_written / 1048576.0, - (interval_bytes_read + interval_bytes_written) / 1048576.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Compaction IO interval (MB/sec): " - "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - interval_bytes_new / 1048576.0 / interval_seconds_up, - interval_bytes_read / 1048576.0 / interval_seconds_up, - interval_bytes_written / 1048576.0 / interval_seconds_up, - (interval_bytes_read + interval_bytes_written) / 1048576.0 / - interval_seconds_up); - value->append(buf); - - // +1 to avoid divide by 0 and NaN - snprintf( - buf, sizeof(buf), - "Amplification interval: %.1f write, %.1f compaction\n", - (double)(interval_bytes_written + wal_bytes) / (interval_bytes_new + 1), - (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / - (interval_bytes_new + 1)); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction, %.3f leveln_slowdown\n", - stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, - stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, - stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, - total_slowdown / 1000000.0); - value->append(buf); - - snprintf(buf, sizeof(buf), - "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " - "%lu memtable_compaction, %lu leveln_slowdown\n", - (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], - (unsigned long)stall_counts_[LEVEL0_NUM_FILES], - (unsigned long)stall_counts_[MEMTABLE_COMPACTION], - (unsigned long)total_slowdown_count); - value->append(buf); - - last_stats_.compaction_bytes_read_ = total_bytes_read; - last_stats_.compaction_bytes_written_ = total_bytes_written; - last_stats_.ingest_bytes_ = user_bytes_written; - last_stats_.seconds_up_ = seconds_up; - last_stats_.wal_bytes_ = wal_bytes; - last_stats_.wal_synced_ = wal_synced; - last_stats_.write_with_wal_ = write_with_wal; - last_stats_.write_other_ = write_other; - last_stats_.write_self_ = write_self; - - return true; - } else if (in == "sstables") { - *value = current->DebugString(); - return true; - } else if (in == "num-immutable-mem-table") { - *value = std::to_string(immsize); - return true; - } + // Pardon the long line but I think it is easier to read this way. + snprintf( + buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) " + " " + "Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn " + "Rnp1 " + " Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "--------------------------------------------------------------------" + "--" + "--------------------------------------------------------------------" + "--" + "----------------------------------------------------------------\n"); + value->append(buf); + for (int level = 0; level < number_levels_; level++) { + int files = current->NumLevelFiles(level); + if (compaction_stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = compaction_stats_[level].bytes_readn + + compaction_stats_[level].bytes_readnp1; + int64_t bytes_new = compaction_stats_[level].bytes_written - + compaction_stats_[level].bytes_readnp1; + double amplify = + (compaction_stats_[level].bytes_readn == 0) + ? 0.0 + : (compaction_stats_[level].bytes_written + + compaction_stats_[level].bytes_readnp1 + + compaction_stats_[level].bytes_readn) / + (double)compaction_stats_[level].bytes_readn; + + total_bytes_read += bytes_read; + total_bytes_written += compaction_stats_[level].bytes_written; + + uint64_t stalls = level == 0 ? (stall_counts_[LEVEL0_SLOWDOWN] + + stall_counts_[LEVEL0_NUM_FILES] + + stall_counts_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? (stall_micros_[LEVEL0_SLOWDOWN] + + stall_micros_[LEVEL0_NUM_FILES] + + stall_micros_[MEMTABLE_COMPACTION]) + : stall_leveln_slowdown_[level]; + + snprintf(buf, sizeof(buf), + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " + "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " + "%9lu\n", + level, files, current->NumLevelBytes(level) / 1048576.0, + current->NumLevelBytes(level) / + version_set->MaxBytesForLevel(level), + compaction_stats_[level].micros / 1e6, + bytes_read / 1048576.0, + compaction_stats_[level].bytes_written / 1048576.0, + compaction_stats_[level].bytes_readn / 1048576.0, + compaction_stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, amplify, + // +1 to avoid division by 0 + (bytes_read / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + (compaction_stats_[level].bytes_written / 1048576.0) / + ((compaction_stats_[level].micros + 1) / 1000000.0), + compaction_stats_[level].files_in_leveln, + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].files_out_levelnp1, + compaction_stats_[level].files_out_levelnp1 - + compaction_stats_[level].files_in_levelnp1, + compaction_stats_[level].count, + (int)((double)compaction_stats_[level].micros / 1000.0 / + (compaction_stats_[level].count + 1)), + (double)stall_us / 1000.0 / (stalls + 1), + stall_us / 1000000.0, (unsigned long)stalls); + total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; + value->append(buf); + } + } + + interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; + interval_bytes_read = + total_bytes_read - last_stats_.compaction_bytes_read_; + interval_bytes_written = + total_bytes_written - last_stats_.compaction_bytes_written_; + interval_seconds_up = seconds_up - last_stats_.seconds_up_; + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", + seconds_up, interval_seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Writes cumulative: %llu total, %llu batches, " + "%.1f per batch, %.2f ingest GB\n", + (unsigned long long)(write_other + write_self), + (unsigned long long)write_self, + (write_other + write_self) / (double)(write_self + 1), + user_bytes_written / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "WAL cumulative: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f GB written\n", + (unsigned long long)write_with_wal, + (unsigned long long)wal_synced, + write_with_wal / (double)(wal_synced + 1), + wal_bytes / (1048576.0 * 1024)); + value->append(buf); - return false; + snprintf(buf, sizeof(buf), + "Compaction IO cumulative (GB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + user_bytes_written / (1048576.0 * 1024), + total_bytes_read / (1048576.0 * 1024), + total_bytes_written / (1048576.0 * 1024), + (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); + value->append(buf); + + snprintf( + buf, sizeof(buf), + "Compaction IO cumulative (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + user_bytes_written / 1048576.0 / seconds_up, + total_bytes_read / 1048576.0 / seconds_up, + total_bytes_written / 1048576.0 / seconds_up, + (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification cumulative: %.1f write, %.1f compaction\n", + (double)(total_bytes_written + wal_bytes) / (user_bytes_written + 1), + (double)(total_bytes_written + total_bytes_read + wal_bytes) / + (user_bytes_written + 1)); + value->append(buf); + + uint64_t interval_write_other = write_other - last_stats_.write_other_; + uint64_t interval_write_self = write_self - last_stats_.write_self_; + + snprintf(buf, sizeof(buf), + "Writes interval: %llu total, %llu batches, " + "%.1f per batch, %.1f ingest MB\n", + (unsigned long long)(interval_write_other + interval_write_self), + (unsigned long long)interval_write_self, + (double)(interval_write_other + interval_write_self) / + (interval_write_self + 1), + (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); + value->append(buf); + + uint64_t interval_write_with_wal = + write_with_wal - last_stats_.write_with_wal_; + + uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; + uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; + + snprintf(buf, sizeof(buf), + "WAL interval: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f MB written\n", + (unsigned long long)interval_write_with_wal, + (unsigned long long)interval_wal_synced, + interval_write_with_wal / (double)(interval_wal_synced + 1), + interval_wal_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB): " + "%.2f new, %.2f read, %.2f write, %.2f read+write\n", + interval_bytes_new / 1048576.0, interval_bytes_read / 1048576.0, + interval_bytes_written / 1048576.0, + (interval_bytes_read + interval_bytes_written) / 1048576.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Compaction IO interval (MB/sec): " + "%.1f new, %.1f read, %.1f write, %.1f read+write\n", + interval_bytes_new / 1048576.0 / interval_seconds_up, + interval_bytes_read / 1048576.0 / interval_seconds_up, + interval_bytes_written / 1048576.0 / interval_seconds_up, + (interval_bytes_read + interval_bytes_written) / 1048576.0 / + interval_seconds_up); + value->append(buf); + + // +1 to avoid divide by 0 and NaN + snprintf( + buf, sizeof(buf), + "Amplification interval: %.1f write, %.1f compaction\n", + (double)(interval_bytes_written + wal_bytes) / + (interval_bytes_new + 1), + (double)(interval_bytes_written + interval_bytes_read + wal_bytes) / + (interval_bytes_new + 1)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction, %.3f leveln_slowdown\n", + stall_micros_[LEVEL0_SLOWDOWN] / 1000000.0, + stall_micros_[LEVEL0_NUM_FILES] / 1000000.0, + stall_micros_[MEMTABLE_COMPACTION] / 1000000.0, + total_slowdown / 1000000.0); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long)stall_counts_[LEVEL0_SLOWDOWN], + (unsigned long)stall_counts_[LEVEL0_NUM_FILES], + (unsigned long)stall_counts_[MEMTABLE_COMPACTION], + (unsigned long)total_slowdown_count); + value->append(buf); + + last_stats_.compaction_bytes_read_ = total_bytes_read; + last_stats_.compaction_bytes_written_ = total_bytes_written; + last_stats_.ingest_bytes_ = user_bytes_written; + last_stats_.seconds_up_ = seconds_up; + last_stats_.wal_bytes_ = wal_bytes; + last_stats_.wal_synced_ = wal_synced; + last_stats_.write_with_wal_ = write_with_wal; + last_stats_.write_other_ = write_other; + last_stats_.write_self_ = write_self; + + return true; + } + case kSsTables: + *value = current->DebugString(); + return true; + case kNumImmutableMemTable: + *value = std::to_string(imm.size()); + return true; + case MemtableFlushPending: + // Return number of mem tables that are ready to flush (made immutable) + *value = std::to_string(imm.IsFlushPending() ? 1 : 0); + return true; + case CompactionPending: + // 1 if the system already determines at least one compacdtion is needed. + // 0 otherwise, + *value = std::to_string(current->NeedsCompaction() ? 1 : 0); + return true; + default: + return false; + } } } // namespace rocksdb diff --git a/db/internal_stats.h b/db/internal_stats.h index 8e99a9d7e..5f1a6263a 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -17,6 +17,23 @@ #include namespace rocksdb { + +class MemTableList; + +enum DBPropertyType { + kNumFilesAtLevel, // Number of files at a specific level + kLevelStats, // Return number of files and total sizes of each level + kStats, // Return general statitistics of DB + kSsTables, // Return a human readable string of current SST files + kNumImmutableMemTable, // Return number of immutable mem tables + MemtableFlushPending, // Return 1 if mem table flushing is pending, otherwise + // 0. + CompactionPending, // Return 1 if a compaction is pending. Otherwise 0. + kUnknown, +}; + +extern DBPropertyType GetPropertyType(const Slice& property); + class InternalStats { public: enum WriteStallType { @@ -99,8 +116,9 @@ class InternalStats { stall_leveln_slowdown_count_[level] += micros; } - bool GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, int immsize); + bool GetProperty(DBPropertyType property_type, const Slice& property, + std::string* value, VersionSet* version_set, + const MemTableList& imm); private: std::vector compaction_stats_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d58fe5048..ebda34802 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -92,7 +92,7 @@ void MemTableListVersion::Remove(MemTable* m) { // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending() { +bool MemTableList::IsFlushPending() const { if ((flush_requested_ && num_flush_not_started_ >= 1) || (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); diff --git a/db/memtable_list.h b/db/memtable_list.h index 0bd54235a..3c87d4eee 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -86,7 +86,7 @@ class MemTableList { // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(); + bool IsFlushPending() const; // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time.