diff --git a/CMakeLists.txt b/CMakeLists.txt index 6bfe8605e..e9e506951 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -668,7 +668,7 @@ set(SOURCES db/merge_helper.cc db/merge_operator.cc db/output_validator.cc - db/periodic_work_scheduler.cc + db/periodic_task_scheduler.cc db/range_del_aggregator.cc db/range_tombstone_fragmenter.cc db/repair.cc @@ -1297,7 +1297,7 @@ if(WITH_TESTS) db/merge_test.cc db/options_file_test.cc db/perf_context_test.cc - db/periodic_work_scheduler_test.cc + db/periodic_task_scheduler_test.cc db/plain_table_db_test.cc db/seqno_time_test.cc db/prefix_test.cc diff --git a/HISTORY.md b/HISTORY.md index e76975a99..261ffb012 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced. +* Fix periodic_task unable to re-register the same task type, which may cause `SetOptions()` fail to update periodical_task time like: `stats_dump_period_sec`, `stats_persist_period_sec`. ### Public API changes * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API diff --git a/Makefile b/Makefile index 4a39fe09a..c7662a6ce 100644 --- a/Makefile +++ b/Makefile @@ -1882,7 +1882,7 @@ blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIB timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) -periodic_work_scheduler_test: $(OBJ_DIR)/db/periodic_work_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) +periodic_task_scheduler_test: $(OBJ_DIR)/db/periodic_task_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) diff --git a/TARGETS b/TARGETS index 3d0033d87..71f32bc01 100644 --- a/TARGETS +++ b/TARGETS @@ -83,7 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/merge_helper.cc", "db/merge_operator.cc", "db/output_validator.cc", - "db/periodic_work_scheduler.cc", + "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", @@ -421,7 +421,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/merge_helper.cc", "db/merge_operator.cc", "db/output_validator.cc", - "db/periodic_work_scheduler.cc", + "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", @@ -5600,8 +5600,8 @@ cpp_unittest_wrapper(name="perf_context_test", extra_compiler_flags=[]) -cpp_unittest_wrapper(name="periodic_work_scheduler_test", - srcs=["db/periodic_work_scheduler_test.cc"], +cpp_unittest_wrapper(name="periodic_task_scheduler_test", + srcs=["db/periodic_task_scheduler_test.cc"], deps=[":rocksdb_test_lib"], extra_compiler_flags=[]) diff --git a/db/db_impl/compacted_db_impl.cc b/db/db_impl/compacted_db_impl.cc index e5c755bfb..c5947b06b 100644 --- a/db/db_impl/compacted_db_impl.cc +++ b/db/db_impl/compacted_db_impl.cc @@ -242,7 +242,7 @@ Status CompactedDBImpl::Open(const Options& options, std::unique_ptr db(new CompactedDBImpl(db_options, dbname)); Status s = db->Init(options); if (s.ok()) { - s = db->StartPeriodicWorkScheduler(); + s = db->StartPeriodicTaskScheduler(); } if (s.ok()) { ROCKS_LOG_INFO(db->immutable_db_options_.info_log, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 74925a942..08fa75597 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -45,7 +45,7 @@ #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/range_tombstone_fragmenter.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" @@ -217,7 +217,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, refitting_level_(false), opened_successfully_(false), #ifndef ROCKSDB_LITE - periodic_work_scheduler_(nullptr), + periodic_task_scheduler_(), #endif // ROCKSDB_LITE two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), @@ -260,6 +260,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, SetDbSessionId(); assert(!db_session_id_.empty()); +#ifndef ROCKSDB_LITE + periodic_task_functions_.emplace(PeriodicTaskType::kDumpStats, + [this]() { this->DumpStats(); }); + periodic_task_functions_.emplace(PeriodicTaskType::kPersistStats, + [this]() { this->PersistStats(); }); + periodic_task_functions_.emplace(PeriodicTaskType::kFlushInfoLog, + [this]() { this->FlushInfoLog(); }); + periodic_task_functions_.emplace( + PeriodicTaskType::kRecordSeqnoTime, + [this]() { this->RecordSeqnoToTimeMapping(); }); +#endif // ROCKSDB_LITE + versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, &write_controller_, &block_cache_tracer_, @@ -480,9 +492,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { "Shutdown: canceling all background work"); #ifndef ROCKSDB_LITE - if (periodic_work_scheduler_ != nullptr) { - periodic_work_scheduler_->Unregister(this); - periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); + for (uint8_t task_type = 0; + task_type < static_cast(PeriodicTaskType::kMax); task_type++) { + Status s = periodic_task_scheduler_.Unregister( + static_cast(task_type)); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Failed to unregister periodic task %d, status: %s", + task_type, s.ToString().c_str()); + } } #endif // !ROCKSDB_LITE @@ -767,30 +785,50 @@ void DBImpl::PrintStatistics() { } } -Status DBImpl::StartPeriodicWorkScheduler() { +Status DBImpl::StartPeriodicTaskScheduler() { #ifndef ROCKSDB_LITE #ifndef NDEBUG // It only used by test to disable scheduler bool disable_scheduler = false; TEST_SYNC_POINT_CALLBACK( - "DBImpl::StartPeriodicWorkScheduler:DisableScheduler", + "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", &disable_scheduler); if (disable_scheduler) { return Status::OK(); } -#endif // !NDEBUG { InstrumentedMutexLock l(&mutex_); - periodic_work_scheduler_ = PeriodicWorkScheduler::Default(); - TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init", - &periodic_work_scheduler_); + TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicTaskScheduler:Init", + &periodic_task_scheduler_); } - return periodic_work_scheduler_->Register( - this, mutable_db_options_.stats_dump_period_sec, - mutable_db_options_.stats_persist_period_sec); +#endif // !NDEBUG + if (mutable_db_options_.stats_dump_period_sec > 0) { + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kDumpStats, + periodic_task_functions_.at(PeriodicTaskType::kDumpStats), + mutable_db_options_.stats_dump_period_sec); + if (!s.ok()) { + return s; + } + } + if (mutable_db_options_.stats_persist_period_sec > 0) { + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kPersistStats, + periodic_task_functions_.at(PeriodicTaskType::kPersistStats), + mutable_db_options_.stats_persist_period_sec); + if (!s.ok()) { + return s; + } + } + + Status s = periodic_task_scheduler_.Register( + PeriodicTaskType::kFlushInfoLog, + periodic_task_functions_.at(PeriodicTaskType::kFlushInfoLog)); + + return s; #else return Status::OK(); #endif // !ROCKSDB_LITE @@ -798,9 +836,6 @@ Status DBImpl::StartPeriodicWorkScheduler() { Status DBImpl::RegisterRecordSeqnoTimeWorker() { #ifndef ROCKSDB_LITE - if (!periodic_work_scheduler_) { - return Status::OK(); - } uint64_t min_time_duration = std::numeric_limits::max(); uint64_t max_time_duration = std::numeric_limits::min(); { @@ -828,26 +863,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() { } Status s; - if (seqno_time_cadence != record_seqno_time_cadence_) { - if (seqno_time_cadence == 0) { - periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); - } else { - s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( - this, seqno_time_cadence); - } - - if (s.ok()) { - record_seqno_time_cadence_ = seqno_time_cadence; - } - - if (s.IsNotSupported()) { - // TODO: Fix the timer cannot cancel and re-add the same task - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Updating seqno to time worker cadence is not supported yet, to make " - "the change effective, please reopen the DB instance."); - s = Status::OK(); - } + if (seqno_time_cadence == 0) { + s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kRecordSeqnoTime, + periodic_task_functions_.at(PeriodicTaskType::kRecordSeqnoTime), + seqno_time_cadence); } return s; @@ -1087,6 +1109,10 @@ void DBImpl::DumpStats() { PrintStatistics(); } +// Periodically flush info log out of application buffer at a low frequency. +// This improves debuggability in case of RocksDB hanging since it ensures the +// log messages leading up to the hang will eventually become visible in the +// log. void DBImpl::FlushInfoLog() { if (shutdown_initiated_) { return; @@ -1279,22 +1305,36 @@ Status DBImpl::SetDBOptions( MaybeScheduleFlushOrCompaction(); } - if (new_options.stats_dump_period_sec != - mutable_db_options_.stats_dump_period_sec || - new_options.stats_persist_period_sec != - mutable_db_options_.stats_persist_period_sec) { - mutex_.Unlock(); - periodic_work_scheduler_->Unregister(this); - s = periodic_work_scheduler_->Register( - this, new_options.stats_dump_period_sec, - new_options.stats_persist_period_sec); - mutex_.Lock(); + mutex_.Unlock(); + if (new_options.stats_dump_period_sec == 0) { + s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kDumpStats); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kDumpStats, + periodic_task_functions_.at(PeriodicTaskType::kDumpStats), + new_options.stats_dump_period_sec); } if (new_options.max_total_wal_size != mutable_db_options_.max_total_wal_size) { max_total_wal_size_.store(new_options.max_total_wal_size, std::memory_order_release); } + if (s.ok()) { + if (new_options.stats_persist_period_sec == 0) { + s = periodic_task_scheduler_.Unregister( + PeriodicTaskType::kPersistStats); + } else { + s = periodic_task_scheduler_.Register( + PeriodicTaskType::kPersistStats, + periodic_task_functions_.at(PeriodicTaskType::kPersistStats), + new_options.stats_persist_period_sec); + } + } + mutex_.Lock(); + if (!s.ok()) { + return s; + } + write_controller_.set_max_delayed_write_rate( new_options.delayed_write_rate); table_cache_.get()->SetCapacity(new_options.max_open_files == -1 @@ -3043,12 +3083,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } // InstrumentedMutexLock l(&mutex_) if (cf_options.preclude_last_level_data_seconds > 0) { - // TODO(zjay): Fix the timer issue and re-enable this. - ROCKS_LOG_ERROR( - immutable_db_options_.info_log, - "Creating column family with `preclude_last_level_data_seconds` needs " - "to restart DB to take effect"); - // s = RegisterRecordSeqnoTimeWorker(); + s = RegisterRecordSeqnoTimeWorker(); } sv_context.Clean(); // this is outside the mutex diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4481e81db..3053d64ac 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -32,6 +32,7 @@ #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" #include "db/memtable_list.h" +#include "db/periodic_task_scheduler.h" #include "db/post_memtable_callback.h" #include "db/pre_release_callback.h" #include "db/range_del_aggregator.h" @@ -75,10 +76,6 @@ class ArenaWrappedDBIter; class InMemoryStatsHistoryIterator; class MemTable; class PersistentStatsHistoryIterator; -class PeriodicWorkScheduler; -#ifndef NDEBUG -class PeriodicWorkTestScheduler; -#endif // !NDEBUG class TableCache; class TaskLimiterToken; class Version; @@ -1147,7 +1144,7 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForPeridicWorkerRun(std::function callback) const; + void TEST_WaitForPeridicTaskRun(std::function callback) const; SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const; size_t TEST_EstimateInMemoryStatsHistorySize() const; @@ -1162,7 +1159,7 @@ class DBImpl : public DB { } #ifndef ROCKSDB_LITE - PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const; + const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const; #endif // !ROCKSDB_LITE #endif // NDEBUG @@ -2069,7 +2066,7 @@ class DBImpl : public DB { LogBuffer* log_buffer); // Schedule background tasks - Status StartPeriodicWorkScheduler(); + Status StartPeriodicTaskScheduler(); Status RegisterRecordSeqnoTimeWorker(); @@ -2611,14 +2608,11 @@ class DBImpl : public DB { #ifndef ROCKSDB_LITE // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog(). - // Currently, it always use a global instance from - // PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by - // PeriodicWorkTestScheduler. - PeriodicWorkScheduler* periodic_work_scheduler_; - - // Current cadence of the periodic worker for recording sequence number to - // time. - uint64_t record_seqno_time_cadence_ = 0; + // Currently, internally it has a global timer instance for running the tasks. + PeriodicTaskScheduler periodic_task_scheduler_; + + // It contains the implementations for each periodic task. + std::map periodic_task_functions_; #endif // When set, we use a separate queue for writes that don't write to memtable. diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 0252a3524..52b0b67a2 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -12,7 +12,7 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "monitoring/thread_status_updater.h" #include "util/cast_util.h" @@ -302,16 +302,12 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( } #ifndef ROCKSDB_LITE -void DBImpl::TEST_WaitForPeridicWorkerRun( - std::function callback) const { - if (periodic_work_scheduler_ != nullptr) { - static_cast(periodic_work_scheduler_) - ->TEST_WaitForRun(callback); - } +void DBImpl::TEST_WaitForPeridicTaskRun(std::function callback) const { + periodic_task_scheduler_.TEST_WaitForRun(callback); } -PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const { - return static_cast(periodic_work_scheduler_); +const PeriodicTaskScheduler& DBImpl::TEST_GetPeriodicTaskScheduler() const { + return periodic_task_scheduler_; } SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index f85778fe0..9168e3ecb 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -11,7 +11,7 @@ #include "db/builder.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "env/composite_env_wrapper.h" #include "file/filename.h" #include "file/read_write_util.h" @@ -2105,7 +2105,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, "DB::Open() failed: %s", s.ToString().c_str()); } if (s.ok()) { - s = impl->StartPeriodicWorkScheduler(); + s = impl->StartPeriodicTaskScheduler(); } if (s.ok()) { diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 3fb0f99a1..7f031444a 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -695,10 +695,10 @@ TEST_P(DBSSTTestRateLimit, RateLimitedDelete) { *abs_time_us = Env::Default()->NowMicros(); }); - // Disable PeriodicWorkScheduler as it also has TimedWait, which could update + // Disable PeriodicTaskScheduler as it also has TimedWait, which could update // the simulated sleep time ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:DisableScheduler", [&](void* arg) { + "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) { bool* disable_scheduler = static_cast(arg); *disable_scheduler = true; }); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index a27357e4f..3b2aba22f 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -1592,7 +1592,8 @@ void InternalStats::DumpCFMapStats( int files = vstorage->NumLevelFiles(level); total_files += files; total_files_being_compacted += files_being_compacted[level]; - if (comp_stats_[level].micros > 0 || files > 0) { + if (comp_stats_[level].micros > 0 || comp_stats_[level].cpu_micros > 0 || + files > 0) { compaction_stats_sum->Add(comp_stats_[level]); total_file_size += vstorage->NumLevelBytes(level); uint64_t input_bytes; diff --git a/db/periodic_task_scheduler.cc b/db/periodic_task_scheduler.cc new file mode 100644 index 000000000..2024510dd --- /dev/null +++ b/db/periodic_task_scheduler.cc @@ -0,0 +1,113 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/periodic_task_scheduler.h" + +#include "rocksdb/system_clock.h" + +#ifndef ROCKSDB_LITE +namespace ROCKSDB_NAMESPACE { + +// `timer_mutex` is a global mutex serves 3 purposes currently: +// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as +// they are currently not implemented in a thread-safe way; and +// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and +// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically. +// (3) protect tasks_map_ in PeriodicTaskScheduler +// Note: It's not efficient to have a static global mutex, for +// PeriodicTaskScheduler it should be okay, as the operations are called +// infrequently. +static port::Mutex timer_mutex; + +static const std::map kDefaultPeriodSeconds = { + {PeriodicTaskType::kDumpStats, kInvalidPeriodSec}, + {PeriodicTaskType::kPersistStats, kInvalidPeriodSec}, + {PeriodicTaskType::kFlushInfoLog, 10}, + {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec}, +}; + +static const std::map kPeriodicTaskTypeNames = { + {PeriodicTaskType::kDumpStats, "dump_st"}, + {PeriodicTaskType::kPersistStats, "pst_st"}, + {PeriodicTaskType::kFlushInfoLog, "flush_info_log"}, + {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"}, +}; + +Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, + const PeriodicTaskFunc& fn) { + return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type)); +} + +Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, + const PeriodicTaskFunc& fn, + uint64_t repeat_period_seconds) { + MutexLock l(&timer_mutex); + static std::atomic initial_delay(0); + + if (repeat_period_seconds == kInvalidPeriodSec) { + return Status::InvalidArgument("Invalid task repeat period"); + } + auto it = tasks_map_.find(task_type); + if (it != tasks_map_.end()) { + // the task already exists and it's the same, no update needed + if (it->second.repeat_every_sec == repeat_period_seconds) { + return Status::OK(); + } + // cancel the existing one before register new one + timer_->Cancel(it->second.name); + tasks_map_.erase(it); + } + + timer_->Start(); + // put task type name as prefix, for easy debug + std::string unique_id = + kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++); + + bool succeeded = timer_->Add( + fn, unique_id, + (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond, + repeat_period_seconds * kMicrosInSecond); + if (!succeeded) { + return Status::Aborted("Failed to register periodic task"); + } + auto result = tasks_map_.try_emplace( + task_type, TaskInfo{unique_id, repeat_period_seconds}); + if (!result.second) { + return Status::Aborted("Failed to add periodic task"); + }; + return Status::OK(); +} + +Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) { + MutexLock l(&timer_mutex); + auto it = tasks_map_.find(task_type); + if (it != tasks_map_.end()) { + timer_->Cancel(it->second.name); + tasks_map_.erase(it); + } + if (!timer_->HasPendingTask()) { + timer_->Shutdown(); + } + return Status::OK(); +} + +Timer* PeriodicTaskScheduler::Default() { + static Timer timer(SystemClock::Default().get()); + return &timer; +} + +#ifndef NDEBUG +void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) { + static Timer test_timer(clock); + test_timer.TEST_OverrideTimer(clock); + MutexLock l(&timer_mutex); + timer_ = &test_timer; +} +#endif // NDEBUG + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/db/periodic_task_scheduler.h b/db/periodic_task_scheduler.h new file mode 100644 index 000000000..f45b80c4d --- /dev/null +++ b/db/periodic_task_scheduler.h @@ -0,0 +1,110 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "util/timer.h" + +namespace ROCKSDB_NAMESPACE { +class SystemClock; + +using PeriodicTaskFunc = std::function; + +constexpr uint64_t kInvalidPeriodSec = 0; + +// List of task types +enum class PeriodicTaskType : uint8_t { + kDumpStats = 0, + kPersistStats, + kFlushInfoLog, + kRecordSeqnoTime, + kMax, +}; + +// PeriodicTaskScheduler contains the periodic task scheduled from the DB +// instance. It's used to schedule/unschedule DumpStats(), PersistStats(), +// FlushInfoLog(), etc. Each type of the task can only have one instance, +// re-register the same task type would only update the repeat period. +// +// Internally, it uses a global single threaded timer object to run the periodic +// task functions. Timer thread will always be started since the info log +// flushing cannot be disabled. +class PeriodicTaskScheduler { + public: + explicit PeriodicTaskScheduler() = default; + + PeriodicTaskScheduler(const PeriodicTaskScheduler&) = delete; + PeriodicTaskScheduler(PeriodicTaskScheduler&&) = delete; + PeriodicTaskScheduler& operator=(const PeriodicTaskScheduler&) = delete; + PeriodicTaskScheduler& operator=(PeriodicTaskScheduler&&) = delete; + + // Register a task with its default repeat period + Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn); + + // Register a task with specified repeat period. 0 is an invalid argument + // (kInvalidPeriodSec). To stop the task, please use Unregister() specifically + Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn, + uint64_t repeat_period_seconds); + + // Unregister the task + Status Unregister(PeriodicTaskType task_type); + +#ifndef NDEBUG + // Override the timer for the unittest + void TEST_OverrideTimer(SystemClock* clock); + + // Call Timer TEST_WaitForRun() which wait until Timer starting waiting. + void TEST_WaitForRun(const std::function& callback) const { + if (timer_ != nullptr) { + timer_->TEST_WaitForRun(callback); + } + } + + // Get global valid task number in the Timer + size_t TEST_GetValidTaskNum() const { + if (timer_ != nullptr) { + return timer_->TEST_GetPendingTaskNum(); + } + return 0; + } + + // If it has the specified task type registered + bool TEST_HasTask(PeriodicTaskType task_type) const { + auto it = tasks_map_.find(task_type); + return it != tasks_map_.end(); + } +#endif // NDEBUG + + private: + // default global Timer instance + static Timer* Default(); + + // Internal structure to store task information + struct TaskInfo { + TaskInfo(std::string _name, uint64_t _repeat_every_sec) + : name(std::move(_name)), repeat_every_sec(_repeat_every_sec) {} + std::string name; + uint64_t repeat_every_sec; + }; + + // Internal tasks map + std::map tasks_map_; + + // Global timer pointer, which doesn't support synchronous add/cancel tasks + // so having a global `timer_mutex` for add/cancel task. + Timer* timer_ = Default(); + + // Global task id, protected by the global `timer_mutex` + inline static uint64_t id_; + + static constexpr uint64_t kMicrosInSecond = 1000U * 1000U; +}; + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/db/periodic_work_scheduler_test.cc b/db/periodic_task_scheduler_test.cc similarity index 76% rename from db/periodic_work_scheduler_test.cc rename to db/periodic_task_scheduler_test.cc index 04771035c..b0922c0f8 100644 --- a/db/periodic_work_scheduler_test.cc +++ b/db/periodic_task_scheduler_test.cc @@ -1,9 +1,10 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) Meta Platforms, Inc. and affiliates. +// // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/db_test_util.h" #include "env/composite_env_wrapper.h" @@ -12,10 +13,10 @@ namespace ROCKSDB_NAMESPACE { #ifndef ROCKSDB_LITE -class PeriodicWorkSchedulerTest : public DBTestBase { +class PeriodicTaskSchedulerTest : public DBTestBase { public: - PeriodicWorkSchedulerTest() - : DBTestBase("periodic_work_scheduler_test", /*env_do_fsync=*/true) { + PeriodicTaskSchedulerTest() + : DBTestBase("periodic_task_scheduler_test", /*env_do_fsync=*/true) { mock_clock_ = std::make_shared(env_->GetSystemClock()); mock_env_.reset(new CompositeEnvWrapper(env_, mock_clock_)); } @@ -27,18 +28,16 @@ class PeriodicWorkSchedulerTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } }; -TEST_F(PeriodicWorkSchedulerTest, Basic) { - constexpr unsigned int kPeriodSec = - PeriodicWorkScheduler::kDefaultFlushInfoLogPeriodSec; +TEST_F(PeriodicTaskSchedulerTest, Basic) { + constexpr unsigned int kPeriodSec = 10; Close(); Options options; options.stats_dump_period_sec = kPeriodSec; @@ -66,26 +65,26 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_GT(kPeriodSec, 1u); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec) - 1); }); - auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_NE(nullptr, scheduler); - ASSERT_EQ(3, scheduler->TEST_GetValidTaskNum()); + const PeriodicTaskScheduler& scheduler = + dbfull()->TEST_GetPeriodicTaskScheduler(); + ASSERT_EQ(3, scheduler.TEST_GetValidTaskNum()); ASSERT_EQ(1, dump_st_counter); ASSERT_EQ(1, pst_st_counter); ASSERT_EQ(1, flush_info_log_counter); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(2, dump_st_counter); ASSERT_EQ(2, pst_st_counter); ASSERT_EQ(2, flush_info_log_counter); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); @@ -99,25 +98,22 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); // Info log flush should still run. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); ASSERT_EQ(3, pst_st_counter); ASSERT_EQ(4, flush_info_log_counter); - scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_EQ(1u, scheduler->TEST_GetValidTaskNum()); + ASSERT_EQ(1u, scheduler.TEST_GetValidTaskNum()); // Re-enable one task ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}})); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); - scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_NE(nullptr, scheduler); - ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum()); + ASSERT_EQ(2, scheduler.TEST_GetValidTaskNum()); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(4, dump_st_counter); ASSERT_EQ(3, pst_st_counter); @@ -126,7 +122,7 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { Close(); } -TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { +TEST_F(PeriodicTaskSchedulerTest, MultiInstances) { constexpr int kPeriodSec = 5; const int kInstanceNum = 10; @@ -153,23 +149,24 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { } auto dbi = static_cast_with_check(dbs[kInstanceNum - 1]); - auto scheduler = dbi->TEST_GetPeriodicWorkScheduler(); - ASSERT_EQ(kInstanceNum * 3, scheduler->TEST_GetValidTaskNum()); + + const PeriodicTaskScheduler& scheduler = dbi->TEST_GetPeriodicTaskScheduler(); + ASSERT_EQ(kInstanceNum * 3, scheduler.TEST_GetValidTaskNum()); int expected_run = kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); @@ -181,9 +178,9 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { expected_run += (kInstanceNum - half) * 2; - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); - dbi->TEST_WaitForPeridicWorkerRun( + dbi->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); @@ -194,7 +191,7 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { } } -TEST_F(PeriodicWorkSchedulerTest, MultiEnv) { +TEST_F(PeriodicTaskSchedulerTest, MultiEnv) { constexpr int kDumpPeriodSec = 5; constexpr int kPersistPeriodSec = 10; Close(); @@ -217,15 +214,12 @@ TEST_F(PeriodicWorkSchedulerTest, MultiEnv) { std::string dbname = test::PerThreadDBPath("multi_env_test"); DB* db; ASSERT_OK(DB::Open(options2, dbname, &db)); - DBImpl* dbi = static_cast_with_check(db); - - ASSERT_EQ(dbi->TEST_GetPeriodicWorkScheduler(), - dbfull()->TEST_GetPeriodicWorkScheduler()); ASSERT_OK(db->Close()); delete db; Close(); } + #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc deleted file mode 100644 index 4763f63b4..000000000 --- a/db/periodic_work_scheduler.cc +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#include "db/periodic_work_scheduler.h" - -#include "db/db_impl/db_impl.h" -#include "rocksdb/system_clock.h" - -#ifndef ROCKSDB_LITE -namespace ROCKSDB_NAMESPACE { - -const std::string PeriodicWorkTaskNames::kDumpStats = "dump_st"; -const std::string PeriodicWorkTaskNames::kPersistStats = "pst_st"; -const std::string PeriodicWorkTaskNames::kFlushInfoLog = "flush_info_log"; -const std::string PeriodicWorkTaskNames::kRecordSeqnoTime = "record_seq_time"; - -PeriodicWorkScheduler::PeriodicWorkScheduler( - const std::shared_ptr& clock) { - timer = std::unique_ptr(new Timer(clock.get())); -} - -Status PeriodicWorkScheduler::Register(DBImpl* dbi, - unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec) { - MutexLock l(&timer_mu_); - static std::atomic initial_delay(0); - timer->Start(); - if (stats_dump_period_sec > 0) { - bool succeeded = timer->Add( - [dbi]() { dbi->DumpStats(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats), - initial_delay.fetch_add(1) % - static_cast(stats_dump_period_sec) * kMicrosInSecond, - static_cast(stats_dump_period_sec) * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task DumpStats"); - } - } - if (stats_persist_period_sec > 0) { - bool succeeded = timer->Add( - [dbi]() { dbi->PersistStats(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats), - initial_delay.fetch_add(1) % - static_cast(stats_persist_period_sec) * kMicrosInSecond, - static_cast(stats_persist_period_sec) * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task PersistStats"); - } - } - bool succeeded = - timer->Add([dbi]() { dbi->FlushInfoLog(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog), - initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * - kMicrosInSecond, - kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); - if (!succeeded) { - return Status::Aborted("Unable to add periodic task FlushInfoLog"); - } - return Status::OK(); -} - -Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( - DBImpl* dbi, uint64_t record_cadence_sec) { - MutexLock l(&timer_mu_); - timer->Start(); - static std::atomic_uint64_t initial_delay(0); - bool succeeded = timer->Add( - [dbi]() { dbi->RecordSeqnoToTimeMapping(); }, - GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime), - initial_delay.fetch_add(1) % record_cadence_sec * kMicrosInSecond, - record_cadence_sec * kMicrosInSecond); - if (!succeeded) { - return Status::NotSupported( - "Updating seqno to time worker cadence is not supported yet"); - } - return Status::OK(); -} - -void PeriodicWorkScheduler::UnregisterRecordSeqnoTimeWorker(DBImpl* dbi) { - MutexLock l(&timer_mu_); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime)); - if (!timer->HasPendingTask()) { - timer->Shutdown(); - } -} - -void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { - MutexLock l(&timer_mu_); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats)); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats)); - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog)); - if (!timer->HasPendingTask()) { - timer->Shutdown(); - } -} - -PeriodicWorkScheduler* PeriodicWorkScheduler::Default() { - // Always use the default SystemClock for the scheduler, as we only use the - // NowMicros which is the same for all clocks. The Env could only be - // overridden in test. - static PeriodicWorkScheduler scheduler(SystemClock::Default()); - return &scheduler; -} - -std::string PeriodicWorkScheduler::GetTaskName( - const DBImpl* dbi, const std::string& func_name) const { - std::string db_session_id; - // TODO: Should this error be ignored? - dbi->GetDbSessionId(db_session_id).PermitUncheckedError(); - return db_session_id + ":" + func_name; -} - -#ifndef NDEBUG - -// Get the static scheduler. For a new SystemClock, it needs to re-create the -// internal timer, so only re-create it when there's no running task. Otherwise, -// return the existing scheduler. Which means if the unittest needs to update -// MockClock, Close all db instances and then re-open them. -PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default( - const std::shared_ptr& clock) { - static PeriodicWorkTestScheduler scheduler(clock); - static port::Mutex mutex; - { - MutexLock l(&mutex); - if (scheduler.timer.get() != nullptr && - scheduler.timer->TEST_GetPendingTaskNum() == 0) { - { - MutexLock timer_mu_guard(&scheduler.timer_mu_); - scheduler.timer->Shutdown(); - } - scheduler.timer.reset(new Timer(clock.get())); - } - } - return &scheduler; -} - -void PeriodicWorkTestScheduler::TEST_WaitForRun( - std::function callback) const { - if (timer != nullptr) { - timer->TEST_WaitForRun(callback); - } -} - -size_t PeriodicWorkTestScheduler::TEST_GetValidTaskNum() const { - if (timer != nullptr) { - return timer->TEST_GetPendingTaskNum(); - } - return 0; -} - -bool PeriodicWorkTestScheduler::TEST_HasValidTask( - const DBImpl* dbi, const std::string& func_name) const { - if (timer == nullptr) { - return false; - } - return timer->TEST_HasVaildTask(GetTaskName(dbi, func_name)); -} - -PeriodicWorkTestScheduler::PeriodicWorkTestScheduler( - const std::shared_ptr& clock) - : PeriodicWorkScheduler(clock) {} - -#endif // !NDEBUG -} // namespace ROCKSDB_NAMESPACE - -#endif // ROCKSDB_LITE diff --git a/db/periodic_work_scheduler.h b/db/periodic_work_scheduler.h deleted file mode 100644 index acfb24396..000000000 --- a/db/periodic_work_scheduler.h +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#pragma once - -#ifndef ROCKSDB_LITE - -#include "db/db_impl/db_impl.h" -#include "util/timer.h" - -namespace ROCKSDB_NAMESPACE { -class SystemClock; - -// PeriodicWorkScheduler is a singleton object, which is scheduling/running -// DumpStats(), PersistStats(), and FlushInfoLog() for all DB instances. All DB -// instances use the same object from `Default()`. -// -// Internally, it uses a single threaded timer object to run the periodic work -// functions. Timer thread will always be started since the info log flushing -// cannot be disabled. -class PeriodicWorkScheduler { - public: - static PeriodicWorkScheduler* Default(); - - PeriodicWorkScheduler() = delete; - PeriodicWorkScheduler(const PeriodicWorkScheduler&) = delete; - PeriodicWorkScheduler(PeriodicWorkScheduler&&) = delete; - PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete; - PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete; - - Status Register(DBImpl* dbi, unsigned int stats_dump_period_sec, - unsigned int stats_persist_period_sec); - Status RegisterRecordSeqnoTimeWorker(DBImpl* dbi, uint64_t record_cadence); - - void Unregister(DBImpl* dbi); - void UnregisterRecordSeqnoTimeWorker(DBImpl* dbi); - - // Periodically flush info log out of application buffer at a low frequency. - // This improves debuggability in case of RocksDB hanging since it ensures the - // log messages leading up to the hang will eventually become visible in the - // log. - static const uint64_t kDefaultFlushInfoLogPeriodSec = 10; - - protected: - std::unique_ptr timer; - // `timer_mu_` serves two purposes currently: - // (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as - // they are currently not implemented in a thread-safe way; and - // (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and - // the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically. - port::Mutex timer_mu_; - - explicit PeriodicWorkScheduler(const std::shared_ptr& clock); - - // Get the unique task name (prefix with db session id) - std::string GetTaskName(const DBImpl* dbi, - const std::string& func_name) const; -}; - -#ifndef NDEBUG -// PeriodicWorkTestScheduler is for unittest, which can specify the SystemClock -// It also contains functions for unittest. -class PeriodicWorkTestScheduler : public PeriodicWorkScheduler { - public: - static PeriodicWorkTestScheduler* Default( - const std::shared_ptr& clock); - - void TEST_WaitForRun(std::function callback) const; - - size_t TEST_GetValidTaskNum() const; - - bool TEST_HasValidTask(const DBImpl* dbi, const std::string& func_name) const; - - private: - explicit PeriodicWorkTestScheduler(const std::shared_ptr& clock); -}; -#endif // !NDEBUG - -struct PeriodicWorkTaskNames { - static const std::string kDumpStats; - static const std::string kPersistStats; - static const std::string kFlushInfoLog; - static const std::string kRecordSeqnoTime; -}; - -} // namespace ROCKSDB_NAMESPACE - -#endif // ROCKSDB_LITE diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index c9feac766..ad0ac5f8a 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -5,7 +5,7 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/db_test_util.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "db/seqno_to_time_mapping.h" #include "port/stack_trace.h" #include "rocksdb/iostats_context.h" @@ -29,11 +29,10 @@ class SeqnoTimeTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } @@ -80,7 +79,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { // pass some time first, otherwise the first a few keys write time are going // to be zero, and internally zero has special meaning: kUnknownSeqnoTime - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); int sst_num = 0; @@ -88,7 +87,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -110,7 +109,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger * 2; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -124,7 +123,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { for (; sst_num < kNumTrigger * 3; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); } @@ -142,36 +141,31 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { // the first a few key should be cold AssertKetTemperature(20, Temperature::kCold); - // Wait some time, each time after compaction, the cold data size is - // increasing and hot data size is decreasing for (int i = 0; i < 30; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun([&] { + dbfull()->TEST_WaitForPeridicTaskRun([&] { mock_clock_->MockSleepForSeconds(static_cast(20 * kKeyPerSec)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); - uint64_t pre_hot = hot_data_size; - uint64_t pre_cold = cold_data_size; - hot_data_size = GetSstSizeHelper(Temperature::kUnknown); - cold_data_size = GetSstSizeHelper(Temperature::kCold); - ASSERT_LT(hot_data_size, pre_hot); - ASSERT_GT(cold_data_size, pre_cold); // the hot/cold data cut off range should be between i * 20 + 200 -> 250 AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); AssertKetTemperature(i * 20 + 200, Temperature::kCold); } - // Wait again, all data should be cold after that + ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), cold_data_size); + + // Wait again, the most of the data should be cold after that + // but it may not be all cold, because if there's no new data write to SST, + // the compaction will not get the new seqno->time sampling to decide the last + // a few data's time. for (int i = 0; i < 5; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); - ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); - - // any random data should be cold + // any random data close to the end should be cold AssertKetTemperature(1000, Temperature::kCold); // close explicitly, because the env is local variable which will be released @@ -197,7 +191,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { // pass some time first, otherwise the first a few keys write time are going // to be zero, and internally zero has special meaning: kUnknownSeqnoTime - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); int sst_num = 0; @@ -205,7 +199,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { for (; sst_num < 4; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -227,7 +221,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { for (; sst_num < 14; sst_num++) { for (int i = 0; i < kNumKeys; i++) { ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -248,7 +242,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { // Wait some time, with each wait, the cold data is increasing and hot data is // decreasing for (int i = 0; i < 30; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); uint64_t pre_hot = hot_data_size; @@ -263,17 +257,16 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { AssertKetTemperature(i * 20 + 400, Temperature::kCold); } - // Wait again, all data should be cold after that + // Wait again, the most of the data should be cold after that + // hot data might not be empty, because if we don't write new data, there's + // no seqno->time sampling available to the compaction for (int i = 0; i < 5; i++) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); } - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); - ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); - - // any random data should be cold + // any random data close to the end should be cold AssertKetTemperature(1000, Temperature::kCold); Close(); @@ -291,7 +284,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 10 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -322,7 +315,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 1 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 190), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -358,7 +351,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 200 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 380), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -400,7 +393,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { // Write a key every 100 seconds for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i + 570), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seq_end = dbfull()->GetLatestSequenceNumber(); @@ -464,9 +457,7 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(db_->Close()); } -// TODO(zjay): Disabled, until New CF bug with preclude_last_level_data_seconds -// is fixed -TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { +TEST_F(SeqnoTimeTest, MultiCFs) { Options options = CurrentOptions(); options.preclude_last_level_data_seconds = 0; options.env = mock_env_.get(); @@ -474,14 +465,14 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { options.stats_persist_period_sec = 0; ReopenWithColumnFamilies({"default"}, options); - auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); - ASSERT_FALSE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + const PeriodicTaskScheduler& scheduler = + dbfull()->TEST_GetPeriodicTaskScheduler(); + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); // Write some data and increase the current time for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush()); @@ -496,26 +487,20 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { Options options_1 = options; options_1.preclude_last_level_data_seconds = 10000; // 10k CreateColumnFamilies({"one"}, options_1); - ASSERT_TRUE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); // Write some data to the default CF (without preclude_last_level feature) for (int i = 0; i < 200; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush()); - // in memory mapping won't increase because CFs with preclude_last_level - // feature doesn't have memtable - auto queue = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); - ASSERT_LT(queue.size(), 5); - // Write some data to the CF one for (int i = 0; i < 20; i++) { ASSERT_OK(Put(1, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush(1)); @@ -539,7 +524,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // Add more data to CF "two" to fill the in memory mapping for (int i = 0; i < 2000; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); @@ -563,11 +548,10 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // enabled have flushed, the in-memory seqno->time mapping should be cleared for (int i = 0; i < 10; i++) { ASSERT_OK(Put(0, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); - ASSERT_LE(seqs.size(), 5); ASSERT_OK(Flush(0)); // trigger compaction for CF "two" and make sure the compaction output has @@ -575,7 +559,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { for (int j = 0; j < 3; j++) { for (int i = 0; i < 200; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush(2)); @@ -595,7 +579,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { for (int j = 0; j < 2; j++) { for (int i = 0; i < 200; i++) { ASSERT_OK(Put(0, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_OK(Flush(0)); @@ -610,7 +594,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { // Write some data to CF "two", but don't flush to accumulate for (int i = 0; i < 1000; i++) { ASSERT_OK(Put(2, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } ASSERT_GE( @@ -630,8 +614,7 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { 0); // And the timer worker is stopped - ASSERT_FALSE(scheduler->TEST_HasValidTask( - dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); Close(); } @@ -655,7 +638,7 @@ TEST_F(SeqnoTimeTest, MultiInstancesBasic) { WriteOptions wo; for (int i = 0; i < 200; i++) { ASSERT_OK(dbi->Put(wo, Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping(); @@ -678,7 +661,7 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { for (int j = 0; j < 3; j++) { for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); @@ -700,7 +683,7 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { // Trigger a compaction for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(i), "value")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } ASSERT_OK(Flush()); diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index d968735e2..21ac786b4 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -15,7 +15,7 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" -#include "db/periodic_work_scheduler.h" +#include "db/periodic_task_scheduler.h" #include "monitoring/persistent_stats_history.h" #include "options/options_helper.h" #include "port/stack_trace.h" @@ -44,11 +44,10 @@ class StatsHistoryTest : public DBTestBase { void SetUp() override { mock_clock_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { - auto* periodic_work_scheduler_ptr = - reinterpret_cast(arg); - *periodic_work_scheduler_ptr = - PeriodicWorkTestScheduler::Default(mock_clock_); + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); }); } }; @@ -67,10 +66,10 @@ TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); @@ -99,17 +98,17 @@ TEST_F(StatsHistoryTest, StatsPersistScheduling) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); // Test cancel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); int old_val = counter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec * 2); }); ASSERT_EQ(counter, old_val); @@ -131,7 +130,7 @@ TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { {{"stats_persist_period_sec", std::to_string(kPeriodSec)}})); ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); Close(); @@ -150,11 +149,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ReopenWithColumnFamilies({"default", "pikachu"}, options); // make sure the first stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::unique_ptr stats_iter; @@ -172,7 +171,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ASSERT_GT(stats_count, 0); // Wait a bit and verify no more stats are found for (int i = 0; i < 10; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(1); }); } ASSERT_OK(db_->GetStatsHistory(0, mock_clock_->NowSeconds(), &stats_iter)); @@ -227,7 +226,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { const int kIterations = 10; for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -251,7 +250,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { // Wait for stats persist to finish for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -300,11 +299,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = @@ -312,14 +311,14 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { int key_count1 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count2 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -393,32 +392,32 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::map stats_map_after; @@ -482,10 +481,10 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { ASSERT_EQ(Get(2, "foo"), "bar"); // make sure the first stats persist to finish - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -582,7 +581,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ColumnFamilyData* cfd_default = @@ -601,7 +600,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Eevee", "v0")); ASSERT_EQ("v0", Get(1, "Eevee")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf // LogNumbers: default: 16, stats: 10, pikachu: 5 @@ -630,7 +629,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_EQ("v2", Get("bar2")); ASSERT_EQ("v2", Get("foo2")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to default and stats cf, flushing default cf // LogNumbers: default: 19, stats: 19, pikachu: 19 @@ -645,7 +644,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Jolteon", "v3")); ASSERT_EQ("v3", Get(1, "Jolteon")); - dbfull()->TEST_WaitForPeridicWorkerRun( + dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flushing test cf // LogNumbers: default: 19, stats: 19, pikachu: 22 diff --git a/src.mk b/src.mk index c8b470213..a39f9a7f4 100644 --- a/src.mk +++ b/src.mk @@ -74,7 +74,7 @@ LIB_SOURCES = \ db/merge_helper.cc \ db/merge_operator.cc \ db/output_validator.cc \ - db/periodic_work_scheduler.cc \ + db/periodic_task_scheduler.cc \ db/range_del_aggregator.cc \ db/range_tombstone_fragmenter.cc \ db/repair.cc \ @@ -501,7 +501,7 @@ TEST_MAIN_SOURCES = \ db/obsolete_files_test.cc \ db/options_file_test.cc \ db/perf_context_test.cc \ - db/periodic_work_scheduler_test.cc \ + db/periodic_task_scheduler_test.cc \ db/plain_table_db_test.cc \ db/prefix_test.cc \ db/repair_test.cc \ diff --git a/util/timer.h b/util/timer.h index 4b9ab668e..c2fbf869b 100644 --- a/util/timer.h +++ b/util/timer.h @@ -187,10 +187,9 @@ class Timer { return ret; } - bool TEST_HasVaildTask(const std::string& func_name) const { + void TEST_OverrideTimer(SystemClock* clock) { InstrumentedMutexLock l(&mutex_); - auto it = map_.find(func_name); - return it != map_.end() && it->second->IsValid(); + clock_ = clock; } #endif // NDEBUG