Utility to run task periodically in a thread (#4423)

Summary:
Introduce `RepeatableThread` utility to run task periodically in a separate thread. It is basically the same as the the same class in fbcode, and in addition provide a helper method to let tests mock time and trigger execution one at a time.

We can use this class to replace `TimerQueue` in #4382 and `BlobDB`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4423

Differential Revision: D10020932

Pulled By: yiwu-arbug

fbshipit-source-id: 3616bef108c39a33c92eedb1256de424b7c04087
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 75ca13875c
commit d6f2ecf49c
  1. 1
      CMakeLists.txt
  2. 4
      Makefile
  3. 5
      TARGETS
  4. 1
      src.mk
  5. 142
      util/repeatable_thread.h
  6. 76
      util/repeatable_thread_test.cc

@ -946,6 +946,7 @@ if(WITH_TESTS)
util/hash_test.cc
util/heap_test.cc
util/rate_limiter_test.cc
util/repeatable_thread_test.cc
util/slice_transform_test.cc
util/timer_queue_test.cc
util/thread_list_test.cc

@ -533,6 +533,7 @@ TESTS = \
write_unprepared_transaction_test \
db_universal_compaction_test \
trace_analyzer_test \
repeatable_thread_test \
PARALLEL_TEST = \
backupable_db_test \
@ -1556,6 +1557,9 @@ range_del_aggregator_bench: db/range_del_aggregator_bench.o $(LIBOBJECTS) $(TEST
blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
repeatable_thread_test: util/repeatable_thread_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

@ -935,6 +935,11 @@ ROCKS_TESTS = [
"db/repair_test.cc",
"serial",
],
[
"repeatable_thread_test",
"util/repeatable_thread_test.cc",
"serial",
],
[
"sim_cache_test",
"utilities/simulator_cache/sim_cache_test.cc",

@ -378,6 +378,7 @@ MAIN_SOURCES = \
util/filelock_test.cc \
util/log_write_bench.cc \
util/rate_limiter_test.cc \
util/repeatable_thread_test.cc \
util/slice_transform_test.cc \
util/timer_queue_test.cc \
util/thread_list_test.cc \

@ -0,0 +1,142 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <functional>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/mutexlock.h"
namespace rocksdb {
class RepeatableThread {
public:
RepeatableThread(std::function<void()> function,
const std::string& thread_name, Env* env, uint64_t delay_us,
uint64_t initial_delay_us = 0)
: function_(function),
thread_name_("rocksdb:" + thread_name),
env_(env),
delay_us_(delay_us),
initial_delay_us_(initial_delay_us),
cond_var_(&mutex_),
running_(true),
#ifndef NDEBUG
waiting_(false),
run_count_(0),
#endif
thread_([this] { thread(); }) {
}
void cancel() {
{
MutexLock l(&mutex_);
if (!running_) {
return;
}
running_ = false;
cond_var_.SignalAll();
}
thread_.join();
}
~RepeatableThread() { cancel(); }
#ifndef NDEBUG
// Wait until RepeatableThread starting waiting, call the optional callback,
// then wait for one run of RepeatableThread. Tests can use provide a
// custom env object to mock time, and use the callback here to bump current
// time and trigger RepeatableThread. See repeatable_thread_test for example.
//
// Note: only support one caller of this method.
void TEST_WaitForRun(std::function<void()> callback = nullptr) {
MutexLock l(&mutex_);
while (!waiting_) {
cond_var_.Wait();
}
uint64_t prev_count = run_count_;
if (callback != nullptr) {
callback();
}
cond_var_.SignalAll();
while (!(run_count_ > prev_count)) {
cond_var_.Wait();
}
}
#endif
private:
bool wait(uint64_t delay) {
MutexLock l(&mutex_);
if (running_ && delay > 0) {
uint64_t wait_until = env_->NowMicros() + delay;
#ifndef NDEBUG
waiting_ = true;
cond_var_.SignalAll();
#endif
while (running_) {
cond_var_.TimedWait(wait_until);
if (env_->NowMicros() >= wait_until) {
break;
}
}
#ifndef NDEBUG
waiting_ = false;
#endif
}
return running_;
}
void thread() {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
// Set thread name.
auto thread_handle = thread_.native_handle();
int ret __attribute__((__unused__)) =
pthread_setname_np(thread_handle, thread_name_.c_str());
assert(ret == 0);
#endif
#endif
assert(delay_us_ > 0);
if (!wait(initial_delay_us_)) {
return;
}
do {
function_();
#ifndef NDEBUG
{
MutexLock l(&mutex_);
run_count_++;
cond_var_.SignalAll();
}
#endif
} while (wait(delay_us_));
}
const std::function<void()> function_;
const std::string thread_name_;
Env* const env_;
const uint64_t delay_us_;
const uint64_t initial_delay_us_;
// Mutex lock should be held when accessing running_, waiting_
// and run_count_.
port::Mutex mutex_;
port::CondVar cond_var_;
bool running_;
#ifndef NDEBUG
// RepeatableThread waiting for timeout.
bool waiting_;
// Times function_ had run.
uint64_t run_count_;
#endif
port::Thread thread_;
};
} // namespace rocksdb

@ -0,0 +1,76 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <atomic>
#include <memory>
#include "db/db_test_util.h"
#include "util/repeatable_thread.h"
#include "util/testharness.h"
class RepeatableThreadTest : public testing::Test {
public:
RepeatableThreadTest()
: mock_env_(new rocksdb::MockTimeEnv(rocksdb::Env::Default())) {}
protected:
std::unique_ptr<rocksdb::MockTimeEnv> mock_env_;
};
TEST_F(RepeatableThreadTest, TimedTest) {
constexpr uint64_t kSecond = 1000000; // 1s = 1000000us
constexpr int kIteration = 3;
rocksdb::Env* env = rocksdb::Env::Default();
rocksdb::port::Mutex mutex;
rocksdb::port::CondVar test_cv(&mutex);
int count = 0;
uint64_t prev_time = env->NowMicros();
rocksdb::RepeatableThread thread(
[&] {
rocksdb::MutexLock l(&mutex);
count++;
uint64_t now = env->NowMicros();
assert(count == 1 || prev_time + 1 * kSecond <= now);
prev_time = now;
if (count >= kIteration) {
test_cv.SignalAll();
}
},
"rt_test", env, 1 * kSecond);
// Wait for execution finish.
{
rocksdb::MutexLock l(&mutex);
while (count < kIteration) {
test_cv.Wait();
}
}
// Test cancel
thread.cancel();
}
TEST_F(RepeatableThreadTest, MockEnvTest) {
constexpr uint64_t kSecond = 1000000; // 1s = 1000000us
constexpr int kIteration = 3;
mock_env_->set_current_time(0); // in seconds
std::atomic<int> count{0};
rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(),
1 * kSecond, 1 * kSecond);
for (int i = 1; i <= kIteration; i++) {
// Bump current time
thread.TEST_WaitForRun([&] { mock_env_->set_current_time(i); });
}
// Test function should be exectued exactly kIteraion times.
ASSERT_EQ(kIteration, count.load());
// Test cancel
thread.cancel();
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save