Periodically flush info log out of application buffer (#7488)

Summary:
This PR schedules a background thread (shared across all DB instances)
to flush info log every ten seconds. This improves debuggability in case
of RocksDB hanging since it ensures the log messages leading up to the hang
will eventually become visible in the log.

The bulk of this PR is moving monitoring/stats_dump_scheduler* to db/periodic_work_scheduler*
and making the corresponding name changes since now the scheduler handles info
log flushing, not just stats dumping.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7488

Reviewed By: riversand963

Differential Revision: D24065165

Pulled By: ajkr

fbshipit-source-id: 339c47a0ff43b79fdbd055fbd9fefbb6f9d8d3b5
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent 94fc676d3f
commit 1e00909730
  1. 4
      CMakeLists.txt
  2. 2
      Makefile
  3. 18
      TARGETS
  4. 2
      db/compacted_db_impl.cc
  5. 41
      db/db_impl/db_impl.cc
  6. 20
      db/db_impl/db_impl.h
  7. 10
      db/db_impl/db_impl_debug.cc
  8. 4
      db/db_impl/db_impl_open.cc
  9. 35
      db/periodic_work_scheduler.cc
  10. 70
      db/periodic_work_scheduler.h
  11. 83
      db/periodic_work_scheduler_test.cc
  12. 64
      monitoring/stats_dump_scheduler.h
  13. 12
      monitoring/stats_history_test.cc
  14. 4
      src.mk

@ -621,6 +621,7 @@ set(SOURCES
db/merge_helper.cc db/merge_helper.cc
db/merge_operator.cc db/merge_operator.cc
db/output_validator.cc db/output_validator.cc
db/periodic_work_scheduler.cc
db/range_del_aggregator.cc db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc db/range_tombstone_fragmenter.cc
db/repair.cc db/repair.cc
@ -678,7 +679,6 @@ set(SOURCES
monitoring/perf_level.cc monitoring/perf_level.cc
monitoring/persistent_stats_history.cc monitoring/persistent_stats_history.cc
monitoring/statistics.cc monitoring/statistics.cc
monitoring/stats_dump_scheduler.cc
monitoring/thread_status_impl.cc monitoring/thread_status_impl.cc
monitoring/thread_status_updater.cc monitoring/thread_status_updater.cc
monitoring/thread_status_util.cc monitoring/thread_status_util.cc
@ -1103,6 +1103,7 @@ if(WITH_TESTS)
db/merge_test.cc db/merge_test.cc
db/options_file_test.cc db/options_file_test.cc
db/perf_context_test.cc db/perf_context_test.cc
db/periodic_work_scheduler_test.cc
db/plain_table_db_test.cc db/plain_table_db_test.cc
db/prefix_test.cc db/prefix_test.cc
db/range_del_aggregator_test.cc db/range_del_aggregator_test.cc
@ -1134,7 +1135,6 @@ if(WITH_TESTS)
monitoring/histogram_test.cc monitoring/histogram_test.cc
monitoring/iostats_context_test.cc monitoring/iostats_context_test.cc
monitoring/statistics_test.cc monitoring/statistics_test.cc
monitoring/stats_dump_scheduler_test.cc
monitoring/stats_history_test.cc monitoring/stats_history_test.cc
options/configurable_test.cc options/configurable_test.cc
options/options_settable_test.cc options/options_settable_test.cc

@ -1904,7 +1904,7 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA
timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
stats_dump_scheduler_test: $(OBJ_DIR)/monitoring/stats_dump_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) periodic_work_scheduler_test: $(OBJ_DIR)/db/periodic_work_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)

@ -185,6 +185,7 @@ cpp_library(
"db/merge_helper.cc", "db/merge_helper.cc",
"db/merge_operator.cc", "db/merge_operator.cc",
"db/output_validator.cc", "db/output_validator.cc",
"db/periodic_work_scheduler.cc",
"db/range_del_aggregator.cc", "db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc", "db/range_tombstone_fragmenter.cc",
"db/repair.cc", "db/repair.cc",
@ -245,7 +246,6 @@ cpp_library(
"monitoring/perf_level.cc", "monitoring/perf_level.cc",
"monitoring/persistent_stats_history.cc", "monitoring/persistent_stats_history.cc",
"monitoring/statistics.cc", "monitoring/statistics.cc",
"monitoring/stats_dump_scheduler.cc",
"monitoring/thread_status_impl.cc", "monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc", "monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc", "monitoring/thread_status_updater_debug.cc",
@ -472,6 +472,7 @@ cpp_library(
"db/merge_helper.cc", "db/merge_helper.cc",
"db/merge_operator.cc", "db/merge_operator.cc",
"db/output_validator.cc", "db/output_validator.cc",
"db/periodic_work_scheduler.cc",
"db/range_del_aggregator.cc", "db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc", "db/range_tombstone_fragmenter.cc",
"db/repair.cc", "db/repair.cc",
@ -532,7 +533,6 @@ cpp_library(
"monitoring/perf_level.cc", "monitoring/perf_level.cc",
"monitoring/persistent_stats_history.cc", "monitoring/persistent_stats_history.cc",
"monitoring/statistics.cc", "monitoring/statistics.cc",
"monitoring/stats_dump_scheduler.cc",
"monitoring/thread_status_impl.cc", "monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc", "monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc", "monitoring/thread_status_updater_debug.cc",
@ -1681,6 +1681,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"periodic_work_scheduler_test",
"db/periodic_work_scheduler_test.cc",
"serial",
[],
[],
],
[ [
"persistent_cache_test", "persistent_cache_test",
"utilities/persistent_cache/persistent_cache_test.cc", "utilities/persistent_cache/persistent_cache_test.cc",
@ -1814,13 +1821,6 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"stats_dump_scheduler_test",
"monitoring/stats_dump_scheduler_test.cc",
"serial",
[],
[],
],
[ [
"stats_history_test", "stats_history_test",
"monitoring/stats_history_test.cc", "monitoring/stats_history_test.cc",

@ -156,7 +156,7 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname)); std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options); Status s = db->Init(options);
if (s.ok()) { if (s.ok()) {
db->StartStatsDumpScheduler(); db->StartPeriodicWorkScheduler();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log, ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode"); "Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log); LogFlush(db->immutable_db_options_.info_log);

@ -44,6 +44,7 @@
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/merge_helper.h" #include "db/merge_helper.h"
#include "db/periodic_work_scheduler.h"
#include "db/range_tombstone_fragmenter.h" #include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
@ -65,7 +66,6 @@
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h" #include "monitoring/thread_status_util.h"
#include "options/cf_options.h" #include "options/cf_options.h"
@ -207,7 +207,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
refitting_level_(false), refitting_level_(false),
opened_successfully_(false), opened_successfully_(false),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
stats_dump_scheduler_(nullptr), periodic_work_scheduler_(nullptr),
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
two_write_queues_(options.two_write_queues), two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush), manual_wal_flush_(options.manual_wal_flush),
@ -446,8 +446,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
"Shutdown: canceling all background work"); "Shutdown: canceling all background work");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (stats_dump_scheduler_ != nullptr) { if (periodic_work_scheduler_ != nullptr) {
stats_dump_scheduler_->Unregister(this); periodic_work_scheduler_->Unregister(this);
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
@ -685,17 +685,17 @@ void DBImpl::PrintStatistics() {
} }
} }
void DBImpl::StartStatsDumpScheduler() { void DBImpl::StartPeriodicWorkScheduler() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
stats_dump_scheduler_ = StatsDumpScheduler::Default(); periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartStatsDumpScheduler:Init", TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
&stats_dump_scheduler_); &periodic_work_scheduler_);
} }
stats_dump_scheduler_->Register(this, periodic_work_scheduler_->Register(
mutable_db_options_.stats_dump_period_sec, this, mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec); mutable_db_options_.stats_persist_period_sec);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} }
@ -907,6 +907,14 @@ void DBImpl::DumpStats() {
PrintStatistics(); PrintStatistics();
} }
void DBImpl::FlushInfoLog() {
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
LogFlush(immutable_db_options_.info_log);
}
Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family, Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
int max_entries_to_print, int max_entries_to_print,
std::string* out_str) { std::string* out_str) {
@ -1082,20 +1090,13 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.stats_dump_period_sec || mutable_db_options_.stats_dump_period_sec ||
new_options.stats_persist_period_sec != new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) { mutable_db_options_.stats_persist_period_sec) {
if (stats_dump_scheduler_) {
mutex_.Unlock(); mutex_.Unlock();
stats_dump_scheduler_->Unregister(this); periodic_work_scheduler_->Unregister(this);
mutex_.Lock(); periodic_work_scheduler_->Register(
} this, new_options.stats_dump_period_sec,
if (new_options.stats_dump_period_sec > 0 ||
new_options.stats_persist_period_sec > 0) {
mutex_.Unlock();
stats_dump_scheduler_->Register(this,
new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec); new_options.stats_persist_period_sec);
mutex_.Lock(); mutex_.Lock();
} }
}
write_controller_.set_max_delayed_write_rate( write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate); new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1 table_cache_.get()->SetCapacity(new_options.max_open_files == -1

@ -70,9 +70,9 @@ class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator; class InMemoryStatsHistoryIterator;
class MemTable; class MemTable;
class PersistentStatsHistoryIterator; class PersistentStatsHistoryIterator;
class StatsDumpScheduler; class PeriodicWorkScheduler;
#ifndef NDEBUG #ifndef NDEBUG
class StatsDumpTestScheduler; class PeriodicWorkTestScheduler;
#endif // !NDEBUG #endif // !NDEBUG
class TableCache; class TableCache;
class TaskLimiterToken; class TaskLimiterToken;
@ -1002,7 +1002,7 @@ class DBImpl : public DB {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
StatsDumpTestScheduler* TEST_GetStatsDumpScheduler() const; PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const;
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
#endif // NDEBUG #endif // NDEBUG
@ -1013,6 +1013,9 @@ class DBImpl : public DB {
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG
void DumpStats(); void DumpStats();
// flush LOG out of application buffer
void FlushInfoLog();
protected: protected:
const std::string dbname_; const std::string dbname_;
std::string db_id_; std::string db_id_;
@ -1652,7 +1655,7 @@ class DBImpl : public DB {
LogBuffer* log_buffer); LogBuffer* log_buffer);
// Schedule background tasks // Schedule background tasks
void StartStatsDumpScheduler(); void StartPeriodicWorkScheduler();
void PrintStatistics(); void PrintStatistics();
@ -2111,10 +2114,11 @@ class DBImpl : public DB {
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_; std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// Scheduler to run DumpStats() and PersistStats(). Currently, it always use // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog().
// a global instance from StatsDumpScheduler::Default(). Only in unittest, it // Currently, it always use a global instance from
// can be overrided by StatsDumpTestSchduler. // PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by
StatsDumpScheduler* stats_dump_scheduler_; // PeriodicWorkTestScheduler.
PeriodicWorkScheduler* periodic_work_scheduler_;
#endif #endif
// When set, we use a separate queue for writes that don't write to memtable. // When set, we use a separate queue for writes that don't write to memtable.

@ -12,7 +12,7 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/error_handler.h" #include "db/error_handler.h"
#include "monitoring/stats_dump_scheduler.h" #include "db/periodic_work_scheduler.h"
#include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_updater.h"
#include "util/cast_util.h" #include "util/cast_util.h"
@ -274,14 +274,14 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const { void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const {
if (stats_dump_scheduler_ != nullptr) { if (periodic_work_scheduler_ != nullptr) {
static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_) static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_)
->TEST_WaitForRun(callback); ->TEST_WaitForRun(callback);
} }
} }
StatsDumpTestScheduler* DBImpl::TEST_GetStatsDumpScheduler() const { PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const {
return static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_); return static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_);
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -11,12 +11,12 @@
#include "db/builder.h" #include "db/builder.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/error_handler.h" #include "db/error_handler.h"
#include "db/periodic_work_scheduler.h"
#include "env/composite_env_wrapper.h" #include "env/composite_env_wrapper.h"
#include "file/read_write_util.h" #include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
#include "monitoring/stats_dump_scheduler.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
@ -1763,7 +1763,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
persist_options_status.ToString().c_str()); persist_options_status.ToString().c_str());
} }
if (s.ok()) { if (s.ok()) {
impl->StartStatsDumpScheduler(); impl->StartPeriodicWorkScheduler();
} else { } else {
for (auto* h : *handles) { for (auto* h : *handles) {
delete h; delete h;

@ -3,7 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h" #include "db/periodic_work_scheduler.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "util/cast_util.h" #include "util/cast_util.h"
@ -11,16 +11,16 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
StatsDumpScheduler::StatsDumpScheduler(Env* env) { PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) {
timer = std::unique_ptr<Timer>(new Timer(env)); timer = std::unique_ptr<Timer>(new Timer(env));
} }
void StatsDumpScheduler::Register(DBImpl* dbi, void PeriodicWorkScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) { unsigned int stats_persist_period_sec) {
static std::atomic<uint64_t> initial_delay(0); static std::atomic<uint64_t> initial_delay(0);
if (stats_dump_period_sec > 0) {
timer->Start(); timer->Start();
if (stats_dump_period_sec > 0) {
timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"),
initial_delay.fetch_add(1) % initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_dump_period_sec) * static_cast<uint64_t>(stats_dump_period_sec) *
@ -28,32 +28,37 @@ void StatsDumpScheduler::Register(DBImpl* dbi,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond); static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond);
} }
if (stats_persist_period_sec > 0) { if (stats_persist_period_sec > 0) {
timer->Start();
timer->Add( timer->Add(
[dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"), [dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"),
initial_delay.fetch_add(1) % initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond, static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond); static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond);
} }
timer->Add([dbi]() { dbi->FlushInfoLog(); },
GetTaskName(dbi, "flush_info_log"),
initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec *
kMicrosInSecond,
kDefaultFlushInfoLogPeriodSec * kMicrosInSecond);
} }
void StatsDumpScheduler::Unregister(DBImpl* dbi) { void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
timer->Cancel(GetTaskName(dbi, "dump_st")); timer->Cancel(GetTaskName(dbi, "dump_st"));
timer->Cancel(GetTaskName(dbi, "pst_st")); timer->Cancel(GetTaskName(dbi, "pst_st"));
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
if (!timer->HasPendingTask()) { if (!timer->HasPendingTask()) {
timer->Shutdown(); timer->Shutdown();
} }
} }
StatsDumpScheduler* StatsDumpScheduler::Default() { PeriodicWorkScheduler* PeriodicWorkScheduler::Default() {
// Always use the default Env for the scheduler, as we only use the NowMicros // Always use the default Env for the scheduler, as we only use the NowMicros
// which is the same for all env. // which is the same for all env.
// The Env could only be overridden in test. // The Env could only be overridden in test.
static StatsDumpScheduler scheduler(Env::Default()); static PeriodicWorkScheduler scheduler(Env::Default());
return &scheduler; return &scheduler;
} }
std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi, std::string PeriodicWorkScheduler::GetTaskName(DBImpl* dbi,
const std::string& func_name) { const std::string& func_name) {
std::string db_session_id; std::string db_session_id;
// TODO: Should this error be ignored? // TODO: Should this error be ignored?
@ -67,8 +72,8 @@ std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi,
// timer, so only re-create it when there's no running task. Otherwise, return // timer, so only re-create it when there's no running task. Otherwise, return
// the existing scheduler. Which means if the unittest needs to update MockEnv, // the existing scheduler. Which means if the unittest needs to update MockEnv,
// Close all db instances and then re-open them. // Close all db instances and then re-open them.
StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) { PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(Env* env) {
static StatsDumpTestScheduler scheduler(env); static PeriodicWorkTestScheduler scheduler(env);
static port::Mutex mutex; static port::Mutex mutex;
{ {
MutexLock l(&mutex); MutexLock l(&mutex);
@ -81,22 +86,22 @@ StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) {
return &scheduler; return &scheduler;
} }
void StatsDumpTestScheduler::TEST_WaitForRun( void PeriodicWorkTestScheduler::TEST_WaitForRun(
std::function<void()> callback) const { std::function<void()> callback) const {
if (timer != nullptr) { if (timer != nullptr) {
timer->TEST_WaitForRun(callback); timer->TEST_WaitForRun(callback);
} }
} }
size_t StatsDumpTestScheduler::TEST_GetValidTaskNum() const { size_t PeriodicWorkTestScheduler::TEST_GetValidTaskNum() const {
if (timer != nullptr) { if (timer != nullptr) {
return timer->TEST_GetPendingTaskNum(); return timer->TEST_GetPendingTaskNum();
} }
return 0; return 0;
} }
StatsDumpTestScheduler::StatsDumpTestScheduler(Env* env) PeriodicWorkTestScheduler::PeriodicWorkTestScheduler(Env* env)
: StatsDumpScheduler(env) {} : PeriodicWorkScheduler(env) {}
#endif // !NDEBUG #endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,70 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "db/db_impl/db_impl.h"
#include "util/timer.h"
namespace ROCKSDB_NAMESPACE {
// PeriodicWorkScheduler is a singleton object, which is scheduling/running
// DumpStats(), PersistStats(), and FlushInfoLog() for all DB instances. All DB
// instances use the same object from `Default()`.
//
// Internally, it uses a single threaded timer object to run the periodic work
// functions. Timer thread will always be started since the info log flushing
// cannot be disabled.
class PeriodicWorkScheduler {
public:
static PeriodicWorkScheduler* Default();
PeriodicWorkScheduler() = delete;
PeriodicWorkScheduler(const PeriodicWorkScheduler&) = delete;
PeriodicWorkScheduler(PeriodicWorkScheduler&&) = delete;
PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete;
PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi);
// Periodically flush info log out of application buffer at a low frequency.
// This improves debuggability in case of RocksDB hanging since it ensures the
// log messages leading up to the hang will eventually become visible in the
// log.
static const uint64_t kDefaultFlushInfoLogPeriodSec = 10;
protected:
std::unique_ptr<Timer> timer;
explicit PeriodicWorkScheduler(Env* env);
private:
std::string GetTaskName(DBImpl* dbi, const std::string& func_name);
};
#ifndef NDEBUG
// PeriodicWorkTestScheduler is for unittest, which can specify the Env like
// SafeMockTimeEnv. It also contains functions for unittest.
class PeriodicWorkTestScheduler : public PeriodicWorkScheduler {
public:
static PeriodicWorkTestScheduler* Default(Env* env);
void TEST_WaitForRun(std::function<void()> callback) const;
size_t TEST_GetValidTaskNum() const;
private:
explicit PeriodicWorkTestScheduler(Env* env);
};
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -3,17 +3,17 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h" #include "db/periodic_work_scheduler.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
class StatsDumpSchedulerTest : public DBTestBase { class PeriodicWorkSchedulerTest : public DBTestBase {
public: public:
StatsDumpSchedulerTest() PeriodicWorkSchedulerTest()
: DBTestBase("/stats_dump_scheduler_test", /*env_do_fsync=*/true), : DBTestBase("/periodic_work_scheduler_test", /*env_do_fsync=*/true),
mock_env_(new MockTimeEnv(Env::Default())) {} mock_env_(new MockTimeEnv(Env::Default())) {}
protected: protected:
@ -22,17 +22,18 @@ class StatsDumpSchedulerTest : public DBTestBase {
void SetUp() override { void SetUp() override {
mock_env_->InstallTimedWaitFixCallback(); mock_env_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr = auto* periodic_work_scheduler_ptr =
reinterpret_cast<StatsDumpScheduler**>(arg); reinterpret_cast<PeriodicWorkScheduler**>(arg);
*stats_dump_scheduler_ptr = *periodic_work_scheduler_ptr =
StatsDumpTestScheduler::Default(mock_env_.get()); PeriodicWorkTestScheduler::Default(mock_env_.get());
}); });
} }
}; };
TEST_F(StatsDumpSchedulerTest, Basic) { TEST_F(PeriodicWorkSchedulerTest, Basic) {
constexpr int kPeriodSec = 5; constexpr unsigned int kPeriodSec =
PeriodicWorkScheduler::kDefaultFlushInfoLogPeriodSec;
Close(); Close();
Options options; Options options;
options.stats_dump_period_sec = kPeriodSec; options.stats_dump_period_sec = kPeriodSec;
@ -47,34 +48,44 @@ TEST_F(StatsDumpSchedulerTest, Basic) {
int pst_st_counter = 0; int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning", SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; }); [&](void*) { pst_st_counter++; });
int flush_info_log_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushInfoLog:StartRunning",
[&](void*) { flush_info_log_counter++; });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
Reopen(options); Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec);
dbfull()->TEST_WaitForStatsDumpRun( ASSERT_GT(kPeriodSec, 1u);
[&] { mock_env_->MockSleepForSeconds(kPeriodSec - 1); }); dbfull()->TEST_WaitForStatsDumpRun([&] {
mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec) - 1);
});
auto scheduler = dbfull()->TEST_GetStatsDumpScheduler(); auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_NE(nullptr, scheduler); ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum()); ASSERT_EQ(3, scheduler->TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter); ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter); ASSERT_EQ(1, pst_st_counter);
ASSERT_EQ(1, flush_info_log_counter);
dbfull()->TEST_WaitForStatsDumpRun( dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->MockSleepForSeconds(kPeriodSec); }); [&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(2, dump_st_counter); ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter); ASSERT_EQ(2, pst_st_counter);
ASSERT_EQ(2, flush_info_log_counter);
dbfull()->TEST_WaitForStatsDumpRun( dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->MockSleepForSeconds(kPeriodSec); }); [&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter); ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter); ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(3, flush_info_log_counter);
// Disable scheduler with SetOption // Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions( ASSERT_OK(dbfull()->SetDBOptions(
@ -82,27 +93,35 @@ TEST_F(StatsDumpSchedulerTest, Basic) {
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler(); // Info log flush should still run.
ASSERT_EQ(0u, scheduler->TEST_GetValidTaskNum()); dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(4, flush_info_log_counter);
scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_EQ(1u, scheduler->TEST_GetValidTaskNum());
// Re-enable one task // Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}})); ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler(); scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_NE(nullptr, scheduler); ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(1, scheduler->TEST_GetValidTaskNum()); ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum());
dump_st_counter = 0;
dbfull()->TEST_WaitForStatsDumpRun( dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->MockSleepForSeconds(kPeriodSec); }); [&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(1, dump_st_counter); ASSERT_EQ(4, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(5, flush_info_log_counter);
Close(); Close();
} }
TEST_F(StatsDumpSchedulerTest, MultiInstances) { TEST_F(PeriodicWorkSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5; constexpr int kPeriodSec = 5;
const int kInstanceNum = 10; const int kInstanceNum = 10;
@ -129,8 +148,8 @@ TEST_F(StatsDumpSchedulerTest, MultiInstances) {
} }
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]); auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
auto scheduler = dbi->TEST_GetStatsDumpScheduler(); auto scheduler = dbi->TEST_GetPeriodicWorkScheduler();
ASSERT_EQ(kInstanceNum * 2, scheduler->TEST_GetValidTaskNum()); ASSERT_EQ(kInstanceNum * 3, scheduler->TEST_GetValidTaskNum());
int expected_run = kInstanceNum; int expected_run = kInstanceNum;
dbi->TEST_WaitForStatsDumpRun( dbi->TEST_WaitForStatsDumpRun(
@ -170,7 +189,7 @@ TEST_F(StatsDumpSchedulerTest, MultiInstances) {
} }
} }
TEST_F(StatsDumpSchedulerTest, MultiEnv) { TEST_F(PeriodicWorkSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5; constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10; constexpr int kPersistPeriodSec = 10;
Close(); Close();
@ -194,8 +213,8 @@ TEST_F(StatsDumpSchedulerTest, MultiEnv) {
ASSERT_OK(DB::Open(options2, dbname, &db)); ASSERT_OK(DB::Open(options2, dbname, &db));
DBImpl* dbi = static_cast_with_check<DBImpl>(db); DBImpl* dbi = static_cast_with_check<DBImpl>(db);
ASSERT_EQ(dbi->TEST_GetStatsDumpScheduler(), ASSERT_EQ(dbi->TEST_GetPeriodicWorkScheduler(),
dbfull()->TEST_GetStatsDumpScheduler()); dbfull()->TEST_GetPeriodicWorkScheduler());
db->Close(); db->Close();
delete db; delete db;

@ -1,64 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "db/db_impl/db_impl.h"
#include "util/timer.h"
namespace ROCKSDB_NAMESPACE {
// StatsDumpScheduler is a singleton object, which is scheduling/running
// DumpStats() and PersistStats() for all DB instances. All DB instances uses
// the same object from `Default()`.
// Internally, it uses a single threaded timer object to run the stats dump
// functions. Timer thread won't be started if there's no function needs to run,
// for example, option.stats_dump_period_sec and option.stats_persist_period_sec
// are set to 0.
class StatsDumpScheduler {
public:
static StatsDumpScheduler* Default();
StatsDumpScheduler() = delete;
StatsDumpScheduler(const StatsDumpScheduler&) = delete;
StatsDumpScheduler(StatsDumpScheduler&&) = delete;
StatsDumpScheduler& operator=(const StatsDumpScheduler&) = delete;
StatsDumpScheduler& operator=(StatsDumpScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi);
protected:
std::unique_ptr<Timer> timer;
explicit StatsDumpScheduler(Env* env);
private:
std::string GetTaskName(DBImpl* dbi, const std::string& func_name);
};
#ifndef NDEBUG
// StatsDumpTestScheduler is for unittest, which can specify the Env like
// SafeMockTimeEnv. It also contains functions for unittest.
class StatsDumpTestScheduler : public StatsDumpScheduler {
public:
static StatsDumpTestScheduler* Default(Env* env);
void TEST_WaitForRun(std::function<void()> callback) const;
size_t TEST_GetValidTaskNum() const;
private:
explicit StatsDumpTestScheduler(Env* env);
};
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -15,8 +15,8 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/periodic_work_scheduler.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
#include "monitoring/stats_dump_scheduler.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
@ -41,11 +41,11 @@ class StatsHistoryTest : public DBTestBase {
void SetUp() override { void SetUp() override {
mock_env_->InstallTimedWaitFixCallback(); mock_env_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr = auto* periodic_work_scheduler_ptr =
reinterpret_cast<StatsDumpScheduler**>(arg); reinterpret_cast<PeriodicWorkScheduler**>(arg);
*stats_dump_scheduler_ptr = *periodic_work_scheduler_ptr =
StatsDumpTestScheduler::Default(mock_env_.get()); PeriodicWorkTestScheduler::Default(mock_env_.get());
}); });
} }
}; };

@ -57,6 +57,7 @@ LIB_SOURCES = \
db/merge_helper.cc \ db/merge_helper.cc \
db/merge_operator.cc \ db/merge_operator.cc \
db/output_validator.cc \ db/output_validator.cc \
db/periodic_work_scheduler.cc \
db/range_del_aggregator.cc \ db/range_del_aggregator.cc \
db/range_tombstone_fragmenter.cc \ db/range_tombstone_fragmenter.cc \
db/repair.cc \ db/repair.cc \
@ -117,7 +118,6 @@ LIB_SOURCES = \
monitoring/perf_level.cc \ monitoring/perf_level.cc \
monitoring/persistent_stats_history.cc \ monitoring/persistent_stats_history.cc \
monitoring/statistics.cc \ monitoring/statistics.cc \
monitoring/stats_dump_scheduler.cc \
monitoring/thread_status_impl.cc \ monitoring/thread_status_impl.cc \
monitoring/thread_status_updater.cc \ monitoring/thread_status_updater.cc \
monitoring/thread_status_updater_debug.cc \ monitoring/thread_status_updater_debug.cc \
@ -419,6 +419,7 @@ TEST_MAIN_SOURCES = \
db/obsolete_files_test.cc \ db/obsolete_files_test.cc \
db/options_file_test.cc \ db/options_file_test.cc \
db/perf_context_test.cc \ db/perf_context_test.cc \
db/periodic_work_scheduler_test.cc \
db/plain_table_db_test.cc \ db/plain_table_db_test.cc \
db/prefix_test.cc \ db/prefix_test.cc \
db/repair_test.cc \ db/repair_test.cc \
@ -450,7 +451,6 @@ TEST_MAIN_SOURCES = \
monitoring/histogram_test.cc \ monitoring/histogram_test.cc \
monitoring/iostats_context_test.cc \ monitoring/iostats_context_test.cc \
monitoring/statistics_test.cc \ monitoring/statistics_test.cc \
monitoring/stats_dump_scheduler_test.cc \
monitoring/stats_history_test.cc \ monitoring/stats_history_test.cc \
options/configurable_test.cc \ options/configurable_test.cc \
options/options_settable_test.cc \ options/options_settable_test.cc \

Loading…
Cancel
Save