From 09957ded1df3de32eacfe4c2b2079b3e32064775 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 29 Mar 2019 10:03:02 -0700 Subject: [PATCH] Update RepeatableThreadTest with MockTimeEnv (#5107) Summary: **This PR updates RepeatableThread::wait, breaking some tests on OS X. The rest of the PR fixes the tests on OS X.** `RepeatableThreadTest.MockEnvTest` uses `MockTimeEnv` and `RepeatableThread`. If `RepeatableThread::wait` calls `TimedWait` with a time smaller than or equal to the current (real) time, `TimedWait` returns immediately on certain platforms, e.g. OS X. #4560 addresses this issue by replacing `TimedWait` with `Wait` in test. This fixes the test but makes test/production code diverge, which is not optimal for test coverage. This PR proposes an alternative fix which unifies test and production code path for `RepeatableThread::wait`. We obtain the current (real) time in seconds and add 10 extra seconds to ensure that `RepeatableThread::wait` invokes `TimedWait` with a time greater than (real) current time. This is to prevent the `TimedWait` function from returning immediately without sleeping and releasing the mutex. If `TimedWait` returns immediately, the mutex will not be released, and `RepeatableThread::TEST_WaitForRun` never has a chance to execute the callback which, in this case, updates the result returned by `mock_env->NowMicros()`. Consequently, `RepeatableThread::wait` cannot break out of the loop, causing test to hang. The extra 10 seconds is a best-effort approach because there seems no reliable and deterministic way to provide the aforementioned guarantee. By the time `RepeatableThread::wait` is called, there is no guarantee that the `delay + mock_env->NowMicros()` will be greater than the current real time. However, 10 seconds should be sufficient in most cases. We will keep an eye for possible flakiness of this test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5107 Differential Revision: D14680885 Pulled By: riversand963 fbshipit-source-id: d1ecbe10e1dacd110bd464cd01e188bfee72b89e --- db/db_options_test.cc | 59 ++++++++++++++++++++++++++++++++++ db/db_sst_test.cc | 8 +++++ util/mock_time_env.h | 2 ++ util/repeatable_thread.h | 23 ++++--------- util/repeatable_thread_test.cc | 30 +++++++++++++++++ 5 files changed, 106 insertions(+), 16 deletions(-) diff --git a/db/db_options_test.cc b/db/db_options_test.cc index d3a1f7a43..a7ecf1274 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -527,6 +527,17 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); int counter = 0; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; @@ -556,6 +567,17 @@ TEST_F(DBOptionsTest, StatsPersistScheduling) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG int counter = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); @@ -581,6 +603,17 @@ TEST_F(DBOptionsTest, PersistentStatsFreshInstall) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG int counter = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); @@ -616,6 +649,19 @@ TEST_F(DBOptionsTest, GetStatsHistory) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); @@ -658,6 +704,19 @@ TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index cd6cde19a..dcd5847eb 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -356,6 +356,14 @@ TEST_F(DBSSTTest, RateLimitedDelete) { env_->time_elapse_only_sleep_ = true; Options options = CurrentOptions(); options.disable_auto_compactions = true; + // Need to disable stats dumping and persisting which also use + // RepeatableThread, one of whose member variables is of type + // InstrumentedCondVar. The callback for + // InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping + // and persisting threads and cause time_spent_deleting measurement to become + // incorrect. + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; options.env = env_; int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec diff --git a/util/mock_time_env.h b/util/mock_time_env.h index c6ab8a748..feada4777 100644 --- a/util/mock_time_env.h +++ b/util/mock_time_env.h @@ -31,6 +31,8 @@ class MockTimeEnv : public EnvWrapper { return current_time_ * 1000000000; } + uint64_t RealNowMicros() { return target()->NowMicros(); } + void set_current_time(uint64_t time) { assert(time >= current_time_); current_time_ = time; diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 770ba5772..967cc4994 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -25,6 +25,7 @@ class RepeatableThread { env_(env), delay_us_(delay_us), initial_delay_us_(initial_delay_us), + mutex_(env), cond_var_(&mutex_), running_(true), #ifndef NDEBUG @@ -36,7 +37,7 @@ class RepeatableThread { void cancel() { { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (!running_) { return; } @@ -58,7 +59,7 @@ class RepeatableThread { // // Note: only support one caller of this method. void TEST_WaitForRun(std::function callback = nullptr) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); while (!waiting_) { cond_var_.Wait(); } @@ -75,7 +76,7 @@ class RepeatableThread { private: bool wait(uint64_t delay) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (running_ && delay > 0) { uint64_t wait_until = env_->NowMicros() + delay; #ifndef NDEBUG @@ -83,17 +84,7 @@ class RepeatableThread { cond_var_.SignalAll(); #endif while (running_) { -#ifndef NDEBUG - if (dynamic_cast(env_) != nullptr) { - // MockTimeEnv is used. Since it is not easy to mock TimedWait, - // we wait without timeout to wait for TEST_WaitForRun to wake us up. - cond_var_.Wait(); - } else { - cond_var_.TimedWait(wait_until); - } -#else cond_var_.TimedWait(wait_until); -#endif if (env_->NowMicros() >= wait_until) { break; } @@ -124,7 +115,7 @@ class RepeatableThread { function_(); #ifndef NDEBUG { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); run_count_++; cond_var_.SignalAll(); } @@ -140,8 +131,8 @@ class RepeatableThread { // Mutex lock should be held when accessing running_, waiting_ // and run_count_. - port::Mutex mutex_; - port::CondVar cond_var_; + InstrumentedMutex mutex_; + InstrumentedCondVar cond_var_; bool running_; #ifndef NDEBUG // RepeatableThread waiting for timeout. diff --git a/util/repeatable_thread_test.cc b/util/repeatable_thread_test.cc index dec437da3..ee853c105 100644 --- a/util/repeatable_thread_test.cc +++ b/util/repeatable_thread_test.cc @@ -8,6 +8,7 @@ #include "db/db_test_util.h" #include "util/repeatable_thread.h" +#include "util/sync_point.h" #include "util/testharness.h" class RepeatableThreadTest : public testing::Test { @@ -56,6 +57,35 @@ TEST_F(RepeatableThreadTest, MockEnvTest) { constexpr int kIteration = 3; mock_env_->set_current_time(0); // in seconds std::atomic count{0}; + +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + // Obtain the current (real) time in seconds and add 1000 extra seconds + // to ensure that RepeatableThread::wait invokes TimedWait with a time + // greater than (real) current time. This is to prevent the TimedWait + // function from returning immediately without sleeping and releasing + // the mutex on certain platforms, e.g. OS X. If TimedWait returns + // immediately, the mutex will not be released, and + // RepeatableThread::TEST_WaitForRun never has a chance to execute the + // callback which, in this case, updates the result returned by + // mock_env->NowMicros. Consequently, RepeatableThread::wait cannot + // break out of the loop, causing test to hang. The extra 1000 seconds + // is a best-effort approach because there seems no reliable and + // deterministic way to provide the aforementioned guarantee. By the + // time RepeatableThread::wait is called, it is no guarantee that the + // delay + mock_env->NowMicros will be greater than the current real + // time. However, 1000 seconds should be sufficient in most cases. + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env_->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env_->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(), 1 * kSecond, 1 * kSecond); for (int i = 1; i <= kIteration; i++) {