Update RepeatableThreadTest with MockTimeEnv (#5107)

Summary:
**This PR updates RepeatableThread::wait, breaking some tests on OS X. The rest of the PR fixes the tests on OS X.**
`RepeatableThreadTest.MockEnvTest` uses `MockTimeEnv` and `RepeatableThread`. If `RepeatableThread::wait` calls `TimedWait` with a time smaller than or equal to the current (real) time, `TimedWait` returns immediately on certain platforms, e.g. OS X. #4560 addresses this issue by replacing `TimedWait` with `Wait` in test. This fixes the test but makes test/production code diverge, which is not optimal for test coverage. This PR proposes an alternative fix which unifies test and production code path for `RepeatableThread::wait`. We obtain the current (real) time in seconds and add 10 extra seconds to ensure that `RepeatableThread::wait` invokes `TimedWait` with a time greater than (real) current time. This is to prevent the `TimedWait` function from returning immediately without sleeping and releasing the mutex. If `TimedWait` returns immediately, the mutex will not be released, and `RepeatableThread::TEST_WaitForRun` never has a chance to execute the callback which, in this case, updates the result returned by `mock_env->NowMicros()`. Consequently, `RepeatableThread::wait` cannot break out of the loop, causing test to hang. The extra 10 seconds is a best-effort approach because there seems no reliable and deterministic way to provide the aforementioned guarantee. By the time `RepeatableThread::wait` is called, there is no guarantee that the `delay + mock_env->NowMicros()` will be greater than the current real time. However, 10 seconds should be sufficient in most cases. We will keep an eye for possible flakiness of this test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5107

Differential Revision: D14680885

Pulled By: riversand963

fbshipit-source-id: d1ecbe10e1dacd110bd464cd01e188bfee72b89e
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent d77476ef55
commit 09957ded1d
  1. 59
      db/db_options_test.cc
  2. 8
      db/db_sst_test.cc
  3. 2
      util/mock_time_env.h
  4. 23
      util/repeatable_thread.h
  5. 30
      util/repeatable_thread_test.cc

@ -527,6 +527,17 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
int counter = 0; int counter = 0;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DumpStats:1", [&](void* /*arg*/) { "DBImpl::DumpStats:1", [&](void* /*arg*/) {
counter++; counter++;
@ -556,6 +567,17 @@ TEST_F(DBOptionsTest, StatsPersistScheduling) {
mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
int counter = 0; int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
@ -581,6 +603,17 @@ TEST_F(DBOptionsTest, PersistentStatsFreshInstall) {
mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
int counter = 0; int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
@ -616,6 +649,19 @@ TEST_F(DBOptionsTest, GetStatsHistory) {
mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
@ -658,6 +704,19 @@ TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) {
mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);

@ -356,6 +356,14 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
env_->time_elapse_only_sleep_ = true; env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
// Need to disable stats dumping and persisting which also use
// RepeatableThread, one of whose member variables is of type
// InstrumentedCondVar. The callback for
// InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping
// and persisting threads and cause time_spent_deleting measurement to become
// incorrect.
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
options.env = env_; options.env = env_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec

@ -31,6 +31,8 @@ class MockTimeEnv : public EnvWrapper {
return current_time_ * 1000000000; return current_time_ * 1000000000;
} }
uint64_t RealNowMicros() { return target()->NowMicros(); }
void set_current_time(uint64_t time) { void set_current_time(uint64_t time) {
assert(time >= current_time_); assert(time >= current_time_);
current_time_ = time; current_time_ = time;

@ -25,6 +25,7 @@ class RepeatableThread {
env_(env), env_(env),
delay_us_(delay_us), delay_us_(delay_us),
initial_delay_us_(initial_delay_us), initial_delay_us_(initial_delay_us),
mutex_(env),
cond_var_(&mutex_), cond_var_(&mutex_),
running_(true), running_(true),
#ifndef NDEBUG #ifndef NDEBUG
@ -36,7 +37,7 @@ class RepeatableThread {
void cancel() { void cancel() {
{ {
MutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
if (!running_) { if (!running_) {
return; return;
} }
@ -58,7 +59,7 @@ class RepeatableThread {
// //
// Note: only support one caller of this method. // Note: only support one caller of this method.
void TEST_WaitForRun(std::function<void()> callback = nullptr) { void TEST_WaitForRun(std::function<void()> callback = nullptr) {
MutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
while (!waiting_) { while (!waiting_) {
cond_var_.Wait(); cond_var_.Wait();
} }
@ -75,7 +76,7 @@ class RepeatableThread {
private: private:
bool wait(uint64_t delay) { bool wait(uint64_t delay) {
MutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
if (running_ && delay > 0) { if (running_ && delay > 0) {
uint64_t wait_until = env_->NowMicros() + delay; uint64_t wait_until = env_->NowMicros() + delay;
#ifndef NDEBUG #ifndef NDEBUG
@ -83,17 +84,7 @@ class RepeatableThread {
cond_var_.SignalAll(); cond_var_.SignalAll();
#endif #endif
while (running_) { while (running_) {
#ifndef NDEBUG
if (dynamic_cast<MockTimeEnv*>(env_) != nullptr) {
// MockTimeEnv is used. Since it is not easy to mock TimedWait,
// we wait without timeout to wait for TEST_WaitForRun to wake us up.
cond_var_.Wait();
} else {
cond_var_.TimedWait(wait_until); cond_var_.TimedWait(wait_until);
}
#else
cond_var_.TimedWait(wait_until);
#endif
if (env_->NowMicros() >= wait_until) { if (env_->NowMicros() >= wait_until) {
break; break;
} }
@ -124,7 +115,7 @@ class RepeatableThread {
function_(); function_();
#ifndef NDEBUG #ifndef NDEBUG
{ {
MutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
run_count_++; run_count_++;
cond_var_.SignalAll(); cond_var_.SignalAll();
} }
@ -140,8 +131,8 @@ class RepeatableThread {
// Mutex lock should be held when accessing running_, waiting_ // Mutex lock should be held when accessing running_, waiting_
// and run_count_. // and run_count_.
port::Mutex mutex_; InstrumentedMutex mutex_;
port::CondVar cond_var_; InstrumentedCondVar cond_var_;
bool running_; bool running_;
#ifndef NDEBUG #ifndef NDEBUG
// RepeatableThread waiting for timeout. // RepeatableThread waiting for timeout.

@ -8,6 +8,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "util/repeatable_thread.h" #include "util/repeatable_thread.h"
#include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
class RepeatableThreadTest : public testing::Test { class RepeatableThreadTest : public testing::Test {
@ -56,6 +57,35 @@ TEST_F(RepeatableThreadTest, MockEnvTest) {
constexpr int kIteration = 3; constexpr int kIteration = 3;
mock_env_->set_current_time(0); // in seconds mock_env_->set_current_time(0); // in seconds
std::atomic<int> count{0}; std::atomic<int> count{0};
#if defined(OS_MACOSX) && !defined(NDEBUG)
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
// Obtain the current (real) time in seconds and add 1000 extra seconds
// to ensure that RepeatableThread::wait invokes TimedWait with a time
// greater than (real) current time. This is to prevent the TimedWait
// function from returning immediately without sleeping and releasing
// the mutex on certain platforms, e.g. OS X. If TimedWait returns
// immediately, the mutex will not be released, and
// RepeatableThread::TEST_WaitForRun never has a chance to execute the
// callback which, in this case, updates the result returned by
// mock_env->NowMicros. Consequently, RepeatableThread::wait cannot
// break out of the loop, causing test to hang. The extra 1000 seconds
// is a best-effort approach because there seems no reliable and
// deterministic way to provide the aforementioned guarantee. By the
// time RepeatableThread::wait is called, it is no guarantee that the
// delay + mock_env->NowMicros will be greater than the current real
// time. However, 1000 seconds should be sufficient in most cases.
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env_->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env_->RealNowMicros() + 1000;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(), rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(),
1 * kSecond, 1 * kSecond); 1 * kSecond, 1 * kSecond);
for (int i = 1; i <= kIteration; i++) { for (int i = 1; i <= kIteration; i++) {

Loading…
Cancel
Save