add GetStatsHistory to retrieve stats snapshots (#4748)

Summary:
This PR adds public `GetStatsHistory` API to retrieve stats history in the form of an std map. The key of the map is the timestamp in microseconds when the stats snapshot is taken, the value is another std map from stats name to stats value (stored in std string). Two DBOptions are introduced: `stats_persist_period_sec` (default 10 minutes) controls the intervals between two snapshots are taken; `max_stats_history_count` (default 10) controls the max number of history snapshots to keep in memory. RocksDB will stop collecting stats snapshots if `stats_persist_period_sec` is set to 0.

(This PR is the in-memory part of https://github.com/facebook/rocksdb/pull/4535)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4748

Differential Revision: D13961471

Pulled By: miasantreble

fbshipit-source-id: ac836d401ecb84ea92216bf9966f969dedf4ad04
main
Zhongyi Xie 5 years ago committed by Facebook Github Bot
parent 48c8d8445e
commit c4f5d0aa15
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 1
      TARGETS
  4. 134
      db/db_impl.cc
  5. 36
      db/db_impl.h
  6. 16
      db/db_impl_debug.cc
  7. 205
      db/db_options_test.cc
  8. 45
      db/in_memory_stats_history.cc
  9. 55
      db/in_memory_stats_history.h
  10. 9
      include/rocksdb/db.h
  11. 9
      include/rocksdb/options.h
  12. 8
      include/rocksdb/statistics.h
  13. 49
      include/rocksdb/stats_history.h
  14. 28
      java/rocksjni/portal.h
  15. 13
      monitoring/statistics.cc
  16. 4
      monitoring/statistics.h
  17. 8
      options/db_options.cc
  18. 2
      options/db_options.h
  19. 12
      options/options_helper.cc
  20. 2
      options/options_settable_test.cc
  21. 4
      options/options_test.cc
  22. 1
      src.mk
  23. 10
      tools/db_bench_tool.cc
  24. 2
      util/repeatable_thread.h

@ -501,6 +501,7 @@ set(SOURCES
db/flush_scheduler.cc
db/forward_iterator.cc
db/internal_stats.cc
db/in_memory_stats_history.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc
db/log_writer.cc

@ -8,6 +8,7 @@
* For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds.
* For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed.
* Add support for block checksums verification for external SST files before ingestion.
* Introduce stats history which periodically saves Statistics snapshots and added `GetStatsHistory` API to retrieve these snapshots.
* Add a place holder in manifest which indicate a record from future that can be safely ignored.
* Add support for trace sampling.
* Enable properties block checksum verification for block-based tables.

@ -110,6 +110,7 @@ cpp_library(
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/in_memory_stats_history.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",

@ -37,6 +37,7 @@
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/in_memory_stats_history.h"
#include "db/job_context.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
@ -69,6 +70,7 @@
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/write_buffer_manager.h"
@ -387,16 +389,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");
InstrumentedMutexLock l(&mutex_);
// To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called
// before grabbing db mutex because the actual worker function
// `DBImpl::DumpStats()` also holds db mutex
if (thread_dump_stats_ != nullptr) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
thread_dump_stats_.reset();
}
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->cancel();
thread_persist_stats_.reset();
}
InstrumentedMutexLock l(&mutex_);
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
@ -620,6 +621,7 @@ void DBImpl::PrintStatistics() {
void DBImpl::StartTimedTasks() {
unsigned int stats_dump_period_sec = 0;
unsigned int stats_persist_period_sec = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
@ -630,7 +632,112 @@ void DBImpl::StartTimedTasks() {
stats_dump_period_sec * 1000000));
}
}
stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
if (stats_persist_period_sec > 0) {
if (!thread_persist_stats_) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
stats_persist_period_sec * 1000000));
}
}
}
}
// esitmate the total size of stats_history_
size_t DBImpl::EstiamteStatsHistorySize() const {
size_t size_total =
sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
if (stats_history_.size() == 0) return size_total;
size_t size_per_slice =
sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
// non-empty map, stats_history_.begin() guaranteed to exist
std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
for (const auto& pairs : sample_slice) {
size_per_slice +=
pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
}
size_total = size_per_slice * stats_history_.size();
return size_total;
}
void DBImpl::PersistStats() {
TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
#ifndef ROCKSDB_LITE
if (shutdown_initiated_) {
return;
}
uint64_t now_micros = env_->NowMicros();
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
return;
}
size_t stats_history_size_limit = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
}
// TODO(Zhongyi): also persist immutable_db_options_.statistics
{
std::map<std::string, uint64_t> stats_map;
if (!statistics->getTickerMap(&stats_map)) {
return;
}
InstrumentedMutexLock l(&stats_history_mutex_);
// calculate the delta from last time
if (stats_slice_initialized_) {
std::map<std::string, uint64_t> stats_delta;
for (const auto& stat : stats_map) {
if (stats_slice_.find(stat.first) != stats_slice_.end()) {
stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
}
}
stats_history_[now_micros] = stats_delta;
}
stats_slice_initialized_ = true;
std::swap(stats_slice_, stats_map);
TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
// delete older stats snapshots to control memory consumption
bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
while (purge_needed && !stats_history_.empty()) {
stats_history_.erase(stats_history_.begin());
purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
}
}
// TODO: persist stats to disk
#endif // !ROCKSDB_LITE
}
bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map) {
assert(new_time);
assert(stats_map);
if (!new_time || !stats_map) return false;
// lock when search for start_time
{
InstrumentedMutexLock l(&stats_history_mutex_);
auto it = stats_history_.lower_bound(start_time);
if (it != stats_history_.end() && it->first < end_time) {
// make a copy for timestamp and stats_map
*new_time = it->first;
*stats_map = it->second;
return true;
} else {
return false;
}
}
}
Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
if (!stats_iterator) {
return Status::InvalidArgument("stats_iterator not preallocated.");
}
stats_iterator->reset(
new InMemoryStatsHistoryIterator(start_time, end_time, this));
return (*stats_iterator)->status();
}
void DBImpl::DumpStats() {
@ -819,6 +926,21 @@ Status DBImpl::SetDBOptions(
thread_dump_stats_.reset();
}
}
if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (thread_persist_stats_) {
mutex_.Unlock();
thread_persist_stats_->cancel();
mutex_.Lock();
}
if (new_options.stats_persist_period_sec > 0) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
new_options.stats_persist_period_sec * 1000000));
} else {
thread_persist_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1

@ -63,6 +63,7 @@ namespace rocksdb {
class Arena;
class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class TableCache;
class TaskLimiterToken;
@ -482,7 +483,10 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForTimedTaskRun(std::function<void()> callback) const;
void TEST_WaitForDumpStatsRun(std::function<void()> callback) const;
void TEST_WaitForPersistStatsRun(std::function<void()> callback) const;
bool TEST_IsPersistentStatsEnabled() const;
size_t TEST_EstiamteStatsHistorySize() const;
#endif // NDEBUG
@ -728,6 +732,17 @@ class DBImpl : public DB {
static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory);
// Given a time window, return an iterator for accessing stats history
Status GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
// find stats map from stats_history_ with smallest timestamp in
// the range of [start_time, end_time)
bool FindStatsByTime(uint64_t start_time, uint64_t end_time,
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map);
protected:
Env* const env_;
const std::string dbname_;
@ -1135,6 +1150,11 @@ class DBImpl : public DB {
void PrintStatistics();
size_t EstiamteStatsHistorySize() const;
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
@ -1177,6 +1197,8 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;
// In addition to mutex_, log_write_mutex_ protected writes to stats_history_
InstrumentedMutex stats_history_mutex_;
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
// logfile_number_. With two_write_queues it also protects alive_log_files_,
// and log_empty_. Refer to the definition of each variable below for more
@ -1299,6 +1321,12 @@ class DBImpl : public DB {
bool is_snapshot_supported_;
std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_;
std::map<std::string, uint64_t> stats_slice_;
bool stats_slice_initialized_ = false;
// Class to maintain directories for all database paths other than main one.
class Directories {
public:
@ -1544,10 +1572,14 @@ class DBImpl : public DB {
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// handle for scheduling jobs at fixed intervals
// handle for scheduling stats dumping at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_dump_stats_;
// handle for scheduling stats snapshoting at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_persist_stats_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

@ -243,10 +243,24 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size);
}
void DBImpl::TEST_WaitForTimedTaskRun(std::function<void()> callback) const {
void DBImpl::TEST_WaitForDumpStatsRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
}
}
void DBImpl::TEST_WaitForPersistStatsRun(std::function<void()> callback) const {
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->TEST_WaitForRun(callback);
}
}
bool DBImpl::TEST_IsPersistentStatsEnabled() const {
return thread_persist_stats_ && thread_persist_stats_->IsRunning();
}
size_t DBImpl::TEST_EstiamteStatsHistorySize() const {
return EstiamteStatsHistorySize();
}
} // namespace rocksdb
#endif // NDEBUG

@ -18,12 +18,15 @@
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/stats_history.h"
#include "util/random.h"
#include "util/sync_point.h"
#include "util/testutil.h"
namespace rocksdb {
const int kMicrosInSec = 1000000;
class DBOptionsTest : public DBTestBase {
public:
DBOptionsTest() : DBTestBase("/db_options_test") {}
@ -508,10 +511,11 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
for (int i = 0; i < 20; i++) {
int num = rand() % 5000 + 1;
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", std::to_string(num)}}));
ASSERT_OK(
dbfull()->SetDBOptions({{"stats_dump_period_sec", ToString(num)}}));
ASSERT_EQ(num, dbfull()->GetDBOptions().stats_dump_period_sec);
}
Close();
}
TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
@ -530,17 +534,210 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); });
dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
// Test cacel job through SetOptions
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}}));
int old_val = counter;
env_->SleepForMicroseconds(10000000);
for (int i = 6; i < 20; ++i) {
dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); });
}
ASSERT_EQ(counter, old_val);
Close();
}
// Test persistent stats background thread scheduling and cancelling
TEST_F(DBOptionsTest, StatsPersistScheduling) {
Options options;
options.create_if_missing = true;
options.stats_persist_period_sec = 5;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec);
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
// Test cacel job through SetOptions
ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled());
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}}));
ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled());
Close();
}
// Test enabling persistent stats for the first time
TEST_F(DBOptionsTest, PersistentStatsFreshInstall) {
Options options;
options.create_if_missing = true;
options.stats_persist_period_sec = 0;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}}));
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec);
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
Close();
}
TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) {
Options options;
options.create_if_missing = true;
options.stats_persist_period_sec = 5;
options.env = env_;
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "12345"}}));
ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_NOK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "abcde"}}));
ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec);
}
TEST_F(DBOptionsTest, GetStatsHistory) {
Options options;
options.create_if_missing = true;
options.stats_persist_period_sec = 5;
options.statistics = rocksdb::CreateDBStatistics();
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
int mock_time = 1;
// Wait for stats persist to finish
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
std::unique_ptr<StatsHistoryIterator> stats_iter;
db_->GetStatsHistory(0, 6 * kMicrosInSec, &stats_iter);
ASSERT_TRUE(stats_iter != nullptr);
// disabled stats snapshots
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}}));
size_t stats_count = 0;
for (; stats_iter->Valid(); stats_iter->Next()) {
auto stats_map = stats_iter->GetStatsMap();
stats_count += stats_map.size();
}
ASSERT_GT(stats_count, 0);
// Wait a bit and verify no more stats are found
for (mock_time = 6; mock_time < 20; ++mock_time) {
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(mock_time); });
}
db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter);
ASSERT_TRUE(stats_iter != nullptr);
size_t stats_count_new = 0;
for (; stats_iter->Valid(); stats_iter->Next()) {
stats_count_new += stats_iter->GetStatsMap().size();
}
ASSERT_EQ(stats_count_new, stats_count);
Close();
}
TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) {
Options options;
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
options.stats_persist_period_sec = 1;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// some random operation to populate statistics
ASSERT_OK(Delete("foo"));
ASSERT_OK(Put("sol", "sol"));
ASSERT_OK(Put("epic", "epic"));
ASSERT_OK(Put("ltd", "ltd"));
ASSERT_EQ("sol", Get("sol"));
ASSERT_EQ("epic", Get("epic"));
ASSERT_EQ("ltd", Get("ltd"));
Iterator* iterator = db_->NewIterator(ReadOptions());
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
ASSERT_TRUE(iterator->key() == iterator->value());
}
delete iterator;
ASSERT_OK(Flush());
ASSERT_OK(Delete("sol"));
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
int mock_time = 1;
// Wait for stats persist to finish
for (; mock_time < 5; ++mock_time) {
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(mock_time); });
}
// second round of ops
ASSERT_OK(Put("saigon", "saigon"));
ASSERT_OK(Put("noodle talk", "noodle talk"));
ASSERT_OK(Put("ping bistro", "ping bistro"));
iterator = db_->NewIterator(ReadOptions());
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
ASSERT_TRUE(iterator->key() == iterator->value());
}
delete iterator;
ASSERT_OK(Flush());
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
for (; mock_time < 10; ++mock_time) {
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(mock_time); });
}
std::unique_ptr<StatsHistoryIterator> stats_iter;
db_->GetStatsHistory(0, 10 * kMicrosInSec, &stats_iter);
ASSERT_TRUE(stats_iter != nullptr);
size_t stats_count = 0;
int slice_count = 0;
for (; stats_iter->Valid(); stats_iter->Next()) {
slice_count++;
auto stats_map = stats_iter->GetStatsMap();
stats_count += stats_map.size();
}
size_t stats_history_size = dbfull()->TEST_EstiamteStatsHistorySize();
ASSERT_GE(slice_count, 9);
ASSERT_GE(stats_history_size, 12000);
// capping memory cost at 12000 bytes since one slice is around 10000~12000
ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}}));
ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size);
// Wait for stats persist to finish
for (; mock_time < 20; ++mock_time) {
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(mock_time); });
}
db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter);
ASSERT_TRUE(stats_iter != nullptr);
size_t stats_count_reopen = 0;
slice_count = 0;
for (; stats_iter->Valid(); stats_iter->Next()) {
slice_count++;
auto stats_map = stats_iter->GetStatsMap();
stats_count_reopen += stats_map.size();
}
size_t stats_history_size_reopen = dbfull()->TEST_EstiamteStatsHistorySize();
// only one slice can fit under the new stats_history_buffer_size
ASSERT_LT(slice_count, 2);
ASSERT_TRUE(stats_history_size_reopen < 12000 &&
stats_history_size_reopen > 0);
ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0);
Close();
}
static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) {
dbfull->TEST_LockMutex();
JobContext job_context(0);

@ -0,0 +1,45 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include "db/in_memory_stats_history.h"
namespace rocksdb {
InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {}
bool InMemoryStatsHistoryIterator::Valid() const { return valid_; }
Status InMemoryStatsHistoryIterator::status() const { return status_; }
void InMemoryStatsHistoryIterator::Next() {
// increment start_time by 1 to avoid infinite loop
AdvanceIteratorByTime(GetStatsTime() + 1, end_time_);
}
uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; }
const std::map<std::string, uint64_t>&
InMemoryStatsHistoryIterator::GetStatsMap() const {
return stats_map_;
}
// advance the iterator to the next time between [start_time, end_time)
// if success, update time_ and stats_map_ with new_time and stats_map
void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time,
uint64_t end_time) {
// try to find next entry in stats_history_ map
if (db_impl_ != nullptr) {
valid_ =
db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_);
} else {
valid_ = false;
}
}
} // namespace rocksdb

