Fix flaky test DecreaseNumBgThreads (#6393)

Summary:
The DecreaseNumBgThreads test keeps failing on Windows in AppVeyor.
It fails because it depends on a timed wait for the tasks to be dequeued from the threadpool's internal queue, but within the specified time, the task might have not been scheduled onto the newly created threads.
https://github.com/facebook/rocksdb/pull/6232 tries to fix this by waiting for longer time to let the threads scheduled.
This PR tries to fix this by replacing the timed wait with a synchronization on the task's internal conditional variable.
When the number of threads increases, instead of guessing the time needed for the task to be scheduled, it directly blocks on the conditional variable until the task starts running.
But when thread number is reduced, it still does a timed wait, but this does not lead to the flakiness now, will try to remove these timed waits in a future PR.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6393

Test Plan: Wait to see whether AppVeyor tests pass.

Differential Revision: D19890928

Pulled By: cheng-chang

fbshipit-source-id: 4e56e4addf625c98c0876e62d9d57a6f0a156f76
main
Cheng Chang 5 years ago committed by Facebook Github Bot
parent acfee40af5
commit 46516778dd
  1. 41
      env/env_test.cc
  2. 26
      test_util/testutil.h

41
env/env_test.cc vendored

@ -567,18 +567,19 @@ TEST_P(EnvPosixTestWithParam, TwoPools) {
} }
TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
constexpr int kWaitMicros = 60000000; // 1min
std::vector<test::SleepingBackgroundTask> tasks(10); std::vector<test::SleepingBackgroundTask> tasks(10);
// Set number of thread to 1 first. // Set number of thread to 1 first.
env_->SetBackgroundThreads(1, Env::Priority::HIGH); env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
// Schedule 3 tasks. 0 running; Task 1, 2 waiting. // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
for (size_t i = 0; i < 3; i++) { for (size_t i = 0; i < 3; i++) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH); Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
} }
ASSERT_FALSE(tasks[0].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping()); ASSERT_TRUE(!tasks[1].IsSleeping());
@ -586,7 +587,7 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
// Increase to 2 threads. Task 0, 1 running; 2 waiting // Increase to 2 threads. Task 0, 1 running; 2 waiting
env_->SetBackgroundThreads(2, Env::Priority::HIGH); env_->SetBackgroundThreads(2, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[1].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[1].IsSleeping()); ASSERT_TRUE(tasks[1].IsSleeping());
@ -602,7 +603,7 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
// The last task finishes. Task 0 running, 2 waiting. // The last task finishes. Task 0 running, 2 waiting.
tasks[1].WakeUp(); tasks[1].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros));
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping()); ASSERT_TRUE(!tasks[1].IsSleeping());
@ -610,16 +611,17 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
// Increase to 5 threads. Task 0 and 2 running. // Increase to 5 threads. Task 0 and 2 running.
env_->SetBackgroundThreads(5, Env::Priority::HIGH); env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[2].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping()); ASSERT_TRUE(tasks[2].IsSleeping());
// Change number of threads a couple of times while there is no sufficient // Change number of threads a couple of times while there is no sufficient
// tasks. // tasks.
env_->SetBackgroundThreads(7, Env::Priority::HIGH); env_->SetBackgroundThreads(7, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
tasks[2].WakeUp(); tasks[2].WakeUp();
ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros));
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(3, Env::Priority::HIGH); env_->SetBackgroundThreads(3, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
@ -642,8 +644,13 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH); Env::Priority::HIGH);
} }
Env::Default()->SleepForMicroseconds(kDelayMicros); for (size_t i = 3; i <= 5; i++) {
ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros));
}
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
ASSERT_TRUE(tasks[3].IsSleeping()); ASSERT_TRUE(tasks[3].IsSleeping());
ASSERT_TRUE(tasks[4].IsSleeping()); ASSERT_TRUE(tasks[4].IsSleeping());
ASSERT_TRUE(tasks[5].IsSleeping()); ASSERT_TRUE(tasks[5].IsSleeping());
@ -655,8 +662,10 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
tasks[3].WakeUp(); tasks[3].WakeUp();
tasks[4].WakeUp(); tasks[4].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); for (size_t i = 5; i < 8; i++) {
ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros));
}
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
for (size_t i = 5; i < 8; i++) { for (size_t i = 5; i < 8; i++) {
ASSERT_TRUE(tasks[i].IsSleeping()); ASSERT_TRUE(tasks[i].IsSleeping());
} }
@ -670,14 +679,14 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
// Wake up task 6. Task 5, 7 running // Wake up task 6. Task 5, 7 running
tasks[6].WakeUp(); tasks[6].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[6].TimedWaitUntilDone(kWaitMicros));
ASSERT_TRUE(tasks[5].IsSleeping()); ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(!tasks[6].IsSleeping()); ASSERT_TRUE(!tasks[6].IsSleeping());
ASSERT_TRUE(tasks[7].IsSleeping()); ASSERT_TRUE(tasks[7].IsSleeping());
// Wake up threads 7. Task 5 running // Wake up threads 7. Task 5 running
tasks[7].WakeUp(); tasks[7].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[7].TimedWaitUntilDone(kWaitMicros));
ASSERT_TRUE(!tasks[7].IsSleeping()); ASSERT_TRUE(!tasks[7].IsSleeping());
// Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
@ -701,20 +710,18 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
// Wake up thread 9. // Wake up thread 9.
tasks[9].WakeUp(); tasks[9].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[9].TimedWaitUntilDone(kWaitMicros));
ASSERT_TRUE(!tasks[9].IsSleeping()); ASSERT_TRUE(!tasks[9].IsSleeping());
ASSERT_TRUE(tasks[8].IsSleeping()); ASSERT_TRUE(tasks[8].IsSleeping());
// Wake up thread 8 // Wake up thread 8
tasks[8].WakeUp(); tasks[8].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); ASSERT_FALSE(tasks[8].TimedWaitUntilDone(kWaitMicros));
ASSERT_TRUE(!tasks[8].IsSleeping()); ASSERT_TRUE(!tasks[8].IsSleeping());
// Wake up the last thread // Wake up the last thread
tasks[5].WakeUp(); tasks[5].WakeUp();
ASSERT_FALSE(tasks[5].TimedWaitUntilDone(kWaitMicros));
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[5].IsSleeping());
WaitThreadPoolsEmpty(); WaitThreadPoolsEmpty();
} }

@ -429,6 +429,20 @@ class SleepingBackgroundTask {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
// Waits for the status to change to sleeping,
// otherwise times out.
// wait_time is in microseconds.
// Returns true when times out, false otherwise.
bool TimedWaitUntilSleeping(uint64_t wait_time) {
auto abs_time = Env::Default()->NowMicros() + wait_time;
MutexLock l(&mutex_);
while (!sleeping_ || !should_sleep_) {
if (bg_cv_.TimedWait(abs_time)) {
return true;
}
}
return false;
}
void WakeUp() { void WakeUp() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
should_sleep_ = false; should_sleep_ = false;
@ -440,6 +454,18 @@ class SleepingBackgroundTask {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
// Similar to TimedWaitUntilSleeping.
// Waits until the task is done.
bool TimedWaitUntilDone(uint64_t wait_time) {
auto abs_time = Env::Default()->NowMicros() + wait_time;
MutexLock l(&mutex_);
while (!done_with_sleep_) {
if (bg_cv_.TimedWait(abs_time)) {
return true;
}
}
return false;
}
bool WokenUp() { bool WokenUp() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return should_sleep_ == false; return should_sleep_ == false;

Loading…
Cancel
Save