From d9e71fb2c53726d9c5ed73b4ec962a7ed6ef15ec Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Thu, 25 Aug 2022 18:52:37 -0700 Subject: [PATCH] Fix periodic_task unable to re-register the same task type (#10379) Summary: Timer has a limitation that it cannot re-register a task with the same name, because the cancel only mark the task as invalid and wait for the Timer thread to clean it up later, before the task is cleaned up, the same task name cannot be added. Which makes the task option update likely to fail, which basically cancel and re-register the same task name. Change the periodic task name to a random unique id and store it in periodic_task_scheduler. Also refactor the `periodic_work` to `periodic_task` to make each job function as a `task`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10379 Test Plan: unittests Reviewed By: ajkr Differential Revision: D38000615 Pulled By: jay-zhuang fbshipit-source-id: e4135f9422e3b53aaec8eda54f4e18ce633a279e --- CMakeLists.txt | 4 +- HISTORY.md | 1 + Makefile | 2 +- TARGETS | 8 +- db/db_impl/compacted_db_impl.cc | 2 +- db/db_impl/db_impl.cc | 141 +++++++++------ db/db_impl/db_impl.h | 24 +-- db/db_impl/db_impl_debug.cc | 14 +- db/db_impl/db_impl_open.cc | 4 +- db/db_sst_test.cc | 4 +- db/internal_stats.cc | 3 +- db/periodic_task_scheduler.cc | 113 ++++++++++++ db/periodic_task_scheduler.h | 110 ++++++++++++ ...est.cc => periodic_task_scheduler_test.cc} | 72 ++++---- db/periodic_work_scheduler.cc | 168 ------------------ db/periodic_work_scheduler.h | 90 ---------- db/seqno_time_test.cc | 115 +++++------- monitoring/stats_history_test.cc | 63 ++++--- src.mk | 4 +- util/timer.h | 5 +- 20 files changed, 457 insertions(+), 490 deletions(-) create mode 100644 db/periodic_task_scheduler.cc create mode 100644 db/periodic_task_scheduler.h rename db/{periodic_work_scheduler_test.cc => periodic_task_scheduler_test.cc} (76%) delete mode 100644 db/periodic_work_scheduler.cc delete mode 100644 db/periodic_work_scheduler.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6bfe8605e..e9e506951 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -668,7 +668,7 @@ set(SOURCES db/merge_helper.cc db/merge_operator.cc db/output_validator.cc - db/periodic_work_scheduler.cc + db/periodic_task_scheduler.cc db/range_del_aggregator.cc db/range_tombstone_fragmenter.cc db/repair.cc @@ -1297,7 +1297,7 @@ if(WITH_TESTS) db/merge_test.cc db/options_file_test.cc db/perf_context_test.cc - db/periodic_work_scheduler_test.cc + db/periodic_task_scheduler_test.cc db/plain_table_db_test.cc db/seqno_time_test.cc db/prefix_test.cc diff --git a/HISTORY.md b/HISTORY.md index e76975a99..261ffb012 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced. +* Fix periodic_task unable to re-register the same task type, which may cause `SetOptions()` fail to update periodical_task time like: `stats_dump_period_sec`, `stats_persist_period_sec`. ### Public API changes * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API diff --git a/Makefile b/Makefile index 4a39fe09a..c7662a6ce 100644 --- a/Makefile +++ b/Makefile @@ -1882,7 +1882,7 @@ blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIB timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) -periodic_work_scheduler_test: $(OBJ_DIR)/db/periodic_work_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) +periodic_task_scheduler_test: $(OBJ_DIR)/db/periodic_task_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) diff --git a/TARGETS b/TARGETS index 3d0033d87..71f32bc01 100644 --- a/TARGETS +++ b/TARGETS @@ -83,7 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/merge_helper.cc", "db/merge_operator.cc", "db/output_validator.cc", - "db/periodic_work_scheduler.cc", + "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", @@ -421,7 +421,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/merge_helper.cc", "db/merge_operator.cc", "db/output_validator.cc", - "db/periodic_work_scheduler.cc", + "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", @@ -5600,8 +5600,8 @@ cpp_unittest_wrapper(name="perf_context_test", extra_compiler_flags=[]) -cpp_unittest_wrapper(name="periodic_work_scheduler_test", - srcs=["db/periodic_work_scheduler_test.cc"], +cpp_unittest_wrapper(name="periodic_task_scheduler_test", + srcs=["db/periodic_task_scheduler_test.cc"], deps=[":rocksdb_test_lib"], extra_compiler_flags=[]) diff --git a/db/db_impl/compacted_db_impl.cc b/db/db_impl/compacted_db_impl.cc index e5c755bfb..c5947b06b 100644 --- a/db/db_impl/compacted_db_impl.cc +++ b/db/db_impl/compacted_db_impl.cc @@ -242,7 +242,7 @@ Status CompactedDBImpl::Open(const Options& options, std::unique_ptr db(new CompactedDBImpl(db_options, dbname)); Status s = db->Init(options); if (s.ok()) { - s = db->StartPeriodicWorkScheduler(); + s = db->StartPeriodicTaskScheduler(); } if (s.ok()) { ROCKS_LOG_INFO(db->immutable_db_options_.info_log, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 74925a942..08fa75597 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -45,7 +45,7 @@ #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/range_tombstone_fragmenter.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" @@ -217,7 +217,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, refitting_level_(false), opened_successfully_(false), #ifndef ROCKSDB_LITE - periodic_work_scheduler_(nullptr), + periodic_task_scheduler_(), #endif // ROCKSDB_LITE two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), @@ -260,6 +260,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, SetDbSessionId(); assert(!db_session_id_.empty()); +#ifndef ROCKSDB_LITE + periodic_task_functions_.emplace(PeriodicTaskType::kDumpStats, + [this]() { this->DumpStats(); }); + periodic_task_functions_.emplace(PeriodicTaskType::kPersistStats, + [this]() { this->PersistStats(); }); + periodic_task_functions_.emplace(PeriodicTaskType::kFlushInfoLog, + [this]() { this->FlushInfoLog(); }); + periodic_task_functions_.emplace( + PeriodicTaskType::kRecordSeqnoTime, + [this]() { this->RecordSeqnoToTimeMapping(); }); +#endif // ROCKSDB_LITE + versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, &write_controller_, &block_cache_tracer_, @@ -480,9 +492,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { "Shutdown: canceling all background work"); #ifndef ROCKSDB_LITE - if (periodic_work_scheduler_ != nullptr) { - periodic_work_scheduler_->Unregister(this); - periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); + for (uint8_t task_type = 0; + task_type < static_cast(PeriodicTaskType::kMax); task_type++) { + Status s = periodic_task_scheduler_.Unregister( + static_cast(task_type)); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Failed to unregister periodic task %d, status: %s", + task_type, s.ToString().c_str()); + } } #endif // !ROCKSDB_LITE @@ -767,30 +785,50 @@ void DBImpl::PrintStatistics() { } } -Status DBImpl::StartPeriodicWorkScheduler() { +Status DBImpl::StartPeriodicTaskScheduler() { #ifndef ROCKSDB_LITE #ifndef NDEBUG // It only used by test to disable scheduler bool disable_scheduler = false; TEST_SYNC_POINT_CALLBACK( - "DBImpl::StartPeriodicWorkScheduler:DisableScheduler", + "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", &disable_scheduler); if (disable_scheduler) { return Status::OK(); } -#endif // !NDEBUG { InstrumentedMutexLock l(&mutex_); - periodic_work_scheduler_ = PeriodicWorkScheduler::Default(); - TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init", - &periodic_work_scheduler_); + TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicTaskScheduler:Init", + &periodic_task_scheduler_); } - return periodic_work_scheduler_->Register( - this, mutable_db_options_.stats_dump_period_sec, - mutable_db_options_.stats_persist_period_sec); +#endif // !NDEBUG + if (mutable_db_options_.stats_dump_period_sec > 0) { + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kDumpStats, + periodic_task_functions_.at(PeriodicTaskType::kDumpStats), + mutable_db_options_.stats_dump_period_sec); + if (!s.ok()) { + return s; + } + } + if (mutable_db_options_.stats_persist_period_sec > 0) { + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kPersistStats, + periodic_task_functions_.at(PeriodicTaskType::kPersistStats), + mutable_db_options_.stats_persist_period_sec); + if (!s.ok()) { + return s; + } + } + + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kFlushInfoLog, + periodic_task_functions_.at(PeriodicTaskType::kFlushInfoLog)); + + return s; #else return Status::OK(); #endif // !ROCKSDB_LITE @@ -798,9 +836,6 @@ Status DBImpl::StartPeriodicWorkScheduler() { Status DBImpl::RegisterRecordSeqnoTimeWorker() { #ifndef ROCKSDB_LITE - if (!periodic_work_scheduler_) { - return Status::OK(); - } uint64_t min_time_duration = std::numeric_limits::max(); uint64_t max_time_duration = std::numeric_limits::min(); { @@ -828,26 +863,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() { } Status s; - if (seqno_time_cadence != record_seqno_time_cadence_) { - if (seqno_time_cadence == 0) { - periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); - } else { - s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( - this, seqno_time_cadence); - } - - if (s.ok()) { - record_seqno_time_cadence_ = seqno_time_cadence; - } - - if (s.IsNotSupported()) { - // TODO: Fix the timer cannot cancel and re-add the same task - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Updating seqno to time worker cadence is not supported yet, to make " - "the change effective, please reopen the DB instance."); - s = Status::OK(); - } + if (seqno_time_cadence == 0) { + s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kRecordSeqnoTime, + periodic_task_functions_.at(PeriodicTaskType::kRecordSeqnoTime), + seqno_time_cadence); } return s; @@ -1087,6 +1109,10 @@ void DBImpl::DumpStats() { PrintStatistics(); } +// Periodically flush info log out of application buffer at a low frequency. +// This improves debuggability in case of RocksDB hanging since it ensures the +// log messages leading up to the hang will eventually become visible in the +// log. void DBImpl::FlushInfoLog() { if (shutdown_initiated_) { return; @@ -1279,22 +1305,36 @@ Status DBImpl::SetDBOptions( MaybeScheduleFlushOrCompaction(); } - if (new_options.stats_dump_period_sec != - mutable_db_options_.stats_dump_period_sec || - new_options.stats_persist_period_sec != - mutable_db_options_.stats_persist_period_sec) { - mutex_.Unlock(); - periodic_work_scheduler_->Unregister(this); - s = periodic_work_scheduler_->Register( - this, new_options.stats_dump_period_sec, - new_options.stats_persist_period_sec); - mutex_.Lock(); + mutex_.Unlock(); + if (new_options.stats_dump_period_sec == 0) { + s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kDumpStats); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kDumpStats, + periodic_task_functions_.at(PeriodicTaskType::kDumpStats), + new_options.stats_dump_period_sec); } if (new_options.max_total_wal_size != mutable_db_options_.max_total_wal_size) { max_total_wal_size_.store(new_options.max_total_wal_size, std::memory_order_release); } + if (s.ok()) { + if (new_options.stats_persist_period_sec == 0) { + s = periodic_task_scheduler_.Unregister( + PeriodicTaskType::kPersistStats); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kPersistStats, + periodic_task_functions_.at(PeriodicTaskType::kPersistStats), + new_options.stats_persist_period_sec); + } + } + mutex_.Lock(); + if (!s.ok()) { + return s; + } + write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 @@ -3043,12 +3083,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } // InstrumentedMutexLock l(&mutex_) if (cf_options.preclude_last_level_data_seconds > 0) { - // TODO(zjay): Fix the timer issue and re-enable this. - ROCKS_LOG_ERROR( - immutable_db_options_.info_log, - "Creating column family with `preclude_last_level_data_seconds` needs " - "to restart DB to take effect"); - // s = RegisterRecordSeqnoTimeWorker(); + s = RegisterRecordSeqnoTimeWorker(); } sv_context.Clean(); // this is outside the mutex diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4481e81db..3053d64ac 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -32,6 +32,7 @@ #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" #include "db/memtable_list.h" +#include "db/periodic_task_scheduler.h" #include "db/post_memtable_callback.h" #include "db/pre_release_callback.h" #include "db/range_del_aggregator.h" @@ -75,10 +76,6 @@ class ArenaWrappedDBIter; class InMemoryStatsHistoryIterator; class MemTable; class PersistentStatsHistoryIterator; -class PeriodicWorkScheduler; -#ifndef NDEBUG -class PeriodicWorkTestScheduler; -#endif // !NDEBUG class TableCache; class TaskLimiterToken; class Version; @@ -1147,7 +1144,7 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForPeridicWorkerRun(std::function callback) const; + void TEST_WaitForPeridicTaskRun(std::function callback) const; SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const; size_t TEST_EstimateInMemoryStatsHistorySize() const; @@ -1162,7 +1159,7 @@ class DBImpl : public DB { } #ifndef ROCKSDB_LITE - PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const; + const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const; #endif // !ROCKSDB_LITE #endif // NDEBUG @@ -2069,7 +2066,7 @@ class DBImpl : public DB { LogBuffer* log_buffer); // Schedule background tasks - Status StartPeriodicWorkScheduler(); + Status StartPeriodicTaskScheduler(); Status RegisterRecordSeqnoTimeWorker(); @@ -2611,14 +2608,11 @@ class DBImpl : public DB { #ifndef ROCKSDB_LITE // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog(). - // Currently, it always use a global instance from - // PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by - // PeriodicWorkTestScheduler. - PeriodicWorkScheduler* periodic_work_scheduler_; - - // Current cadence of the periodic worker for recording sequence number to - // time. - uint64_t record_seqno_time_cadence_ = 0; + // Currently, internally it has a global timer instance for running the tasks. + PeriodicTaskScheduler periodic_task_scheduler_; + + // It contains the implementations for each periodic task. + std::map periodic_task_functions_; #endif // When set, we use a separate queue for writes that don't write to memtable. diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 0252a3524..52b0b67a2 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -12,7 +12,7 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "monitoring/thread_status_updater.h" #include "util/cast_util.h" @@ -302,16 +302,12 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( } #ifndef ROCKSDB_LITE -void DBImpl::TEST_WaitForPeridicWorkerRun( - std::function callback) const { - if (periodic_work_scheduler_ != nullptr) { - static_cast(periodic_work_scheduler_) - ->TEST_WaitForRun(callback); - } +void DBImpl::TEST_WaitForPeridicTaskRun(std::function callback) const { + periodic_task_scheduler_.TEST_WaitForRun(callback); } -PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const { - return static_cast(periodic_work_scheduler_); +const PeriodicTaskScheduler& DBImpl::TEST_GetPeriodicTaskScheduler() const { + return periodic_task_scheduler_; } SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index f85778fe0..9168e3ecb 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -11,7 +11,7 @@ #include "db/builder.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "env/composite_env_wrapper.h" #include "file/filename.h" #include "file/read_write_util.h" @@ -2105,7 +2105,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, "DB::Open() failed: %s", s.ToString().c_str()); } if (s.ok()) { - s = impl->StartPeriodicWorkScheduler(); + s = impl->StartPeriodicTaskScheduler(); } if (s.ok()) { diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 3fb0f99a1..7f031444a 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -695,10 +695,10 @@ TEST_P(DBSSTTestRateLimit, RateLimitedDelete) { *abs_time_us = Env::Default()->NowMicros(); }); - // Disable PeriodicWorkScheduler as it also has TimedWait, which could update + // Disable PeriodicTaskScheduler as it also has TimedWait, which could update // the simulated sleep time ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:DisableScheduler", [&](void* arg) { + "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) { bool* disable_scheduler = static_cast(arg); *disable_scheduler = true; }); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index a27357e4f..3b2aba22f 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -1592,7 +1592,8 @@ void InternalStats::DumpCFMapStats( int files = vstorage->NumLevelFiles(level); total_files += files; total_files_being_compacted += files_being_compacted[level]; - if (comp_stats_[level].micros > 0 || files > 0) { + if (comp_stats_[level].micros > 0 || comp_stats_[level].cpu_micros > 0 || + files > 0) { compaction_stats_sum->Add(comp_stats_[level]); total_file_size += vstorage->NumLevelBytes(level); uint64_t input_bytes; diff --git a/db/periodic_task_scheduler.cc b/db/periodic_task_scheduler.cc new file mode 100644 index 000000000..2024510dd --- /dev/null +++ b/db/periodic_task_scheduler.cc @@ -0,0 +1,113 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/periodic_task_scheduler.h" + +#include "rocksdb/system_clock.h" + +#ifndef ROCKSDB_LITE +namespace ROCKSDB_NAMESPACE { + +// `timer_mutex` is a global mutex serves 3 purposes currently: +// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as +// they are currently not implemented in a thread-safe way; and +// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and +// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically. +// (3) protect tasks_map_ in PeriodicTaskScheduler +// Note: It's not efficient to have a static global mutex, for +// PeriodicTaskScheduler it should be okay, as the operations are called +// infrequently. +static port::Mutex timer_mutex; + +static const std::map kDefaultPeriodSeconds = { + {PeriodicTaskType::kDumpStats, kInvalidPeriodSec}, + {PeriodicTaskType::kPersistStats, kInvalidPeriodSec}, + {PeriodicTaskType::kFlushInfoLog, 10}, + {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec}, +}; + +static const std::map kPeriodicTaskTypeNames = { + {PeriodicTaskType::kDumpStats, "dump_st"}, + {PeriodicTaskType::kPersistStats, "pst_st"}, + {PeriodicTaskType::kFlushInfoLog, "flush_info_log"}, + {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"}, +}; + +Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, + const PeriodicTaskFunc& fn) { + return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type)); +} + +Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, + const PeriodicTaskFunc& fn, + uint64_t repeat_period_seconds) { + MutexLock l(&timer_mutex); + static std::atomic initial_delay(0); + + if (repeat_period_seconds == kInvalidPeriodSec) { + return Status::InvalidArgument("Invalid task repeat period"); + } + auto it = tasks_map_.find(task_type); + if (it != tasks_map_.end()) { + // the task already exists and it's the same, no update needed + if (it->second.repeat_every_sec == repeat_period_seconds) { + return Status::OK(); + } + // cancel the existing one before register new one + timer_->Cancel(it->second.name); + tasks_map_.erase(it); + } + + timer_->Start(); + // put task type name as prefix, for easy debug + std::string unique_id = + kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++); + + bool succeeded = timer_->Add( + fn, unique_id, + (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond, + repeat_period_seconds * kMicrosInSecond); + if (!succeeded) { + return Status::Aborted("Failed to register periodic task"); + } + auto result = tasks_map_.try_emplace( + task_type, TaskInfo{unique_id, repeat_period_seconds}); + if (!result.second) { + return Status::Aborted("Failed to add periodic task"); + }; + return Status::OK(); +} + +Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) { + MutexLock l(&timer_mutex); + auto it = tasks_map_.find(task_type); + if (it != tasks_map_.end()) { + timer_->Cancel(it->second.name); + tasks_map_.erase(it); + } + if (!timer_->HasPendingTask()) { + timer_->Shutdown(); + } + return Status::OK(); +} + +Timer* PeriodicTaskScheduler::Default() { + static Timer timer(SystemClock::Default().get()); + return &timer; +} + +#ifndef NDEBUG +void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) { + static Timer test_timer(clock); + test_timer.TEST_OverrideTimer(clock); + MutexLock l(&timer_mutex); + timer_ = &test_timer; +} +#endif // NDEBUG + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/db/periodic_task_scheduler.h b/db/periodic_task_scheduler.h new file mode 100644 index 000000000..f45b80c4d --- /dev/null +++ b/db/periodic_task_scheduler.h @@ -0,0 +1,110 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "util/timer.h" + +namespace ROCKSDB_NAMESPACE { +class SystemClock; + +using PeriodicTaskFunc = std::function; + +constexpr uint64_t kInvalidPeriodSec = 0; + +// List of task types +enum class PeriodicTaskType : uint8_t { + kDumpStats = 0, + kPersistStats, + kFlushInfoLog, + kRecordSeqnoTime, + kMax, +}; + +// PeriodicTaskScheduler contains the periodic task scheduled from the DB +// instance. It's used to schedule/unschedule DumpStats(), PersistStats(), +// FlushInfoLog(), etc. Each type of the task can only have one instance, +// re-register the same task type would only update the repeat period. +// +// Internally, it uses a global single threaded timer object to run the periodic +// task functions. Timer thread will always be started since the info log +// flushing cannot be disabled. +class PeriodicTaskScheduler { + public: + explicit PeriodicTaskScheduler() = default; + + PeriodicTaskScheduler(const PeriodicTaskScheduler&) = delete; + PeriodicTaskScheduler(PeriodicTaskScheduler&&) = delete; + PeriodicTaskScheduler& operator=(const PeriodicTaskScheduler&) = delete; + PeriodicTaskScheduler& operator=(PeriodicTaskScheduler&&) = delete; + + // Register a task with its default repeat period + Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn); + + // Register a task with specified repeat period. 0 is an invalid argument + // (kInvalidPeriodSec). To stop the task, please use Unregister() specifically + Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn, + uint64_t repeat_period_seconds); + + // Unregister the task + Status Unregister(PeriodicTaskType task_type); + +#ifndef NDEBUG + // Override the timer for the unittest + void TEST_OverrideTimer(SystemClock* clock); + + // Call Timer TEST_WaitForRun() which wait until Timer starting waiting. + void TEST_WaitForRun(const std::function& callback) const { + if (timer_ != nullptr) { + timer_->TEST_WaitForRun(callback); + } + } + + // Get global valid task number in the Timer + size_t TEST_GetValidTaskNum() const { + if (timer_ != nullptr) { + return timer_->TEST_GetPendingTaskNum(); + } + return 0; + } + + // If it has the specified task type registered + bool TEST_HasTask(PeriodicTaskType task_type) const { + auto it = tasks_map_.find(task_type); + return it != tasks_map_.end(); + } +#endif // NDEBUG + + private: + // default global Timer instance + static Timer* Default(); + + // Internal structure to store task information + struct TaskInfo { + TaskInfo(std::string _name, uint64_t _repeat_every_sec) + : name(std::move(_name)), repeat_every_sec(_repeat_every_sec) {} + std::string name; + uint64_t repeat_every_sec; + }; + + // Internal tasks map + std::map tasks_map_; + + // Global timer pointer, which doesn't support synchronous add/cancel tasks + // so having a global `timer_mutex` for add/cancel task. + Timer* timer_ = Default(); + + // Global task id, protected by the global `timer_mutex` + inline static uint64_t id_; + + static constexpr uint64_t kMicrosInSecond = 1000U * 1000U; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/db/periodic_work_scheduler_test.cc b/db/periodic_task_scheduler_test.cc similarity index 76% rename from db/periodic_work_scheduler_test.cc rename to db/periodic_task_scheduler_test.cc index 04771035c..b0922c0f8 100644 --- a/db/periodic_work_scheduler_test.cc +++ b/db/periodic_task_scheduler_test.cc @@ -1,9 +1,10 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) Meta Platforms, Inc. and affiliates. +// // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/db_test_util.h" #include "env/composite_env_wrapper.h" @@ -12,10 +13,10 @@ namespace ROCKSDB_NAMESPACE { #ifndef ROCKSDB_LITE -class PeriodicWorkSchedulerTest : public DBTestBase { +class PeriodicTaskSchedulerTest : public DBTestBase { public: - PeriodicWorkSchedulerTest() - : DBTestBase("periodic_work_scheduler_test", /*env_do_fsync=*/true) { + PeriodicTaskSchedulerTest() + : DBTestBase("periodic_task_scheduler_test", /*env_do_fsync=*/true) { mock_clock_ = std::make_shared(env_->GetSystemClock()); mock_env_.reset(new CompositeEnvWrapper(env_, mock_clock_)); } @@ -27,18 +28,16 @@ class PeriodicWorkSchedulerTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } }; -TEST_F(PeriodicWorkSchedulerTest, Basic) { - constexpr unsigned int kPeriodSec = - PeriodicWorkScheduler::kDefaultFlushInfoLogPeriodSec; +TEST_F(PeriodicTaskSchedulerTest, Basic) { + constexpr unsigned int kPeriodSec = 10; Close(); Options options; options.stats_dump_period_sec = kPeriodSec; @@ -66,26 +65,26 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_GT(kPeriodSec, 1u); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec) - 1); }); - auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_NE(nullptr, scheduler); - ASSERT_EQ(3, scheduler->TEST_GetValidTaskNum()); + const PeriodicTaskScheduler& scheduler = + dbfull()->TEST_GetPeriodicTaskScheduler(); + ASSERT_EQ(3, scheduler.TEST_GetValidTaskNum()); ASSERT_EQ(1, dump_st_counter); ASSERT_EQ(1, pst_st_counter); ASSERT_EQ(1, flush_info_log_counter); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(2, dump_st_counter); ASSERT_EQ(2, pst_st_counter); ASSERT_EQ(2, flush_info_log_counter); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); @@ -99,25 +98,22 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); // Info log flush should still run. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); ASSERT_EQ(3, pst_st_counter); ASSERT_EQ(4, flush_info_log_counter); - scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_EQ(1u, scheduler->TEST_GetValidTaskNum()); + ASSERT_EQ(1u, scheduler.TEST_GetValidTaskNum()); // Re-enable one task ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}})); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); - scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_NE(nullptr, scheduler); - ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum()); + ASSERT_EQ(2, scheduler.TEST_GetValidTaskNum()); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(4, dump_st_counter); ASSERT_EQ(3, pst_st_counter); @@ -126,7 +122,7 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { Close(); } -TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { +TEST_F(PeriodicTaskSchedulerTest, MultiInstances) { constexpr int kPeriodSec = 5; const int kInstanceNum = 10; @@ -153,23 +149,24 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { } auto dbi = static_cast_with_check(dbs[kInstanceNum - 1]); - auto scheduler = dbi->TEST_GetPeriodicWorkScheduler(); - ASSERT_EQ(kInstanceNum * 3, scheduler->TEST_GetValidTaskNum()); + + const PeriodicTaskScheduler& scheduler = dbi->TEST_GetPeriodicTaskScheduler(); + ASSERT_EQ(kInstanceNum * 3, scheduler.TEST_GetValidTaskNum()); int expected_run = kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); @@ -181,9 +178,9 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { expected_run += (kInstanceNum - half) * 2; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); @@ -194,7 +191,7 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { } } -TEST_F(PeriodicWorkSchedulerTest, MultiEnv) { +TEST_F(PeriodicTaskSchedulerTest, MultiEnv) { constexpr int kDumpPeriodSec = 5; constexpr int kPersistPeriodSec = 10; Close(); @@ -217,15 +214,12 @@ TEST_F(PeriodicWorkSchedulerTest, MultiEnv) { std::string dbname = test::PerThreadDBPath("multi_env_test"); DB* db; ASSERT_OK(DB::Open(options2, dbname, &db)); - DBImpl* dbi = static_cast_with_check(db); - - ASSERT_EQ(dbi->TEST_GetPeriodicWorkScheduler(), - dbfull()->TEST_GetPeriodicWorkScheduler()); ASSERT_OK(db->Close()); delete db; Close(); } + #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc deleted file mode 100644 index 4763f63b4..000000000 --- a/db/periodic_work_scheduler.cc +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#include "db/periodic_work_scheduler.h" - -#include "db/db_impl/db_impl.h" -#include "rocksdb/system_clock.h" - -#ifndef ROCKSDB_LITE -namespace ROCKSDB_NAMESPACE { - -const std::string PeriodicWorkTaskNames::kDumpStats = "dump_st"; -const std::string PeriodicWorkTaskNames::kPersistStats = "pst_st"; -const std::string PeriodicWorkTaskNames::kFlushInfoLog = "flush_info_log"; -const std::string PeriodicWorkTaskNames::kRecordSeqnoTime = "record_seq_time"; - -PeriodicWorkScheduler::PeriodicWorkScheduler( - const std::shared_ptr& clock) { - timer = std::unique_ptr(new Timer(clock.get())); -} - -Status PeriodicWorkScheduler::Register(DBImpl* dbi, - unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec) { - MutexLock l(&timer_mu_); - static std::atomic initial_delay(0); - timer->Start(); - if (stats_dump_period_sec > 0) { - bool succeeded = timer->Add( - [dbi]() { dbi->DumpStats(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats), - initial_delay.fetch_add(1) % - static_cast(stats_dump_period_sec) * kMicrosInSecond, - static_cast(stats_dump_period_sec) * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task DumpStats"); - } - } - if (stats_persist_period_sec > 0) { - bool succeeded = timer->Add( - [dbi]() { dbi->PersistStats(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats), - initial_delay.fetch_add(1) % - static_cast(stats_persist_period_sec) * kMicrosInSecond, - static_cast(stats_persist_period_sec) * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task PersistStats"); - } - } - bool succeeded = - timer->Add([dbi]() { dbi->FlushInfoLog(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog), - initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * - kMicrosInSecond, - kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task FlushInfoLog"); - } - return Status::OK(); -} - -Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( - DBImpl* dbi, uint64_t record_cadence_sec) { - MutexLock l(&timer_mu_); - timer->Start(); - static std::atomic_uint64_t initial_delay(0); - bool succeeded = timer->Add( - [dbi]() { dbi->RecordSeqnoToTimeMapping(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime), - initial_delay.fetch_add(1) % record_cadence_sec * kMicrosInSecond, - record_cadence_sec * kMicrosInSecond); - if (!succeeded) { - return Status::NotSupported( - "Updating seqno to time worker cadence is not supported yet"); - } - return Status::OK(); -} - -void PeriodicWorkScheduler::UnregisterRecordSeqnoTimeWorker(DBImpl* dbi) { - MutexLock l(&timer_mu_); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime)); - if (!timer->HasPendingTask()) { - timer->Shutdown(); - } -} - -void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { - MutexLock l(&timer_mu_); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats)); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats)); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog)); - if (!timer->HasPendingTask()) { - timer->Shutdown(); - } -} - -PeriodicWorkScheduler* PeriodicWorkScheduler::Default() { - // Always use the default SystemClock for the scheduler, as we only use the - // NowMicros which is the same for all clocks. The Env could only be - // overridden in test. - static PeriodicWorkScheduler scheduler(SystemClock::Default()); - return &scheduler; -} - -std::string PeriodicWorkScheduler::GetTaskName( - const DBImpl* dbi, const std::string& func_name) const { - std::string db_session_id; - // TODO: Should this error be ignored? - dbi->GetDbSessionId(db_session_id).PermitUncheckedError(); - return db_session_id + ":" + func_name; -} - -#ifndef NDEBUG - -// Get the static scheduler. For a new SystemClock, it needs to re-create the -// internal timer, so only re-create it when there's no running task. Otherwise, -// return the existing scheduler. Which means if the unittest needs to update -// MockClock, Close all db instances and then re-open them. -PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default( - const std::shared_ptr& clock) { - static PeriodicWorkTestScheduler scheduler(clock); - static port::Mutex mutex; - { - MutexLock l(&mutex); - if (scheduler.timer.get() != nullptr && - scheduler.timer->TEST_GetPendingTaskNum() == 0) { - { - MutexLock timer_mu_guard(&scheduler.timer_mu_); - scheduler.timer->Shutdown(); - } - scheduler.timer.reset(new Timer(clock.get())); - } - } - return &scheduler; -} - -void PeriodicWorkTestScheduler::TEST_WaitForRun( - std::function callback) const { - if (timer != nullptr) { - timer->TEST_WaitForRun(callback); - } -} - -size_t PeriodicWorkTestScheduler::TEST_GetValidTaskNum() const { - if (timer != nullptr) { - return timer->TEST_GetPendingTaskNum(); - } - return 0; -} - -bool PeriodicWorkTestScheduler::TEST_HasValidTask( - const DBImpl* dbi, const std::string& func_name) const { - if (timer == nullptr) { - return false; - } - return timer->TEST_HasVaildTask(GetTaskName(dbi, func_name)); -} - -PeriodicWorkTestScheduler::PeriodicWorkTestScheduler( - const std::shared_ptr& clock) - : PeriodicWorkScheduler(clock) {} - -#endif // !NDEBUG -} // namespace ROCKSDB_NAMESPACE - -#endif // ROCKSDB_LITE diff --git a/db/periodic_work_scheduler.h b/db/periodic_work_scheduler.h deleted file mode 100644 index acfb24396..000000000 --- a/db/periodic_work_scheduler.h +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#pragma once - -#ifndef ROCKSDB_LITE - -#include "db/db_impl/db_impl.h" -#include "util/timer.h" - -namespace ROCKSDB_NAMESPACE { -class SystemClock; - -// PeriodicWorkScheduler is a singleton object, which is scheduling/running -// DumpStats(), PersistStats(), and FlushInfoLog() for all DB instances. All DB -// instances use the same object from `Default()`. -// -// Internally, it uses a single threaded timer object to run the periodic work -// functions. Timer thread will always be started since the info log flushing -// cannot be disabled. -class PeriodicWorkScheduler { - public: - static PeriodicWorkScheduler* Default(); - - PeriodicWorkScheduler() = delete; - PeriodicWorkScheduler(const PeriodicWorkScheduler&) = delete; - PeriodicWorkScheduler(PeriodicWorkScheduler&&) = delete; - PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete; - PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete; - - Status Register(DBImpl* dbi, unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec); - Status RegisterRecordSeqnoTimeWorker(DBImpl* dbi, uint64_t record_cadence); - - void Unregister(DBImpl* dbi); - void UnregisterRecordSeqnoTimeWorker(DBImpl* dbi); - - // Periodically flush info log out of application buffer at a low frequency. - // This improves debuggability in case of RocksDB hanging since it ensures the - // log messages leading up to the hang will eventually become visible in the - // log. - static const uint64_t kDefaultFlushInfoLogPeriodSec = 10; - - protected: - std::unique_ptr timer; - // `timer_mu_` serves two purposes currently: - // (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as - // they are currently not implemented in a thread-safe way; and - // (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and - // the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically. - port::Mutex timer_mu_; - - explicit PeriodicWorkScheduler(const std::shared_ptr& clock); - - // Get the unique task name (prefix with db session id) - std::string GetTaskName(const DBImpl* dbi, - const std::string& func_name) const; -}; - -#ifndef NDEBUG -// PeriodicWorkTestScheduler is for unittest, which can specify the SystemClock -// It also contains functions for unittest. -class PeriodicWorkTestScheduler : public PeriodicWorkScheduler { - public: - static PeriodicWorkTestScheduler* Default( - const std::shared_ptr& clock); - - void TEST_WaitForRun(std::function callback) const; - - size_t TEST_GetValidTaskNum() const; - - bool TEST_HasValidTask(const DBImpl* dbi, const std::string& func_name) const; - - private: - explicit PeriodicWorkTestScheduler(const std::shared_ptr& clock); -}; -#endif // !NDEBUG - -struct PeriodicWorkTaskNames { - static const std::string kDumpStats; - static const std::string kPersistStats; - static const std::string kFlushInfoLog; - static const std::string kRecordSeqnoTime; -}; - -} // namespace ROCKSDB_NAMESPACE - -#endif // ROCKSDB_LITE diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index c9feac766..ad0ac5f8a 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -5,7 +5,7 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/db_test_util.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/seqno_to_time_mapping.h" #include "port/stack_trace.h" #include "rocksdb/iostats_context.h" @@ -29,11 +29,10 @@ class SeqnoTimeTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } @@ -80,7 +79,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { // pass some time first, otherwise the first a few keys write time are going // to be zero, and internally zero has special meaning: kUnknownSeqnoTime - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); int sst_num = 0; @@ -88,7 +87,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -110,7 +109,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger * 2; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -124,7 +123,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger * 3; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -142,36 +141,31 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { // the first a few key should be cold AssertKetTemperature(20, Temperature::kCold); - // Wait some time, each time after compaction, the cold data size is - // increasing and hot data size is decreasing for (int i = 0; i < 30; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(20 * kKeyPerSec)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); - uint64_t pre_hot = hot_data_size; - uint64_t pre_cold = cold_data_size; - hot_data_size = GetSstSizeHelper(Temperature::kUnknown); - cold_data_size = GetSstSizeHelper(Temperature::kCold); - ASSERT_LT(hot_data_size, pre_hot); - ASSERT_GT(cold_data_size, pre_cold); // the hot/cold data cut off range should be between i * 20 + 200 -> 250 AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); AssertKetTemperature(i * 20 + 200, Temperature::kCold); } - // Wait again, all data should be cold after that + ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), cold_data_size); + + // Wait again, the most of the data should be cold after that + // but it may not be all cold, because if there's no new data write to SST, + // the compaction will not get the new seqno->time sampling to decide the last + // a few data's time. for (int i = 0; i < 5; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); - ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); - - // any random data should be cold + // any random data close to the end should be cold AssertKetTemperature(1000, Temperature::kCold); // close explicitly, because the env is local variable which will be released @@ -197,7 +191,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { // pass some time first, otherwise the first a few keys write time are going // to be zero, and internally zero has special meaning: kUnknownSeqnoTime - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); int sst_num = 0; @@ -205,7 +199,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { for (; sst_num < 4; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -227,7 +221,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { for (; sst_num < 14; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -248,7 +242,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { // Wait some time, with each wait, the cold data is increasing and hot data is // decreasing for (int i = 0; i < 30; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); uint64_t pre_hot = hot_data_size; @@ -263,17 +257,16 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { AssertKetTemperature(i * 20 + 400, Temperature::kCold); } - // Wait again, all data should be cold after that + // Wait again, the most of the data should be cold after that + // hot data might not be empty, because if we don't write new data, there's + // no seqno->time sampling available to the compaction for (int i = 0; i < 5; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); - ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); - - // any random data should be cold + // any random data close to the end should be cold AssertKetTemperature(1000, Temperature::kCold); Close(); @@ -291,7 +284,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 10 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -322,7 +315,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 1 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 190), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -358,7 +351,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 200 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 380), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -400,7 +393,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 100 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 570), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -464,9 +457,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(db_->Close()); } -// TODO(zjay): Disabled, until New CF bug with preclude_last_level_data_seconds -// is fixed -TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { +TEST_F(SeqnoTimeTest, MultiCFs) { Options options = CurrentOptions(); options.preclude_last_level_data_seconds = 0; options.env = mock_env_.get(); @@ -474,14 +465,14 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { options.stats_persist_period_sec = 0; ReopenWithColumnFamilies({"default"}, options); - auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_FALSE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + const PeriodicTaskScheduler& scheduler = + dbfull()->TEST_GetPeriodicTaskScheduler(); + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); // Write some data and increase the current time for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush()); @@ -496,26 +487,20 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { Options options_1 = options; options_1.preclude_last_level_data_seconds = 10000; // 10k CreateColumnFamilies({"one"}, options_1); - ASSERT_TRUE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); // Write some data to the default CF (without preclude_last_level feature) for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush()); - // in memory mapping won't increase because CFs with preclude_last_level - // feature doesn't have memtable - auto queue = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); - ASSERT_LT(queue.size(), 5); - // Write some data to the CF one for (int i = 0; i < 20; i++) { ASSERT_OK(Put(1, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush(1)); @@ -539,7 +524,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // Add more data to CF "two" to fill the in memory mapping for (int i = 0; i < 2000; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); @@ -563,11 +548,10 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // enabled have flushed, the in-memory seqno->time mapping should be cleared for (int i = 0; i < 10; i++) { ASSERT_OK(Put(0, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); - ASSERT_LE(seqs.size(), 5); ASSERT_OK(Flush(0)); // trigger compaction for CF "two" and make sure the compaction output has @@ -575,7 +559,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { for (int j = 0; j < 3; j++) { for (int i = 0; i < 200; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush(2)); @@ -595,7 +579,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { for (int j = 0; j < 2; j++) { for (int i = 0; i < 200; i++) { ASSERT_OK(Put(0, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush(0)); @@ -610,7 +594,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // Write some data to CF "two", but don't flush to accumulate for (int i = 0; i < 1000; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_GE( @@ -630,8 +614,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { 0); // And the timer worker is stopped - ASSERT_FALSE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); Close(); } @@ -655,7 +638,7 @@ TEST_F(SeqnoTimeTest, MultiInstancesBasic) { WriteOptions wo; for (int i = 0; i < 200; i++) { ASSERT_OK(dbi->Put(wo, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping(); @@ -678,7 +661,7 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { for (int j = 0; j < 3; j++) { for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -700,7 +683,7 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { // Trigger a compaction for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index d968735e2..21ac786b4 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -15,7 +15,7 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "monitoring/persistent_stats_history.h" #include "options/options_helper.h" #include "port/stack_trace.h" @@ -44,11 +44,10 @@ class StatsHistoryTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } }; @@ -67,10 +66,10 @@ TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); @@ -99,17 +98,17 @@ TEST_F(StatsHistoryTest, StatsPersistScheduling) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); // Test cancel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); int old_val = counter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec * 2); }); ASSERT_EQ(counter, old_val); @@ -131,7 +130,7 @@ TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { {{"stats_persist_period_sec", std::to_string(kPeriodSec)}})); ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); Close(); @@ -150,11 +149,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ReopenWithColumnFamilies({"default", "pikachu"}, options); // make sure the first stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::unique_ptr stats_iter; @@ -172,7 +171,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ASSERT_GT(stats_count, 0); // Wait a bit and verify no more stats are found for (int i = 0; i < 10; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(1); }); } ASSERT_OK(db_->GetStatsHistory(0, mock_clock_->NowSeconds(), &stats_iter)); @@ -227,7 +226,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { const int kIterations = 10; for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -251,7 +250,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { // Wait for stats persist to finish for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -300,11 +299,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = @@ -312,14 +311,14 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { int key_count1 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count2 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -393,32 +392,32 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::map stats_map_after; @@ -482,10 +481,10 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { ASSERT_EQ(Get(2, "foo"), "bar"); // make sure the first stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -582,7 +581,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ColumnFamilyData* cfd_default = @@ -601,7 +600,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Eevee", "v0")); ASSERT_EQ("v0", Get(1, "Eevee")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf // LogNumbers: default: 16, stats: 10, pikachu: 5 @@ -630,7 +629,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_EQ("v2", Get("bar2")); ASSERT_EQ("v2", Get("foo2")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to default and stats cf, flushing default cf // LogNumbers: default: 19, stats: 19, pikachu: 19 @@ -645,7 +644,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Jolteon", "v3")); ASSERT_EQ("v3", Get(1, "Jolteon")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flushing test cf // LogNumbers: default: 19, stats: 19, pikachu: 22 diff --git a/src.mk b/src.mk index c8b470213..a39f9a7f4 100644 --- a/src.mk +++ b/src.mk @@ -74,7 +74,7 @@ LIB_SOURCES = \ db/merge_helper.cc \ db/merge_operator.cc \ db/output_validator.cc \ - db/periodic_work_scheduler.cc \ + db/periodic_task_scheduler.cc \ db/range_del_aggregator.cc \ db/range_tombstone_fragmenter.cc \ db/repair.cc \ @@ -501,7 +501,7 @@ TEST_MAIN_SOURCES = \ db/obsolete_files_test.cc \ db/options_file_test.cc \ db/perf_context_test.cc \ - db/periodic_work_scheduler_test.cc \ + db/periodic_task_scheduler_test.cc \ db/plain_table_db_test.cc \ db/prefix_test.cc \ db/repair_test.cc \ diff --git a/util/timer.h b/util/timer.h index 4b9ab668e..c2fbf869b 100644 --- a/util/timer.h +++ b/util/timer.h @@ -187,10 +187,9 @@ class Timer { return ret; } - bool TEST_HasVaildTask(const std::string& func_name) const { + void TEST_OverrideTimer(SystemClock* clock) { InstrumentedMutexLock l(&mutex_); - auto it = map_.find(func_name); - return it != map_.end() && it->second->IsValid(); + clock_ = clock; } #endif // NDEBUG