From 0355d14dd9c9e41d37bb3a462c5a1ddbc8962392 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Tue, 7 Apr 2020 11:53:00 -0700 Subject: [PATCH] Add a simple timer support to schedule work at fixed times/intervals (#6543) Summary: Adding a simple timer support to schedule work at a fixed time. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6543 Test Plan: TODO: clean up the unit tests, and make them better. Reviewed By: siying Differential Revision: D20465390 Pulled By: sagar0 fbshipit-source-id: cba143f70b6339863e1d0f8b8bf92e51c2b3d678 --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 7 ++ src.mk | 1 + util/timer.h | 222 +++++++++++++++++++++++++++++++++++++++++++ util/timer_test.cc | 230 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 465 insertions(+) create mode 100644 util/timer.h create mode 100644 util/timer_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 3bdd6909f..ee310f8e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1070,6 +1070,7 @@ if(WITH_TESTS) util/slice_test.cc util/slice_transform_test.cc util/timer_queue_test.cc + util/timer_test.cc util/thread_list_test.cc util/thread_local_test.cc util/work_queue_test.cc diff --git a/Makefile b/Makefile index fc2501998..b94b02b7a 100644 --- a/Makefile +++ b/Makefile @@ -606,6 +606,7 @@ TESTS = \ defer_test \ blob_file_addition_test \ blob_file_garbage_test \ + timer_test \ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1) TESTS += folly_synchronization_distributed_mutex_test @@ -1748,6 +1749,9 @@ blob_file_addition_test: db/blob/blob_file_addition_test.o $(LIBOBJECTS) $(TESTH blob_file_garbage_test: db/blob/blob_file_garbage_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +timer_test: util/timer_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 8a0c7a9c8..34d8d6a3a 100644 --- a/TARGETS +++ b/TARGETS @@ -1456,6 +1456,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "timer_test", + "util/timer_test.cc", + "serial", + [], + [], + ], [ "trace_analyzer_test", "tools/trace_analyzer_test.cc", diff --git a/src.mk b/src.mk index cbd2d0e6d..ffcfe4293 100644 --- a/src.mk +++ b/src.mk @@ -448,6 +448,7 @@ MAIN_SOURCES = \ util/slice_test.cc \ util/slice_transform_test.cc \ util/timer_queue_test.cc \ + util/timer_test.cc \ util/thread_list_test.cc \ util/thread_local_test.cc \ util/work_queue_test.cc \ diff --git a/util/timer.h b/util/timer.h new file mode 100644 index 000000000..aee55816a --- /dev/null +++ b/util/timer.h @@ -0,0 +1,222 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +// A Timer class to handle repeated work. +// +// A single timer instance can handle multiple functions via a single thread. +// It is better to leave long running work to a dedicated thread pool. +// +// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`. +// Work (in terms of a `void function`) can be scheduled by calling `Add` with +// a unique function name and de-scheduled by calling `Cancel`. +// Many functions can be added. +// +// Impl Details: +// A heap is used to keep track of when the next timer goes off. +// A map from a function name to the function keeps track of all the functions. +class Timer { + public: + Timer(Env* env) + : env_(env), + mutex_(env), + cond_var_(&mutex_), + running_(false) { + } + + ~Timer() {} + + void Add(std::function fn, + const std::string& fn_name, + uint64_t start_after_us, + uint64_t repeat_every_us) { + std::unique_ptr fn_info(new FunctionInfo( + std::move(fn), + fn_name, + env_->NowMicros() + start_after_us, + repeat_every_us)); + + MutexLock l(&mutex_); + heap_.push(fn_info.get()); + map_.emplace(std::make_pair(fn_name, std::move(fn_info))); + } + + void Cancel(const std::string& fn_name) { + MutexLock l(&mutex_); + + auto it = map_.find(fn_name); + if (it != map_.end()) { + if (it->second) { + it->second->Cancel(); + } + } + } + + void CancelAll() { + MutexLock l(&mutex_); + CancelAllWithLock(); + } + + // Start the Timer + bool Start() { + MutexLock l(&mutex_); + if (running_) { + return false; + } + + thread_.reset(new port::Thread(&Timer::Run, this)); + running_ = true; + return true; + } + + // Shutdown the Timer + bool Shutdown() { + { + MutexLock l(&mutex_); + if (!running_) { + return false; + } + CancelAllWithLock(); + running_ = false; + cond_var_.SignalAll(); + } + + if (thread_) { + thread_->join(); + } + return true; + } + + private: + + void Run() { + MutexLock l(&mutex_); + + while (running_) { + if (heap_.empty()) { + // wait + cond_var_.Wait(); + continue; + } + + FunctionInfo* current_fn = heap_.top(); + + if (!current_fn->IsValid()) { + heap_.pop(); + map_.erase(current_fn->name); + continue; + } + + if (current_fn->next_run_time_us <= env_->NowMicros()) { + // Execute the work + current_fn->fn(); + + // Remove the work from the heap once it is done executing. + // Note that we are just removing the pointer from the heap. Its + // memory is still managed in the map (as it holds a unique ptr). + // So current_fn is still a valid ptr. + heap_.pop(); + + if (current_fn->repeat_every_us > 0) { + current_fn->next_run_time_us = env_->NowMicros() + + current_fn->repeat_every_us; + + // Schedule new work into the heap with new time. + heap_.push(current_fn); + } + } else { + cond_var_.TimedWait(current_fn->next_run_time_us); + } + } + } + + void CancelAllWithLock() { + if (map_.empty() && heap_.empty()) { + return; + } + + while (!heap_.empty()) { + heap_.pop(); + } + + map_.clear(); + } + + // A wrapper around std::function to keep track when it should run next + // and at what frequency. + struct FunctionInfo { + // the actual work + std::function fn; + // name of the function + std::string name; + // when the function should run next + uint64_t next_run_time_us; + // repeat interval + uint64_t repeat_every_us; + // controls whether this function is valid. + // A function is valid upon construction and until someone explicitly + // calls `Cancel()`. + bool valid; + + FunctionInfo(std::function&& _fn, + const std::string& _name, + const uint64_t _next_run_time_us, + uint64_t _repeat_every_us) + : fn(std::move(_fn)), + name(_name), + next_run_time_us(_next_run_time_us), + repeat_every_us(_repeat_every_us), + valid(true) {} + + void Cancel() { + valid = false; + } + + bool IsValid() { + return valid; + } + }; + + struct RunTimeOrder { + bool operator()(const FunctionInfo* f1, + const FunctionInfo* f2) { + return f1->next_run_time_us > f2->next_run_time_us; + } + }; + + Env* const env_; + // This mutex controls both the heap_ and the map_. It needs to be held for + // making any changes in them. + port::Mutex mutex_; + port::CondVar cond_var_; + std::unique_ptr thread_; + bool running_; + + + std::priority_queue, + RunTimeOrder> heap_; + + // In addition to providing a mapping from a function name to a function, + // it is also responsible for memory management. + std::unordered_map> map_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/util/timer_test.cc b/util/timer_test.cc new file mode 100644 index 000000000..0c5a04710 --- /dev/null +++ b/util/timer_test.cc @@ -0,0 +1,230 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/timer.h" + +#include "db/db_test_util.h" + +namespace ROCKSDB_NAMESPACE { + +class TimerTest : public testing::Test { + public: + TimerTest() : mock_env_(new MockTimeEnv(Env::Default())) {} + + protected: + std::unique_ptr mock_env_; +}; + +TEST_F(TimerTest, SingleScheduleOnceTest) { + const uint64_t kSecond = 1000000; // 1sec = 1000000us + const int kIterations = 1; + uint64_t time_counter = 0; + mock_env_->set_current_time(0); + port::Mutex mutex; + port::CondVar test_cv(&mutex); + + Timer timer(mock_env_.get()); + int count = 0; + timer.Add( + [&] { + MutexLock l(&mutex); + count++; + if (count >= kIterations) { + test_cv.SignalAll(); + } + }, + "fn_sch_test", 1 * kSecond, 0); + + ASSERT_TRUE(timer.Start()); + + // Wait for execution to finish + { + MutexLock l(&mutex); + while(count < kIterations) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv.TimedWait(time_counter); + } + } + + ASSERT_TRUE(timer.Shutdown()); + + ASSERT_EQ(1, count); +} + +TEST_F(TimerTest, MultipleScheduleOnceTest) { + const uint64_t kSecond = 1000000; // 1sec = 1000000us + const int kIterations = 1; + uint64_t time_counter = 0; + mock_env_->set_current_time(0); + port::Mutex mutex1; + port::CondVar test_cv1(&mutex1); + + Timer timer(mock_env_.get()); + int count1 = 0; + timer.Add( + [&] { + MutexLock l(&mutex1); + count1++; + if (count1 >= kIterations) { + test_cv1.SignalAll(); + } + }, + "fn_sch_test1", 1 * kSecond, 0); + + port::Mutex mutex2; + port::CondVar test_cv2(&mutex2); + int count2 = 0; + timer.Add( + [&] { + MutexLock l(&mutex2); + count2 += 5; + if (count2 >= kIterations) { + test_cv2.SignalAll(); + } + }, + "fn_sch_test2", 3 * kSecond, 0); + + ASSERT_TRUE(timer.Start()); + + // Wait for execution to finish + { + MutexLock l(&mutex1); + while (count1 < kIterations) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv1.TimedWait(time_counter); + } + } + + // Wait for execution to finish + { + MutexLock l(&mutex2); + while(count2 < kIterations) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv2.TimedWait(time_counter); + } + } + + ASSERT_TRUE(timer.Shutdown()); + + ASSERT_EQ(1, count1); + ASSERT_EQ(5, count2); +} + +TEST_F(TimerTest, SingleScheduleRepeatedlyTest) { + const uint64_t kSecond = 1000000; // 1sec = 1000000us + const int kIterations = 5; + uint64_t time_counter = 0; + mock_env_->set_current_time(0); + port::Mutex mutex; + port::CondVar test_cv(&mutex); + + Timer timer(mock_env_.get()); + int count = 0; + timer.Add( + [&] { + MutexLock l(&mutex); + count++; + fprintf(stderr, "%d\n", count); + if (count >= kIterations) { + test_cv.SignalAll(); + } + }, + "fn_sch_test", 1 * kSecond, 1 * kSecond); + + ASSERT_TRUE(timer.Start()); + + // Wait for execution to finish + { + MutexLock l(&mutex); + while(count < kIterations) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv.TimedWait(time_counter); + } + } + + ASSERT_TRUE(timer.Shutdown()); + + ASSERT_EQ(5, count); +} + +TEST_F(TimerTest, MultipleScheduleRepeatedlyTest) { + const uint64_t kSecond = 1000000; // 1sec = 1000000us + uint64_t time_counter = 0; + mock_env_->set_current_time(0); + Timer timer(mock_env_.get()); + + port::Mutex mutex1; + port::CondVar test_cv1(&mutex1); + const int kIterations1 = 5; + int count1 = 0; + timer.Add( + [&] { + MutexLock l(&mutex1); + count1++; + fprintf(stderr, "hello\n"); + if (count1 >= kIterations1) { + test_cv1.SignalAll(); + } + }, + "fn_sch_test1", 0, 2 * kSecond); + + port::Mutex mutex2; + port::CondVar test_cv2(&mutex2); + const int kIterations2 = 5; + int count2 = 0; + timer.Add( + [&] { + MutexLock l(&mutex2); + count2++; + fprintf(stderr, "world\n"); + if (count2 >= kIterations2) { + test_cv2.SignalAll(); + } + }, + "fn_sch_test2", 1 * kSecond, 2 * kSecond); + + ASSERT_TRUE(timer.Start()); + + // Wait for execution to finish + { + MutexLock l(&mutex1); + while(count1 < kIterations1) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv1.TimedWait(time_counter); + } + } + + timer.Cancel("fn_sch_test1"); + + // Wait for execution to finish + { + MutexLock l(&mutex2); + while(count2 < kIterations2) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv2.TimedWait(time_counter); + } + } + + timer.Cancel("fn_sch_test2"); + + ASSERT_TRUE(timer.Shutdown()); + + ASSERT_EQ(count1, 5); + ASSERT_EQ(count2, 5); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +}