diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index a3c8bda41..e052fc7e1 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -66,51 +66,6 @@ class OnFileDeletionListener : public EventListener { std::string expected_file_name_; }; -class SleepingBackgroundTask { - public: - SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} - void DoSleep() { - MutexLock l(&mutex_); - while (should_sleep_) { - bg_cv_.Wait(); - } - done_with_sleep_ = true; - bg_cv_.SignalAll(); - } - void WakeUp() { - MutexLock l(&mutex_); - should_sleep_ = false; - bg_cv_.SignalAll(); - } - void WaitUntilDone() { - MutexLock l(&mutex_); - while (!done_with_sleep_) { - bg_cv_.Wait(); - } - } - bool WokenUp() { - MutexLock l(&mutex_); - return should_sleep_ == false; - } - - void Reset() { - MutexLock l(&mutex_); - should_sleep_ = true; - done_with_sleep_ = false; - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - port::Mutex mutex_; - port::CondVar bg_cv_; // Signalled when background work finishes - bool should_sleep_; - bool done_with_sleep_; -}; - static const int kCDTValueSize = 1000; static const int kCDTKeysPerBuffer = 4; static const int kCDTNumLevels = 8; @@ -1493,8 +1448,8 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) { env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); // stop the compaction thread until we simulate the file creation failure. - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); options.env = env_; @@ -1586,8 +1541,8 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) { ASSERT_EQ("0,1", FilesPerLevel(0)); // block compactions - SleepingBackgroundTask sleeping_task; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task, + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::LOW); options.max_bytes_for_level_base = 1024 * 1024; // 1 MB diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 7ce18c8e7..84a6e9a52 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -779,44 +779,12 @@ TEST_P(FaultInjectionTest, FaultTest) { } while (ChangeOptions()); } -class SleepingBackgroundTask { - public: - SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} - void DoSleep() { - MutexLock l(&mutex_); - while (should_sleep_) { - bg_cv_.Wait(); - } - done_with_sleep_ = true; - bg_cv_.SignalAll(); - } - void WakeUp() { - MutexLock l(&mutex_); - should_sleep_ = false; - bg_cv_.SignalAll(); - while (!done_with_sleep_) { - bg_cv_.Wait(); - } - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - port::Mutex mutex_; - port::CondVar bg_cv_; // Signalled when background work finishes - bool should_sleep_; - bool done_with_sleep_; -}; - // Previous log file is not fsynced if sync is forced after log rolling. TEST_P(FaultInjectionTest, WriteOptionSyncTest) { - SleepingBackgroundTask sleeping_task_low; + test::SleepingBackgroundTask sleeping_task_low; env_->SetBackgroundThreads(1, Env::HIGH); // Block the job queue to prevent flush job from running. - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::HIGH); WriteOptions write_options; @@ -896,10 +864,10 @@ TEST_P(FaultInjectionTest, UninstalledCompaction) { } TEST_P(FaultInjectionTest, ManualLogSyncTest) { - SleepingBackgroundTask sleeping_task_low; + test::SleepingBackgroundTask sleeping_task_low; env_->SetBackgroundThreads(1, Env::HIGH); // Block the job queue to prevent flush job from running. - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::HIGH); WriteOptions write_options; diff --git a/util/env_test.cc b/util/env_test.cc index 6fe8ed80b..7f5e4b93b 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -36,6 +36,7 @@ #include "util/mutexlock.h" #include "util/string_util.h" #include "util/testharness.h" +#include "util/testutil.h" namespace rocksdb { @@ -56,46 +57,6 @@ static void SetBool(void* ptr) { ->store(true, std::memory_order_relaxed); } -class SleepingBackgroundTask { - public: - explicit SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {} - void DoSleep() { - MutexLock l(&mutex_); - sleeping_ = true; - while (should_sleep_) { - bg_cv_.Wait(); - } - sleeping_ = false; - bg_cv_.SignalAll(); - } - - void WakeUp() { - MutexLock l(&mutex_); - should_sleep_ = false; - bg_cv_.SignalAll(); - - while (sleeping_) { - bg_cv_.Wait(); - } - } - - bool IsSleeping() { - MutexLock l(&mutex_); - return sleeping_; - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - port::Mutex mutex_; - port::CondVar bg_cv_; // Signalled when background work finishes - bool should_sleep_; - bool sleeping_; -}; - TEST_F(EnvPosixTest, RunImmediately) { std::atomic called(false); env_->Schedule(&SetBool, &called); @@ -108,12 +69,12 @@ TEST_F(EnvPosixTest, UnSchedule) { env_->SetBackgroundThreads(1, Env::LOW); /* Block the low priority queue */ - SleepingBackgroundTask sleeping_task, sleeping_task1; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task, + test::SleepingBackgroundTask sleeping_task, sleeping_task1; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, Env::Priority::LOW); /* Schedule another task */ - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task1, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1, Env::Priority::LOW, &sleeping_task1); /* Remove it with a different tag */ @@ -321,7 +282,7 @@ TEST_F(EnvPosixTest, TwoPools) { } TEST_F(EnvPosixTest, DecreaseNumBgThreads) { - std::vector tasks(10); + std::vector tasks(10); // Set number of thread to 1 first. env_->SetBackgroundThreads(1, Env::Priority::HIGH); @@ -329,7 +290,7 @@ TEST_F(EnvPosixTest, DecreaseNumBgThreads) { // Schedule 3 tasks. 0 running; Task 1, 2 waiting. for (size_t i = 0; i < 3; i++) { - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); } @@ -393,7 +354,7 @@ TEST_F(EnvPosixTest, DecreaseNumBgThreads) { // Enqueue 5 more tasks. Thread pool size now is 4. // Task 0, 3, 4, 5 running;6, 7 waiting. for (size_t i = 3; i < 8; i++) { - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], Env::Priority::HIGH); } Env::Default()->SleepForMicroseconds(kDelayMicros); @@ -435,9 +396,9 @@ TEST_F(EnvPosixTest, DecreaseNumBgThreads) { ASSERT_TRUE(!tasks[7].IsSleeping()); // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8], Env::Priority::HIGH); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9], Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0); diff --git a/util/testutil.h b/util/testutil.h index 45c822e82..1729b3ee0 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -284,12 +284,22 @@ extern std::string KeyStr(const std::string& user_key, class SleepingBackgroundTask { public: SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} + : bg_cv_(&mutex_), + should_sleep_(true), + done_with_sleep_(false), + sleeping_(false) {} + + bool IsSleeping() { + MutexLock l(&mutex_); + return sleeping_; + } void DoSleep() { MutexLock l(&mutex_); + sleeping_ = true; while (should_sleep_) { bg_cv_.Wait(); } + sleeping_ = false; done_with_sleep_ = true; bg_cv_.SignalAll(); } @@ -324,6 +334,7 @@ class SleepingBackgroundTask { port::CondVar bg_cv_; // Signalled when background work finishes bool should_sleep_; bool done_with_sleep_; + bool sleeping_; }; } // namespace test