diff --git a/table/block_test.cc b/table/block_test.cc index 712a430de..b720b8dad 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -232,7 +232,8 @@ class BlockReadAmpBitmapSlowAndAccurate { // Return true if any byte in this range was Marked bool IsAnyInRangeMarked(size_t start_offset, size_t end_offset) { - auto it = marked_ranges_.lower_bound(std::make_pair(start_offset, static_cast(0))); + auto it = marked_ranges_.lower_bound( + std::make_pair(start_offset, static_cast(0))); if (it == marked_ranges_.end()) { return false; } @@ -307,7 +308,8 @@ TEST_F(BlockTest, BlockReadAmpBitmap) { for (size_t i = 0; i < random_entries.size(); i++) { auto ¤t_entry = random_entries[rnd.Next() % random_entries.size()]; - read_amp_bitmap.Mark(static_cast(current_entry.first), static_cast(current_entry.second)); + read_amp_bitmap.Mark(static_cast(current_entry.first), + static_cast(current_entry.second)); read_amp_slow_and_accurate.Mark(current_entry.first, current_entry.second); diff --git a/util/env_test.cc b/util/env_test.cc index 9fe884f6a..7746a9dfd 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -206,14 +206,23 @@ TEST_P(EnvPosixTestWithParam, StartThread) { } TEST_P(EnvPosixTestWithParam, TwoPools) { + // Data structures to signal tasks to run. + port::Mutex mutex; + port::CondVar cv(&mutex); + bool should_start = false; + class CB { public: - CB(const std::string& pool_name, int pool_size) + CB(const std::string& pool_name, int pool_size, port::Mutex* trigger_mu, + port::CondVar* trigger_cv, bool* _should_start) : mu_(), num_running_(0), num_finished_(0), pool_size_(pool_size), - pool_name_(pool_name) { } + pool_name_(pool_name), + trigger_mu_(trigger_mu), + trigger_cv_(trigger_cv), + should_start_(_should_start) {} static void Run(void* v) { CB* cb = reinterpret_cast(v); @@ -228,8 +237,12 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { ASSERT_LE(num_running_, pool_size_.load()); } - // sleep for 1 sec - Env::Default()->SleepForMicroseconds(1000000); + { + MutexLock l(trigger_mu_); + while (!(*should_start_)) { + trigger_cv_->Wait(); + } + } { MutexLock l(&mu_); @@ -254,14 +267,17 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { int num_finished_; std::atomic pool_size_; std::string pool_name_; + port::Mutex* trigger_mu_; + port::CondVar* trigger_cv_; + bool* should_start_; }; const int kLowPoolSize = 2; const int kHighPoolSize = 4; const int kJobs = 8; - CB low_pool_job("low", kLowPoolSize); - CB high_pool_job("high", kHighPoolSize); + CB low_pool_job("low", kLowPoolSize, &mutex, &cv, &should_start); + CB high_pool_job("high", kHighPoolSize, &mutex, &cv, &should_start); env_->SetBackgroundThreads(kLowPoolSize); env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); @@ -275,7 +291,17 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); } // Wait a short while for the jobs to be dispatched. - Env::Default()->SleepForMicroseconds(kDelayMicros); + int sleep_count = 0; + while ((unsigned int)(kJobs - kLowPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::LOW) || + (unsigned int)(kJobs - kHighPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) { + env_->SleepForMicroseconds(kDelayMicros); + if (++sleep_count > 100) { + break; + } + } + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), env_->GetThreadPoolQueueLen()); ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), @@ -283,6 +309,13 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize), env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // Trigger jobs to run. + { + MutexLock l(&mutex); + should_start = true; + cv.SignalAll(); + } + // wait for all jobs to finish while (low_pool_job.NumFinished() < kJobs || high_pool_job.NumFinished() < kJobs) { @@ -292,6 +325,9 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // Hold jobs to schedule; + should_start = false; + // call IncBackgroundThreadsIfNeeded to two pools. One increasing and // the other decreasing env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW); @@ -305,7 +341,16 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); } // Wait a short while for the jobs to be dispatched. - Env::Default()->SleepForMicroseconds(kDelayMicros); + sleep_count = 0; + while ((unsigned int)(kJobs - kLowPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::LOW) || + (unsigned int)(kJobs - (kHighPoolSize + 1)) != + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) { + env_->SleepForMicroseconds(kDelayMicros); + if (++sleep_count > 100) { + break; + } + } ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), env_->GetThreadPoolQueueLen()); ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), @@ -313,6 +358,13 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)), env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + // Trigger jobs to run. + { + MutexLock l(&mutex); + should_start = true; + cv.SignalAll(); + } + // wait for all jobs to finish while (low_pool_job.NumFinished() < kJobs || high_pool_job.NumFinished() < kJobs) {