From c4f5d0aa152a0f0d8d603c91cbc2999993562c17 Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Wed, 20 Feb 2019 15:46:59 -0800 Subject: [PATCH] add GetStatsHistory to retrieve stats snapshots (#4748) Summary: This PR adds public `GetStatsHistory` API to retrieve stats history in the form of an std map. The key of the map is the timestamp in microseconds when the stats snapshot is taken, the value is another std map from stats name to stats value (stored in std string). Two DBOptions are introduced: `stats_persist_period_sec` (default 10 minutes) controls the intervals between two snapshots are taken; `max_stats_history_count` (default 10) controls the max number of history snapshots to keep in memory. RocksDB will stop collecting stats snapshots if `stats_persist_period_sec` is set to 0. (This PR is the in-memory part of https://github.com/facebook/rocksdb/pull/4535) Pull Request resolved: https://github.com/facebook/rocksdb/pull/4748 Differential Revision: D13961471 Pulled By: miasantreble fbshipit-source-id: ac836d401ecb84ea92216bf9966f969dedf4ad04 --- CMakeLists.txt | 1 + HISTORY.md | 1 + TARGETS | 1 + db/db_impl.cc | 134 +++++++++++++++++++- db/db_impl.h | 36 +++++- db/db_impl_debug.cc | 16 ++- db/db_options_test.cc | 205 ++++++++++++++++++++++++++++++- db/in_memory_stats_history.cc | 45 +++++++ db/in_memory_stats_history.h | 55 +++++++++ include/rocksdb/db.h | 9 ++ include/rocksdb/options.h | 9 ++ include/rocksdb/statistics.h | 8 +- include/rocksdb/stats_history.h | 49 ++++++++ java/rocksjni/portal.h | 28 ++--- monitoring/statistics.cc | 13 ++ monitoring/statistics.h | 4 +- options/db_options.cc | 8 ++ options/db_options.h | 2 + options/options_helper.cc | 12 ++ options/options_settable_test.cc | 2 + options/options_test.cc | 4 + src.mk | 1 + tools/db_bench_tool.cc | 10 ++ util/repeatable_thread.h | 2 + 24 files changed, 626 insertions(+), 29 deletions(-) create mode 100644 db/in_memory_stats_history.cc create mode 100644 db/in_memory_stats_history.h create mode 100644 include/rocksdb/stats_history.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 3737398d7..40cdd26bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -501,6 +501,7 @@ set(SOURCES db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc + db/in_memory_stats_history.cc db/logs_with_prep_tracker.cc db/log_reader.cc db/log_writer.cc diff --git a/HISTORY.md b/HISTORY.md index 7070b819b..14502d37a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds. * For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed. * Add support for block checksums verification for external SST files before ingestion. +* Introduce stats history which periodically saves Statistics snapshots and added `GetStatsHistory` API to retrieve these snapshots. * Add a place holder in manifest which indicate a record from future that can be safely ignored. * Add support for trace sampling. * Enable properties block checksum verification for block-based tables. diff --git a/TARGETS b/TARGETS index f7182e2f2..4590560f1 100644 --- a/TARGETS +++ b/TARGETS @@ -110,6 +110,7 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", + "db/in_memory_stats_history.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index 2497011a4..821abba0e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -37,6 +37,7 @@ #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" +#include "db/in_memory_stats_history.h" #include "db/job_context.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -69,6 +70,7 @@ #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" +#include "rocksdb/stats_history.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "rocksdb/write_buffer_manager.h" @@ -387,16 +389,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown: canceling all background work"); - InstrumentedMutexLock l(&mutex_); - // To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called - // before grabbing db mutex because the actual worker function - // `DBImpl::DumpStats()` also holds db mutex if (thread_dump_stats_ != nullptr) { - mutex_.Unlock(); thread_dump_stats_->cancel(); - mutex_.Lock(); thread_dump_stats_.reset(); } + if (thread_persist_stats_ != nullptr) { + thread_persist_stats_->cancel(); + thread_persist_stats_.reset(); + } + InstrumentedMutexLock l(&mutex_); if (!shutting_down_.load(std::memory_order_acquire) && has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { @@ -620,6 +621,7 @@ void DBImpl::PrintStatistics() { void DBImpl::StartTimedTasks() { unsigned int stats_dump_period_sec = 0; + unsigned int stats_persist_period_sec = 0; { InstrumentedMutexLock l(&mutex_); stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec; @@ -630,7 +632,112 @@ void DBImpl::StartTimedTasks() { stats_dump_period_sec * 1000000)); } } + stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec; + if (stats_persist_period_sec > 0) { + if (!thread_persist_stats_) { + thread_persist_stats_.reset(new rocksdb::RepeatableThread( + [this]() { DBImpl::PersistStats(); }, "pst_st", env_, + stats_persist_period_sec * 1000000)); + } + } + } +} + +// esitmate the total size of stats_history_ +size_t DBImpl::EstiamteStatsHistorySize() const { + size_t size_total = + sizeof(std::map>); + if (stats_history_.size() == 0) return size_total; + size_t size_per_slice = + sizeof(uint64_t) + sizeof(std::map); + // non-empty map, stats_history_.begin() guaranteed to exist + std::map sample_slice(stats_history_.begin()->second); + for (const auto& pairs : sample_slice) { + size_per_slice += + pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second); } + size_total = size_per_slice * stats_history_.size(); + return size_total; +} + +void DBImpl::PersistStats() { + TEST_SYNC_POINT("DBImpl::PersistStats:Entry"); +#ifndef ROCKSDB_LITE + if (shutdown_initiated_) { + return; + } + uint64_t now_micros = env_->NowMicros(); + Statistics* statistics = immutable_db_options_.statistics.get(); + if (!statistics) { + return; + } + size_t stats_history_size_limit = 0; + { + InstrumentedMutexLock l(&mutex_); + stats_history_size_limit = mutable_db_options_.stats_history_buffer_size; + } + + // TODO(Zhongyi): also persist immutable_db_options_.statistics + { + std::map stats_map; + if (!statistics->getTickerMap(&stats_map)) { + return; + } + InstrumentedMutexLock l(&stats_history_mutex_); + // calculate the delta from last time + if (stats_slice_initialized_) { + std::map stats_delta; + for (const auto& stat : stats_map) { + if (stats_slice_.find(stat.first) != stats_slice_.end()) { + stats_delta[stat.first] = stat.second - stats_slice_[stat.first]; + } + } + stats_history_[now_micros] = stats_delta; + } + stats_slice_initialized_ = true; + std::swap(stats_slice_, stats_map); + TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied"); + + // delete older stats snapshots to control memory consumption + bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + while (purge_needed && !stats_history_.empty()) { + stats_history_.erase(stats_history_.begin()); + purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + } + } + // TODO: persist stats to disk +#endif // !ROCKSDB_LITE +} + +bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time, + uint64_t* new_time, + std::map* stats_map) { + assert(new_time); + assert(stats_map); + if (!new_time || !stats_map) return false; + // lock when search for start_time + { + InstrumentedMutexLock l(&stats_history_mutex_); + auto it = stats_history_.lower_bound(start_time); + if (it != stats_history_.end() && it->first < end_time) { + // make a copy for timestamp and stats_map + *new_time = it->first; + *stats_map = it->second; + return true; + } else { + return false; + } + } +} + +Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time, + std::unique_ptr* stats_iterator) { + if (!stats_iterator) { + return Status::InvalidArgument("stats_iterator not preallocated."); + } + stats_iterator->reset( + new InMemoryStatsHistoryIterator(start_time, end_time, this)); + return (*stats_iterator)->status(); } void DBImpl::DumpStats() { @@ -819,6 +926,21 @@ Status DBImpl::SetDBOptions( thread_dump_stats_.reset(); } } + if (new_options.stats_persist_period_sec != + mutable_db_options_.stats_persist_period_sec) { + if (thread_persist_stats_) { + mutex_.Unlock(); + thread_persist_stats_->cancel(); + mutex_.Lock(); + } + if (new_options.stats_persist_period_sec > 0) { + thread_persist_stats_.reset(new rocksdb::RepeatableThread( + [this]() { DBImpl::PersistStats(); }, "pst_st", env_, + new_options.stats_persist_period_sec * 1000000)); + } else { + thread_persist_stats_.reset(); + } + } write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 diff --git a/db/db_impl.h b/db/db_impl.h index 86acaa24d..39ac83c1b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,6 +63,7 @@ namespace rocksdb { class Arena; class ArenaWrappedDBIter; +class InMemoryStatsHistoryIterator; class MemTable; class TableCache; class TaskLimiterToken; @@ -482,7 +483,10 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForTimedTaskRun(std::function callback) const; + void TEST_WaitForDumpStatsRun(std::function callback) const; + void TEST_WaitForPersistStatsRun(std::function callback) const; + bool TEST_IsPersistentStatsEnabled() const; + size_t TEST_EstiamteStatsHistorySize() const; #endif // NDEBUG @@ -728,6 +732,17 @@ class DBImpl : public DB { static Status CreateAndNewDirectory(Env* env, const std::string& dirname, std::unique_ptr* directory); + // Given a time window, return an iterator for accessing stats history + Status GetStatsHistory( + uint64_t start_time, uint64_t end_time, + std::unique_ptr* stats_iterator) override; + + // find stats map from stats_history_ with smallest timestamp in + // the range of [start_time, end_time) + bool FindStatsByTime(uint64_t start_time, uint64_t end_time, + uint64_t* new_time, + std::map* stats_map); + protected: Env* const env_; const std::string dbname_; @@ -1135,6 +1150,11 @@ class DBImpl : public DB { void PrintStatistics(); + size_t EstiamteStatsHistorySize() const; + + // persist stats to column family "_persistent_stats" + void PersistStats(); + // dump rocksdb.stats to LOG void DumpStats(); @@ -1177,6 +1197,8 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; + // In addition to mutex_, log_write_mutex_ protected writes to stats_history_ + InstrumentedMutex stats_history_mutex_; // In addition to mutex_, log_write_mutex_ protected writes to logs_ and // logfile_number_. With two_write_queues it also protects alive_log_files_, // and log_empty_. Refer to the definition of each variable below for more @@ -1299,6 +1321,12 @@ class DBImpl : public DB { bool is_snapshot_supported_; + std::map> stats_history_; + + std::map stats_slice_; + + bool stats_slice_initialized_ = false; + // Class to maintain directories for all database paths other than main one. class Directories { public: @@ -1544,10 +1572,14 @@ class DBImpl : public DB { // Only to be set during initialization std::unique_ptr recoverable_state_pre_release_callback_; - // handle for scheduling jobs at fixed intervals + // handle for scheduling stats dumping at fixed intervals // REQUIRES: mutex locked std::unique_ptr thread_dump_stats_; + // handle for scheduling stats snapshoting at fixed intervals + // REQUIRES: mutex locked + std::unique_ptr thread_persist_stats_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index bbdd5df37..d2e4a7d38 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -243,10 +243,24 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( return GetWalPreallocateBlockSize(write_buffer_size); } -void DBImpl::TEST_WaitForTimedTaskRun(std::function callback) const { +void DBImpl::TEST_WaitForDumpStatsRun(std::function callback) const { if (thread_dump_stats_ != nullptr) { thread_dump_stats_->TEST_WaitForRun(callback); } } + +void DBImpl::TEST_WaitForPersistStatsRun(std::function callback) const { + if (thread_persist_stats_ != nullptr) { + thread_persist_stats_->TEST_WaitForRun(callback); + } +} + +bool DBImpl::TEST_IsPersistentStatsEnabled() const { + return thread_persist_stats_ && thread_persist_stats_->IsRunning(); +} + +size_t DBImpl::TEST_EstiamteStatsHistorySize() const { + return EstiamteStatsHistorySize(); +} } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 892ee60f9..d3a1f7a43 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -18,12 +18,15 @@ #include "rocksdb/cache.h" #include "rocksdb/convenience.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/stats_history.h" #include "util/random.h" #include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { +const int kMicrosInSec = 1000000; + class DBOptionsTest : public DBTestBase { public: DBOptionsTest() : DBTestBase("/db_options_test") {} @@ -508,10 +511,11 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) { for (int i = 0; i < 20; i++) { int num = rand() % 5000 + 1; - ASSERT_OK(dbfull()->SetDBOptions( - {{"stats_dump_period_sec", std::to_string(num)}})); + ASSERT_OK( + dbfull()->SetDBOptions({{"stats_dump_period_sec", ToString(num)}})); ASSERT_EQ(num, dbfull()->GetDBOptions().stats_dump_period_sec); } + Close(); } TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { @@ -530,17 +534,210 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec); - dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); }); + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); ASSERT_GE(counter, 1); // Test cacel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); int old_val = counter; - env_->SleepForMicroseconds(10000000); + for (int i = 6; i < 20; ++i) { + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); + } ASSERT_EQ(counter, old_val); Close(); } +// Test persistent stats background thread scheduling and cancelling +TEST_F(DBOptionsTest, StatsPersistScheduling) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); + Close(); +} + +// Test enabling persistent stats for the first time +TEST_F(DBOptionsTest, PersistentStatsFreshInstall) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 0; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + Close(); +} + +TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.env = env_; + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "12345"}})); + ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec); + ASSERT_NOK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "abcde"}})); + ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec); +} + +TEST_F(DBOptionsTest, GetStatsHistory) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + int mock_time = 1; + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + std::unique_ptr stats_iter; + db_->GetStatsHistory(0, 6 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + // disabled stats snapshots + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + size_t stats_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + ASSERT_GT(stats_count, 0); + // Wait a bit and verify no more stats are found + for (mock_time = 6; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_new = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + stats_count_new += stats_iter->GetStatsMap().size(); + } + ASSERT_EQ(stats_count_new, stats_count); + Close(); +} + +TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) { + Options options; + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options.stats_persist_period_sec = 1; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // some random operation to populate statistics + ASSERT_OK(Delete("foo")); + ASSERT_OK(Put("sol", "sol")); + ASSERT_OK(Put("epic", "epic")); + ASSERT_OK(Put("ltd", "ltd")); + ASSERT_EQ("sol", Get("sol")); + ASSERT_EQ("epic", Get("epic")); + ASSERT_EQ("ltd", Get("ltd")); + Iterator* iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + ASSERT_OK(Delete("sol")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + int mock_time = 1; + // Wait for stats persist to finish + for (; mock_time < 5; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + + // second round of ops + ASSERT_OK(Put("saigon", "saigon")); + ASSERT_OK(Put("noodle talk", "noodle talk")); + ASSERT_OK(Put("ping bistro", "ping bistro")); + iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + for (; mock_time < 10; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + std::unique_ptr stats_iter; + db_->GetStatsHistory(0, 10 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + size_t stats_history_size = dbfull()->TEST_EstiamteStatsHistorySize(); + ASSERT_GE(slice_count, 9); + ASSERT_GE(stats_history_size, 12000); + // capping memory cost at 12000 bytes since one slice is around 10000~12000 + ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}})); + ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size); + // Wait for stats persist to finish + for (; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count_reopen += stats_map.size(); + } + size_t stats_history_size_reopen = dbfull()->TEST_EstiamteStatsHistorySize(); + // only one slice can fit under the new stats_history_buffer_size + ASSERT_LT(slice_count, 2); + ASSERT_TRUE(stats_history_size_reopen < 12000 && + stats_history_size_reopen > 0); + ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0); + Close(); +} + static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) { dbfull->TEST_LockMutex(); JobContext job_context(0); diff --git a/db/in_memory_stats_history.cc b/db/in_memory_stats_history.cc new file mode 100644 index 000000000..39355cfbe --- /dev/null +++ b/db/in_memory_stats_history.cc @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl.h" +#include "db/in_memory_stats_history.h" + +namespace rocksdb { + +InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {} + +bool InMemoryStatsHistoryIterator::Valid() const { return valid_; } + +Status InMemoryStatsHistoryIterator::status() const { return status_; } + +void InMemoryStatsHistoryIterator::Next() { + // increment start_time by 1 to avoid infinite loop + AdvanceIteratorByTime(GetStatsTime() + 1, end_time_); +} + +uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; } + +const std::map& +InMemoryStatsHistoryIterator::GetStatsMap() const { + return stats_map_; +} + +// advance the iterator to the next time between [start_time, end_time) +// if success, update time_ and stats_map_ with new_time and stats_map +void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time, + uint64_t end_time) { + // try to find next entry in stats_history_ map + if (db_impl_ != nullptr) { + valid_ = + db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_); + } else { + valid_ = false; + } +} + +} // namespace rocksdb diff --git a/db/in_memory_stats_history.h b/db/in_memory_stats_history.h new file mode 100644 index 000000000..4b52e23ff --- /dev/null +++ b/db/in_memory_stats_history.h @@ -0,0 +1,55 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/stats_history.h" + +namespace rocksdb { + +class InMemoryStatsHistoryIterator final : public StatsHistoryIterator { + public: + InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time, + DBImpl* db_impl) + : start_time_(start_time), + end_time_(end_time), + valid_(true), + db_impl_(db_impl) { + AdvanceIteratorByTime(start_time_, end_time_); + } + ~InMemoryStatsHistoryIterator() override; + bool Valid() const override; + Status status() const override; + + void Next() override; + uint64_t GetStatsTime() const override; + + const std::map& GetStatsMap() const override; + + private: + // advance the iterator to the next stats history record with timestamp + // between [start_time, end_time) + void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time); + + // No copying allowed + InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete; + void operator=(const InMemoryStatsHistoryIterator&) = delete; + InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete; + InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) = + delete; + + uint64_t time_; + uint64_t start_time_; + uint64_t end_time_; + std::map stats_map_; + Status status_; + bool valid_; + DBImpl* db_impl_; +}; + +} // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 0ba9dda02..9d5316546 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -52,6 +52,7 @@ struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; +class StatsHistoryIterator; class TraceWriter; #ifdef ROCKSDB_LITE class CompactionJobInfo; @@ -1226,6 +1227,14 @@ class DB { // Needed for StackableDB virtual DB* GetRootDB() { return this; } + // Given a time window, return an iterator for accessing stats history + // User is responsible for deleting StatsHistoryIterator after use + virtual Status GetStatsHistory(uint64_t /*start_time*/, + uint64_t /*end_time*/, + std::unique_ptr* /*stats_iterator*/) { + return Status::NotSupported("GetStatsHistory() is not implemented."); + } + private: // No copying allowed DB(const DB&); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3b3bd88c3..311d78983 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -677,6 +677,15 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. unsigned int stats_dump_period_sec = 600; + // if not zero, dump rocksdb.stats to RocksDB every stats_persist_period_sec + // Default: 600 + unsigned int stats_persist_period_sec = 600; + + // if not zero, periodically take stats snapshots and store in memory, the + // memory size for stats snapshots is capped at stats_history_buffer_size + // Default: 1MB + size_t stats_history_buffer_size = 1024 * 1024; + // If set true, will hint the underlying file system that the file // access pattern is random, when a sst file is opened. // Default: true diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index dc7750283..c1cadde7b 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -8,8 +8,9 @@ #include #include #include -#include +#include #include +#include #include #include "rocksdb/status.h" @@ -479,6 +480,11 @@ class Statistics { return std::string("ToString(): not implemented"); } + virtual bool getTickerMap(std::map*) const { + // Do nothing by default + return false; + }; + // Override this function to disable particular histogram collection virtual bool HistEnabledForType(uint32_t type) const { return type < HISTOGRAM_ENUM_MAX; diff --git a/include/rocksdb/stats_history.h b/include/rocksdb/stats_history.h new file mode 100644 index 000000000..40ea51d1f --- /dev/null +++ b/include/rocksdb/stats_history.h @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include + +// #include "db/db_impl.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class DBImpl; + +class StatsHistoryIterator { + public: + StatsHistoryIterator() {} + virtual ~StatsHistoryIterator() {} + + virtual bool Valid() const = 0; + + // Moves to the next stats history record. After this call, Valid() is + // true iff the iterator was not positioned at the last entry in the source. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Return the time stamp (in microseconds) when stats history is recorded. + // REQUIRES: Valid() + virtual uint64_t GetStatsTime() const = 0; + + // Return the current stats history as an std::map which specifies the + // mapping from stats name to stats value . The underlying storage + // for the returned map is valid only until the next modification of + // the iterator. + // REQUIRES: Valid() + virtual const std::map& GetStatsMap() const = 0; + + // If an error has occurred, return it. Else return an ok status. + virtual Status status() const = 0; +}; + +} // namespace rocksdb diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index f739886d2..c24ff1688 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -4522,7 +4522,7 @@ class JniUtil { * if an OutOfMemoryError or ArrayIndexOutOfBoundsException * exception occurs * - * @return A std::vector containing copies of the Java strings + * @return A std::vector containing copies of the Java strings */ static std::vector copyStrings(JNIEnv* env, jobjectArray jss, jboolean* has_exception) { @@ -4540,7 +4540,7 @@ class JniUtil { * if an OutOfMemoryError or ArrayIndexOutOfBoundsException * exception occurs * - * @return A std::vector containing copies of the Java strings + * @return A std::vector containing copies of the Java strings */ static std::vector copyStrings(JNIEnv* env, jobjectArray jss, const jsize jss_len, jboolean* has_exception) { @@ -4624,7 +4624,7 @@ class JniUtil { * @param has_exception (OUT) will be set to JNI_TRUE * if an OutOfMemoryError exception occurs * - * @return A std:string copy of the jstring, or an + * @return A std::string copy of the jstring, or an * empty std::string if has_exception == JNI_TRUE */ static std::string copyStdString(JNIEnv* env, jstring js, @@ -4655,8 +4655,8 @@ class JniUtil { * @param bytes The bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java specific array size limitation. */ static jbyteArray copyBytes(JNIEnv* env, std::string bytes) { @@ -4823,7 +4823,7 @@ class JniUtil { return jbyte_strings; } - + /** * Copies bytes to a new jByteArray with the check of java array size limitation. * @@ -4831,29 +4831,29 @@ class JniUtil { * @param size number of bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java array size limitation to avoid overflow. */ static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) { // Limitation for java array size is vm specific // In general it cannot exceed Integer.MAX_VALUE (2^31 - 1) // Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5) - // It means that the next call to env->NewByteArray can still end with + // It means that the next call to env->NewByteArray can still end with // OutOfMemoryError("Requested array size exceeds VM limit") coming from VM static const size_t MAX_JARRAY_SIZE = (static_cast(1)) << 31; if(size > MAX_JARRAY_SIZE) { rocksdb::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit"); return nullptr; } - + const jsize jlen = static_cast(size); jbyteArray jbytes = env->NewByteArray(jlen); if(jbytes == nullptr) { - // exception thrown: OutOfMemoryError + // exception thrown: OutOfMemoryError return nullptr; } - + env->SetByteArrayRegion(jbytes, 0, jlen, const_cast(reinterpret_cast(bytes))); if(env->ExceptionCheck()) { @@ -4872,8 +4872,8 @@ class JniUtil { * @param bytes The bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java specific array size limitation. */ static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) { diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index a9479269d..0501ee207 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -384,6 +384,19 @@ std::string StatisticsImpl::ToString() const { return res; } +bool StatisticsImpl::getTickerMap( + std::map* stats_map) const { + assert(stats_map); + if (!stats_map) return false; + stats_map->clear(); + MutexLock lock(&aggregate_lock_); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_ENUM_MAX); + (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first); + } + return true; +} + bool StatisticsImpl::HistEnabledForType(uint32_t type) const { return type < HISTOGRAM_ENUM_MAX; } diff --git a/monitoring/statistics.h b/monitoring/statistics.h index dcd5f7a01..55dd1fad7 100644 --- a/monitoring/statistics.h +++ b/monitoring/statistics.h @@ -6,9 +6,10 @@ #pragma once #include "rocksdb/statistics.h" -#include #include +#include #include +#include #include "monitoring/histogram.h" #include "port/likely.h" @@ -56,6 +57,7 @@ class StatisticsImpl : public Statistics { virtual Status Reset() override; virtual std::string ToString() const override; + virtual bool getTickerMap(std::map*) const override; virtual bool HistEnabledForType(uint32_t type) const override; private: diff --git a/options/db_options.cc b/options/db_options.cc index 4e8134511..b90c27606 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -229,6 +229,8 @@ MutableDBOptions::MutableDBOptions() max_total_wal_size(0), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), stats_dump_period_sec(600), + stats_persist_period_sec(600), + stats_history_buffer_size(1024 * 1024), max_open_files(-1), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -245,6 +247,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), stats_dump_period_sec(options.stats_dump_period_sec), + stats_persist_period_sec(options.stats_persist_period_sec), + stats_history_buffer_size(options.stats_history_buffer_size), max_open_files(options.max_open_files), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -269,6 +273,10 @@ void MutableDBOptions::Dump(Logger* log) const { delete_obsolete_files_period_micros); ROCKS_LOG_HEADER(log, " Options.stats_dump_period_sec: %u", stats_dump_period_sec); + ROCKS_LOG_HEADER(log, " Options.stats_persist_period_sec: %d", + stats_persist_period_sec); + ROCKS_LOG_HEADER(log, " Options.stats_history_buffer_size: %d", + stats_history_buffer_size); ROCKS_LOG_HEADER(log, " Options.max_open_files: %d", max_open_files); ROCKS_LOG_HEADER(log, diff --git a/options/db_options.h b/options/db_options.h index 2cd83b55d..360848851 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -97,6 +97,8 @@ struct MutableDBOptions { uint64_t max_total_wal_size; uint64_t delete_obsolete_files_period_micros; unsigned int stats_dump_period_sec; + unsigned int stats_persist_period_sec; + size_t stats_history_buffer_size; int max_open_files; uint64_t bytes_per_sync; uint64_t wal_bytes_per_sync; diff --git a/options/options_helper.cc b/options/options_helper.cc index 05ea8d67b..94410e530 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -79,6 +79,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.allow_fallocate = immutable_db_options.allow_fallocate; options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec; options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec; + options.stats_persist_period_sec = + mutable_db_options.stats_persist_period_sec; + options.stats_history_buffer_size = + mutable_db_options.stats_history_buffer_size; options.advise_random_on_open = immutable_db_options.advise_random_on_open; options.db_write_buffer_size = immutable_db_options.db_write_buffer_size; options.write_buffer_manager = immutable_db_options.write_buffer_manager; @@ -1495,6 +1499,14 @@ std::unordered_map {offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, stats_dump_period_sec)}}, + {"stats_persist_period_sec", + {offsetof(struct DBOptions, stats_persist_period_sec), + OptionType::kUInt, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, stats_persist_period_sec)}}, + {"stats_history_buffer_size", + {offsetof(struct DBOptions, stats_history_buffer_size), + OptionType::kSizeT, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, stats_history_buffer_size)}}, {"fail_if_options_file_error", {offsetof(struct DBOptions, fail_if_options_file_error), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 9d37fc186..7da8380cf 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -266,6 +266,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "manifest_preallocation_size=1222;" "allow_mmap_writes=false;" "stats_dump_period_sec=70127;" + "stats_persist_period_sec=54321;" + "stats_history_buffer_size=14159;" "allow_fallocate=true;" "allow_mmap_reads=false;" "use_direct_reads=false;" diff --git a/options/options_test.cc b/options/options_test.cc index cebad4938..f700d8d65 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -128,6 +128,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, + {"stats_persist_period_sec", "57"}, + {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, @@ -262,6 +264,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); + ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U); + ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); diff --git a/src.mk b/src.mk index 19e01ec3a..39ba3f99b 100644 --- a/src.mk +++ b/src.mk @@ -34,6 +34,7 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ + db/in_memory_stats_history.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 502366236..ec9cdcab4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1095,6 +1095,12 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo " DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG "); DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec, "Gap between printing stats to log in seconds"); +DEFINE_uint64(stats_persist_period_sec, + rocksdb::Options().stats_persist_period_sec, + "Gap between persisting stats in seconds"); +DEFINE_uint64(stats_history_buffer_size, + rocksdb::Options().stats_history_buffer_size, + "Max number of stats snapshots to keep in memory"); enum RepFactory { kSkipList, @@ -3565,6 +3571,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.dump_malloc_stats = FLAGS_dump_malloc_stats; options.stats_dump_period_sec = static_cast(FLAGS_stats_dump_period_sec); + options.stats_persist_period_sec = + static_cast(FLAGS_stats_persist_period_sec); + options.stats_history_buffer_size = + static_cast(FLAGS_stats_history_buffer_size); options.compression_opts.level = FLAGS_compression_level; options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 3506234f9..770ba5772 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -46,6 +46,8 @@ class RepeatableThread { thread_.join(); } + bool IsRunning() { return running_; } + ~RepeatableThread() { cancel(); } #ifndef NDEBUG