diff --git a/db/db_test_util.h b/db/db_test_util.h index 80b9895a3..5dfe231f7 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1150,28 +1150,4 @@ class DBTestBase : public testing::Test { bool time_elapse_only_sleep_on_reopen_ = false; }; -class SafeMockTimeEnv : public MockTimeEnv { - public: - explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) { - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); -#if defined(OS_MACOSX) && !defined(NDEBUG) - // This is an alternate way (vs. SpecialEnv) of dealing with the fact - // that on some platforms, pthread_cond_timedwait does not appear to - // release the lock for other threads to operate if the deadline time - // is already passed. (TimedWait calls are currently a bad abstraction - // because the deadline parameter is usually computed from Env time, - // but is interpreted in real clock time.) - SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t time_us = *reinterpret_cast(arg); - if (time_us < this->RealNowMicros()) { - *reinterpret_cast(arg) = this->RealNowMicros() + 1000; - } - }); -#endif // OS_MACOSX && !NDEBUG - SyncPoint::GetInstance()->EnableProcessing(); - } -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/monitoring/stats_dump_scheduler_test.cc b/monitoring/stats_dump_scheduler_test.cc index 3c7f30f18..ac4a83703 100644 --- a/monitoring/stats_dump_scheduler_test.cc +++ b/monitoring/stats_dump_scheduler_test.cc @@ -14,12 +14,13 @@ class StatsDumpSchedulerTest : public DBTestBase { public: StatsDumpSchedulerTest() : DBTestBase("/stats_dump_scheduler_test", /*env_do_fsync=*/true), - mock_env_(new SafeMockTimeEnv(Env::Default())) {} + mock_env_(new MockTimeEnv(Env::Default())) {} protected: - std::unique_ptr mock_env_; + std::unique_ptr mock_env_; void SetUp() override { + mock_env_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( "DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { auto* stats_dump_scheduler_ptr = diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 65ac934a5..f5674f5d0 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -33,12 +33,13 @@ class StatsHistoryTest : public DBTestBase { public: StatsHistoryTest() : DBTestBase("/stats_history_test", /*env_do_fsync=*/true), - mock_env_(new SafeMockTimeEnv(Env::Default())) {} + mock_env_(new MockTimeEnv(Env::Default())) {} protected: - std::unique_ptr mock_env_; + std::unique_ptr mock_env_; void SetUp() override { + mock_env_->InstallTimedWaitFixCallback(); SyncPoint::GetInstance()->SetCallBack( "DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) { auto* stats_dump_scheduler_ptr = diff --git a/test_util/mock_time_env.h b/test_util/mock_time_env.h index 8c60b4615..0e479e116 100644 --- a/test_util/mock_time_env.h +++ b/test_util/mock_time_env.h @@ -41,6 +41,30 @@ class MockTimeEnv : public EnvWrapper { current_time_ = time; } + // TODO: this is a workaround for the different behavior on different platform + // for timedwait timeout. Ideally timedwait API should be moved to env. + // details: PR #7101. + void InstallTimedWaitFixCallback() { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + // This is an alternate way (vs. SpecialEnv) of dealing with the fact + // that on some platforms, pthread_cond_timedwait does not appear to + // release the lock for other threads to operate if the deadline time + // is already passed. (TimedWait calls are currently a bad abstraction + // because the deadline parameter is usually computed from Env time, + // but is interpreted in real clock time.) + SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < this->RealNowMicros()) { + *reinterpret_cast(arg) = this->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + SyncPoint::GetInstance()->EnableProcessing(); + } + private: std::atomic current_time_{0}; }; diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 1ac8edee6..f78b1a8c2 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -10,7 +10,9 @@ #include "port/port.h" #include "rocksdb/env.h" +#ifndef NDEBUG #include "test_util/mock_time_env.h" +#endif // !NDEBUG #include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { diff --git a/util/timer.h b/util/timer.h index de9e9d1ab..a789fcb1e 100644 --- a/util/timer.h +++ b/util/timer.h @@ -42,9 +42,15 @@ class Timer { running_(false), executing_task_(false) {} - // Add a new function. If the fn_name already exists, overriding it, - // regardless if the function is pending removed (invalid) or not. - // repeat_every_us == 0 means do not repeat + // Add a new function to run. + // fn_name has to be identical, otherwise, the new one overrides the existing + // one, regardless if the function is pending removed (invalid) or not. + // start_after_us is the initial delay. + // repeat_every_us is the interval between ending time of the last call and + // starting time of the next call. For example, repeat_every_us = 2000 and + // the function takes 1000us to run. If it starts at time [now]us, then it + // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us. + // repeat_every_us == 0 means do not repeat. void Add(std::function fn, const std::string& fn_name, uint64_t start_after_us, @@ -138,10 +144,18 @@ class Timer { } #ifndef NDEBUG + // Wait until Timer starting waiting, call the optional callback, then wait + // for Timer waiting again. + // Tests can provide a custom env object to mock time, and use the callback + // here to bump current time and trigger Timer. See timer_test for example. + // + // Note: only support one caller of this method. void TEST_WaitForRun(std::function callback = nullptr) { InstrumentedMutexLock l(&mutex_); - while (!heap_.empty() && - heap_.top()->next_run_time_us <= env_->NowMicros()) { + // It act as a spin lock + while (executing_task_ || + (!heap_.empty() && + heap_.top()->next_run_time_us <= env_->NowMicros())) { cond_var_.TimedWait(env_->NowMicros() + 1000); } if (callback != nullptr) { @@ -150,8 +164,9 @@ class Timer { cond_var_.SignalAll(); do { cond_var_.TimedWait(env_->NowMicros() + 1000); - } while (!heap_.empty() && - heap_.top()->next_run_time_us <= env_->NowMicros()); + } while ( + executing_task_ || + (!heap_.empty() && heap_.top()->next_run_time_us <= env_->NowMicros())); } size_t TEST_GetPendingTaskNum() const { diff --git a/util/timer_test.cc b/util/timer_test.cc index 7e02516c6..e12208c74 100644 --- a/util/timer_test.cc +++ b/util/timer_test.cc @@ -15,272 +15,187 @@ class TimerTest : public testing::Test { protected: std::unique_ptr mock_env_; - -#if defined(OS_MACOSX) && !defined(NDEBUG) - // On some platforms (MacOS) pthread_cond_timedwait does not appear - // to release the lock for other threads to operate if the deadline time - // is already passed. This is a problem for tests in general because - // TimedWait calls are a bad abstraction: the deadline parameter is - // usually computed from Env time, but is interpreted in real clock time. - // Since this test doesn't even pretend to use clock times, we have - // to mock TimedWait to ensure it yields. - void SetUp() override { - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { - uint64_t* time_us = reinterpret_cast(arg); - if (*time_us < mock_env_->RealNowMicros()) { - *time_us = mock_env_->RealNowMicros() + 1000; - } - }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - } -#endif // OS_MACOSX && !NDEBUG - const uint64_t kSecond = 1000000; // 1sec = 1000000us + + void SetUp() override { mock_env_->InstallTimedWaitFixCallback(); } }; TEST_F(TimerTest, SingleScheduleOnceTest) { - const int kIterations = 1; - uint64_t time_counter = 0; - mock_env_->set_current_time(0); - - InstrumentedMutex mutex; - InstrumentedCondVar test_cv(&mutex); - + const int kInitDelaySec = 1; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); + int count = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex); - count++; - if (count >= kIterations) { - test_cv.SignalAll(); - } - }, - "fn_sch_test", 1 * kSecond, 0); + timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond, 0); ASSERT_TRUE(timer.Start()); + ASSERT_EQ(0, count); // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex); - while(count < kIterations) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv.TimedWait(time_counter); - } - } + mock_time_sec += kInitDelaySec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(1, count); ASSERT_TRUE(timer.Shutdown()); - - ASSERT_EQ(1, count); } TEST_F(TimerTest, MultipleScheduleOnceTest) { - const int kIterations = 1; - uint64_t time_counter = 0; - mock_env_->set_current_time(0); - InstrumentedMutex mutex1; - InstrumentedCondVar test_cv1(&mutex1); - + const int kInitDelay1Sec = 1; + const int kInitDelay2Sec = 3; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); + int count1 = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex1); - count1++; - if (count1 >= kIterations) { - test_cv1.SignalAll(); - } - }, - "fn_sch_test1", 1 * kSecond, 0); + timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Sec * kSecond, 0); - InstrumentedMutex mutex2; - InstrumentedCondVar test_cv2(&mutex2); int count2 = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex2); - count2 += 5; - if (count2 >= kIterations) { - test_cv2.SignalAll(); - } - }, - "fn_sch_test2", 3 * kSecond, 0); + timer.Add([&] { count2++; }, "fn_sch_test2", kInitDelay2Sec * kSecond, 0); ASSERT_TRUE(timer.Start()); + ASSERT_EQ(0, count1); + ASSERT_EQ(0, count2); - // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex1); - while (count1 < kIterations) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv1.TimedWait(time_counter); - } - } + mock_time_sec = kInitDelay1Sec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); - // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex2); - while(count2 < kIterations) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv2.TimedWait(time_counter); - } - } + ASSERT_EQ(1, count1); + ASSERT_EQ(0, count2); - ASSERT_TRUE(timer.Shutdown()); + mock_time_sec = kInitDelay2Sec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); ASSERT_EQ(1, count1); - ASSERT_EQ(5, count2); + ASSERT_EQ(1, count2); + + ASSERT_TRUE(timer.Shutdown()); } TEST_F(TimerTest, SingleScheduleRepeatedlyTest) { const int kIterations = 5; - uint64_t time_counter = 0; - mock_env_->set_current_time(0); - - InstrumentedMutex mutex; - InstrumentedCondVar test_cv(&mutex); + const int kInitDelaySec = 1; + const int kRepeatSec = 1; + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); int count = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex); - count++; - if (count >= kIterations) { - test_cv.SignalAll(); - } - }, - "fn_sch_test", 1 * kSecond, 1 * kSecond); + timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond, + kRepeatSec * kSecond); ASSERT_TRUE(timer.Start()); + ASSERT_EQ(0, count); + + mock_time_sec += kInitDelaySec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + + ASSERT_EQ(1, count); // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex); - while(count < kIterations) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv.TimedWait(time_counter); - } + for (int i = 1; i < kIterations; i++) { + mock_time_sec += kRepeatSec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); } + ASSERT_EQ(kIterations, count); ASSERT_TRUE(timer.Shutdown()); - - ASSERT_EQ(5, count); } TEST_F(TimerTest, MultipleScheduleRepeatedlyTest) { - uint64_t time_counter = 0; - mock_env_->set_current_time(0); + const int kInitDelay1Sec = 0; + const int kInitDelay2Sec = 1; + const int kInitDelay3Sec = 0; + const int kRepeatSec = 2; + const int kLargeRepeatSec = 100; + const int kIterations = 5; + + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); - InstrumentedMutex mutex1; - InstrumentedCondVar test_cv1(&mutex1); - const int kIterations1 = 5; int count1 = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex1); - count1++; - if (count1 >= kIterations1) { - test_cv1.SignalAll(); - } - }, - "fn_sch_test1", 0, 2 * kSecond); + timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Sec * kSecond, + kRepeatSec * kSecond); - InstrumentedMutex mutex2; - InstrumentedCondVar test_cv2(&mutex2); - const int kIterations2 = 5; int count2 = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex2); - count2++; - if (count2 >= kIterations2) { - test_cv2.SignalAll(); - } - }, - "fn_sch_test2", 1 * kSecond, 2 * kSecond); + timer.Add([&] { count2++; }, "fn_sch_test2", kInitDelay2Sec * kSecond, + kRepeatSec * kSecond); + + // Add a function with relatively large repeat interval + int count3 = 0; + timer.Add([&] { count3++; }, "fn_sch_test3", kInitDelay3Sec * kSecond, + kLargeRepeatSec * kSecond); ASSERT_TRUE(timer.Start()); + ASSERT_EQ(0, count2); + ASSERT_EQ(0, count3); // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex1); - while(count1 < kIterations1) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv1.TimedWait(time_counter); - } + for (; count1 < kIterations; mock_time_sec++) { + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ((mock_time_sec + 2) / kRepeatSec, count1); + ASSERT_EQ((mock_time_sec + 1) / kRepeatSec, count2); + + // large interval function should only run once (the first one). + ASSERT_EQ(1, count3); } timer.Cancel("fn_sch_test1"); // Wait for execution to finish - { - InstrumentedMutexLock l(&mutex2); - while(count2 < kIterations2) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv2.TimedWait(time_counter); - } - } + mock_time_sec++; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(kIterations, count1); + ASSERT_EQ(kIterations, count2); + ASSERT_EQ(1, count3); timer.Cancel("fn_sch_test2"); - ASSERT_TRUE(timer.Shutdown()); + ASSERT_EQ(kIterations, count1); + ASSERT_EQ(kIterations, count2); - ASSERT_EQ(count1, 5); - ASSERT_EQ(count2, 5); + // execute the long interval one + mock_time_sec = kLargeRepeatSec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(2, count3); + + ASSERT_TRUE(timer.Shutdown()); } TEST_F(TimerTest, AddAfterStartTest) { const int kIterations = 5; - InstrumentedMutex mutex; - InstrumentedCondVar test_cv(&mutex); + const int kInitDelaySec = 1; + const int kRepeatSec = 1; // wait timer to run and then add a new job SyncPoint::GetInstance()->LoadDependency( {{"Timer::Run::Waiting", "TimerTest:AddAfterStartTest:1"}}); SyncPoint::GetInstance()->EnableProcessing(); - mock_env_->set_current_time(0); + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); ASSERT_TRUE(timer.Start()); TEST_SYNC_POINT("TimerTest:AddAfterStartTest:1"); int count = 0; - timer.Add( - [&] { - InstrumentedMutexLock l(&mutex); - count++; - if (count >= kIterations) { - test_cv.SignalAll(); - } - }, - "fn_sch_test", 1 * kSecond, 1 * kSecond); - + timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond, + kRepeatSec * kSecond); + ASSERT_EQ(0, count); // Wait for execution to finish - uint64_t time_counter = 0; - { - InstrumentedMutexLock l(&mutex); - while (count < kIterations) { - time_counter += kSecond; - mock_env_->set_current_time(time_counter); - test_cv.TimedWait(time_counter); - } + mock_time_sec += kInitDelaySec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(1, count); + + for (int i = 1; i < kIterations; i++) { + mock_time_sec += kRepeatSec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); } + ASSERT_EQ(kIterations, count); ASSERT_TRUE(timer.Shutdown()); - - ASSERT_EQ(kIterations, count); } TEST_F(TimerTest, CancelRunningTask) { @@ -356,35 +271,86 @@ TEST_F(TimerTest, ShutdownRunningTask) { delete value; } -TEST_F(TimerTest, AddSameFuncNameTest) { - mock_env_->set_current_time(0); +TEST_F(TimerTest, AddSameFuncName) { + const int kInitDelaySec = 1; + const int kRepeat1Sec = 5; + const int kRepeat2Sec = 4; + + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); Timer timer(mock_env_.get()); ASSERT_TRUE(timer.Start()); int func_counter1 = 0; - timer.Add([&] { func_counter1++; }, "duplicated_func", 1 * kSecond, - 5 * kSecond); + timer.Add([&] { func_counter1++; }, "duplicated_func", + kInitDelaySec * kSecond, kRepeat1Sec * kSecond); int func2_counter = 0; - timer.Add([&] { func2_counter++; }, "func2", 1 * kSecond, 4 * kSecond); + timer.Add([&] { func2_counter++; }, "func2", kInitDelaySec * kSecond, + kRepeat2Sec * kSecond); // New function with the same name should override the existing one int func_counter2 = 0; - timer.Add([&] { func_counter2++; }, "duplicated_func", 1 * kSecond, - 5 * kSecond); + timer.Add([&] { func_counter2++; }, "duplicated_func", + kInitDelaySec * kSecond, kRepeat1Sec * kSecond); + + ASSERT_EQ(0, func_counter1); + ASSERT_EQ(0, func2_counter); + ASSERT_EQ(0, func_counter2); + + mock_time_sec += kInitDelaySec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + + ASSERT_EQ(0, func_counter1); + ASSERT_EQ(1, func2_counter); + ASSERT_EQ(1, func_counter2); - timer.TEST_WaitForRun([&] { mock_env_->set_current_time(1); }); + mock_time_sec += kRepeat1Sec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); - ASSERT_EQ(func_counter1, 0); - ASSERT_EQ(func2_counter, 1); - ASSERT_EQ(func_counter2, 1); + ASSERT_EQ(0, func_counter1); + ASSERT_EQ(2, func2_counter); + ASSERT_EQ(2, func_counter2); - timer.TEST_WaitForRun([&] { mock_env_->set_current_time(6); }); + ASSERT_TRUE(timer.Shutdown()); +} + +TEST_F(TimerTest, RepeatIntervalWithFuncRunningTime) { + const int kInitDelaySec = 1; + const int kRepeatSec = 5; + const int kFuncRunningTimeSec = 1; + + int mock_time_sec = 0; + mock_env_->set_current_time(mock_time_sec); + Timer timer(mock_env_.get()); - ASSERT_EQ(func_counter1, 0); - ASSERT_EQ(func2_counter, 2); - ASSERT_EQ(func_counter2, 2); + ASSERT_TRUE(timer.Start()); + + int func_counter = 0; + timer.Add( + [&] { + mock_env_->set_current_time(mock_time_sec + kFuncRunningTimeSec); + func_counter++; + }, + "func", kInitDelaySec * kSecond, kRepeatSec * kSecond); + + ASSERT_EQ(0, func_counter); + mock_time_sec += kInitDelaySec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(1, func_counter); + + // After repeat interval time, the function is not executed, as running + // the function takes some time (`kFuncRunningTimeSec`). The repeat interval + // is the time between ending time of the last call and starting time of the + // next call. + mock_time_sec += kRepeatSec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(1, func_counter); + + mock_time_sec += kFuncRunningTimeSec; + timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); }); + ASSERT_EQ(2, func_counter); ASSERT_TRUE(timer.Shutdown()); }