diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8a5216d8d..7ac73671f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -823,16 +823,29 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() { min_time_duration / SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF; } - Status s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( - this, seqno_time_cadence); - if (s.IsNotSupported()) { - // TODO: Fix the timer cannot cancel and re-add the same task - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Updating seqno to time worker cadence is not supported yet, to make " - "the change effective, please reopen the DB instance."); - s = Status::OK(); + Status s; + if (seqno_time_cadence != record_seqno_time_cadence_) { + if (seqno_time_cadence == 0) { + periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); + } else { + s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( + this, seqno_time_cadence); + } + + if (s.ok()) { + record_seqno_time_cadence_ = seqno_time_cadence; + } + + if (s.IsNotSupported()) { + // TODO: Fix the timer cannot cancel and re-add the same task + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Updating seqno to time worker cadence is not supported yet, to make " + "the change effective, please reopen the DB instance."); + s = Status::OK(); + } } + return s; #else return Status::OK(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9315b3737..cecff3d1d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1160,7 +1160,7 @@ class DBImpl : public DB { int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; void TEST_WaitForPeridicWorkerRun(std::function callback) const; - const SeqnoToTimeMapping& TEST_GetSeqnoToTimeMapping() const; + SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const; size_t TEST_EstimateInMemoryStatsHistorySize() const; uint64_t TEST_GetCurrentLogNumber() const { @@ -2544,6 +2544,10 @@ class DBImpl : public DB { // PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by // PeriodicWorkTestScheduler. PeriodicWorkScheduler* periodic_work_scheduler_; + + // Current cadence of the periodic worker for recording sequence number to + // time. + uint64_t record_seqno_time_cadence_ = 0; #endif // When set, we use a separate queue for writes that don't write to memtable. diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index b70959c35..8ea09e883 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -314,7 +314,8 @@ PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const { return static_cast(periodic_work_scheduler_); } -const SeqnoToTimeMapping& DBImpl::TEST_GetSeqnoToTimeMapping() const { +SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const { + InstrumentedMutexLock l(&mutex_); return seqno_time_mapping_; } diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc index c2742cb49..4763f63b4 100644 --- a/db/periodic_work_scheduler.cc +++ b/db/periodic_work_scheduler.cc @@ -64,14 +64,6 @@ Status PeriodicWorkScheduler::Register(DBImpl* dbi, Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( DBImpl* dbi, uint64_t record_cadence_sec) { MutexLock l(&timer_mu_); - if (record_seqno_time_cadence_ == record_cadence_sec) { - return Status::OK(); - } - if (record_cadence_sec == 0) { - timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime)); - record_seqno_time_cadence_ = record_cadence_sec; - return Status::OK(); - } timer->Start(); static std::atomic_uint64_t initial_delay(0); bool succeeded = timer->Add( @@ -83,7 +75,6 @@ Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( return Status::NotSupported( "Updating seqno to time worker cadence is not supported yet"); } - record_seqno_time_cadence_ = record_cadence_sec; return Status::OK(); } @@ -93,7 +84,6 @@ void PeriodicWorkScheduler::UnregisterRecordSeqnoTimeWorker(DBImpl* dbi) { if (!timer->HasPendingTask()) { timer->Shutdown(); } - record_seqno_time_cadence_ = 0; } void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { diff --git a/db/periodic_work_scheduler.h b/db/periodic_work_scheduler.h index 8b4eb1959..acfb24396 100644 --- a/db/periodic_work_scheduler.h +++ b/db/periodic_work_scheduler.h @@ -57,9 +57,6 @@ class PeriodicWorkScheduler { // Get the unique task name (prefix with db session id) std::string GetTaskName(const DBImpl* dbi, const std::string& func_name) const; - - private: - uint64_t record_seqno_time_cadence_ = 0; }; #ifndef NDEBUG diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index 48fa4ae83..0d6262175 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -639,6 +639,38 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { Close(); } +TEST_F(SeqnoTimeTest, MultiInstancesBasic) { + const int kInstanceNum = 2; + + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + + auto dbs = std::vector(kInstanceNum); + for (int i = 0; i < kInstanceNum; i++) { + ASSERT_OK( + DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i]))); + } + + // Make sure the second instance has the worker enabled + auto dbi = static_cast_with_check(dbs[1]); + WriteOptions wo; + for (int i = 0; i < 200; i++) { + ASSERT_OK(dbi->Put(wo, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping(); + ASSERT_GT(seqno_to_time_mapping.Size(), 10); + + for (int i = 0; i < kInstanceNum; i++) { + ASSERT_OK(dbs[i]->Close()); + delete dbs[i]; + } +} + TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal;