diff --git a/CMakeLists.txt b/CMakeLists.txt index 3737398d7..40cdd26bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -501,6 +501,7 @@ set(SOURCES db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc + db/in_memory_stats_history.cc db/logs_with_prep_tracker.cc db/log_reader.cc db/log_writer.cc diff --git a/HISTORY.md b/HISTORY.md index 7070b819b..14502d37a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds. * For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed. * Add support for block checksums verification for external SST files before ingestion. +* Introduce stats history which periodically saves Statistics snapshots and added `GetStatsHistory` API to retrieve these snapshots. * Add a place holder in manifest which indicate a record from future that can be safely ignored. * Add support for trace sampling. * Enable properties block checksum verification for block-based tables. diff --git a/TARGETS b/TARGETS index f7182e2f2..4590560f1 100644 --- a/TARGETS +++ b/TARGETS @@ -110,6 +110,7 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", + "db/in_memory_stats_history.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index 2497011a4..821abba0e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -37,6 +37,7 @@ #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" +#include "db/in_memory_stats_history.h" #include "db/job_context.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -69,6 +70,7 @@ #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" +#include "rocksdb/stats_history.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "rocksdb/write_buffer_manager.h" @@ -387,16 +389,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown: canceling all background work"); - InstrumentedMutexLock l(&mutex_); - // To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called - // before grabbing db mutex because the actual worker function - // `DBImpl::DumpStats()` also holds db mutex if (thread_dump_stats_ != nullptr) { - mutex_.Unlock(); thread_dump_stats_->cancel(); - mutex_.Lock(); thread_dump_stats_.reset(); } + if (thread_persist_stats_ != nullptr) { + thread_persist_stats_->cancel(); + thread_persist_stats_.reset(); + } + InstrumentedMutexLock l(&mutex_); if (!shutting_down_.load(std::memory_order_acquire) && has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { @@ -620,6 +621,7 @@ void DBImpl::PrintStatistics() { void DBImpl::StartTimedTasks() { unsigned int stats_dump_period_sec = 0; + unsigned int stats_persist_period_sec = 0; { InstrumentedMutexLock l(&mutex_); stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec; @@ -630,7 +632,112 @@ void DBImpl::StartTimedTasks() { stats_dump_period_sec * 1000000)); } } + stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec; + if (stats_persist_period_sec > 0) { + if (!thread_persist_stats_) { + thread_persist_stats_.reset(new rocksdb::RepeatableThread( + [this]() { DBImpl::PersistStats(); }, "pst_st", env_, + stats_persist_period_sec * 1000000)); + } + } + } +} + +// esitmate the total size of stats_history_ +size_t DBImpl::EstiamteStatsHistorySize() const { + size_t size_total = + sizeof(std::map>); + if (stats_history_.size() == 0) return size_total; + size_t size_per_slice = + sizeof(uint64_t) + sizeof(std::map); + // non-empty map, stats_history_.begin() guaranteed to exist + std::map sample_slice(stats_history_.begin()->second); + for (const auto& pairs : sample_slice) { + size_per_slice += + pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second); } + size_total = size_per_slice * stats_history_.size(); + return size_total; +} + +void DBImpl::PersistStats() { + TEST_SYNC_POINT("DBImpl::PersistStats:Entry"); +#ifndef ROCKSDB_LITE + if (shutdown_initiated_) { + return; + } + uint64_t now_micros = env_->NowMicros(); + Statistics* statistics = immutable_db_options_.statistics.get(); + if (!statistics) { + return; + } + size_t stats_history_size_limit = 0; + { + InstrumentedMutexLock l(&mutex_); + stats_history_size_limit = mutable_db_options_.stats_history_buffer_size; + } + + // TODO(Zhongyi): also persist immutable_db_options_.statistics + { + std::map stats_map; + if (!statistics->getTickerMap(&stats_map)) { + return; + } + InstrumentedMutexLock l(&stats_history_mutex_); + // calculate the delta from last time + if (stats_slice_initialized_) { + std::map stats_delta; + for (const auto& stat : stats_map) { + if (stats_slice_.find(stat.first) != stats_slice_.end()) { + stats_delta[stat.first] = stat.second - stats_slice_[stat.first]; + } + } + stats_history_[now_micros] = stats_delta; + } + stats_slice_initialized_ = true; + std::swap(stats_slice_, stats_map); + TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied"); + + // delete older stats snapshots to control memory consumption + bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + while (purge_needed && !stats_history_.empty()) { + stats_history_.erase(stats_history_.begin()); + purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + } + } + // TODO: persist stats to disk +#endif // !ROCKSDB_LITE +} + +bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time, + uint64_t* new_time, + std::map* stats_map) { + assert(new_time); + assert(stats_map); + if (!new_time || !stats_map) return false; + // lock when search for start_time + { + InstrumentedMutexLock l(&stats_history_mutex_); + auto it = stats_history_.lower_bound(start_time); + if (it != stats_history_.end() && it->first < end_time) { + // make a copy for timestamp and stats_map + *new_time = it->first; + *stats_map = it->second; + return true; + } else { + return false; + } + } +} + +Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time, + std::unique_ptr* stats_iterator) { + if (!stats_iterator) { + return Status::InvalidArgument("stats_iterator not preallocated."); + } + stats_iterator->reset( + new InMemoryStatsHistoryIterator(start_time, end_time, this)); + return (*stats_iterator)->status(); } void DBImpl::DumpStats() { @@ -819,6 +926,21 @@ Status DBImpl::SetDBOptions( thread_dump_stats_.reset(); } } + if (new_options.stats_persist_period_sec != + mutable_db_options_.stats_persist_period_sec) { + if (thread_persist_stats_) { + mutex_.Unlock(); + thread_persist_stats_->cancel(); + mutex_.Lock(); + } + if (new_options.stats_persist_period_sec > 0) { + thread_persist_stats_.reset(new rocksdb::RepeatableThread( + [this]() { DBImpl::PersistStats(); }, "pst_st", env_, + new_options.stats_persist_period_sec * 1000000)); + } else { + thread_persist_stats_.reset(); + } + } write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 diff --git a/db/db_impl.h b/db/db_impl.h index 86acaa24d..39ac83c1b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,6 +63,7 @@ namespace rocksdb { class Arena; class ArenaWrappedDBIter; +class InMemoryStatsHistoryIterator; class MemTable; class TableCache; class TaskLimiterToken; @@ -482,7 +483,10 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForTimedTaskRun(std::function callback) const; + void TEST_WaitForDumpStatsRun(std::function callback) const; + void TEST_WaitForPersistStatsRun(std::function callback) const; + bool TEST_IsPersistentStatsEnabled() const; + size_t TEST_EstiamteStatsHistorySize() const; #endif // NDEBUG @@ -728,6 +732,17 @@ class DBImpl : public DB { static Status CreateAndNewDirectory(Env* env, const std::string& dirname, std::unique_ptr* directory); + // Given a time window, return an iterator for accessing stats history + Status GetStatsHistory( + uint64_t start_time, uint64_t end_time, + std::unique_ptr* stats_iterator) override; + + // find stats map from stats_history_ with smallest timestamp in + // the range of [start_time, end_time) + bool FindStatsByTime(uint64_t start_time, uint64_t end_time, + uint64_t* new_time, + std::map* stats_map); + protected: Env* const env_; const std::string dbname_; @@ -1135,6 +1150,11 @@ class DBImpl : public DB { void PrintStatistics(); + size_t EstiamteStatsHistorySize() const; + + // persist stats to column family "_persistent_stats" + void PersistStats(); + // dump rocksdb.stats to LOG void DumpStats(); @@ -1177,6 +1197,8 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; + // In addition to mutex_, log_write_mutex_ protected writes to stats_history_ + InstrumentedMutex stats_history_mutex_; // In addition to mutex_, log_write_mutex_ protected writes to logs_ and // logfile_number_. With two_write_queues it also protects alive_log_files_, // and log_empty_. Refer to the definition of each variable below for more @@ -1299,6 +1321,12 @@ class DBImpl : public DB { bool is_snapshot_supported_; + std::map> stats_history_; + + std::map stats_slice_; + + bool stats_slice_initialized_ = false; + // Class to maintain directories for all database paths other than main one. class Directories { public: @@ -1544,10 +1572,14 @@ class DBImpl : public DB { // Only to be set during initialization std::unique_ptr recoverable_state_pre_release_callback_; - // handle for scheduling jobs at fixed intervals + // handle for scheduling stats dumping at fixed intervals // REQUIRES: mutex locked std::unique_ptr thread_dump_stats_; + // handle for scheduling stats snapshoting at fixed intervals + // REQUIRES: mutex locked + std::unique_ptr thread_persist_stats_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index bbdd5df37..d2e4a7d38 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -243,10 +243,24 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( return GetWalPreallocateBlockSize(write_buffer_size); } -void DBImpl::TEST_WaitForTimedTaskRun(std::function callback) const { +void DBImpl::TEST_WaitForDumpStatsRun(std::function callback) const { if (thread_dump_stats_ != nullptr) { thread_dump_stats_->TEST_WaitForRun(callback); } } + +void DBImpl::TEST_WaitForPersistStatsRun(std::function callback) const { + if (thread_persist_stats_ != nullptr) { + thread_persist_stats_->TEST_WaitForRun(callback); + } +} + +bool DBImpl::TEST_IsPersistentStatsEnabled() const { + return thread_persist_stats_ && thread_persist_stats_->IsRunning(); +} + +size_t DBImpl::TEST_EstiamteStatsHistorySize() const { + return EstiamteStatsHistorySize(); +} } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 892ee60f9..d3a1f7a43 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -18,12 +18,15 @@ #include "rocksdb/cache.h" #include "rocksdb/convenience.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/stats_history.h" #include "util/random.h" #include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { +const int kMicrosInSec = 1000000; + class DBOptionsTest : public DBTestBase { public: DBOptionsTest() : DBTestBase("/db_options_test") {} @@ -508,10 +511,11 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) { for (int i = 0; i < 20; i++) { int num = rand() % 5000 + 1; - ASSERT_OK(dbfull()->SetDBOptions( - {{"stats_dump_period_sec", std::to_string(num)}})); + ASSERT_OK( + dbfull()->SetDBOptions({{"stats_dump_period_sec", ToString(num)}})); ASSERT_EQ(num, dbfull()->GetDBOptions().stats_dump_period_sec); } + Close(); } TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { @@ -530,17 +534,210 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec); - dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); }); + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); ASSERT_GE(counter, 1); // Test cacel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); int old_val = counter; - env_->SleepForMicroseconds(10000000); + for (int i = 6; i < 20; ++i) { + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); + } ASSERT_EQ(counter, old_val); Close(); } +// Test persistent stats background thread scheduling and cancelling +TEST_F(DBOptionsTest, StatsPersistScheduling) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); + Close(); +} + +// Test enabling persistent stats for the first time +TEST_F(DBOptionsTest, PersistentStatsFreshInstall) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 0; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + Close(); +} + +TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.env = env_; + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "12345"}})); + ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec); + ASSERT_NOK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "abcde"}})); + ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec); +} + +TEST_F(DBOptionsTest, GetStatsHistory) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + int mock_time = 1; + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + std::unique_ptr stats_iter; + db_->GetStatsHistory(0, 6 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + // disabled stats snapshots + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + size_t stats_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + ASSERT_GT(stats_count, 0); + // Wait a bit and verify no more stats are found + for (mock_time = 6; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_new = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + stats_count_new += stats_iter->GetStatsMap().size(); + } + ASSERT_EQ(stats_count_new, stats_count); + Close(); +} + +TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) { + Options options; + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options.stats_persist_period_sec = 1; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // some random operation to populate statistics + ASSERT_OK(Delete("foo")); + ASSERT_OK(Put("sol", "sol")); + ASSERT_OK(Put("epic", "epic")); + ASSERT_OK(Put("ltd", "ltd")); + ASSERT_EQ("sol", Get("sol")); + ASSERT_EQ("epic", Get("epic")); + ASSERT_EQ("ltd", Get("ltd")); + Iterator* iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + ASSERT_OK(Delete("sol")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + int mock_time = 1; + // Wait for stats persist to finish + for (; mock_time < 5; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + + // second round of ops + ASSERT_OK(Put("saigon", "saigon")); + ASSERT_OK(Put("noodle talk", "noodle talk")); + ASSERT_OK(Put("ping bistro", "ping bistro")); + iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + for (; mock_time < 10; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + std::unique_ptr stats_iter; + db_->GetStatsHistory(0, 10 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + size_t stats_history_size = dbfull()->TEST_EstiamteStatsHistorySize(); + ASSERT_GE(slice_count, 9); + ASSERT_GE(stats_history_size, 12000); + // capping memory cost at 12000 bytes since one slice is around 10000~12000 + ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}})); + ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size); + // Wait for stats persist to finish + for (; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count_reopen += stats_map.size(); + } + size_t stats_history_size_reopen = dbfull()->TEST_EstiamteStatsHistorySize(); + // only one slice can fit under the new stats_history_buffer_size + ASSERT_LT(slice_count, 2); + ASSERT_TRUE(stats_history_size_reopen < 12000 && + stats_history_size_reopen > 0); + ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0); + Close(); +} + static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) { dbfull->TEST_LockMutex(); JobContext job_context(0); diff --git a/db/in_memory_stats_history.cc b/db/in_memory_stats_history.cc new file mode 100644 index 000000000..39355cfbe --- /dev/null +++ b/db/in_memory_stats_history.cc @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl.h" +#include "db/in_memory_stats_history.h" + +namespace rocksdb { + +InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {} + +bool InMemoryStatsHistoryIterator::Valid() const { return valid_; } + +Status InMemoryStatsHistoryIterator::status() const { return status_; } + +void InMemoryStatsHistoryIterator::Next() { + // increment start_time by 1 to avoid infinite loop + AdvanceIteratorByTime(GetStatsTime() + 1, end_time_); +} + +uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; } + +const std::map& +InMemoryStatsHistoryIterator::GetStatsMap() const { + return stats_map_; +} + +// advance the iterator to the next time between [start_time, end_time) +// if success, update time_ and stats_map_ with new_time and stats_map +void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time, + uint64_t end_time) { + // try to find next entry in stats_history_ map + if (db_impl_ != nullptr) { + valid_ = + db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_); + } else { + valid_ = false; + } +} + +} // namespace rocksdb diff --git a/db/in_memory_stats_history.h b/db/in_memory_stats_history.h new file mode 100644 index 000000000..4b52e23ff --- /dev/null +++ b/db/in_memory_stats_history.h @@ -0,0 +1,55 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/stats_history.h" + +namespace rocksdb { + +class InMemoryStatsHistoryIterator final : public StatsHistoryIterator { + public: + InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time, + DBImpl* db_impl) + : start_time_(start_time), + end_time_(end_time), + valid_(true), + db_impl_(db_impl) { + AdvanceIteratorByTime(start_time_, end_time_); + } + ~InMemoryStatsHistoryIterator() override; + bool Valid() const override; + Status status() const override; + + void Next() override; + uint64_t GetStatsTime() const override; + + const std::map& GetStatsMap() const override; + + private: + // advance the iterator to the next stats history record with timestamp + // between [start_time, end_time) + void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time); + + // No copying allowed + InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete; + void operator=(const InMemoryStatsHistoryIterator&) = delete; + InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete; + InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) = + delete; + + uint64_t time_; + uint64_t start_time_; + uint64_t end_time_; + std::map stats_map_; + Status status_; + bool valid_; + DBImpl* db_impl_; +}; + +} // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 0ba9dda02..9d5316546 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -52,6 +52,7 @@ struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; +class StatsHistoryIterator; class TraceWriter; #ifdef ROCKSDB_LITE class CompactionJobInfo; @@ -1226,6 +1227,14 @@ class DB { // Needed for StackableDB virtual DB* GetRootDB() { return this; } + // Given a time window, return an iterator for accessing stats history + // User is responsible for deleting StatsHistoryIterator after use + virtual Status GetStatsHistory(uint64_t /*start_time*/, + uint64_t /*end_time*/, + std::unique_ptr* /*stats_iterator*/) { + return Status::NotSupported("GetStatsHistory() is not implemented."); + } + private: // No copying allowed DB(const DB&); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3b3bd88c3..311d78983 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -677,6 +677,15 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. unsigned int stats_dump_period_sec = 600; + // if not zero, dump rocksdb.stats to RocksDB every stats_persist_period_sec + // Default: 600 + unsigned int stats_persist_period_sec = 600; + + // if not zero, periodically take stats snapshots and store in memory, the + // memory size for stats snapshots is capped at stats_history_buffer_size + // Default: 1MB + size_t stats_history_buffer_size = 1024 * 1024; + // If set true, will hint the underlying file system that the file // access pattern is random, when a sst file is opened. // Default: true diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index dc7750283..c1cadde7b 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -8,8 +8,9 @@ #include #include #include -#include +#include #include +#include #include #include "rocksdb/status.h" @@ -479,6 +480,11 @@ class Statistics { return std::string("ToString(): not implemented"); } + virtual bool getTickerMap(std::map*) const { + // Do nothing by default + return false; + }; + // Override this function to disable particular histogram collection virtual bool HistEnabledForType(uint32_t type) const { return type < HISTOGRAM_ENUM_MAX; diff --git a/include/rocksdb/stats_history.h b/include/rocksdb/stats_history.h new file mode 100644 index 000000000..40ea51d1f --- /dev/null +++ b/include/rocksdb/stats_history.h @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include + +// #include "db/db_impl.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class DBImpl; + +class StatsHistoryIterator { + public: + StatsHistoryIterator() {} + virtual ~StatsHistoryIterator() {} + + virtual bool Valid() const = 0; + + // Moves to the next stats history record. After this call, Valid() is + // true iff the iterator was not positioned at the last entry in the source. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Return the time stamp (in microseconds) when stats history is recorded. + // REQUIRES: Valid() + virtual uint64_t GetStatsTime() const = 0; + + // Return the current stats history as an std::map which specifies the + // mapping from stats name to stats value . The underlying storage + // for the returned map is valid only until the next modification of + // the iterator. + // REQUIRES: Valid() + virtual const std::map& GetStatsMap() const = 0; + + // If an error has occurred, return it. Else return an ok status. + virtual Status status() const = 0; +}; + +} // namespace rocksdb diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index f739886d2..c24ff1688 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -4522,7 +4522,7 @@ class JniUtil { * if an OutOfMemoryError or ArrayIndexOutOfBoundsException * exception occurs * - * @return A std::vector containing copies of the Java strings + * @return A std::vector containing copies of the Java strings */ static std::vector copyStrings(JNIEnv* env, jobjectArray jss, jboolean* has_exception) { @@ -4540,7 +4540,7 @@ class JniUtil { * if an OutOfMemoryError or ArrayIndexOutOfBoundsException * exception occurs * - * @return A std::vector containing copies of the Java strings + * @return A std::vector containing copies of the Java strings */ static std::vector copyStrings(JNIEnv* env, jobjectArray jss, const jsize jss_len, jboolean* has_exception) { @@ -4624,7 +4624,7 @@ class JniUtil { * @param has_exception (OUT) will be set to JNI_TRUE * if an OutOfMemoryError exception occurs * - * @return A std:string copy of the jstring, or an + * @return A std::string copy of the jstring, or an * empty std::string if has_exception == JNI_TRUE */ static std::string copyStdString(JNIEnv* env, jstring js, @@ -4655,8 +4655,8 @@ class JniUtil { * @param bytes The bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java specific array size limitation. */ static jbyteArray copyBytes(JNIEnv* env, std::string bytes) { @@ -4823,7 +4823,7 @@ class JniUtil { return jbyte_strings; } - + /** * Copies bytes to a new jByteArray with the check of java array size limitation. * @@ -4831,29 +4831,29 @@ class JniUtil { * @param size number of bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java array size limitation to avoid overflow. */ static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) { // Limitation for java array size is vm specific // In general it cannot exceed Integer.MAX_VALUE (2^31 - 1) // Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5) - // It means that the next call to env->NewByteArray can still end with + // It means that the next call to env->NewByteArray can still end with // OutOfMemoryError("Requested array size exceeds VM limit") coming from VM static const size_t MAX_JARRAY_SIZE = (static_cast(1)) << 31; if(size > MAX_JARRAY_SIZE) { rocksdb::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit"); return nullptr; } - + const jsize jlen = static_cast(size); jbyteArray jbytes = env->NewByteArray(jlen); if(jbytes == nullptr) { - // exception thrown: OutOfMemoryError + // exception thrown: OutOfMemoryError return nullptr; } - + env->SetByteArrayRegion(jbytes, 0, jlen, const_cast(reinterpret_cast(bytes))); if(env->ExceptionCheck()) { @@ -4872,8 +4872,8 @@ class JniUtil { * @param bytes The bytes to copy * * @return the Java byte[] or nullptr if an exception occurs - * - * @throws RocksDBException thrown + * + * @throws RocksDBException thrown * if memory size to copy exceeds general java specific array size limitation. */ static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) { diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index a9479269d..0501ee207 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -384,6 +384,19 @@ std::string StatisticsImpl::ToString() const { return res; } +bool StatisticsImpl::getTickerMap( + std::map* stats_map) const { + assert(stats_map); + if (!stats_map) return false; + stats_map->clear(); + MutexLock lock(&aggregate_lock_); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_ENUM_MAX); + (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first); + } + return true; +} + bool StatisticsImpl::HistEnabledForType(uint32_t type) const { return type < HISTOGRAM_ENUM_MAX; } diff --git a/monitoring/statistics.h b/monitoring/statistics.h index dcd5f7a01..55dd1fad7 100644 --- a/monitoring/statistics.h +++ b/monitoring/statistics.h @@ -6,9 +6,10 @@ #pragma once #include "rocksdb/statistics.h" -#include #include +#include #include +#include #include "monitoring/histogram.h" #include "port/likely.h" @@ -56,6 +57,7 @@ class StatisticsImpl : public Statistics { virtual Status Reset() override; virtual std::string ToString() const override; + virtual bool getTickerMap(std::map*) const override; virtual bool HistEnabledForType(uint32_t type) const override; private: diff --git a/options/db_options.cc b/options/db_options.cc index 4e8134511..b90c27606 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -229,6 +229,8 @@ MutableDBOptions::MutableDBOptions() max_total_wal_size(0), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), stats_dump_period_sec(600), + stats_persist_period_sec(600), + stats_history_buffer_size(1024 * 1024), max_open_files(-1), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -245,6 +247,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), stats_dump_period_sec(options.stats_dump_period_sec), + stats_persist_period_sec(options.stats_persist_period_sec), + stats_history_buffer_size(options.stats_history_buffer_size), max_open_files(options.max_open_files), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -269,6 +273,10 @@ void MutableDBOptions::Dump(Logger* log) const { delete_obsolete_files_period_micros); ROCKS_LOG_HEADER(log, " Options.stats_dump_period_sec: %u", stats_dump_period_sec); + ROCKS_LOG_HEADER(log, " Options.stats_persist_period_sec: %d", + stats_persist_period_sec); + ROCKS_LOG_HEADER(log, " Options.stats_history_buffer_size: %d", + stats_history_buffer_size); ROCKS_LOG_HEADER(log, " Options.max_open_files: %d", max_open_files); ROCKS_LOG_HEADER(log, diff --git a/options/db_options.h b/options/db_options.h index 2cd83b55d..360848851 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -97,6 +97,8 @@ struct MutableDBOptions { uint64_t max_total_wal_size; uint64_t delete_obsolete_files_period_micros; unsigned int stats_dump_period_sec; + unsigned int stats_persist_period_sec; + size_t stats_history_buffer_size; int max_open_files; uint64_t bytes_per_sync; uint64_t wal_bytes_per_sync; diff --git a/options/options_helper.cc b/options/options_helper.cc index 05ea8d67b..94410e530 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -79,6 +79,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.allow_fallocate = immutable_db_options.allow_fallocate; options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec; options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec; + options.stats_persist_period_sec = + mutable_db_options.stats_persist_period_sec; + options.stats_history_buffer_size = + mutable_db_options.stats_history_buffer_size; options.advise_random_on_open = immutable_db_options.advise_random_on_open; options.db_write_buffer_size = immutable_db_options.db_write_buffer_size; options.write_buffer_manager = immutable_db_options.write_buffer_manager; @@ -1495,6 +1499,14 @@ std::unordered_map {offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, stats_dump_period_sec)}}, + {"stats_persist_period_sec", + {offsetof(struct DBOptions, stats_persist_period_sec), + OptionType::kUInt, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, stats_persist_period_sec)}}, + {"stats_history_buffer_size", + {offsetof(struct DBOptions, stats_history_buffer_size), + OptionType::kSizeT, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, stats_history_buffer_size)}}, {"fail_if_options_file_error", {offsetof(struct DBOptions, fail_if_options_file_error), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 9d37fc186..7da8380cf 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -266,6 +266,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "manifest_preallocation_size=1222;" "allow_mmap_writes=false;" "stats_dump_period_sec=70127;" + "stats_persist_period_sec=54321;" + "stats_history_buffer_size=14159;" "allow_fallocate=true;" "allow_mmap_reads=false;" "use_direct_reads=false;" diff --git a/options/options_test.cc b/options/options_test.cc index cebad4938..f700d8d65 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -128,6 +128,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, + {"stats_persist_period_sec", "57"}, + {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, @@ -262,6 +264,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); + ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U); + ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); diff --git a/src.mk b/src.mk index 19e01ec3a..39ba3f99b 100644 --- a/src.mk +++ b/src.mk @@ -34,6 +34,7 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ + db/in_memory_stats_history.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 502366236..ec9cdcab4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1095,6 +1095,12 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo " DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG "); DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec, "Gap between printing stats to log in seconds"); +DEFINE_uint64(stats_persist_period_sec, + rocksdb::Options().stats_persist_period_sec, + "Gap between persisting stats in seconds"); +DEFINE_uint64(stats_history_buffer_size, + rocksdb::Options().stats_history_buffer_size, + "Max number of stats snapshots to keep in memory"); enum RepFactory { kSkipList, @@ -3565,6 +3571,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.dump_malloc_stats = FLAGS_dump_malloc_stats; options.stats_dump_period_sec = static_cast(FLAGS_stats_dump_period_sec); + options.stats_persist_period_sec = + static_cast(FLAGS_stats_persist_period_sec); + options.stats_history_buffer_size = + static_cast(FLAGS_stats_history_buffer_size); options.compression_opts.level = FLAGS_compression_level; options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 3506234f9..770ba5772 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -46,6 +46,8 @@ class RepeatableThread { thread_.join(); } + bool IsRunning() { return running_; } + ~RepeatableThread() { cancel(); } #ifndef NDEBUG