From 69760b4d05a6e96bdf1175c72e4ac8356deacfde Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Fri, 14 Aug 2020 20:11:35 -0700 Subject: [PATCH] Introduce a global StatsDumpScheduler for stats dumping (#7223) Summary: Have a global StatsDumpScheduler for all DB instance stats dumping, including `DumpStats()` and `PersistStats()`. Before this, there're 2 dedicate threads for every DB instance, one for DumpStats() one for PersistStats(), which could create lots of threads if there're hundreds DB instances. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7223 Reviewed By: riversand963 Differential Revision: D23056737 Pulled By: jay-zhuang fbshipit-source-id: 0faa2311142a73433ebb3317361db7cbf43faeba --- CMakeLists.txt | 2 + HISTORY.md | 3 + Makefile | 3 + TARGETS | 8 + db/compacted_db_impl.cc | 2 +- db/db_impl/db_impl.cc | 100 +++----- db/db_impl/db_impl.h | 40 +-- db/db_impl/db_impl_debug.cc | 20 +- db/db_impl/db_impl_open.cc | 10 +- db/db_test_util.h | 24 ++ monitoring/stats_dump_scheduler.cc | 103 ++++++++ monitoring/stats_dump_scheduler.h | 64 +++++ monitoring/stats_dump_scheduler_test.cc | 225 +++++++++++++++++ monitoring/stats_history_test.cc | 317 +++++++++++++++--------- src.mk | 2 + util/timer.h | 78 ++++-- util/timer_test.cc | 33 +++ 17 files changed, 802 insertions(+), 232 deletions(-) create mode 100644 monitoring/stats_dump_scheduler.cc create mode 100644 monitoring/stats_dump_scheduler.h create mode 100644 monitoring/stats_dump_scheduler_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 81010f972..87e2ce738 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -652,6 +652,7 @@ set(SOURCES monitoring/perf_level.cc monitoring/persistent_stats_history.cc monitoring/statistics.cc + monitoring/stats_dump_scheduler.cc monitoring/thread_status_impl.cc monitoring/thread_status_updater.cc monitoring/thread_status_util.cc @@ -1101,6 +1102,7 @@ if(WITH_TESTS) monitoring/histogram_test.cc monitoring/iostats_context_test.cc monitoring/statistics_test.cc + monitoring/stats_dump_scheduler_test.cc monitoring/stats_history_test.cc options/options_settable_test.cc options/options_test.cc diff --git a/HISTORY.md b/HISTORY.md index 72c8d1b63..937d73552 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,9 @@ * A new option `std::shared_ptr file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`. * A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions. +### Performance Improvements +* Reduce thread number for multiple DB instances by re-using one global thread for statistics dumping and persisting. + ## 6.12 (2020-07-28) ### Public API Change * Encryption file classes now exposed for inheritance in env_encryption.h diff --git a/Makefile b/Makefile index ea55088e5..ad2a977b2 100644 --- a/Makefile +++ b/Makefile @@ -1808,6 +1808,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +stats_dump_scheduler_test: $(OBJ_DIR)/monitoring/stats_dump_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 4fcb54fe0..0ffe1cbb2 100644 --- a/TARGETS +++ b/TARGETS @@ -231,6 +231,7 @@ cpp_library( "monitoring/perf_level.cc", "monitoring/persistent_stats_history.cc", "monitoring/statistics.cc", + "monitoring/stats_dump_scheduler.cc", "monitoring/thread_status_impl.cc", "monitoring/thread_status_updater.cc", "monitoring/thread_status_updater_debug.cc", @@ -1470,6 +1471,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "stats_dump_scheduler_test", + "monitoring/stats_dump_scheduler_test.cc", + "serial", + [], + [], + ], [ "stats_history_test", "monitoring/stats_history_test.cc", diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index fddccb11f..dc7e8cc38 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -149,7 +149,7 @@ Status CompactedDBImpl::Open(const Options& options, std::unique_ptr db(new CompactedDBImpl(db_options, dbname)); Status s = db->Init(options); if (s.ok()) { - db->StartTimedTasks(); + db->StartStatsDumpScheduler(); ROCKS_LOG_INFO(db->immutable_db_options_.info_log, "Opened the db as fully compacted mode"); LogFlush(db->immutable_db_options_.info_log); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 97ce58a3e..df75c7c04 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -65,6 +65,7 @@ #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/persistent_stats_history.h" +#include "monitoring/stats_dump_scheduler.h" #include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_util.h" #include "options/cf_options.h" @@ -205,6 +206,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, bg_compaction_paused_(0), refitting_level_(false), opened_successfully_(false), +#ifndef ROCKSDB_LITE + stats_dump_scheduler_(nullptr), +#endif // ROCKSDB_LITE two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), // last_sequencee_ is always maintained by the main queue that also writes @@ -440,14 +444,12 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown: canceling all background work"); - if (thread_dump_stats_ != nullptr) { - thread_dump_stats_->cancel(); - thread_dump_stats_.reset(); - } - if (thread_persist_stats_ != nullptr) { - thread_persist_stats_->cancel(); - thread_persist_stats_.reset(); +#ifndef ROCKSDB_LITE + if (stats_dump_scheduler_ != nullptr) { + stats_dump_scheduler_->Unregister(this); } +#endif // !ROCKSDB_LITE + InstrumentedMutexLock l(&mutex_); if (!shutting_down_.load(std::memory_order_acquire) && has_unpersisted_data_.load(std::memory_order_relaxed) && @@ -680,36 +682,19 @@ void DBImpl::PrintStatistics() { } } -void DBImpl::StartTimedTasks() { - unsigned int stats_dump_period_sec = 0; - unsigned int stats_persist_period_sec = 0; +void DBImpl::StartStatsDumpScheduler() { +#ifndef ROCKSDB_LITE { InstrumentedMutexLock l(&mutex_); - stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec; - if (stats_dump_period_sec > 0) { - if (!thread_dump_stats_) { - // In case of many `DB::Open()` in rapid succession we can have all - // threads dumping at once, which causes severe lock contention in - // jemalloc. Ensure successive `DB::Open()`s are staggered by at least - // one second in the common case. - static std::atomic stats_dump_threads_started(0); - thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread( - [this]() { DBImpl::DumpStats(); }, "dump_st", env_, - static_cast(stats_dump_period_sec) * kMicrosInSecond, - stats_dump_threads_started.fetch_add(1) % - static_cast(stats_dump_period_sec) * - kMicrosInSecond)); - } - } - stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec; - if (stats_persist_period_sec > 0) { - if (!thread_persist_stats_) { - thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread( - [this]() { DBImpl::PersistStats(); }, "pst_st", env_, - static_cast(stats_persist_period_sec) * kMicrosInSecond)); - } - } + stats_dump_scheduler_ = StatsDumpScheduler::Default(); + TEST_SYNC_POINT_CALLBACK("DBImpl::StartStatsDumpScheduler:Init", + &stats_dump_scheduler_); } + + stats_dump_scheduler_->Register(this, + mutable_db_options_.stats_dump_period_sec, + mutable_db_options_.stats_persist_period_sec); +#endif // !ROCKSDB_LITE } // esitmate the total size of stats_history_ @@ -735,7 +720,9 @@ void DBImpl::PersistStats() { if (shutdown_initiated_) { return; } + TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning"); uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond; + Statistics* statistics = immutable_db_options_.statistics.get(); if (!statistics) { return; @@ -826,6 +813,7 @@ void DBImpl::PersistStats() { " bytes, slice count: %" ROCKSDB_PRIszt, stats_history_size, stats_history_.size()); } + TEST_SYNC_POINT("DBImpl::PersistStats:End"); #endif // !ROCKSDB_LITE } @@ -880,6 +868,7 @@ void DBImpl::DumpStats() { if (shutdown_initiated_) { return; } + TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning"); { InstrumentedMutexLock l(&mutex_); default_cf_internal_stats_->GetStringProperty( @@ -1087,45 +1076,22 @@ Status DBImpl::SetDBOptions( } if (new_options.stats_dump_period_sec != - mutable_db_options_.stats_dump_period_sec) { - if (thread_dump_stats_) { + mutable_db_options_.stats_dump_period_sec || + new_options.stats_persist_period_sec != + mutable_db_options_.stats_persist_period_sec) { + if (stats_dump_scheduler_) { mutex_.Unlock(); - thread_dump_stats_->cancel(); + stats_dump_scheduler_->Unregister(this); mutex_.Lock(); } - if (new_options.stats_dump_period_sec > 0) { - // In case many DBs have `stats_dump_period_sec` enabled in rapid - // succession, we can have all threads dumping at once, which causes - // severe lock contention in jemalloc. Ensure successive enabling of - // `stats_dump_period_sec` are staggered by at least one second in the - // common case. - static std::atomic stats_dump_threads_started(0); - thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread( - [this]() { DBImpl::DumpStats(); }, "dump_st", env_, - static_cast(new_options.stats_dump_period_sec) * - kMicrosInSecond, - stats_dump_threads_started.fetch_add(1) % - static_cast(new_options.stats_dump_period_sec) * - kMicrosInSecond)); - } else { - thread_dump_stats_.reset(); - } - } - if (new_options.stats_persist_period_sec != - mutable_db_options_.stats_persist_period_sec) { - if (thread_persist_stats_) { + if (new_options.stats_dump_period_sec > 0 || + new_options.stats_persist_period_sec > 0) { mutex_.Unlock(); - thread_persist_stats_->cancel(); + stats_dump_scheduler_->Register(this, + new_options.stats_dump_period_sec, + new_options.stats_persist_period_sec); mutex_.Lock(); } - if (new_options.stats_persist_period_sec > 0) { - thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread( - [this]() { DBImpl::PersistStats(); }, "pst_st", env_, - static_cast(new_options.stats_persist_period_sec) * - kMicrosInSecond)); - } else { - thread_persist_stats_.reset(); - } } write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 13b22b543..48607c7b1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -70,6 +70,10 @@ class ArenaWrappedDBIter; class InMemoryStatsHistoryIterator; class MemTable; class PersistentStatsHistoryIterator; +class StatsDumpScheduler; +#ifndef NDEBUG +class StatsDumpTestScheduler; +#endif // !NDEBUG class TableCache; class TaskLimiterToken; class Version; @@ -988,9 +992,7 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForDumpStatsRun(std::function callback) const; - void TEST_WaitForPersistStatsRun(std::function callback) const; - bool TEST_IsPersistentStatsEnabled() const; + void TEST_WaitForStatsDumpRun(std::function callback) const; size_t TEST_EstimateInMemoryStatsHistorySize() const; VersionSet* TEST_GetVersionSet() const { return versions_.get(); } @@ -998,8 +1000,19 @@ class DBImpl : public DB { const std::unordered_set& TEST_GetFilesGrabbedForPurge() const { return files_grabbed_for_purge_; } + +#ifndef ROCKSDB_LITE + StatsDumpTestScheduler* TEST_GetStatsDumpScheduler() const; +#endif // !ROCKSDB_LITE + #endif // NDEBUG + // persist stats to column family "_persistent_stats" + void PersistStats(); + + // dump rocksdb.stats to LOG + void DumpStats(); + protected: const std::string dbname_; std::string db_id_; @@ -1639,18 +1652,12 @@ class DBImpl : public DB { LogBuffer* log_buffer); // Schedule background tasks - void StartTimedTasks(); + void StartStatsDumpScheduler(); void PrintStatistics(); size_t EstimateInMemoryStatsHistorySize() const; - // persist stats to column family "_persistent_stats" - void PersistStats(); - - // dump rocksdb.stats to LOG - void DumpStats(); - // Return the minimum empty level that could hold the total data in the // input level. Return the input level, if such level could not be found. int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, @@ -2103,13 +2110,12 @@ class DBImpl : public DB { // Only to be set during initialization std::unique_ptr recoverable_state_pre_release_callback_; - // handle for scheduling stats dumping at fixed intervals - // REQUIRES: mutex locked - std::unique_ptr thread_dump_stats_; - - // handle for scheduling stats snapshoting at fixed intervals - // REQUIRES: mutex locked - std::unique_ptr thread_persist_stats_; +#ifndef ROCKSDB_LITE + // Scheduler to run DumpStats() and PersistStats(). Currently, it always use + // a global instance from StatsDumpScheduler::Default(). Only in unittest, it + // can be overrided by StatsDumpTestSchduler. + StatsDumpScheduler* stats_dump_scheduler_; +#endif // When set, we use a separate queue for writes that don't write to memtable. // In 2PC these are the writes at Prepare phase. diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 332040851..7669d1e9e 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -12,6 +12,7 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" +#include "monitoring/stats_dump_scheduler.h" #include "monitoring/thread_status_updater.h" #include "util/cast_util.h" @@ -271,21 +272,18 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( return GetWalPreallocateBlockSize(write_buffer_size); } -void DBImpl::TEST_WaitForDumpStatsRun(std::function callback) const { - if (thread_dump_stats_ != nullptr) { - thread_dump_stats_->TEST_WaitForRun(callback); +#ifndef ROCKSDB_LITE +void DBImpl::TEST_WaitForStatsDumpRun(std::function callback) const { + if (stats_dump_scheduler_ != nullptr) { + static_cast(stats_dump_scheduler_) + ->TEST_WaitForRun(callback); } } -void DBImpl::TEST_WaitForPersistStatsRun(std::function callback) const { - if (thread_persist_stats_ != nullptr) { - thread_persist_stats_->TEST_WaitForRun(callback); - } -} - -bool DBImpl::TEST_IsPersistentStatsEnabled() const { - return thread_persist_stats_ && thread_persist_stats_->IsRunning(); +StatsDumpTestScheduler* DBImpl::TEST_GetStatsDumpScheduler() const { + return static_cast(stats_dump_scheduler_); } +#endif // !ROCKSDB_LITE size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const { return EstimateInMemoryStatsHistorySize(); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 286bc5648..97618fac3 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -6,17 +6,17 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "db/db_impl/db_impl.h" - #include #include "db/builder.h" +#include "db/db_impl/db_impl.h" #include "db/error_handler.h" #include "env/composite_env_wrapper.h" #include "file/read_write_util.h" #include "file/sst_file_manager_impl.h" #include "file/writable_file_writer.h" #include "monitoring/persistent_stats_history.h" +#include "monitoring/stats_dump_scheduler.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" #include "table/block_based/block_based_table_factory.h" @@ -1700,6 +1700,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, sfm->ReserveDiskBuffer(max_write_buffer_size, impl->immutable_db_options_.db_paths[0].path); } + #endif // !ROCKSDB_LITE if (s.ok()) { @@ -1716,9 +1717,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } if (s.ok()) { - impl->StartTimedTasks(); - } - if (!s.ok()) { + impl->StartStatsDumpScheduler(); + } else { for (auto* h : *handles) { delete h; } diff --git a/db/db_test_util.h b/db/db_test_util.h index f86cbffe7..a76243aef 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1147,4 +1147,28 @@ class DBTestBase : public testing::Test { bool time_elapse_only_sleep_on_reopen_ = false; }; +class SafeMockTimeEnv : public MockTimeEnv { + public: + explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + // This is an alternate way (vs. SpecialEnv) of dealing with the fact + // that on some platforms, pthread_cond_timedwait does not appear to + // release the lock for other threads to operate if the deadline time + // is already passed. (TimedWait calls are currently a bad abstraction + // because the deadline parameter is usually computed from Env time, + // but is interpreted in real clock time.) + SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < this->RealNowMicros()) { + *reinterpret_cast(arg) = this->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + SyncPoint::GetInstance()->EnableProcessing(); + } +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/monitoring/stats_dump_scheduler.cc b/monitoring/stats_dump_scheduler.cc new file mode 100644 index 000000000..1dd641aa3 --- /dev/null +++ b/monitoring/stats_dump_scheduler.cc @@ -0,0 +1,103 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "monitoring/stats_dump_scheduler.h" + +#include "db/db_impl/db_impl.h" +#include "util/cast_util.h" + +#ifndef ROCKSDB_LITE +namespace ROCKSDB_NAMESPACE { + +StatsDumpScheduler::StatsDumpScheduler(Env* env) { + timer = std::unique_ptr(new Timer(env)); +} + +void StatsDumpScheduler::Register(DBImpl* dbi, + unsigned int stats_dump_period_sec, + unsigned int stats_persist_period_sec) { + static std::atomic initial_delay(0); + if (stats_dump_period_sec > 0) { + timer->Start(); + timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), + initial_delay.fetch_add(1) % + static_cast(stats_dump_period_sec) * + kMicrosInSecond, + static_cast(stats_dump_period_sec) * kMicrosInSecond); + } + if (stats_persist_period_sec > 0) { + timer->Start(); + timer->Add( + [dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"), + initial_delay.fetch_add(1) % + static_cast(stats_persist_period_sec) * kMicrosInSecond, + static_cast(stats_persist_period_sec) * kMicrosInSecond); + } +} + +void StatsDumpScheduler::Unregister(DBImpl* dbi) { + timer->Cancel(GetTaskName(dbi, "dump_st")); + timer->Cancel(GetTaskName(dbi, "pst_st")); + if (!timer->HasPendingTask()) { + timer->Shutdown(); + } +} + +StatsDumpScheduler* StatsDumpScheduler::Default() { + // Always use the default Env for the scheduler, as we only use the NowMicros + // which is the same for all env. + // The Env could only be overridden in test. + static StatsDumpScheduler scheduler(Env::Default()); + return &scheduler; +} + +std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi, + const std::string& func_name) { + std::string db_session_id; + dbi->GetDbSessionId(db_session_id); + return db_session_id + ":" + func_name; +} + +#ifndef NDEBUG + +// Get the static scheduler. For a new env, it needs to re-create the internal +// timer, so only re-create it when there's no running task. Otherwise, return +// the existing scheduler. Which means if the unittest needs to update MockEnv, +// Close all db instances and then re-open them. +StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) { + static StatsDumpTestScheduler scheduler(env); + static port::Mutex mutex; + { + MutexLock l(&mutex); + if (scheduler.timer.get() != nullptr && + scheduler.timer->TEST_GetPendingTaskNum() == 0) { + scheduler.timer->Shutdown(); + scheduler.timer.reset(new Timer(env)); + } + } + return &scheduler; +} + +void StatsDumpTestScheduler::TEST_WaitForRun( + std::function callback) const { + if (timer != nullptr) { + timer->TEST_WaitForRun(callback); + } +} + +size_t StatsDumpTestScheduler::TEST_GetValidTaskNum() const { + if (timer != nullptr) { + return timer->TEST_GetPendingTaskNum(); + } + return 0; +} + +StatsDumpTestScheduler::StatsDumpTestScheduler(Env* env) + : StatsDumpScheduler(env) {} + +#endif // !NDEBUG +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/monitoring/stats_dump_scheduler.h b/monitoring/stats_dump_scheduler.h new file mode 100644 index 000000000..1045991d4 --- /dev/null +++ b/monitoring/stats_dump_scheduler.h @@ -0,0 +1,64 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "db/db_impl/db_impl.h" +#include "util/timer.h" + +namespace ROCKSDB_NAMESPACE { + +// StatsDumpScheduler is a singleton object, which is scheduling/running +// DumpStats() and PersistStats() for all DB instances. All DB instances uses +// the same object from `Default()`. +// Internally, it uses a single threaded timer object to run the stats dump +// functions. Timer thread won't be started if there's no function needs to run, +// for example, option.stats_dump_period_sec and option.stats_persist_period_sec +// are set to 0. +class StatsDumpScheduler { + public: + static StatsDumpScheduler* Default(); + + StatsDumpScheduler() = delete; + StatsDumpScheduler(const StatsDumpScheduler&) = delete; + StatsDumpScheduler(StatsDumpScheduler&&) = delete; + StatsDumpScheduler& operator=(const StatsDumpScheduler&) = delete; + StatsDumpScheduler& operator=(StatsDumpScheduler&&) = delete; + + void Register(DBImpl* dbi, unsigned int stats_dump_period_sec, + unsigned int stats_persist_period_sec); + + void Unregister(DBImpl* dbi); + + protected: + std::unique_ptr timer; + + explicit StatsDumpScheduler(Env* env); + + private: + std::string GetTaskName(DBImpl* dbi, const std::string& func_name); +}; + +#ifndef NDEBUG +// StatsDumpTestScheduler is for unittest, which can specify the Env like +// SafeMockTimeEnv. It also contains functions for unittest. +class StatsDumpTestScheduler : public StatsDumpScheduler { + public: + static StatsDumpTestScheduler* Default(Env* env); + + void TEST_WaitForRun(std::function callback) const; + + size_t TEST_GetValidTaskNum() const; + + private: + explicit StatsDumpTestScheduler(Env* env); +}; +#endif // !NDEBUG + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/monitoring/stats_dump_scheduler_test.cc b/monitoring/stats_dump_scheduler_test.cc new file mode 100644 index 000000000..ac81b0700 --- /dev/null +++ b/monitoring/stats_dump_scheduler_test.cc @@ -0,0 +1,225 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "monitoring/stats_dump_scheduler.h" + +#include "db/db_test_util.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +class StatsDumpSchedulerTest : public DBTestBase { + public: + StatsDumpSchedulerTest() + : DBTestBase("/stats_dump_scheduler_test"), + mock_env_(new SafeMockTimeEnv(Env::Default())) {} + + protected: + std::unique_ptr mock_env_; + + void SetUp() override { + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { + auto* stats_dump_scheduler_ptr = + reinterpret_cast(arg); + *stats_dump_scheduler_ptr = + StatsDumpTestScheduler::Default(mock_env_.get()); + }); + } +}; + +TEST_F(StatsDumpSchedulerTest, Basic) { + constexpr int kPeriodSec = 5; + Close(); + Options options; + options.stats_dump_period_sec = kPeriodSec; + options.stats_persist_period_sec = kPeriodSec; + options.create_if_missing = true; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); + + int dump_st_counter = 0; + SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning", + [&](void*) { dump_st_counter++; }); + + int pst_st_counter = 0; + SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning", + [&](void*) { pst_st_counter++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); + + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + auto scheduler = dbfull()->TEST_GetStatsDumpScheduler(); + ASSERT_NE(nullptr, scheduler); + ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum()); + + ASSERT_EQ(1, dump_st_counter); + ASSERT_EQ(1, pst_st_counter); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + ASSERT_EQ(2, dump_st_counter); + ASSERT_EQ(2, pst_st_counter); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + ASSERT_EQ(3, dump_st_counter); + ASSERT_EQ(3, pst_st_counter); + + // Disable scheduler with SetOption + ASSERT_OK(dbfull()->SetDBOptions( + {{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}})); + ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec); + ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); + + scheduler = dbfull()->TEST_GetStatsDumpScheduler(); + ASSERT_EQ(0u, scheduler->TEST_GetValidTaskNum()); + + // Re-enable one task + ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}})); + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); + ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); + + scheduler = dbfull()->TEST_GetStatsDumpScheduler(); + ASSERT_NE(nullptr, scheduler); + ASSERT_EQ(1, scheduler->TEST_GetValidTaskNum()); + + dump_st_counter = 0; + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(1, dump_st_counter); + + Close(); +} + +TEST_F(StatsDumpSchedulerTest, MultiInstances) { + constexpr int kPeriodSec = 5; + const int kInstanceNum = 10; + + Close(); + Options options; + options.stats_dump_period_sec = kPeriodSec; + options.stats_persist_period_sec = kPeriodSec; + options.create_if_missing = true; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); + + int dump_st_counter = 0; + SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2", + [&](void*) { dump_st_counter++; }); + + int pst_st_counter = 0; + SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning", + [&](void*) { pst_st_counter++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + auto dbs = std::vector(kInstanceNum); + for (int i = 0; i < kInstanceNum; i++) { + ASSERT_OK( + DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i]))); + } + + auto dbi = static_cast_with_check(dbs[kInstanceNum - 1]); + auto scheduler = dbi->TEST_GetStatsDumpScheduler(); + ASSERT_EQ(kInstanceNum * 2, scheduler->TEST_GetValidTaskNum()); + + int expected_run = kInstanceNum; + mock_time_sec += kPeriodSec - 1; + dbi->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(expected_run, dump_st_counter); + ASSERT_EQ(expected_run, pst_st_counter); + + expected_run += kInstanceNum; + mock_time_sec += kPeriodSec; + dbi->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(expected_run, dump_st_counter); + ASSERT_EQ(expected_run, pst_st_counter); + + expected_run += kInstanceNum; + mock_time_sec += kPeriodSec; + dbi->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(expected_run, dump_st_counter); + ASSERT_EQ(expected_run, pst_st_counter); + + int half = kInstanceNum / 2; + for (int i = 0; i < half; i++) { + delete dbs[i]; + } + + expected_run += (kInstanceNum - half) * 2; + + mock_time_sec += kPeriodSec; + dbi->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + mock_time_sec += kPeriodSec; + dbi->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(expected_run, dump_st_counter); + ASSERT_EQ(expected_run, pst_st_counter); + + for (int i = half; i < kInstanceNum; i++) { + dbs[i]->Close(); + delete dbs[i]; + } +} + +TEST_F(StatsDumpSchedulerTest, MultiEnv) { + constexpr int kDumpPeriodSec = 5; + constexpr int kPersistPeriodSec = 10; + Close(); + Options options1; + options1.stats_dump_period_sec = kDumpPeriodSec; + options1.stats_persist_period_sec = kPersistPeriodSec; + options1.create_if_missing = true; + mock_env_->set_current_time(0); + options1.env = mock_env_.get(); + + Reopen(options1); + + std::unique_ptr mock_env2(new MockTimeEnv(Env::Default())); + Options options2; + options2.stats_dump_period_sec = kDumpPeriodSec; + options2.stats_persist_period_sec = kPersistPeriodSec; + options2.create_if_missing = true; + mock_env2->set_current_time(0); + options1.env = mock_env2.get(); + + std::string dbname = test::PerThreadDBPath("multi_env_test"); + DB* db; + ASSERT_OK(DB::Open(options2, dbname, &db)); + DBImpl* dbi = static_cast_with_check(db); + + ASSERT_EQ(dbi->TEST_GetStatsDumpScheduler(), + dbfull()->TEST_GetStatsDumpScheduler()); + + db->Close(); + delete db; + Close(); +} +#endif // !ROCKSDB_LITE +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index dec18c203..13edd2ac8 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -6,6 +6,8 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "rocksdb/stats_history.h" + #include #include #include @@ -14,69 +16,69 @@ #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" #include "monitoring/persistent_stats_history.h" +#include "monitoring/stats_dump_scheduler.h" #include "options/options_helper.h" #include "port/stack_trace.h" #include "rocksdb/cache.h" #include "rocksdb/convenience.h" #include "rocksdb/rate_limiter.h" -#include "rocksdb/stats_history.h" #include "test_util/sync_point.h" #include "test_util/testutil.h" #include "util/random.h" namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE class StatsHistoryTest : public DBTestBase { public: - StatsHistoryTest() : DBTestBase("/stats_history_test") {} -}; + StatsHistoryTest() + : DBTestBase("/stats_history_test"), + mock_env_(new SafeMockTimeEnv(Env::Default())) {} -class SafeMockTimeEnv : public MockTimeEnv { - public: - explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) { - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - // This is an alternate way (vs. SpecialEnv) of dealing with the fact - // that on some platforms, pthread_cond_timedwait does not appear to - // release the lock for other threads to operate if the deadline time - // is already passed. (TimedWait calls are currently a bad abstraction - // because the deadline parameter is usually computed from Env time, - // but is interpreted in real clock time.) + protected: + std::unique_ptr mock_env_; + + void SetUp() override { SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < this->RealNowMicros()) { - *reinterpret_cast(arg) = this->RealNowMicros() + 1000; - } + "DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { + auto* stats_dump_scheduler_ptr = + reinterpret_cast(arg); + *stats_dump_scheduler_ptr = + StatsDumpTestScheduler::Default(mock_env_.get()); }); -#endif // OS_MACOSX && !NDEBUG - SyncPoint::GetInstance()->EnableProcessing(); } }; -#ifndef ROCKSDB_LITE - TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_dump_period_sec = 5; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + options.stats_dump_period_sec = kPeriodSec; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); int counter = 0; SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; }); Reopen(options); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); - dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); + + // Wait for the first stats persist to finish, as the initial delay could be + // different. + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); ASSERT_GE(counter, 1); - // Test cacel job through SetOptions + // Test cancel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); int old_val = counter; - for (int i = 6; i < 20; ++i) { - dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); + for (int i = 1; i < 20; ++i) { + mock_env_->set_current_time(i + mock_time_sec); } ASSERT_EQ(counter, old_val); Close(); @@ -84,24 +86,38 @@ TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { // Test persistent stats background thread scheduling and cancelling TEST_F(StatsHistoryTest, StatsPersistScheduling) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_persist_period_sec = 5; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + options.stats_persist_period_sec = kPeriodSec; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); int counter = 0; SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); Reopen(options); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + + // Wait for the first stats persist to finish, as the initial delay could be + // different. + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); ASSERT_GE(counter, 1); // Test cacel job through SetOptions - ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); - ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); + int old_val = counter; + mock_time_sec += kPeriodSec * 2; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(counter, old_val); + Close(); } @@ -110,54 +126,62 @@ TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { Options options; options.create_if_missing = true; options.stats_persist_period_sec = 0; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + mock_env_->set_current_time(0); // in seconds + options.env = mock_env_.get(); int counter = 0; SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); Reopen(options); ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + + dbfull()->TEST_WaitForStatsDumpRun([&] { mock_env_->set_current_time(5); }); ASSERT_GE(counter, 1); Close(); } // TODO(Zhongyi): Move persistent stats related tests to a separate file TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_persist_period_sec = 5; + options.stats_persist_period_sec = kPeriodSec; options.statistics = CreateDBStatistics(); - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); - int mock_time = 1; + // make sure the first stats persist to finish + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + // Wait for stats persist to finish - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + std::unique_ptr stats_iter; - db_->GetStatsHistory(0 /*start_time*/, 6 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec + 1, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); // disabled stats snapshots ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); size_t stats_count = 0; for (; stats_iter->Valid(); stats_iter->Next()) { auto stats_map = stats_iter->GetStatsMap(); - ASSERT_EQ(stats_iter->GetStatsTime(), 5); + ASSERT_EQ(stats_iter->GetStatsTime(), mock_time_sec); stats_count += stats_map.size(); } ASSERT_GT(stats_count, 0); // Wait a bit and verify no more stats are found - for (mock_time = 6; mock_time < 20; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); + for (; mock_time_sec < 30; ++mock_time_sec) { + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); } - db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); size_t stats_count_new = 0; for (; stats_iter->Valid(); stats_iter->Next()) { @@ -168,13 +192,14 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { } TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { + constexpr int kPeriodSec = 1; Options options; options.create_if_missing = true; options.statistics = CreateDBStatistics(); - options.stats_persist_period_sec = 1; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + options.stats_persist_period_sec = kPeriodSec; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); @@ -195,11 +220,10 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { ASSERT_OK(Flush()); ASSERT_OK(Delete("sol")); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - int mock_time = 1; // Wait for stats persist to finish - for (; mock_time < 5; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); + for (mock_time_sec = 1; mock_time_sec < kPeriodSec; mock_time_sec++) { + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); } // second round of ops @@ -213,12 +237,14 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { delete iterator; ASSERT_OK(Flush()); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - for (; mock_time < 10; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); + + for (; mock_time_sec < 10; mock_time_sec++) { + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); } + std::unique_ptr stats_iter; - db_->GetStatsHistory(0 /*start_time*/, 10 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, 10, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); size_t stats_count = 0; int slice_count = 0; @@ -233,12 +259,14 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { // capping memory cost at 13000 bytes since one slice is around 10000~13000 ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "13000"}})); ASSERT_EQ(13000, dbfull()->GetDBOptions().stats_history_buffer_size); + // Wait for stats persist to finish - for (; mock_time < 20; ++mock_time) { - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(mock_time); }); + for (; mock_time_sec < 20; mock_time_sec++) { + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); } - db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + + db_->GetStatsHistory(0, 20, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); size_t stats_count_reopen = 0; slice_count = 0; @@ -268,33 +296,47 @@ int countkeys(Iterator* iter) { } TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_persist_period_sec = 5; + options.stats_persist_period_sec = kPeriodSec; options.statistics = CreateDBStatistics(); options.persist_stats_to_disk = true; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ(Get("foo"), "bar"); + // Wait for the first stats persist to finish, as the initial delay could be + // different. + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + // Wait for stats persist to finish - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count1 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(10); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count2 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(15); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count3 = countkeys(iter); @@ -303,15 +345,15 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { ASSERT_GE(key_count3, key_count2); ASSERT_EQ(key_count3 - key_count2, key_count2 - key_count1); std::unique_ptr stats_iter; - db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec + 1, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); size_t stats_count = 0; int slice_count = 0; int non_zero_count = 0; - for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + for (int i = 2; stats_iter->Valid(); stats_iter->Next(), i++) { slice_count++; auto stats_map = stats_iter->GetStatsMap(); - ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + ASSERT_EQ(stats_iter->GetStatsTime(), kPeriodSec * i - 1); for (auto& stat : stats_map) { if (stat.second != 0) { non_zero_count++; @@ -324,7 +366,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { ASSERT_EQ(stats_count, key_count3 - 2); // verify reopen will not cause data loss ReopenWithColumnFamilies({"default", "pikachu"}, options); - db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec + 1, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); size_t stats_count_reopen = 0; int slice_count_reopen = 0; @@ -339,6 +381,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { } stats_count_reopen += stats_map.size(); } + ASSERT_EQ(non_zero_count, non_zero_count_recover); ASSERT_EQ(slice_count, slice_count_reopen); ASSERT_EQ(stats_count, stats_count_reopen); @@ -348,52 +391,68 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { // Test persisted stats matches the value found in options.statistics and // the stats value retains after DB reopen TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_persist_period_sec = 5; + options.stats_persist_period_sec = kPeriodSec; options.statistics = CreateDBStatistics(); options.persist_stats_to_disk = true; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); std::map stats_map_before; ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ(Get("foo"), "bar"); + // Wait for the first stats persist to finish, as the initial delay could be + // different. + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + // Wait for stats persist to finish - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(10); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(15); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(20); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); std::map stats_map_after; ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_after)); std::unique_ptr stats_iter; - db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec + 1, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); std::string sample = "rocksdb.num.iterator.deleted"; uint64_t recovered_value = 0; - for (int i = 1; stats_iter->Valid(); stats_iter->Next(), ++i) { + for (int i = 2; stats_iter->Valid(); stats_iter->Next(), ++i) { auto stats_map = stats_iter->GetStatsMap(); - ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + ASSERT_EQ(stats_iter->GetStatsTime(), kPeriodSec * i - 1); for (const auto& stat : stats_map) { if (sample.compare(stat.first) == 0) { recovered_value += stat.second; @@ -404,12 +463,12 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { // test stats value retains after recovery ReopenWithColumnFamilies({"default", "pikachu"}, options); - db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec + 1, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); uint64_t new_recovered_value = 0; - for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + for (int i = 2; stats_iter->Valid(); stats_iter->Next(), i++) { auto stats_map = stats_iter->GetStatsMap(); - ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + ASSERT_EQ(stats_iter->GetStatsTime(), kPeriodSec * i - 1); for (const auto& stat : stats_map) { if (sample.compare(stat.first) == 0) { new_recovered_value += stat.second; @@ -426,14 +485,15 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { // TODO(Zhongyi): add test for different format versions TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; - options.stats_persist_period_sec = 5; + options.stats_persist_period_sec = kPeriodSec; options.statistics = CreateDBStatistics(); options.persist_stats_to_disk = true; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); ASSERT_OK(TryReopen(options)); CreateColumnFamilies({"one", "two", "three"}, options); ASSERT_OK(Put(1, "foo", "bar")); @@ -442,7 +502,15 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { CreateColumnFamilies({"four"}, options); ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); ASSERT_EQ(Get(2, "foo"), "bar"); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + + // make sure the first stats persist to finish + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count = countkeys(iter); @@ -451,7 +519,7 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { uint64_t num_write_wal = 0; std::string sample = "rocksdb.write.wal"; std::unique_ptr stats_iter; - db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); for (; stats_iter->Valid(); stats_iter->Next()) { auto stats_map = stats_iter->GetStatsMap(); @@ -487,7 +555,7 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName, &handle)); // verify stats is not affected by prior failed CF creation - db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + db_->GetStatsHistory(0, mock_time_sec, &stats_iter); ASSERT_TRUE(stats_iter != nullptr); num_write_wal = 0; for (; stats_iter->Valid(); stats_iter->Next()) { @@ -525,17 +593,25 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { } TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { + constexpr int kPeriodSec = 5; Options options; options.create_if_missing = true; options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb - options.stats_persist_period_sec = 5; + options.stats_persist_period_sec = kPeriodSec; options.statistics = CreateDBStatistics(); options.persist_stats_to_disk = true; - std::unique_ptr mock_env(new SafeMockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds - options.env = mock_env.get(); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + options.env = mock_env_.get(); CreateColumnFamilies({"pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options); + + // Wait for the first stats persist to finish, as the initial delay could be + // different. + mock_time_sec += kPeriodSec - 1; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); + ColumnFamilyData* cfd_default = static_cast(dbfull()->DefaultColumnFamily()) ->cfd(); @@ -551,7 +627,10 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_EQ("v0", Get("foo")); ASSERT_OK(Put(1, "Eevee", "v0")); ASSERT_EQ("v0", Get(1, "Eevee")); - dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); // writing to all three cf, flush default cf // LogNumbers: default: 14, stats: 4, pikachu: 4 ASSERT_OK(Flush()); @@ -574,8 +653,10 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put("bar2", "v2")); ASSERT_EQ("v2", Get("bar2")); ASSERT_EQ("v2", Get("foo2")); - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(10); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); // writing to default and stats cf, flushing default cf // LogNumbers: default: 19, stats: 19, pikachu: 19 ASSERT_OK(Flush()); @@ -588,8 +669,10 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_EQ("v3", Get("foo3")); ASSERT_OK(Put(1, "Jolteon", "v3")); ASSERT_EQ("v3", Get(1, "Jolteon")); - dbfull()->TEST_WaitForPersistStatsRun( - [&] { mock_env->set_current_time(15); }); + + mock_time_sec += kPeriodSec; + dbfull()->TEST_WaitForStatsDumpRun( + [&] { mock_env_->set_current_time(mock_time_sec); }); // writing to all three cf, flushing test cf // LogNumbers: default: 19, stats: 19, pikachu: 22 ASSERT_OK(Flush(1)); diff --git a/src.mk b/src.mk index a684311be..c7bd65fcc 100644 --- a/src.mk +++ b/src.mk @@ -115,6 +115,7 @@ LIB_SOURCES = \ monitoring/perf_level.cc \ monitoring/persistent_stats_history.cc \ monitoring/statistics.cc \ + monitoring/stats_dump_scheduler.cc \ monitoring/thread_status_impl.cc \ monitoring/thread_status_updater.cc \ monitoring/thread_status_updater_debug.cc \ @@ -433,6 +434,7 @@ TEST_MAIN_SOURCES = \ monitoring/histogram_test.cc \ monitoring/iostats_context_test.cc \ monitoring/statistics_test.cc \ + monitoring/stats_dump_scheduler_test.cc \ monitoring/stats_history_test.cc \ options/options_settable_test.cc \ options/options_test.cc \ diff --git a/util/timer.h b/util/timer.h index 7a38c3260..de9e9d1ab 100644 --- a/util/timer.h +++ b/util/timer.h @@ -42,23 +42,31 @@ class Timer { running_(false), executing_task_(false) {} - ~Timer() {} - + // Add a new function. If the fn_name already exists, overriding it, + // regardless if the function is pending removed (invalid) or not. // repeat_every_us == 0 means do not repeat void Add(std::function fn, const std::string& fn_name, uint64_t start_after_us, uint64_t repeat_every_us) { - std::unique_ptr fn_info(new FunctionInfo( - std::move(fn), - fn_name, - env_->NowMicros() + start_after_us, - repeat_every_us)); - - InstrumentedMutexLock l(&mutex_); - heap_.push(fn_info.get()); - map_.emplace(std::make_pair(fn_name, std::move(fn_info))); - cond_var_.Signal(); + std::unique_ptr fn_info( + new FunctionInfo(std::move(fn), fn_name, + env_->NowMicros() + start_after_us, repeat_every_us)); + { + InstrumentedMutexLock l(&mutex_); + auto it = map_.find(fn_name); + if (it == map_.end()) { + heap_.push(fn_info.get()); + map_.emplace(std::make_pair(fn_name, std::move(fn_info))); + } else { + // If it already exists, overriding it. + it->second->fn = std::move(fn_info->fn); + it->second->valid = true; + it->second->next_run_time_us = env_->NowMicros() + start_after_us; + it->second->repeat_every_us = repeat_every_us; + } + } + cond_var_.SignalAll(); } void Cancel(const std::string& fn_name) { @@ -119,6 +127,45 @@ class Timer { return true; } + bool HasPendingTask() const { + InstrumentedMutexLock l(&mutex_); + for (auto it = map_.begin(); it != map_.end(); it++) { + if (it->second->IsValid()) { + return true; + } + } + return false; + } + +#ifndef NDEBUG + void TEST_WaitForRun(std::function callback = nullptr) { + InstrumentedMutexLock l(&mutex_); + while (!heap_.empty() && + heap_.top()->next_run_time_us <= env_->NowMicros()) { + cond_var_.TimedWait(env_->NowMicros() + 1000); + } + if (callback != nullptr) { + callback(); + } + cond_var_.SignalAll(); + do { + cond_var_.TimedWait(env_->NowMicros() + 1000); + } while (!heap_.empty() && + heap_.top()->next_run_time_us <= env_->NowMicros()); + } + + size_t TEST_GetPendingTaskNum() const { + InstrumentedMutexLock l(&mutex_); + size_t ret = 0; + for (auto it = map_.begin(); it != map_.end(); it++) { + if (it->second->IsValid()) { + ret++; + } + } + return ret; + } +#endif // NDEBUG + private: void Run() { @@ -142,10 +189,13 @@ class Timer { } if (current_fn->next_run_time_us <= env_->NowMicros()) { + // make a copy of the function so it won't be changed after + // mutex_.unlock. + std::function fn = current_fn->fn; executing_task_ = true; mutex_.Unlock(); // Execute the work - current_fn->fn(); + fn(); mutex_.Lock(); executing_task_ = false; cond_var_.SignalAll(); @@ -243,7 +293,7 @@ class Timer { Env* const env_; // This mutex controls both the heap_ and the map_. It needs to be held for // making any changes in them. - InstrumentedMutex mutex_; + mutable InstrumentedMutex mutex_; InstrumentedCondVar cond_var_; std::unique_ptr thread_; bool running_; diff --git a/util/timer_test.cc b/util/timer_test.cc index 18b0b90c9..7e02516c6 100644 --- a/util/timer_test.cc +++ b/util/timer_test.cc @@ -356,6 +356,39 @@ TEST_F(TimerTest, ShutdownRunningTask) { delete value; } +TEST_F(TimerTest, AddSameFuncNameTest) { + mock_env_->set_current_time(0); + Timer timer(mock_env_.get()); + + ASSERT_TRUE(timer.Start()); + + int func_counter1 = 0; + timer.Add([&] { func_counter1++; }, "duplicated_func", 1 * kSecond, + 5 * kSecond); + + int func2_counter = 0; + timer.Add([&] { func2_counter++; }, "func2", 1 * kSecond, 4 * kSecond); + + // New function with the same name should override the existing one + int func_counter2 = 0; + timer.Add([&] { func_counter2++; }, "duplicated_func", 1 * kSecond, + 5 * kSecond); + + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(1); }); + + ASSERT_EQ(func_counter1, 0); + ASSERT_EQ(func2_counter, 1); + ASSERT_EQ(func_counter2, 1); + + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(6); }); + + ASSERT_EQ(func_counter1, 0); + ASSERT_EQ(func2_counter, 2); + ASSERT_EQ(func_counter2, 2); + + ASSERT_TRUE(timer.Shutdown()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {