move dump stats to a separate thread (#4382)

Summary:
Currently statistics are supposed to be dumped to info log at intervals of `options.stats_dump_period_sec`. However the implementation choice was to bind it with compaction thread, meaning if the database has been serving very light traffic, the stats may not get dumped at all.
We decided to separate stats dumping into a new timed thread using `TimerQueue`, which is already used in blob_db. This will allow us schedule new timed tasks with more deterministic behavior.

Tested with db_bench using `--stats_dump_period_sec=20` in command line:
> LOG:2018/09/17-14:07:45.575025 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:05.643286 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:25.691325 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:45.740989 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------

LOG content:
> 2018/09/17-14:07:45.575025 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
2018/09/17-14:07:45.575080 7fe99fbfe700 [WARN] [db/db_impl.cc:606]
** DB Stats **
Uptime(secs): 20.0 total, 20.0 interval
Cumulative writes: 4447K writes, 4447K keys, 4447K commit groups, 1.0 writes per commit group, ingest: 5.57 GB, 285.01 MB/s
Cumulative WAL: 4447K writes, 0 syncs, 4447638.00 writes per sync, written: 5.57 GB, 285.01 MB/s
Cumulative stall: 00:00:0.012 H:M:S, 0.1 percent
Interval writes: 4447K writes, 4447K keys, 4447K commit groups, 1.0 writes per commit group, ingest: 5700.71 MB, 285.01 MB/s
Interval WAL: 4447K writes, 0 syncs, 4447638.00 writes per sync, written: 5.57 MB, 285.01 MB/s
Interval stall: 00:00:0.012 H:M:S, 0.1 percent
** Compaction Stats [default] **
Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4382

Differential Revision: D9933051

Pulled By: miasantreble

fbshipit-source-id: 6d12bb1e4977674eea4bf2d2ac6d486b814bb2fa
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent 35f26beca5
commit cac87fcf57
  1. 1
      db/compacted_db_impl.cc
  2. 134
      db/db_impl.cc
  3. 13
      db/db_impl.h
  4. 1
      db/db_impl_compaction_flush.cc
  5. 5
      db/db_impl_debug.cc
  6. 3
      db/db_impl_open.cc
  7. 27
      db/db_options_test.cc
  8. 2
      db/db_test_util.h
  9. 2
      env/mock_env.cc

@ -147,6 +147,7 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname)); std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options); Status s = db->Init(options);
if (s.ok()) { if (s.ok()) {
db->StartTimedTasks();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log, ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode"); "Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log); LogFlush(db->immutable_db_options_.info_log);

@ -359,11 +359,20 @@ void DBImpl::WaitForBackgroundWork() {
// Will lock the mutex_, will wait for completion if wait is true // Will lock the mutex_, will wait for completion if wait is true
void DBImpl::CancelAllBackgroundWork(bool wait) { void DBImpl::CancelAllBackgroundWork(bool wait) {
InstrumentedMutexLock l(&mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work"); "Shutdown: canceling all background work");
InstrumentedMutexLock l(&mutex_);
// To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called
// before grabbing db mutex because the actual worker function
// `DBImpl::DumpStats()` also holds db mutex
if (thread_dump_stats_ != nullptr) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
thread_dump_stats_.reset();
}
if (!shutting_down_.load(std::memory_order_acquire) && if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) && has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) { !mutable_db_options_.avoid_flush_during_shutdown) {
@ -577,66 +586,68 @@ void DBImpl::PrintStatistics() {
} }
} }
void DBImpl::MaybeDumpStats() { void DBImpl::StartTimedTasks() {
mutex_.Lock(); unsigned int stats_dump_period_sec = 0;
unsigned int stats_dump_period_sec = {
mutable_db_options_.stats_dump_period_sec; InstrumentedMutexLock l(&mutex_);
mutex_.Unlock(); stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
if (stats_dump_period_sec == 0) return; if (stats_dump_period_sec > 0) {
if (!thread_dump_stats_) {
const uint64_t now_micros = env_->NowMicros(); thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
if (last_stats_dump_time_microsec_ + stats_dump_period_sec * 1000000 <= stats_dump_period_sec * 1000000));
now_micros) { }
// Multiple threads could race in here simultaneously. }
// However, the last one will update last_stats_dump_time_microsec_ }
// atomically. We could see more than one dump during one dump }
// period in rare cases.
last_stats_dump_time_microsec_ = now_micros;
void DBImpl::DumpStats() {
TEST_SYNC_POINT("DBImpl::DumpStats:1");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
const DBPropertyInfo* cf_property_info = const DBPropertyInfo* cf_property_info =
GetPropertyInfo(DB::Properties::kCFStats); GetPropertyInfo(DB::Properties::kCFStats);
assert(cf_property_info != nullptr); assert(cf_property_info != nullptr);
const DBPropertyInfo* db_property_info = const DBPropertyInfo* db_property_info =
GetPropertyInfo(DB::Properties::kDBStats); GetPropertyInfo(DB::Properties::kDBStats);
assert(db_property_info != nullptr); assert(db_property_info != nullptr);
std::string stats; std::string stats;
{ if (shutdown_initiated_) {
InstrumentedMutexLock l(&mutex_); return;
default_cf_internal_stats_->GetStringProperty( }
*db_property_info, DB::Properties::kDBStats, &stats); {
for (auto cfd : *versions_->GetColumnFamilySet()) { InstrumentedMutexLock l(&mutex_);
if (cfd->initialized()) { default_cf_internal_stats_->GetStringProperty(
cfd->internal_stats()->GetStringProperty( *db_property_info, DB::Properties::kDBStats, &stats);
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram, for (auto cfd : *versions_->GetColumnFamilySet()) {
&stats); if (cfd->initialized()) {
} cfd->internal_stats()->GetStringProperty(
} *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
}
} }
} }
ROCKS_LOG_WARN(immutable_db_options_.info_log, for (auto cfd : *versions_->GetColumnFamilySet()) {
"------- DUMPING STATS -------"); if (cfd->initialized()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str()); cfd->internal_stats()->GetStringProperty(
if (immutable_db_options_.dump_malloc_stats) { *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
stats.clear();
DumpMallocStats(&stats);
if (!stats.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- Malloc STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
} }
} }
}
TEST_SYNC_POINT("DBImpl::DumpStats:2");
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- DUMPING STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
if (immutable_db_options_.dump_malloc_stats) {
stats.clear();
DumpMallocStats(&stats);
if (!stats.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- Malloc STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
}
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
PrintStatistics(); PrintStatistics();
}
} }
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
@ -762,7 +773,22 @@ Status DBImpl::SetDBOptions(
new_options.max_background_compactions, Env::Priority::LOW); new_options.max_background_compactions, Env::Priority::LOW);
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
} }
if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
}
if (new_options.stats_dump_period_sec > 0) {
thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
new_options.stats_dump_period_sec * 1000000));
}
else {
thread_dump_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate( write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate); new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1 table_cache_.get()->SetCapacity(new_options.max_open_files == -1

@ -53,6 +53,7 @@
#include "util/autovector.h" #include "util/autovector.h"
#include "util/event_logger.h" #include "util/event_logger.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/repeatable_thread.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "util/trace_replay.h" #include "util/trace_replay.h"
@ -465,6 +466,7 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const; int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const; int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForTimedTaskRun(std::function<void()> callback) const;
#endif // NDEBUG #endif // NDEBUG
@ -1063,10 +1065,13 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs, const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer); bool* sfm_bookkeeping, LogBuffer* log_buffer);
// Schedule background tasks
void StartTimedTasks();
void PrintStatistics(); void PrintStatistics();
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG
void MaybeDumpStats(); void DumpStats();
// Return the minimum empty level that could hold the total data in the // Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found. // input level. Return the input level, if such level could not be found.
@ -1469,6 +1474,10 @@ class DBImpl : public DB {
// Only to be set during initialization // Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_; std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// handle for scheduling jobs at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_dump_stats_;
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);
@ -1548,7 +1557,7 @@ class DBImpl : public DB {
// error recovery from going on in parallel. The latter, shutting_down_, // error recovery from going on in parallel. The latter, shutting_down_,
// is set a little later during the shutdown after scheduling memtable // is set a little later during the shutdown after scheduling memtable
// flushes // flushes
bool shutdown_initiated_; std::atomic<bool> shutdown_initiated_;
// Flag to indicate whether sst_file_manager object was allocated in // Flag to indicate whether sst_file_manager object was allocated in
// DB::Open() or passed to us // DB::Open() or passed to us
bool own_sfm_; bool own_sfm_;

@ -1656,7 +1656,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
bool made_progress = false; bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true); JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0"); TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get()); immutable_db_options_.info_log.get());
{ {

@ -243,5 +243,10 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size); return GetWalPreallocateBlockSize(write_buffer_size);
} }
void DBImpl::TEST_WaitForTimedTaskRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
}
}
} // namespace rocksdb } // namespace rocksdb
#endif // NDEBUG #endif // NDEBUG

@ -1295,6 +1295,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
persist_options_status.ToString()); persist_options_status.ToString());
} }
} }
if (s.ok()) {
impl->StartTimedTasks();
}
if (!s.ok()) { if (!s.ok()) {
for (auto* h : *handles) { for (auto* h : *handles) {
delete h; delete h;

@ -514,6 +514,33 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
} }
} }
TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
Options options;
options.create_if_missing = true;
options.stats_dump_period_sec = 5;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DumpStats:1", [&](void* /*arg*/) {
counter++;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
// Test cacel job through SetOptions
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}}));
int old_val = counter;
env_->SleepForMicroseconds(10000000);
ASSERT_EQ(counter, old_val);
Close();
}
static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) { static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) {
dbfull->TEST_LockMutex(); dbfull->TEST_LockMutex();
JobContext job_context(0); JobContext job_context(0);

@ -573,7 +573,7 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int> delete_count_; std::atomic<int> delete_count_;
bool time_elapse_only_sleep_; std::atomic<bool> time_elapse_only_sleep_;
bool no_slowdown_; bool no_slowdown_;

2
env/mock_env.cc vendored

@ -319,7 +319,7 @@ class TestMemLogger : public Logger {
static const uint64_t flush_every_seconds_ = 5; static const uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_; std::atomic_uint_fast64_t last_flush_micros_;
Env* env_; Env* env_;
bool flush_pending_; std::atomic<bool> flush_pending_;
public: public:
TestMemLogger(std::unique_ptr<WritableFile> f, Env* env, TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,

Loading…
Cancel
Save