@ -0,0 +1,55 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/stats_history.h"
namespace rocksdb {
class InMemoryStatsHistoryIterator final : public StatsHistoryIterator {
public:
InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time,
DBImpl* db_impl)
: start_time_(start_time),
end_time_(end_time),
valid_(true),
db_impl_(db_impl) {
AdvanceIteratorByTime(start_time_, end_time_);
}
~InMemoryStatsHistoryIterator() override;
bool Valid() const override;
Status status() const override;
void Next() override;
uint64_t GetStatsTime() const override;
const std::map<std::string, uint64_t>& GetStatsMap() const override;
private:
// advance the iterator to the next stats history record with timestamp
// between [start_time, end_time)
void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time);
// No copying allowed
InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete;
void operator=(const InMemoryStatsHistoryIterator&) = delete;
InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete;
InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) =
delete;
uint64_t time_;
uint64_t start_time_;
uint64_t end_time_;
std::map<std::string, uint64_t> stats_map_;
Status status_;
bool valid_;
DBImpl* db_impl_;
};
} // namespace rocksdb

@ -52,6 +52,7 @@ struct ExternalSstFileInfo;
class WriteBatch;
class Env;
class EventListener;
class StatsHistoryIterator;
class TraceWriter;
#ifdef ROCKSDB_LITE
class CompactionJobInfo;
@ -1226,6 +1227,14 @@ class DB {
// Needed for StackableDB
virtual DB* GetRootDB() { return this; }
// Given a time window, return an iterator for accessing stats history
// User is responsible for deleting StatsHistoryIterator after use
virtual Status GetStatsHistory(uint64_t /*start_time*/,
uint64_t /*end_time*/,
std::unique_ptr<StatsHistoryIterator>* /*stats_iterator*/) {
return Status::NotSupported("GetStatsHistory() is not implemented.");
}
private:
// No copying allowed
DB(const DB&);

@ -677,6 +677,15 @@ struct DBOptions {
// Dynamically changeable through SetDBOptions() API.
unsigned int stats_dump_period_sec = 600;
// if not zero, dump rocksdb.stats to RocksDB every stats_persist_period_sec
// Default: 600
unsigned int stats_persist_period_sec = 600;
// if not zero, periodically take stats snapshots and store in memory, the
// memory size for stats snapshots is capped at stats_history_buffer_size
// Default: 1MB
size_t stats_history_buffer_size = 1024 * 1024;
// If set true, will hint the underlying file system that the file
// access pattern is random, when a sst file is opened.
// Default: true

@ -8,8 +8,9 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <string>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/status.h"
@ -479,6 +480,11 @@ class Statistics {
return std::string("ToString(): not implemented");
}
virtual bool getTickerMap(std::map<std::string, uint64_t>*) const {
// Do nothing by default
return false;
};
// Override this function to disable particular histogram collection
virtual bool HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;

@ -0,0 +1,49 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <map>
#include <string>
// #include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace rocksdb {
class DBImpl;
class StatsHistoryIterator {
public:
StatsHistoryIterator() {}
virtual ~StatsHistoryIterator() {}
virtual bool Valid() const = 0;
// Moves to the next stats history record. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
virtual void Next() = 0;
// Return the time stamp (in microseconds) when stats history is recorded.
// REQUIRES: Valid()
virtual uint64_t GetStatsTime() const = 0;
// Return the current stats history as an std::map which specifies the
// mapping from stats name to stats value . The underlying storage
// for the returned map is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual const std::map<std::string, uint64_t>& GetStatsMap() const = 0;
// If an error has occurred, return it. Else return an ok status.
virtual Status status() const = 0;
};
} // namespace rocksdb

@ -4522,7 +4522,7 @@ class JniUtil {
* if an OutOfMemoryError or ArrayIndexOutOfBoundsException
* exception occurs
*
* @return A std::vector<std:string> containing copies of the Java strings
* @return A std::vector<std::string> containing copies of the Java strings
*/
static std::vector<std::string> copyStrings(JNIEnv* env,
jobjectArray jss, jboolean* has_exception) {
@ -4540,7 +4540,7 @@ class JniUtil {
* if an OutOfMemoryError or ArrayIndexOutOfBoundsException
* exception occurs
*
* @return A std::vector<std:string> containing copies of the Java strings
* @return A std::vector<std::string> containing copies of the Java strings
*/
static std::vector<std::string> copyStrings(JNIEnv* env,
jobjectArray jss, const jsize jss_len, jboolean* has_exception) {
@ -4624,7 +4624,7 @@ class JniUtil {
* @param has_exception (OUT) will be set to JNI_TRUE
* if an OutOfMemoryError exception occurs
*
* @return A std:string copy of the jstring, or an
* @return A std::string copy of the jstring, or an
* empty std::string if has_exception == JNI_TRUE
*/
static std::string copyStdString(JNIEnv* env, jstring js,
@ -4655,8 +4655,8 @@ class JniUtil {
* @param bytes The bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation.
*/
static jbyteArray copyBytes(JNIEnv* env, std::string bytes) {
@ -4823,7 +4823,7 @@ class JniUtil {
return jbyte_strings;
}
/**
* Copies bytes to a new jByteArray with the check of java array size limitation.
*
@ -4831,29 +4831,29 @@ class JniUtil {
* @param size number of bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java array size limitation to avoid overflow.
*/
static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) {
// Limitation for java array size is vm specific
// In general it cannot exceed Integer.MAX_VALUE (2^31 - 1)
// Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5)
// It means that the next call to env->NewByteArray can still end with
// It means that the next call to env->NewByteArray can still end with
// OutOfMemoryError("Requested array size exceeds VM limit") coming from VM
static const size_t MAX_JARRAY_SIZE = (static_cast<size_t>(1)) << 31;
if(size > MAX_JARRAY_SIZE) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit");
return nullptr;
}
const jsize jlen = static_cast<jsize>(size);
jbyteArray jbytes = env->NewByteArray(jlen);
if(jbytes == nullptr) {
// exception thrown: OutOfMemoryError
// exception thrown: OutOfMemoryError
return nullptr;
}
env->SetByteArrayRegion(jbytes, 0, jlen,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(bytes)));
if(env->ExceptionCheck()) {
@ -4872,8 +4872,8 @@ class JniUtil {
* @param bytes The bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation.
*/
static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) {

@ -384,6 +384,19 @@ std::string StatisticsImpl::ToString() const {
return res;
}
bool StatisticsImpl::getTickerMap(
std::map<std::string, uint64_t>* stats_map) const {
assert(stats_map);
if (!stats_map) return false;
stats_map->clear();
MutexLock lock(&aggregate_lock_);
for (const auto& t : TickersNameMap) {
assert(t.first < TICKER_ENUM_MAX);
(*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first);
}
return true;
}
bool StatisticsImpl::HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;
}

@ -6,9 +6,10 @@
#pragma once
#include "rocksdb/statistics.h"
#include <vector>
#include <atomic>
#include <map>
#include <string>
#include <vector>
#include "monitoring/histogram.h"
#include "port/likely.h"
@ -56,6 +57,7 @@ class StatisticsImpl : public Statistics {
virtual Status Reset() override;
virtual std::string ToString() const override;
virtual bool getTickerMap(std::map<std::string, uint64_t>*) const override;
virtual bool HistEnabledForType(uint32_t type) const override;
private:

@ -229,6 +229,8 @@ MutableDBOptions::MutableDBOptions()
max_total_wal_size(0),
delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000),
stats_dump_period_sec(600),
stats_persist_period_sec(600),
stats_history_buffer_size(1024 * 1024),
max_open_files(-1),
bytes_per_sync(0),
wal_bytes_per_sync(0),
@ -245,6 +247,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
stats_dump_period_sec(options.stats_dump_period_sec),
stats_persist_period_sec(options.stats_persist_period_sec),
stats_history_buffer_size(options.stats_history_buffer_size),
max_open_files(options.max_open_files),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
@ -269,6 +273,10 @@ void MutableDBOptions::Dump(Logger* log) const {
delete_obsolete_files_period_micros);
ROCKS_LOG_HEADER(log, " Options.stats_dump_period_sec: %u",
stats_dump_period_sec);
ROCKS_LOG_HEADER(log, " Options.stats_persist_period_sec: %d",
stats_persist_period_sec);
ROCKS_LOG_HEADER(log, " Options.stats_history_buffer_size: %d",
stats_history_buffer_size);
ROCKS_LOG_HEADER(log, " Options.max_open_files: %d",
max_open_files);
ROCKS_LOG_HEADER(log,

@ -97,6 +97,8 @@ struct MutableDBOptions {
uint64_t max_total_wal_size;
uint64_t delete_obsolete_files_period_micros;
unsigned int stats_dump_period_sec;
unsigned int stats_persist_period_sec;
size_t stats_history_buffer_size;
int max_open_files;
uint64_t bytes_per_sync;
uint64_t wal_bytes_per_sync;

@ -79,6 +79,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.allow_fallocate = immutable_db_options.allow_fallocate;
options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec;
options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec;
options.stats_persist_period_sec =
mutable_db_options.stats_persist_period_sec;
options.stats_history_buffer_size =
mutable_db_options.stats_history_buffer_size;
options.advise_random_on_open = immutable_db_options.advise_random_on_open;
options.db_write_buffer_size = immutable_db_options.db_write_buffer_size;
options.write_buffer_manager = immutable_db_options.write_buffer_manager;
@ -1495,6 +1499,14 @@ std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt,
OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_dump_period_sec)}},
{"stats_persist_period_sec",
{offsetof(struct DBOptions, stats_persist_period_sec),
OptionType::kUInt, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_persist_period_sec)}},
{"stats_history_buffer_size",
{offsetof(struct DBOptions, stats_history_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_history_buffer_size)}},
{"fail_if_options_file_error",
{offsetof(struct DBOptions, fail_if_options_file_error),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},

@ -266,6 +266,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"manifest_preallocation_size=1222;"
"allow_mmap_writes=false;"
"stats_dump_period_sec=70127;"
"stats_persist_period_sec=54321;"
"stats_history_buffer_size=14159;"
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"use_direct_reads=false;"

@ -128,6 +128,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"is_fd_close_on_exec", "true"},
{"skip_log_error_on_recovery", "false"},
{"stats_dump_period_sec", "46"},
{"stats_persist_period_sec", "57"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
@ -262,6 +264,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true);
ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false);
ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U);
ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);

@ -34,6 +34,7 @@ LIB_SOURCES = \
db/flush_job.cc \
db/flush_scheduler.cc \
db/forward_iterator.cc \
db/in_memory_stats_history.cc \
db/internal_stats.cc \
db/logs_with_prep_tracker.cc \
db/log_reader.cc \

@ -1095,6 +1095,12 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
"Gap between printing stats to log in seconds");
DEFINE_uint64(stats_persist_period_sec,
rocksdb::Options().stats_persist_period_sec,
"Gap between persisting stats in seconds");
DEFINE_uint64(stats_history_buffer_size,
rocksdb::Options().stats_history_buffer_size,
"Max number of stats snapshots to keep in memory");
enum RepFactory {
kSkipList,
@ -3565,6 +3571,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.dump_malloc_stats = FLAGS_dump_malloc_stats;
options.stats_dump_period_sec =
static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
options.stats_persist_period_sec =
static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
options.stats_history_buffer_size =
static_cast<size_t>(FLAGS_stats_history_buffer_size);
options.compression_opts.level = FLAGS_compression_level;
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;

@ -46,6 +46,8 @@ class RepeatableThread {
thread_.join();
}
bool IsRunning() { return running_; }
~RepeatableThread() { cancel(); }
#ifndef NDEBUG

Loading…
Cancel
Save