Fix seqno->time worker not scheduled with multi DB instances (#10383)

Summary:
`PeriodicWorkScheduler` is a global singleton, which were used to store per-instance setting `record_seqno_time_cadence_`. Move that to db_impl.h which is per-instance.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10383

Reviewed By: siying

Differential Revision: D37928009

Pulled By: jay-zhuang

fbshipit-source-id: e517754f4a9db98798ac04f72033d4b517f734e9
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent 5b5144deb2
commit 18a61a1734
  1. 15
      db/db_impl/db_impl.cc
  2. 6
      db/db_impl/db_impl.h
  3. 3
      db/db_impl/db_impl_debug.cc
  4. 10
      db/periodic_work_scheduler.cc
  5. 3
      db/periodic_work_scheduler.h
  6. 32
      db/seqno_time_test.cc

@ -823,8 +823,19 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() {
min_time_duration / SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF; min_time_duration / SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
} }
Status s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( Status s;
if (seqno_time_cadence != record_seqno_time_cadence_) {
if (seqno_time_cadence == 0) {
periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this);
} else {
s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker(
this, seqno_time_cadence); this, seqno_time_cadence);
}
if (s.ok()) {
record_seqno_time_cadence_ = seqno_time_cadence;
}
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
// TODO: Fix the timer cannot cancel and re-add the same task // TODO: Fix the timer cannot cancel and re-add the same task
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
@ -833,6 +844,8 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() {
"the change effective, please reopen the DB instance."); "the change effective, please reopen the DB instance.");
s = Status::OK(); s = Status::OK();
} }
}
return s; return s;
#else #else
return Status::OK(); return Status::OK();

@ -1160,7 +1160,7 @@ class DBImpl : public DB {
int TEST_BGFlushesAllowed() const; int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForPeridicWorkerRun(std::function<void()> callback) const; void TEST_WaitForPeridicWorkerRun(std::function<void()> callback) const;
const SeqnoToTimeMapping& TEST_GetSeqnoToTimeMapping() const; SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const;
size_t TEST_EstimateInMemoryStatsHistorySize() const; size_t TEST_EstimateInMemoryStatsHistorySize() const;
uint64_t TEST_GetCurrentLogNumber() const { uint64_t TEST_GetCurrentLogNumber() const {
@ -2544,6 +2544,10 @@ class DBImpl : public DB {
// PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by // PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by
// PeriodicWorkTestScheduler. // PeriodicWorkTestScheduler.
PeriodicWorkScheduler* periodic_work_scheduler_; PeriodicWorkScheduler* periodic_work_scheduler_;
// Current cadence of the periodic worker for recording sequence number to
// time.
uint64_t record_seqno_time_cadence_ = 0;
#endif #endif
// When set, we use a separate queue for writes that don't write to memtable. // When set, we use a separate queue for writes that don't write to memtable.

@ -314,7 +314,8 @@ PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const {
return static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_); return static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_);
} }
const SeqnoToTimeMapping& DBImpl::TEST_GetSeqnoToTimeMapping() const { SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const {
InstrumentedMutexLock l(&mutex_);
return seqno_time_mapping_; return seqno_time_mapping_;
} }

@ -64,14 +64,6 @@ Status PeriodicWorkScheduler::Register(DBImpl* dbi,
Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker(
DBImpl* dbi, uint64_t record_cadence_sec) { DBImpl* dbi, uint64_t record_cadence_sec) {
MutexLock l(&timer_mu_); MutexLock l(&timer_mu_);
if (record_seqno_time_cadence_ == record_cadence_sec) {
return Status::OK();
}
if (record_cadence_sec == 0) {
timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime));
record_seqno_time_cadence_ = record_cadence_sec;
return Status::OK();
}
timer->Start(); timer->Start();
static std::atomic_uint64_t initial_delay(0); static std::atomic_uint64_t initial_delay(0);
bool succeeded = timer->Add( bool succeeded = timer->Add(
@ -83,7 +75,6 @@ Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker(
return Status::NotSupported( return Status::NotSupported(
"Updating seqno to time worker cadence is not supported yet"); "Updating seqno to time worker cadence is not supported yet");
} }
record_seqno_time_cadence_ = record_cadence_sec;
return Status::OK(); return Status::OK();
} }
@ -93,7 +84,6 @@ void PeriodicWorkScheduler::UnregisterRecordSeqnoTimeWorker(DBImpl* dbi) {
if (!timer->HasPendingTask()) { if (!timer->HasPendingTask()) {
timer->Shutdown(); timer->Shutdown();
} }
record_seqno_time_cadence_ = 0;
} }
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {

@ -57,9 +57,6 @@ class PeriodicWorkScheduler {
// Get the unique task name (prefix with db session id) // Get the unique task name (prefix with db session id)
std::string GetTaskName(const DBImpl* dbi, std::string GetTaskName(const DBImpl* dbi,
const std::string& func_name) const; const std::string& func_name) const;
private:
uint64_t record_seqno_time_cadence_ = 0;
}; };
#ifndef NDEBUG #ifndef NDEBUG

@ -639,6 +639,38 @@ TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) {
Close(); Close();
} }
TEST_F(SeqnoTimeTest, MultiInstancesBasic) {
const int kInstanceNum = 2;
Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 10000;
options.env = mock_env_.get();
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
auto dbs = std::vector<DB*>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
// Make sure the second instance has the worker enabled
auto dbi = static_cast_with_check<DBImpl>(dbs[1]);
WriteOptions wo;
for (int i = 0; i < 200; i++) {
ASSERT_OK(dbi->Put(wo, Key(i), "value"));
dbfull()->TEST_WaitForPeridicWorkerRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping();
ASSERT_GT(seqno_to_time_mapping.Size(), 10);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(dbs[i]->Close());
delete dbs[i];
}
}
TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;

Loading…
Cancel
Save