From d6f2ecf49c28fee225477d39e2a1535a87919afe Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 27 Sep 2018 15:25:47 -0700 Subject: [PATCH] Utility to run task periodically in a thread (#4423) Summary: Introduce `RepeatableThread` utility to run task periodically in a separate thread. It is basically the same as the the same class in fbcode, and in addition provide a helper method to let tests mock time and trigger execution one at a time. We can use this class to replace `TimerQueue` in #4382 and `BlobDB`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4423 Differential Revision: D10020932 Pulled By: yiwu-arbug fbshipit-source-id: 3616bef108c39a33c92eedb1256de424b7c04087 --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 5 ++ src.mk | 1 + util/repeatable_thread.h | 142 +++++++++++++++++++++++++++++++++ util/repeatable_thread_test.cc | 76 ++++++++++++++++++ 6 files changed, 229 insertions(+) create mode 100644 util/repeatable_thread.h create mode 100644 util/repeatable_thread_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index e86e728f9..b1fdf913d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -946,6 +946,7 @@ if(WITH_TESTS) util/hash_test.cc util/heap_test.cc util/rate_limiter_test.cc + util/repeatable_thread_test.cc util/slice_transform_test.cc util/timer_queue_test.cc util/thread_list_test.cc diff --git a/Makefile b/Makefile index 70f4c9abd..0fc43d0ff 100644 --- a/Makefile +++ b/Makefile @@ -533,6 +533,7 @@ TESTS = \ write_unprepared_transaction_test \ db_universal_compaction_test \ trace_analyzer_test \ + repeatable_thread_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1556,6 +1557,9 @@ range_del_aggregator_bench: db/range_del_aggregator_bench.o $(LIBOBJECTS) $(TEST blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +repeatable_thread_test: util/repeatable_thread_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 8944e4167..bccdc48bc 100644 --- a/TARGETS +++ b/TARGETS @@ -935,6 +935,11 @@ ROCKS_TESTS = [ "db/repair_test.cc", "serial", ], + [ + "repeatable_thread_test", + "util/repeatable_thread_test.cc", + "serial", + ], [ "sim_cache_test", "utilities/simulator_cache/sim_cache_test.cc", diff --git a/src.mk b/src.mk index cd66cbd0e..9a0ce92ba 100644 --- a/src.mk +++ b/src.mk @@ -378,6 +378,7 @@ MAIN_SOURCES = \ util/filelock_test.cc \ util/log_write_bench.cc \ util/rate_limiter_test.cc \ + util/repeatable_thread_test.cc \ util/slice_transform_test.cc \ util/timer_queue_test.cc \ util/thread_list_test.cc \ diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h new file mode 100644 index 000000000..34164ca56 --- /dev/null +++ b/util/repeatable_thread.h @@ -0,0 +1,142 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +class RepeatableThread { + public: + RepeatableThread(std::function function, + const std::string& thread_name, Env* env, uint64_t delay_us, + uint64_t initial_delay_us = 0) + : function_(function), + thread_name_("rocksdb:" + thread_name), + env_(env), + delay_us_(delay_us), + initial_delay_us_(initial_delay_us), + cond_var_(&mutex_), + running_(true), +#ifndef NDEBUG + waiting_(false), + run_count_(0), +#endif + thread_([this] { thread(); }) { + } + + void cancel() { + { + MutexLock l(&mutex_); + if (!running_) { + return; + } + running_ = false; + cond_var_.SignalAll(); + } + thread_.join(); + } + + ~RepeatableThread() { cancel(); } + +#ifndef NDEBUG + // Wait until RepeatableThread starting waiting, call the optional callback, + // then wait for one run of RepeatableThread. Tests can use provide a + // custom env object to mock time, and use the callback here to bump current + // time and trigger RepeatableThread. See repeatable_thread_test for example. + // + // Note: only support one caller of this method. + void TEST_WaitForRun(std::function callback = nullptr) { + MutexLock l(&mutex_); + while (!waiting_) { + cond_var_.Wait(); + } + uint64_t prev_count = run_count_; + if (callback != nullptr) { + callback(); + } + cond_var_.SignalAll(); + while (!(run_count_ > prev_count)) { + cond_var_.Wait(); + } + } +#endif + + private: + bool wait(uint64_t delay) { + MutexLock l(&mutex_); + if (running_ && delay > 0) { + uint64_t wait_until = env_->NowMicros() + delay; +#ifndef NDEBUG + waiting_ = true; + cond_var_.SignalAll(); +#endif + while (running_) { + cond_var_.TimedWait(wait_until); + if (env_->NowMicros() >= wait_until) { + break; + } + } +#ifndef NDEBUG + waiting_ = false; +#endif + } + return running_; + } + + void thread() { +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + // Set thread name. + auto thread_handle = thread_.native_handle(); + int ret __attribute__((__unused__)) = + pthread_setname_np(thread_handle, thread_name_.c_str()); + assert(ret == 0); +#endif +#endif + + assert(delay_us_ > 0); + if (!wait(initial_delay_us_)) { + return; + } + do { + function_(); +#ifndef NDEBUG + { + MutexLock l(&mutex_); + run_count_++; + cond_var_.SignalAll(); + } +#endif + } while (wait(delay_us_)); + } + + const std::function function_; + const std::string thread_name_; + Env* const env_; + const uint64_t delay_us_; + const uint64_t initial_delay_us_; + + // Mutex lock should be held when accessing running_, waiting_ + // and run_count_. + port::Mutex mutex_; + port::CondVar cond_var_; + bool running_; +#ifndef NDEBUG + // RepeatableThread waiting for timeout. + bool waiting_; + // Times function_ had run. + uint64_t run_count_; +#endif + port::Thread thread_; +}; + +} // namespace rocksdb diff --git a/util/repeatable_thread_test.cc b/util/repeatable_thread_test.cc new file mode 100644 index 000000000..dec437da3 --- /dev/null +++ b/util/repeatable_thread_test.cc @@ -0,0 +1,76 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include + +#include "db/db_test_util.h" +#include "util/repeatable_thread.h" +#include "util/testharness.h" + +class RepeatableThreadTest : public testing::Test { + public: + RepeatableThreadTest() + : mock_env_(new rocksdb::MockTimeEnv(rocksdb::Env::Default())) {} + + protected: + std::unique_ptr mock_env_; +}; + +TEST_F(RepeatableThreadTest, TimedTest) { + constexpr uint64_t kSecond = 1000000; // 1s = 1000000us + constexpr int kIteration = 3; + rocksdb::Env* env = rocksdb::Env::Default(); + rocksdb::port::Mutex mutex; + rocksdb::port::CondVar test_cv(&mutex); + int count = 0; + uint64_t prev_time = env->NowMicros(); + rocksdb::RepeatableThread thread( + [&] { + rocksdb::MutexLock l(&mutex); + count++; + uint64_t now = env->NowMicros(); + assert(count == 1 || prev_time + 1 * kSecond <= now); + prev_time = now; + if (count >= kIteration) { + test_cv.SignalAll(); + } + }, + "rt_test", env, 1 * kSecond); + // Wait for execution finish. + { + rocksdb::MutexLock l(&mutex); + while (count < kIteration) { + test_cv.Wait(); + } + } + + // Test cancel + thread.cancel(); +} + +TEST_F(RepeatableThreadTest, MockEnvTest) { + constexpr uint64_t kSecond = 1000000; // 1s = 1000000us + constexpr int kIteration = 3; + mock_env_->set_current_time(0); // in seconds + std::atomic count{0}; + rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(), + 1 * kSecond, 1 * kSecond); + for (int i = 1; i <= kIteration; i++) { + // Bump current time + thread.TEST_WaitForRun([&] { mock_env_->set_current_time(i); }); + } + // Test function should be exectued exactly kIteraion times. + ASSERT_EQ(kIteration, count.load()); + + // Test cancel + thread.cancel(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +}