From 671d15cbdd3839acb54cb21a2aa82efca4917155 Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Mon, 17 Jun 2019 15:17:43 -0700 Subject: [PATCH] Persistent Stats: persist stats history to disk (#5046) Summary: This PR continues the work in https://github.com/facebook/rocksdb/pull/4748 and https://github.com/facebook/rocksdb/pull/4535 by adding a new DBOption `persist_stats_to_disk` which instructs RocksDB to persist stats history to RocksDB itself. When statistics is enabled, and both options `stats_persist_period_sec` and `persist_stats_to_disk` are set, RocksDB will periodically write stats to a built-in column family in the following form: key -> (timestamp in microseconds)#(stats name), value -> stats value. The existing API `GetStatsHistory` will detect the current value of `persist_stats_to_disk` and either read from in-memory data structure or from the hidden column family on disk. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5046 Differential Revision: D15863138 Pulled By: miasantreble fbshipit-source-id: bb82abdb3f2ca581aa42531734ac799f113e931b --- CMakeLists.txt | 4 +- Makefile | 4 + TARGETS | 8 +- db/db_impl/db_impl.cc | 91 ++- db/db_impl/db_impl.h | 26 +- db/db_impl/db_impl_debug.cc | 4 +- db/db_impl/db_impl_open.cc | 117 +++- db/db_options_test.cc | 265 -------- db/version_set.cc | 19 +- include/rocksdb/db.h | 3 +- include/rocksdb/options.h | 12 + include/rocksdb/stats_history.h | 4 +- {db => monitoring}/in_memory_stats_history.cc | 2 +- {db => monitoring}/in_memory_stats_history.h | 2 +- monitoring/persistent_stats_history.cc | 171 ++++++ monitoring/persistent_stats_history.h | 83 +++ monitoring/stats_history_test.cc | 576 ++++++++++++++++++ options/db_options.cc | 5 +- options/db_options.h | 1 + options/options.cc | 1 - options/options_helper.cc | 5 + options/options_settable_test.cc | 1 + options/options_test.cc | 2 + src.mk | 66 +- tools/db_bench_tool.cc | 3 + 25 files changed, 1143 insertions(+), 332 deletions(-) rename {db => monitoring}/in_memory_stats_history.cc (97%) rename {db => monitoring}/in_memory_stats_history.h (98%) create mode 100644 monitoring/persistent_stats_history.cc create mode 100644 monitoring/persistent_stats_history.h create mode 100644 monitoring/stats_history_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index eda1281e1..7ff61dca9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -521,7 +521,6 @@ set(SOURCES db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc - db/in_memory_stats_history.cc db/logs_with_prep_tracker.cc db/log_reader.cc db/log_writer.cc @@ -568,10 +567,12 @@ set(SOURCES memtable/write_buffer_manager.cc monitoring/histogram.cc monitoring/histogram_windowing.cc + monitoring/in_memory_stats_history.cc monitoring/instrumented_mutex.cc monitoring/iostats_context.cc monitoring/perf_context.cc monitoring/perf_level.cc + monitoring/persistent_stats_history.cc monitoring/statistics.cc monitoring/thread_status_impl.cc monitoring/thread_status_updater.cc @@ -955,6 +956,7 @@ if(WITH_TESTS) monitoring/histogram_test.cc monitoring/iostats_context_test.cc monitoring/statistics_test.cc + monitoring/stats_history_test.cc options/options_settable_test.cc options/options_test.cc table/block_based/block_based_filter_block_test.cc diff --git a/Makefile b/Makefile index 5944325aa..a499cbbed 100644 --- a/Makefile +++ b/Makefile @@ -548,6 +548,7 @@ TESTS = \ ldb_cmd_test \ persistent_cache_test \ statistics_test \ + stats_history_test \ lru_cache_test \ object_registry_test \ repair_test \ @@ -1566,6 +1567,9 @@ persistent_cache_test: utilities/persistent_cache/persistent_cache_test.o db/db statistics_test: monitoring/statistics_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +stats_history_test: monitoring/stats_history_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + lru_cache_test: cache/lru_cache_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 45a99a55d..a43ed6b10 100644 --- a/TARGETS +++ b/TARGETS @@ -113,7 +113,6 @@ cpp_library( "db/flush_job.cc", "db/flush_scheduler.cc", "db/forward_iterator.cc", - "db/in_memory_stats_history.cc", "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", @@ -163,10 +162,12 @@ cpp_library( "memtable/write_buffer_manager.cc", "monitoring/histogram.cc", "monitoring/histogram_windowing.cc", + "monitoring/in_memory_stats_history.cc", "monitoring/instrumented_mutex.cc", "monitoring/iostats_context.cc", "monitoring/perf_context.cc", "monitoring/perf_level.cc", + "monitoring/persistent_stats_history.cc", "monitoring/statistics.cc", "monitoring/thread_status_impl.cc", "monitoring/thread_status_updater.cc", @@ -971,6 +972,11 @@ ROCKS_TESTS = [ "monitoring/statistics_test.cc", "serial", ], + [ + "stats_history_test", + "monitoring/stats_history_test.cc", + "serial", + ], [ "stringappend_test", "utilities/merge_operators/string_append/stringappend_test.cc", diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 154e6dd23..21b8f3d91 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -34,7 +34,6 @@ #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" -#include "db/in_memory_stats_history.h" #include "db/job_context.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -58,8 +57,10 @@ #include "logging/logging.h" #include "memtable/hash_linklist_rep.h" #include "memtable/hash_skiplist_rep.h" +#include "monitoring/in_memory_stats_history.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" +#include "monitoring/persistent_stats_history.h" #include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_util.h" #include "options/cf_options.h" @@ -98,6 +99,9 @@ namespace rocksdb { const std::string kDefaultColumnFamilyName("default"); +const std::string kPersistentStatsColumnFamilyName( + "___rocksdb_stats_history___"); +const int kMicrosInSecond = 1000 * 1000; void DumpRocksDBBuildVersion(Logger* log); CompressionType GetCompressionFlush( @@ -162,6 +166,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, logfile_number_(0), log_dir_synced_(false), log_empty_(true), + persist_stats_cf_handle_(nullptr), log_sync_cv_(&mutex_), total_log_size_(0), is_snapshot_supported_(true), @@ -482,10 +487,17 @@ Status DBImpl::CloseHelper() { } } - if (default_cf_handle_ != nullptr) { + if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking mutex_.Unlock(); - delete default_cf_handle_; + if (default_cf_handle_) { + delete default_cf_handle_; + default_cf_handle_ = nullptr; + } + if (persist_stats_cf_handle_) { + delete persist_stats_cf_handle_; + persist_stats_cf_handle_ = nullptr; + } mutex_.Lock(); } @@ -634,7 +646,7 @@ void DBImpl::StartTimedTasks() { if (!thread_dump_stats_) { thread_dump_stats_.reset(new rocksdb::RepeatableThread( [this]() { DBImpl::DumpStats(); }, "dump_st", env_, - stats_dump_period_sec * 1000000)); + static_cast(stats_dump_period_sec) * kMicrosInSecond)); } } stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec; @@ -642,14 +654,14 @@ void DBImpl::StartTimedTasks() { if (!thread_persist_stats_) { thread_persist_stats_.reset(new rocksdb::RepeatableThread( [this]() { DBImpl::PersistStats(); }, "pst_st", env_, - stats_persist_period_sec * 1000000)); + static_cast(stats_persist_period_sec) * kMicrosInSecond)); } } } } // esitmate the total size of stats_history_ -size_t DBImpl::EstiamteStatsHistorySize() const { +size_t DBImpl::EstimateInMemoryStatsHistorySize() const { size_t size_total = sizeof(std::map>); if (stats_history_.size() == 0) return size_total; @@ -671,7 +683,7 @@ void DBImpl::PersistStats() { if (shutdown_initiated_) { return; } - uint64_t now_micros = env_->NowMicros(); + uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond; Statistics* statistics = immutable_db_options_.statistics.get(); if (!statistics) { return; @@ -682,12 +694,40 @@ void DBImpl::PersistStats() { stats_history_size_limit = mutable_db_options_.stats_history_buffer_size; } - // TODO(Zhongyi): also persist immutable_db_options_.statistics - { - std::map stats_map; - if (!statistics->getTickerMap(&stats_map)) { - return; + std::map stats_map; + if (!statistics->getTickerMap(&stats_map)) { + return; + } + + if (immutable_db_options_.persist_stats_to_disk) { + WriteBatch batch; + if (stats_slice_initialized_) { + for (const auto& stat : stats_map) { + char key[100]; + int length = + EncodePersistentStatsKey(now_seconds, stat.first, 100, key); + // calculate the delta from last time + if (stats_slice_.find(stat.first) != stats_slice_.end()) { + uint64_t delta = stat.second - stats_slice_[stat.first]; + batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)), + ToString(delta)); + } + } } + stats_slice_initialized_ = true; + std::swap(stats_slice_, stats_map); + WriteOptions wo; + wo.low_pri = true; + wo.no_slowdown = true; + wo.sync = false; + Status s = Write(wo, &batch); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Writing to persistent stats CF failed -- %s\n", + s.ToString().c_str()); + } + // TODO(Zhongyi): add purging for persisted data + } else { InstrumentedMutexLock l(&stats_history_mutex_); // calculate the delta from last time if (stats_slice_initialized_) { @@ -697,17 +737,19 @@ void DBImpl::PersistStats() { stats_delta[stat.first] = stat.second - stats_slice_[stat.first]; } } - stats_history_[now_micros] = stats_delta; + stats_history_[now_seconds] = stats_delta; } stats_slice_initialized_ = true; std::swap(stats_slice_, stats_map); TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied"); // delete older stats snapshots to control memory consumption - bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + bool purge_needed = + EstimateInMemoryStatsHistorySize() > stats_history_size_limit; while (purge_needed && !stats_history_.empty()) { stats_history_.erase(stats_history_.begin()); - purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit; + purge_needed = + EstimateInMemoryStatsHistorySize() > stats_history_size_limit; } } // TODO: persist stats to disk @@ -741,8 +783,13 @@ Status DBImpl::GetStatsHistory( if (!stats_iterator) { return Status::InvalidArgument("stats_iterator not preallocated."); } - stats_iterator->reset( - new InMemoryStatsHistoryIterator(start_time, end_time, this)); + if (immutable_db_options_.persist_stats_to_disk) { + stats_iterator->reset( + new PersistentStatsHistoryIterator(start_time, end_time, this)); + } else { + stats_iterator->reset( + new InMemoryStatsHistoryIterator(start_time, end_time, this)); + } return (*stats_iterator)->status(); } @@ -946,7 +993,8 @@ Status DBImpl::SetDBOptions( if (new_options.stats_dump_period_sec > 0) { thread_dump_stats_.reset(new rocksdb::RepeatableThread( [this]() { DBImpl::DumpStats(); }, "dump_st", env_, - new_options.stats_dump_period_sec * 1000000)); + static_cast(new_options.stats_dump_period_sec) * + kMicrosInSecond)); } else { thread_dump_stats_.reset(); } @@ -961,7 +1009,8 @@ Status DBImpl::SetDBOptions( if (new_options.stats_persist_period_sec > 0) { thread_persist_stats_.reset(new rocksdb::RepeatableThread( [this]() { DBImpl::PersistStats(); }, "pst_st", env_, - new_options.stats_persist_period_sec * 1000000)); + static_cast(new_options.stats_persist_period_sec) * + kMicrosInSecond)); } else { thread_persist_stats_.reset(); } @@ -1373,6 +1422,10 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { return default_cf_handle_; } +ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const { + return persist_stats_cf_handle_; +} + Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 942c36ff6..e6d5a56e2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -66,6 +66,7 @@ class Arena; class ArenaWrappedDBIter; class InMemoryStatsHistoryIterator; class MemTable; +class PersistentStatsHistoryIterator; class TableCache; class TaskLimiterToken; class Version; @@ -268,6 +269,8 @@ class DBImpl : public DB { ColumnFamilyHandle* DefaultColumnFamily() const override; + ColumnFamilyHandle* PersistentStatsColumnFamily() const; + virtual Status Close() override; Status GetStatsHistory( @@ -822,7 +825,7 @@ class DBImpl : public DB { void TEST_WaitForDumpStatsRun(std::function callback) const; void TEST_WaitForPersistStatsRun(std::function callback) const; bool TEST_IsPersistentStatsEnabled() const; - size_t TEST_EstiamteStatsHistorySize() const; + size_t TEST_EstimateInMemoryStatsHistorySize() const; #endif // NDEBUG @@ -1016,6 +1019,7 @@ class DBImpl : public DB { friend class DBTest_MixedSlowdownOptionsStop_Test; friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test; friend class DBCompactionTest_CompactionDuringShutdown_Test; + friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test; #ifndef NDEBUG friend class DBTest2_ReadCallbackTest_Test; friend class WriteCallbackTest_WriteWithCallbackTest_Test; @@ -1176,6 +1180,21 @@ class DBImpl : public DB { PrepickedCompaction* prepicked_compaction; }; + // Initialize the built-in column family for persistent stats. Depending on + // whether on-disk persistent stats have been enabled before, it may either + // create a new column family and column family handle or just a column family + // handle. + // Required: DB mutex held + Status InitPersistStatsColumnFamily(); + + // Persistent Stats column family has two format version key which are used + // for compatibility check. Write format version if it's created for the + // first time, read format version and check compatibility if recovering + // from disk. This function requires DB mutex held at entrance but may + // release and re-acquire DB mutex in the process. + // Required: DB mutex held + Status PersistentStatsProcessFormatVersion(); + Status ResumeImpl(); void MaybeIgnoreError(Status* s) const; @@ -1424,7 +1443,7 @@ class DBImpl : public DB { void PrintStatistics(); - size_t EstiamteStatsHistorySize() const; + size_t EstimateInMemoryStatsHistorySize() const; // persist stats to column family "_persistent_stats" void PersistStats(); @@ -1571,6 +1590,9 @@ class DBImpl : public DB { // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; + ColumnFamilyHandleImpl* persist_stats_cf_handle_; + + bool persistent_stats_cfd_exists_ = true; // Without two_write_queues, read and writes to alive_log_files_ are // protected by mutex_. However since back() is never popped, and push_back() diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 4b558facb..ec1e1b477 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -262,8 +262,8 @@ bool DBImpl::TEST_IsPersistentStatsEnabled() const { return thread_persist_stats_ && thread_persist_stats_->IsRunning(); } -size_t DBImpl::TEST_EstiamteStatsHistorySize() const { - return EstiamteStatsHistorySize(); +size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const { + return EstimateInMemoryStatsHistorySize(); } } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index baa4fe707..eec7cf16a 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -13,6 +13,7 @@ #include "db/builder.h" #include "db/error_handler.h" #include "file/sst_file_manager_impl.h" +#include "monitoring/persistent_stats_history.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" #include "table/block_based/block_based_table_factory.h" @@ -375,6 +376,7 @@ Status DBImpl::Recover( } Status s = versions_->Recover(column_families, read_only); + if (immutable_db_options_.paranoid_checks && s.ok()) { s = CheckConsistency(); } @@ -386,6 +388,10 @@ Status DBImpl::Recover( } } } + // DB mutex is already held + if (s.ok() && immutable_db_options_.persist_stats_to_disk) { + s = InitPersistStatsColumnFamily(); + } // Initial max_total_in_memory_state_ before recovery logs. Log recovery // may check this value to decide whether to flush. @@ -401,6 +407,8 @@ Status DBImpl::Recover( default_cf_handle_ = new ColumnFamilyHandleImpl( versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + // TODO(Zhongyi): handle single_column_family_mode_ when + // persistent_stats is enabled single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; @@ -496,6 +504,98 @@ Status DBImpl::Recover( return s; } +Status DBImpl::PersistentStatsProcessFormatVersion() { + mutex_.AssertHeld(); + Status s; + // persist version when stats CF doesn't exist + bool should_persist_format_version = !persistent_stats_cfd_exists_; + mutex_.Unlock(); + if (persistent_stats_cfd_exists_) { + // Check persistent stats format version compatibility. Drop and recreate + // persistent stats CF if format version is incompatible + uint64_t format_version_recovered = 0; + Status s_format = DecodePersistentStatsVersionNumber( + this, StatsVersionKeyType::kFormatVersion, &format_version_recovered); + uint64_t compatible_version_recovered = 0; + Status s_compatible = DecodePersistentStatsVersionNumber( + this, StatsVersionKeyType::kCompatibleVersion, + &compatible_version_recovered); + // abort reading from existing stats CF if any of following is true: + // 1. failed to read format version or compatible version from disk + // 2. sst's format version is greater than current format version, meaning + // this sst is encoded with a newer RocksDB release, and current compatible + // version is below the sst's compatible version + if (!s_format.ok() || !s_compatible.ok() || + (kStatsCFCurrentFormatVersion < format_version_recovered && + kStatsCFCompatibleFormatVersion < compatible_version_recovered)) { + if (!s_format.ok() || !s_compatible.ok()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Reading persistent stats version key failed. Format key: %s, " + "compatible key: %s", + s_format.ToString().c_str(), s_compatible.ToString().c_str()); + } else { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Disable persistent stats due to corrupted or incompatible format " + "version\n"); + } + DropColumnFamily(persist_stats_cf_handle_); + DestroyColumnFamilyHandle(persist_stats_cf_handle_); + ColumnFamilyHandle* handle = nullptr; + ColumnFamilyOptions cfo; + OptimizeForPersistentStats(&cfo); + s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle); + persist_stats_cf_handle_ = static_cast(handle); + // should also persist version here because old stats CF is discarded + should_persist_format_version = true; + } + } + if (s.ok() && should_persist_format_version) { + // Persistent stats CF being created for the first time, need to write + // format version key + WriteBatch batch; + batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString, + ToString(kStatsCFCurrentFormatVersion)); + batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString, + ToString(kStatsCFCompatibleFormatVersion)); + WriteOptions wo; + wo.low_pri = true; + wo.no_slowdown = true; + wo.sync = false; + s = Write(wo, &batch); + } + mutex_.Lock(); + return s; +} + +Status DBImpl::InitPersistStatsColumnFamily() { + mutex_.AssertHeld(); + assert(!persist_stats_cf_handle_); + ColumnFamilyData* persistent_stats_cfd = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr; + + Status s; + if (persistent_stats_cfd != nullptr) { + // We are recovering from a DB which already contains persistent stats CF, + // the CF is already created in VersionSet::ApplyOneVersionEdit, but + // column family handle was not. Need to explicitly create handle here. + persist_stats_cf_handle_ = + new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_); + } else { + mutex_.Unlock(); + ColumnFamilyHandle* handle = nullptr; + ColumnFamilyOptions cfo; + OptimizeForPersistentStats(&cfo); + s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle); + persist_stats_cf_handle_ = static_cast(handle); + mutex_.Lock(); + } + return s; +} + // REQUIRES: log_numbers are sorted in ascending order Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only) { @@ -1065,12 +1165,23 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + if (db_options.persist_stats_to_disk) { + column_families.push_back( + ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options)); + } std::vector handles; Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr); if (s.ok()) { - assert(handles.size() == 1); + if (db_options.persist_stats_to_disk) { + assert(handles.size() == 2); + } else { + assert(handles.size() == 1); + } // i can delete the handle since DBImpl is always holding a reference to // default column family + if (db_options.persist_stats_to_disk && handles[1] != nullptr) { + delete handles[1]; + } delete handles[0]; } return s; @@ -1247,6 +1358,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, s = impl->directories_.GetDbDir()->Fsync(); } } + if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { + // try to read format version but no need to fail Open() even if it fails + s = impl->PersistentStatsProcessFormatVersion(); + } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { diff --git a/db/db_options_test.cc b/db/db_options_test.cc index bf3315328..7dd672646 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -518,114 +518,6 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) { Close(); } -TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { - Options options; - options.create_if_missing = true; - options.stats_dump_period_sec = 5; - std::unique_ptr mock_env; - mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); - int counter = 0; - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < mock_env->RealNowMicros()) { - *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; - } - }); -#endif // OS_MACOSX && !NDEBUG - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DumpStats:1", [&](void* /*arg*/) { - counter++; - }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - Reopen(options); - ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec); - dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); - ASSERT_GE(counter, 1); - - // Test cacel job through SetOptions - ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); - int old_val = counter; - for (int i = 6; i < 20; ++i) { - dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); - } - ASSERT_EQ(counter, old_val); - Close(); -} - -// Test persistent stats background thread scheduling and cancelling -TEST_F(DBOptionsTest, StatsPersistScheduling) { - Options options; - options.create_if_missing = true; - options.stats_persist_period_sec = 5; - std::unique_ptr mock_env; - mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < mock_env->RealNowMicros()) { - *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; - } - }); -#endif // OS_MACOSX && !NDEBUG - int counter = 0; - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - Reopen(options); - ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); - ASSERT_GE(counter, 1); - - // Test cacel job through SetOptions - ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); - ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); - ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); - Close(); -} - -// Test enabling persistent stats for the first time -TEST_F(DBOptionsTest, PersistentStatsFreshInstall) { - Options options; - options.create_if_missing = true; - options.stats_persist_period_sec = 0; - std::unique_ptr mock_env; - mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < mock_env->RealNowMicros()) { - *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; - } - }); -#endif // OS_MACOSX && !NDEBUG - int counter = 0; - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - Reopen(options); - ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); - ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); - ASSERT_GE(counter, 1); - Close(); -} - TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) { Options options; options.create_if_missing = true; @@ -640,163 +532,6 @@ TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) { ASSERT_EQ(12345, dbfull()->GetDBOptions().stats_persist_period_sec); } -TEST_F(DBOptionsTest, GetStatsHistory) { - Options options; - options.create_if_missing = true; - options.stats_persist_period_sec = 5; - options.statistics = rocksdb::CreateDBStatistics(); - std::unique_ptr mock_env; - mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < mock_env->RealNowMicros()) { - *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; - } - }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); -#endif // OS_MACOSX && !NDEBUG - - CreateColumnFamilies({"pikachu"}, options); - ASSERT_OK(Put("foo", "bar")); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - int mock_time = 1; - // Wait for stats persist to finish - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); - std::unique_ptr stats_iter; - db_->GetStatsHistory(0, 6 * kMicrosInSec, &stats_iter); - ASSERT_TRUE(stats_iter != nullptr); - // disabled stats snapshots - ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); - size_t stats_count = 0; - for (; stats_iter->Valid(); stats_iter->Next()) { - auto stats_map = stats_iter->GetStatsMap(); - stats_count += stats_map.size(); - } - ASSERT_GT(stats_count, 0); - // Wait a bit and verify no more stats are found - for (mock_time = 6; mock_time < 20; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); - } - db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); - ASSERT_TRUE(stats_iter != nullptr); - size_t stats_count_new = 0; - for (; stats_iter->Valid(); stats_iter->Next()) { - stats_count_new += stats_iter->GetStatsMap().size(); - } - ASSERT_EQ(stats_count_new, stats_count); - Close(); -} - -TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) { - Options options; - options.create_if_missing = true; - options.statistics = rocksdb::CreateDBStatistics(); - options.stats_persist_period_sec = 1; - std::unique_ptr mock_env; - mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < mock_env->RealNowMicros()) { - *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; - } - }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); -#endif // OS_MACOSX && !NDEBUG - - CreateColumnFamilies({"pikachu"}, options); - ASSERT_OK(Put("foo", "bar")); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - // some random operation to populate statistics - ASSERT_OK(Delete("foo")); - ASSERT_OK(Put("sol", "sol")); - ASSERT_OK(Put("epic", "epic")); - ASSERT_OK(Put("ltd", "ltd")); - ASSERT_EQ("sol", Get("sol")); - ASSERT_EQ("epic", Get("epic")); - ASSERT_EQ("ltd", Get("ltd")); - Iterator* iterator = db_->NewIterator(ReadOptions()); - for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - ASSERT_TRUE(iterator->key() == iterator->value()); - } - delete iterator; - ASSERT_OK(Flush()); - ASSERT_OK(Delete("sol")); - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - int mock_time = 1; - // Wait for stats persist to finish - for (; mock_time < 5; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); - } - - // second round of ops - ASSERT_OK(Put("saigon", "saigon")); - ASSERT_OK(Put("noodle talk", "noodle talk")); - ASSERT_OK(Put("ping bistro", "ping bistro")); - iterator = db_->NewIterator(ReadOptions()); - for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - ASSERT_TRUE(iterator->key() == iterator->value()); - } - delete iterator; - ASSERT_OK(Flush()); - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - for (; mock_time < 10; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); - } - std::unique_ptr stats_iter; - db_->GetStatsHistory(0, 10 * kMicrosInSec, &stats_iter); - ASSERT_TRUE(stats_iter != nullptr); - size_t stats_count = 0; - int slice_count = 0; - for (; stats_iter->Valid(); stats_iter->Next()) { - slice_count++; - auto stats_map = stats_iter->GetStatsMap(); - stats_count += stats_map.size(); - } - size_t stats_history_size = dbfull()->TEST_EstiamteStatsHistorySize(); - ASSERT_GE(slice_count, 9); - ASSERT_GE(stats_history_size, 12000); - // capping memory cost at 12000 bytes since one slice is around 10000~12000 - ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}})); - ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size); - // Wait for stats persist to finish - for (; mock_time < 20; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); - } - db_->GetStatsHistory(0, 20 * kMicrosInSec, &stats_iter); - ASSERT_TRUE(stats_iter != nullptr); - size_t stats_count_reopen = 0; - slice_count = 0; - for (; stats_iter->Valid(); stats_iter->Next()) { - slice_count++; - auto stats_map = stats_iter->GetStatsMap(); - stats_count_reopen += stats_map.size(); - } - size_t stats_history_size_reopen = dbfull()->TEST_EstiamteStatsHistorySize(); - // only one slice can fit under the new stats_history_buffer_size - ASSERT_LT(slice_count, 2); - ASSERT_TRUE(stats_history_size_reopen < 12000 && - stats_history_size_reopen > 0); - ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0); - Close(); -} - static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) { dbfull->TEST_LockMutex(); JobContext job_context(0); diff --git a/db/version_set.cc b/db/version_set.cc index 30fc744c9..ccedca794 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -9,10 +9,10 @@ #include "db/version_set.h" -#include #include #include #include +#include #include #include #include @@ -32,6 +32,7 @@ #include "file/filename.h" #include "monitoring/file_read_sample.h" #include "monitoring/perf_context_imp.h" +#include "monitoring/persistent_stats_history.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/write_buffer_manager.h" @@ -3962,11 +3963,23 @@ Status VersionSet::ApplyOneVersionEditToBuilder( edit.column_family_name_); } auto cf_options = name_to_options.find(edit.column_family_name_); - if (cf_options == name_to_options.end()) { + // implicitly add persistent_stats column family without requiring user + // to specify + bool is_persistent_stats_column_family = + edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0; + if (cf_options == name_to_options.end() && + !is_persistent_stats_column_family) { column_families_not_found.insert( {edit.column_family_, edit.column_family_name_}); } else { - cfd = CreateColumnFamily(cf_options->second, &edit); + // recover persistent_stats CF from a DB that already contains it + if (is_persistent_stats_column_family) { + ColumnFamilyOptions cfo; + OptimizeForPersistentStats(&cfo); + cfd = CreateColumnFamily(cfo, &edit); + } else { + cfd = CreateColumnFamily(cf_options->second, &edit); + } cfd->set_initialized(); builders.insert(std::make_pair( edit.column_family_, std::unique_ptr( diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 3a32d6f82..0f8573e43 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -59,6 +59,7 @@ class CompactionJobInfo; #endif extern const std::string kDefaultColumnFamilyName; +extern const std::string kPersistentStatsColumnFamilyName; struct ColumnFamilyDescriptor { std::string name; ColumnFamilyOptions options; @@ -1335,7 +1336,7 @@ class DB { // Given a window [start_time, end_time), setup a StatsHistoryIterator // to access stats history. Note the start_time and end_time are epoch - // time measured in microsecond, and end_time is an exclusive bound. + // time measured in seconds, and end_time is an exclusive bound. virtual Status GetStatsHistory( uint64_t /*start_time*/, uint64_t /*end_time*/, std::unique_ptr* /*stats_iterator*/) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 307582fe6..fe5617fb5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -694,6 +694,18 @@ struct DBOptions { // Default: 600 unsigned int stats_persist_period_sec = 600; + // If true, automatically persist stats to a hidden column family (column + // family name: ___rocksdb_stats_history___) every + // stats_persist_period_sec seconds; otherwise, write to an in-memory + // struct. User can query through `GetStatsHistory` API. + // If user attempts to create a column family with the same name on a DB + // which have previously set persist_stats_to_disk to true, the column family + // creation will fail, but the hidden column family will survive, as well as + // the previously persisted statistics. + // When peristing stats to disk, the stat name will be limited at 100 bytes. + // Default: false + bool persist_stats_to_disk = false; + // if not zero, periodically take stats snapshots and store in memory, the // memory size for stats snapshots is capped at stats_history_buffer_size // Default: 1MB diff --git a/include/rocksdb/stats_history.h b/include/rocksdb/stats_history.h index 1a8419081..c6634ae68 100644 --- a/include/rocksdb/stats_history.h +++ b/include/rocksdb/stats_history.h @@ -49,10 +49,12 @@ class StatsHistoryIterator { // REQUIRES: Valid() virtual void Next() = 0; - // Return the time stamp (in microseconds) when stats history is recorded. + // Return the time stamp (in seconds) when stats history is recorded. // REQUIRES: Valid() virtual uint64_t GetStatsTime() const = 0; + virtual int GetFormatVersion() const { return -1; } + // Return the current stats history as an std::map which specifies the // mapping from stats name to stats value . The underlying storage // for the returned map is valid only until the next modification of diff --git a/db/in_memory_stats_history.cc b/monitoring/in_memory_stats_history.cc similarity index 97% rename from db/in_memory_stats_history.cc rename to monitoring/in_memory_stats_history.cc index 41fdb71c8..22ecde0ab 100644 --- a/db/in_memory_stats_history.cc +++ b/monitoring/in_memory_stats_history.cc @@ -6,7 +6,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "db/in_memory_stats_history.h" +#include "monitoring/in_memory_stats_history.h" #include "db/db_impl/db_impl.h" namespace rocksdb { diff --git a/db/in_memory_stats_history.h b/monitoring/in_memory_stats_history.h similarity index 98% rename from db/in_memory_stats_history.h rename to monitoring/in_memory_stats_history.h index eeb679cc0..8ccec146a 100644 --- a/db/in_memory_stats_history.h +++ b/monitoring/in_memory_stats_history.h @@ -25,7 +25,7 @@ namespace rocksdb { class InMemoryStatsHistoryIterator final : public StatsHistoryIterator { public: // Setup InMemoryStatsHistoryIterator to return stats snapshots between - // microsecond timestamps [start_time, end_time) + // seconds timestamps [start_time, end_time) InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time, DBImpl* db_impl) : start_time_(start_time), diff --git a/monitoring/persistent_stats_history.cc b/monitoring/persistent_stats_history.cc new file mode 100644 index 000000000..c1704f567 --- /dev/null +++ b/monitoring/persistent_stats_history.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "monitoring/persistent_stats_history.h" + +#include +#include +#include +#include "db/db_impl/db_impl.h" +#include "port/likely.h" +#include "util/string_util.h" + +namespace rocksdb { +// 10 digit seconds timestamp => [Sep 9, 2001 ~ Nov 20, 2286] +const int kNowSecondsStringLength = 10; +const std::string kFormatVersionKeyString = + "__persistent_stats_format_version__"; +const std::string kCompatibleVersionKeyString = + "__persistent_stats_compatible_version__"; +// Every release maintains two versions numbers for persistents stats: Current +// format version and compatible format version. Current format version +// designates what type of encoding will be used when writing to stats CF; +// compatible format version designates the minimum format version that +// can decode the stats CF encoded using the current format version. +const uint64_t kStatsCFCurrentFormatVersion = 1; +const uint64_t kStatsCFCompatibleFormatVersion = 1; + +Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type, + uint64_t* version_number) { + if (type >= StatsVersionKeyType::kKeyTypeMax) { + return Status::InvalidArgument("Invalid stats version key type provided"); + } + std::string key; + if (type == StatsVersionKeyType::kFormatVersion) { + key = kFormatVersionKeyString; + } else if (type == StatsVersionKeyType::kCompatibleVersion) { + key = kCompatibleVersionKeyString; + } + ReadOptions options; + options.verify_checksums = true; + std::string result; + Status s = db->Get(options, db->PersistentStatsColumnFamily(), key, &result); + if (!s.ok() || result.empty()) { + return Status::NotFound("Persistent stats version key " + key + + " not found."); + } + + // read version_number but do nothing in current version + *version_number = ParseUint64(result); + return Status::OK(); +} + +int EncodePersistentStatsKey(uint64_t now_seconds, const std::string& key, + int size, char* buf) { + char timestamp[kNowSecondsStringLength + 1]; + // make time stamp string equal in length to allow sorting by time + snprintf(timestamp, sizeof(timestamp), "%010d", + static_cast(now_seconds)); + timestamp[kNowSecondsStringLength] = '\0'; + return snprintf(buf, size, "%s#%s", timestamp, key.c_str()); +} + +void OptimizeForPersistentStats(ColumnFamilyOptions* cfo) { + cfo->write_buffer_size = 2 << 20; + cfo->target_file_size_base = 2 * 1048576; + cfo->max_bytes_for_level_base = 10 * 1048576; + cfo->snap_refresh_nanos = 0; + cfo->soft_pending_compaction_bytes_limit = 256 * 1048576; + cfo->hard_pending_compaction_bytes_limit = 1073741824ul; + cfo->compression = kNoCompression; +} + +PersistentStatsHistoryIterator::~PersistentStatsHistoryIterator() {} + +bool PersistentStatsHistoryIterator::Valid() const { return valid_; } + +Status PersistentStatsHistoryIterator::status() const { return status_; } + +void PersistentStatsHistoryIterator::Next() { + // increment start_time by 1 to avoid infinite loop + AdvanceIteratorByTime(GetStatsTime() + 1, end_time_); +} + +uint64_t PersistentStatsHistoryIterator::GetStatsTime() const { return time_; } + +const std::map& +PersistentStatsHistoryIterator::GetStatsMap() const { + return stats_map_; +} + +std::pair parseKey(const Slice& key, + uint64_t start_time) { + std::pair result; + std::string key_str = key.ToString(); + std::string::size_type pos = key_str.find("#"); + // TODO(Zhongyi): add counters to track parse failures? + if (pos == std::string::npos) { + result.first = port::kMaxUint64; + result.second.clear(); + } else { + uint64_t parsed_time = ParseUint64(key_str.substr(0, pos)); + // skip entries with timestamp smaller than start_time + if (parsed_time < start_time) { + result.first = port::kMaxUint64; + result.second = ""; + } else { + result.first = parsed_time; + std::string key_resize = key_str.substr(pos + 1); + result.second = key_resize; + } + } + return result; +} + +// advance the iterator to the next time between [start_time, end_time) +// if success, update time_ and stats_map_ with new_time and stats_map +void PersistentStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time, + uint64_t end_time) { + // try to find next entry in stats_history_ map + if (db_impl_ != nullptr) { + ReadOptions ro; + Iterator* iter = + db_impl_->NewIterator(ro, db_impl_->PersistentStatsColumnFamily()); + + char timestamp[kNowSecondsStringLength + 1]; + snprintf(timestamp, sizeof(timestamp), "%010d", + static_cast(std::max(time_, start_time))); + timestamp[kNowSecondsStringLength] = '\0'; + + iter->Seek(timestamp); + // no more entries with timestamp >= start_time is found or version key + // is found to be incompatible + if (!iter->Valid()) { + valid_ = false; + delete iter; + return; + } + time_ = parseKey(iter->key(), start_time).first; + valid_ = true; + // check parsed time and invalid if it exceeds end_time + if (time_ > end_time) { + valid_ = false; + delete iter; + return; + } + // find all entries with timestamp equal to time_ + std::map new_stats_map; + std::pair kv; + for (; iter->Valid(); iter->Next()) { + kv = parseKey(iter->key(), start_time); + if (kv.first != time_) { + break; + } + if (kv.second.compare(kFormatVersionKeyString) == 0) { + continue; + } + new_stats_map[kv.second] = ParseUint64(iter->value().ToString()); + } + stats_map_.swap(new_stats_map); + delete iter; + } else { + valid_ = false; + } +} + +} // namespace rocksdb diff --git a/monitoring/persistent_stats_history.h b/monitoring/persistent_stats_history.h new file mode 100644 index 000000000..9a6885987 --- /dev/null +++ b/monitoring/persistent_stats_history.h @@ -0,0 +1,83 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "db/db_impl/db_impl.h" +#include "rocksdb/stats_history.h" + +namespace rocksdb { + +extern const std::string kFormatVersionKeyString; +extern const std::string kCompatibleVersionKeyString; +extern const uint64_t kStatsCFCurrentFormatVersion; +extern const uint64_t kStatsCFCompatibleFormatVersion; + +enum StatsVersionKeyType : uint32_t { + kFormatVersion = 1, + kCompatibleVersion = 2, + kKeyTypeMax = 3 +}; + +// Read the version number from persitent stats cf depending on type provided +// stores the version number in `*version_number` +// returns Status::OK() on success, or other status code on failure +Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type, + uint64_t* version_number); + +// Encode timestamp and stats key into buf +// Format: timestamp(10 digit) + '#' + key +// Total length of encoded key will be capped at 100 bytes +int EncodePersistentStatsKey(uint64_t timestamp, const std::string& key, + int size, char* buf); + +void OptimizeForPersistentStats(ColumnFamilyOptions* cfo); + +class PersistentStatsHistoryIterator final : public StatsHistoryIterator { + public: + PersistentStatsHistoryIterator(uint64_t start_time, uint64_t end_time, + DBImpl* db_impl) + : time_(0), + start_time_(start_time), + end_time_(end_time), + valid_(true), + db_impl_(db_impl) { + AdvanceIteratorByTime(start_time_, end_time_); + } + ~PersistentStatsHistoryIterator() override; + bool Valid() const override; + Status status() const override; + + void Next() override; + uint64_t GetStatsTime() const override; + + const std::map& GetStatsMap() const override; + + private: + // advance the iterator to the next stats history record with timestamp + // between [start_time, end_time) + void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time); + + // No copying allowed + PersistentStatsHistoryIterator(const PersistentStatsHistoryIterator&) = + delete; + void operator=(const PersistentStatsHistoryIterator&) = delete; + PersistentStatsHistoryIterator(PersistentStatsHistoryIterator&&) = delete; + PersistentStatsHistoryIterator& operator=(PersistentStatsHistoryIterator&&) = + delete; + + uint64_t time_; + uint64_t start_time_; + uint64_t end_time_; + std::map stats_map_; + Status status_; + bool valid_; + DBImpl* db_impl_; +}; + +} // namespace rocksdb diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc new file mode 100644 index 000000000..a66043da1 --- /dev/null +++ b/monitoring/stats_history_test.cc @@ -0,0 +1,576 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "monitoring/persistent_stats_history.h" +#include "options/options_helper.h" +#include "port/stack_trace.h" +#include "rocksdb/cache.h" +#include "rocksdb/convenience.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/stats_history.h" +#include "test_util/sync_point.h" +#include "test_util/testutil.h" +#include "util/random.h" + +namespace rocksdb { + +class StatsHistoryTest : public DBTestBase { + public: + StatsHistoryTest() : DBTestBase("/stats_history_test") {} +}; + +TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { + Options options; + options.create_if_missing = true; + options.stats_dump_period_sec = 5; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec); + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); + int old_val = counter; + for (int i = 6; i < 20; ++i) { + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); + } + ASSERT_EQ(counter, old_val); + Close(); +} + +// Test persistent stats background thread scheduling and cancelling +TEST_F(StatsHistoryTest, StatsPersistScheduling) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); + Close(); +} + +// Test enabling persistent stats for the first time +TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 0; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + int counter = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); + ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + Close(); +} + +// TODO(Zhongyi): Move persistent stats related tests to a separate file +TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + int mock_time = 1; + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + std::unique_ptr stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 6 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + // disabled stats snapshots + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + size_t stats_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5); + stats_count += stats_map.size(); + } + ASSERT_GT(stats_count, 0); + // Wait a bit and verify no more stats are found + for (mock_time = 6; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_new = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + stats_count_new += stats_iter->GetStatsMap().size(); + } + ASSERT_EQ(stats_count_new, stats_count); + Close(); +} + +TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { + Options options; + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + options.stats_persist_period_sec = 1; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // some random operation to populate statistics + ASSERT_OK(Delete("foo")); + ASSERT_OK(Put("sol", "sol")); + ASSERT_OK(Put("epic", "epic")); + ASSERT_OK(Put("ltd", "ltd")); + ASSERT_EQ("sol", Get("sol")); + ASSERT_EQ("epic", Get("epic")); + ASSERT_EQ("ltd", Get("ltd")); + Iterator* iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + ASSERT_OK(Delete("sol")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + int mock_time = 1; + // Wait for stats persist to finish + for (; mock_time < 5; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + + // second round of ops + ASSERT_OK(Put("saigon", "saigon")); + ASSERT_OK(Put("noodle talk", "noodle talk")); + ASSERT_OK(Put("ping bistro", "ping bistro")); + iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + for (; mock_time < 10; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + std::unique_ptr stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 10 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + size_t stats_history_size = dbfull()->TEST_EstimateInMemoryStatsHistorySize(); + ASSERT_GE(slice_count, 9); + ASSERT_GE(stats_history_size, 12000); + // capping memory cost at 12000 bytes since one slice is around 10000~12000 + ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}})); + ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size); + // Wait for stats persist to finish + for (; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count_reopen += stats_map.size(); + } + size_t stats_history_size_reopen = + dbfull()->TEST_EstimateInMemoryStatsHistorySize(); + // only one slice can fit under the new stats_history_buffer_size + ASSERT_LT(slice_count, 2); + ASSERT_TRUE(stats_history_size_reopen < 12000 && + stats_history_size_reopen > 0); + ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0); + Close(); + // TODO: may also want to verify stats timestamp to make sure we are purging + // the correct stats snapshot +} + +int countkeys(Iterator* iter) { + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + return count; +} + +TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(Get("foo"), "bar"); + + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count1 = countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count2 = countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count3 = countkeys(iter); + delete iter; + ASSERT_GE(key_count2, key_count1); + ASSERT_GE(key_count3, key_count2); + ASSERT_EQ(key_count3 - key_count2, key_count2 - key_count1); + std::unique_ptr stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + int non_zero_count = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (auto& stat : stats_map) { + if (stat.second != 0) { + non_zero_count++; + } + } + stats_count += stats_map.size(); + } + ASSERT_EQ(slice_count, 3); + // 2 extra keys for format version + ASSERT_EQ(stats_count, key_count3 - 2); + // verify reopen will not cause data loss + ReopenWithColumnFamilies({"default", "pikachu"}, options); + db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + int slice_count_reopen = 0; + int non_zero_count_recover = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count_reopen++; + auto stats_map = stats_iter->GetStatsMap(); + for (auto& stat : stats_map) { + if (stat.second != 0) { + non_zero_count_recover++; + } + } + stats_count_reopen += stats_map.size(); + } + ASSERT_EQ(non_zero_count, non_zero_count_recover); + ASSERT_EQ(slice_count, slice_count_reopen); + ASSERT_EQ(stats_count, stats_count_reopen); + Close(); +} + +// Test persisted stats matches the value found in options.statistics and +// the stats value retains after DB reopen +TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + std::map stats_map_before; + ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(Get("foo"), "bar"); + + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(20); }); + + std::map stats_map_after; + ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_after)); + std::unique_ptr stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + std::string sample = "rocksdb.num.iterator.deleted"; + uint64_t recovered_value = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), ++i) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + recovered_value += stat.second; + } + } + } + ASSERT_EQ(recovered_value, stats_map_after[sample]); + + // test stats value retains after recovery + ReopenWithColumnFamilies({"default", "pikachu"}, options); + db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + uint64_t new_recovered_value = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + new_recovered_value += stat.second; + } + } + } + ASSERT_EQ(recovered_value, new_recovered_value); + + // TODO(Zhongyi): also add test to read raw values from disk and verify + // correctness + Close(); +} + +// TODO(Zhongyi): add test for different format versions + +TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + ASSERT_OK(TryReopen(options)); + CreateColumnFamilies({"one", "two", "three"}, options); + ASSERT_OK(Put(1, "foo", "bar")); + ReopenWithColumnFamilies({"default", "one", "two", "three"}, options); + ASSERT_EQ(Get(2, "foo"), "bar"); + CreateColumnFamilies({"four"}, options); + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + ASSERT_EQ(Get(2, "foo"), "bar"); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count = countkeys(iter); + delete iter; + ASSERT_GE(key_count, 0); + uint64_t num_write_wal = 0; + std::string sample = "rocksdb.write.wal"; + std::unique_ptr stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + num_write_wal += stat.second; + } + } + } + stats_iter.reset(); + ASSERT_EQ(num_write_wal, 2); + + options.persist_stats_to_disk = false; + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + int cf_count = 0; + for (auto cfd : *dbfull()->versions_->GetColumnFamilySet()) { + (void)cfd; + cf_count++; + } + // persistent stats cf will be implicitly opened even if + // persist_stats_to_disk is false + ASSERT_EQ(cf_count, 6); + ASSERT_EQ(Get(2, "foo"), "bar"); + + // attempt to create column family using same name, should fail + ColumnFamilyOptions cf_opts(options); + ColumnFamilyHandle* handle; + ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName, + &handle)); + + options.persist_stats_to_disk = true; + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName, + &handle)); + // verify stats is not affected by prior failed CF creation + db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + num_write_wal = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + num_write_wal += stat.second; + } + } + } + ASSERT_EQ(num_write_wal, 2); + + Close(); + Destroy(options); +} + +TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { + ASSERT_OK(Put("bar", "v2")); + Close(); + + auto options = CurrentOptions(); + options.stats_persist_period_sec = 5; + options.persist_stats_to_disk = true; + assert(options.env == env_); + ASSERT_OK(ReadOnlyReopen(options)); + ASSERT_EQ("v2", Get("bar")); + Close(); + + // Reopen and flush memtable. + Reopen(options); + Flush(); + Close(); + // Now check keys in read only mode. + ASSERT_OK(ReadOnlyReopen(options)); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/options/db_options.cc b/options/db_options.cc index bdcdd250a..490a37080 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -84,7 +84,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) two_write_queues(options.two_write_queues), manual_wal_flush(options.manual_wal_flush), atomic_flush(options.atomic_flush), - avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io) { + avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io), + persist_stats_to_disk(options.persist_stats_to_disk) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -222,6 +223,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.avoid_unnecessary_blocking_io: %d", avoid_unnecessary_blocking_io); + ROCKS_LOG_HEADER(log, " Options.persist_stats_to_disk: %u", + persist_stats_to_disk); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index 67b26786f..92eea4ecf 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -81,6 +81,7 @@ struct ImmutableDBOptions { bool manual_wal_flush; bool atomic_flush; bool avoid_unnecessary_blocking_io; + bool persist_stats_to_disk; }; struct MutableDBOptions { diff --git a/options/options.cc b/options/options.cc index 1d2b6193c..5efd3ce57 100644 --- a/options/options.cc +++ b/options/options.cc @@ -502,7 +502,6 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForSmallDb( BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; table_factory.reset(new BlockBasedTableFactory(table_options)); - return this; } diff --git a/options/options_helper.cc b/options/options_helper.cc index 388256abd..71a7f9b2f 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -84,6 +84,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec; options.stats_persist_period_sec = mutable_db_options.stats_persist_period_sec; + options.persist_stats_to_disk = immutable_db_options.persist_stats_to_disk; options.stats_history_buffer_size = mutable_db_options.stats_history_buffer_size; options.advise_random_on_open = immutable_db_options.advise_random_on_open; @@ -1580,6 +1581,10 @@ std::unordered_map {offsetof(struct DBOptions, stats_persist_period_sec), OptionType::kUInt, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, stats_persist_period_sec)}}, + {"persist_stats_to_disk", + {offsetof(struct DBOptions, persist_stats_to_disk), + OptionType::kBoolean, OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, persist_stats_to_disk)}}, {"stats_history_buffer_size", {offsetof(struct DBOptions, stats_history_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal, true, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 6044cc4b1..f0b79e372 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -265,6 +265,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_mmap_writes=false;" "stats_dump_period_sec=70127;" "stats_persist_period_sec=54321;" + "persist_stats_to_disk=true;" "stats_history_buffer_size=14159;" "allow_fallocate=true;" "allow_mmap_reads=false;" diff --git a/options/options_test.cc b/options/options_test.cc index 9fcd241d7..24aeec99e 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -129,6 +129,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, {"stats_persist_period_sec", "57"}, + {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, {"use_adaptive_mutex", "false"}, @@ -267,6 +268,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U); + ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); diff --git a/src.mk b/src.mk index 150b1c10a..e48a69595 100644 --- a/src.mk +++ b/src.mk @@ -36,7 +36,6 @@ LIB_SOURCES = \ db/flush_job.cc \ db/flush_scheduler.cc \ db/forward_iterator.cc \ - db/in_memory_stats_history.cc \ db/internal_stats.cc \ db/logs_with_prep_tracker.cc \ db/log_reader.cc \ @@ -86,10 +85,12 @@ LIB_SOURCES = \ memtable/write_buffer_manager.cc \ monitoring/histogram.cc \ monitoring/histogram_windowing.cc \ + monitoring/in_memory_stats_history.cc \ monitoring/instrumented_mutex.cc \ monitoring/iostats_context.cc \ monitoring/perf_context.cc \ monitoring/perf_level.cc \ + monitoring/persistent_stats_history.cc \ monitoring/statistics.cc \ monitoring/thread_status_impl.cc \ monitoring/thread_status_updater.cc \ @@ -105,21 +106,21 @@ LIB_SOURCES = \ port/port_posix.cc \ port/stack_trace.cc \ table/adaptive/adaptive_table_factory.cc \ - table/block_based/block.cc \ - table/block_based/block_based_filter_block.cc \ - table/block_based/block_based_table_builder.cc \ - table/block_based/block_based_table_factory.cc \ - table/block_based/block_based_table_reader.cc \ - table/block_based/block_builder.cc \ - table/block_based/block_prefix_index.cc \ - table/block_based/data_block_hash_index.cc \ - table/block_based/data_block_footer.cc \ - table/block_based/flush_block_policy.cc \ - table/block_based/full_filter_block.cc \ - table/block_based/index_builder.cc \ - table/block_based/partitioned_filter_block.cc \ - table/block_fetcher.cc \ - table/bloom_block.cc \ + table/block_based/block.cc \ + table/block_based/block_based_filter_block.cc \ + table/block_based/block_based_table_builder.cc \ + table/block_based/block_based_table_factory.cc \ + table/block_based/block_based_table_reader.cc \ + table/block_based/block_builder.cc \ + table/block_based/block_prefix_index.cc \ + table/block_based/data_block_hash_index.cc \ + table/block_based/data_block_footer.cc \ + table/block_based/flush_block_policy.cc \ + table/block_based/full_filter_block.cc \ + table/block_based/index_builder.cc \ + table/block_based/partitioned_filter_block.cc \ + table/block_fetcher.cc \ + table/bloom_block.cc \ table/cuckoo/cuckoo_table_builder.cc \ table/cuckoo/cuckoo_table_factory.cc \ table/cuckoo/cuckoo_table_reader.cc \ @@ -233,27 +234,27 @@ LIB_SOURCES_ASM = LIB_SOURCES_C = endif -TOOL_LIB_SOURCES = \ +TOOL_LIB_SOURCES = \ tools/ldb_cmd.cc \ tools/ldb_tool.cc \ tools/sst_dump_tool.cc \ utilities/blob_db/blob_dump_tool.cc \ -ANALYZER_LIB_SOURCES = \ +ANALYZER_LIB_SOURCES = \ tools/block_cache_trace_analyzer.cc \ - tools/trace_analyzer_tool.cc \ + tools/trace_analyzer_tool.cc \ -MOCK_LIB_SOURCES = \ - table/mock_table.cc \ +MOCK_LIB_SOURCES = \ + table/mock_table.cc \ test_util/fault_injection_test_env.cc -BENCH_LIB_SOURCES = \ +BENCH_LIB_SOURCES = \ tools/db_bench_tool.cc \ -TEST_LIB_SOURCES = \ +TEST_LIB_SOURCES = \ db/db_test_util.cc \ - test_util/testharness.cc \ - test_util/testutil.cc \ + test_util/testharness.cc \ + test_util/testutil.cc \ utilities/cassandra/test_utils.cc \ MAIN_SOURCES = \ @@ -301,7 +302,7 @@ MAIN_SOURCES = \ db/dbformat_test.cc \ db/deletefile_test.cc \ db/env_timed_test.cc \ - db/error_handler_test.cc \ + db/error_handler_test.cc \ db/external_sst_file_basic_test.cc \ db/external_sst_file_test.cc \ db/fault_injection_test.cc \ @@ -352,12 +353,13 @@ MAIN_SOURCES = \ monitoring/histogram_test.cc \ monitoring/iostats_context_test.cc \ monitoring/statistics_test.cc \ + monitoring/stats_history_test.cc \ options/options_test.cc \ - table/block_based/block_based_filter_block_test.cc \ - table/block_based/block_test.cc \ - table/block_based/data_block_hash_index_test.cc \ - table/block_based/full_filter_block_test.cc \ - table/block_based/partitioned_filter_block_test.cc \ + table/block_based/block_based_filter_block_test.cc \ + table/block_based/block_test.cc \ + table/block_based/data_block_hash_index_test.cc \ + table/block_based/full_filter_block_test.cc \ + table/block_based/partitioned_filter_block_test.cc \ table/cleanable_test.cc \ table/cuckoo/cuckoo_table_builder_test.cc \ table/cuckoo/cuckoo_table_reader_test.cc \ @@ -373,7 +375,7 @@ MAIN_SOURCES = \ tools/ldb_cmd_test.cc \ tools/reduce_levels_test.cc \ tools/sst_dump_test.cc \ - tools/trace_analyzer_test.cc \ + tools/trace_analyzer_test.cc \ trace_replay/block_cache_tracer_test.cc \ util/autovector_test.cc \ util/bloom_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index a14758418..9b3e2cac3 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1146,6 +1146,8 @@ DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec, DEFINE_uint64(stats_persist_period_sec, rocksdb::Options().stats_persist_period_sec, "Gap between persisting stats in seconds"); +DEFINE_bool(persist_stats_to_disk, rocksdb::Options().persist_stats_to_disk, + "whether to persist stats to disk"); DEFINE_uint64(stats_history_buffer_size, rocksdb::Options().stats_history_buffer_size, "Max number of stats snapshots to keep in memory"); @@ -3727,6 +3729,7 @@ class Benchmark { static_cast(FLAGS_stats_dump_period_sec); options.stats_persist_period_sec = static_cast(FLAGS_stats_persist_period_sec); + options.persist_stats_to_disk = FLAGS_persist_stats_to_disk; options.stats_history_buffer_size = static_cast(FLAGS_stats_history_buffer_size);