ThreadPool to allow decrease number of threads and increase of number of threads is to be instantly scheduled

Summary:
Add a feature to decrease the number of threads in thread pool.
Also instantly schedule more threads if number of threads is increased.

Here is the way it is implemented: each background thread needs its thread ID. After decreasing number of threads, all threads are woken up. The thread with the largest thread ID will terminate. If there are more threads to terminate, the thread will wake up all threads again.

Another change is made so that when number of threads is increased, more threads are created and all previous excessive threads are woken up to do the work.

Test Plan: Add a unit test.

Reviewers: haobo, dhruba

Reviewed By: haobo

CC: yhchiang, igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D18675
main
sdong 11 years ago
parent 1e560459b9
commit 3df07d1703
  1. 96
      util/env_posix.cc
  2. 191
      util/env_test.cc

@ -1470,17 +1470,50 @@ class PosixEnv : public Env {
} }
} }
void BGThread() { // Return true if there is at least one thread needs to terminate.
bool HasExcessiveThread() {
return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
}
// Return true iff the current thread is the excessive thread to terminate.
// Always terminate the running thread that is added last, even if there are
// more than one thread to terminate.
bool IsLastExcessiveThread(size_t thread_id) {
return HasExcessiveThread() &&
thread_id == bgthreads_.size() - 1;
}
// Is one of the threads to terminate.
bool IsExcessiveThread(size_t thread_id) {
return static_cast<int>(thread_id) >= total_threads_limit_;
}
void BGThread(size_t thread_id) {
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty() && !exit_all_threads_) { // Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
} }
if (exit_all_threads_) { // mechanism to let BG threads exit safely if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break; break;
} }
if (IsLastExcessiveThread(thread_id)) {
// Current thread is the last generated one and is excessive.
// We always terminate excessive thread in the reverse order of
// generation time.
pthread_detach(bgthreads_.back());
bgthreads_.pop_back();
if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
void (*function)(void*) = queue_.front().function; void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg; void* arg = queue_.front().arg;
queue_.pop_front(); queue_.pop_front();
@ -1491,36 +1524,50 @@ class PosixEnv : public Env {
} }
} }
// Helper struct for passing arguments when creating threads.
struct BGThreadMetadata {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.
explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};
static void* BGThreadWrapper(void* arg) { static void* BGThreadWrapper(void* arg) {
reinterpret_cast<ThreadPool*>(arg)->BGThread(); BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;
delete meta;
tp->BGThread(thread_id);
return nullptr; return nullptr;
} }
void WakeUpAllThreads() {
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
}
void SetBackgroundThreads(int num) { void SetBackgroundThreads(int num) {
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
if (num > total_threads_limit_) { if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
if (num != total_threads_limit_) {
total_threads_limit_ = num; total_threads_limit_ = num;
WakeUpAllThreads();
StartBGThreads();
} }
assert(total_threads_limit_ > 0); assert(total_threads_limit_ > 0);
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }
void Schedule(void (*function)(void*), void* arg) { void StartBGThreads() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
pthread_t t; pthread_t t;
PthreadCall( PthreadCall(
"create thread", "create thread",
pthread_create(&t, pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper,
nullptr, new BGThreadMetadata(this, bgthreads_.size())));
&ThreadPool::BGThreadWrapper,
this));
// Set the thread name to aid debugging // Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
@ -1534,6 +1581,17 @@ class PosixEnv : public Env {
bgthreads_.push_back(t); bgthreads_.push_back(t);
} }
}
void Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
StartBGThreads();
// Add to priority queue // Add to priority queue
queue_.push_back(BGItem()); queue_.push_back(BGItem());
@ -1541,8 +1599,14 @@ class PosixEnv : public Env {
queue_.back().arg = arg; queue_.back().arg = arg;
queue_len_.store(queue_.size(), std::memory_order_relaxed); queue_len_.store(queue_.size(), std::memory_order_relaxed);
// always wake up at least one waiting thread. if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
PthreadCall("signal", pthread_cond_signal(&bgsignal_)); PthreadCall("signal", pthread_cond_signal(&bgsignal_));
} else {
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }

@ -200,6 +200,197 @@ TEST(EnvPosixTest, TwoPools) {
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
} }
TEST(EnvPosixTest, DecreaseNumBgThreads) {
class SleepingBackgroundTask {
public:
explicit SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
sleeping_ = true;
while (should_sleep_) {
bg_cv_.Wait();
}
sleeping_ = false;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
while (sleeping_) {
bg_cv_.Wait();
}
}
bool IsSleeping() {
MutexLock l(&mutex_);
return sleeping_;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool sleeping_;
};
std::vector<SleepingBackgroundTask> tasks(10);
// Set number of thread to 1 first.
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
// Schedule 3 tasks. 0 running; Task 1, 2 waiting.
for (size_t i = 0; i < 3; i++) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
}
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Increase to 2 threads. Task 0, 1 running; 2 waiting
env_->SetBackgroundThreads(2, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// The last task finishes. Task 0 running, 2 waiting.
tasks[1].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Increase to 5 threads. Task 0 and 2 running.
env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping());
// Change number of threads a couple of times while there is no sufficient
// tasks.
env_->SetBackgroundThreads(7, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
tasks[2].WakeUp();
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(3, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
// Enqueue 5 more tasks. Thread pool size now is 4.
// Task 0, 3, 4, 5 running;6, 7 waiting.
for (size_t i = 3; i < 8; i++) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH);
}
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[3].IsSleeping());
ASSERT_TRUE(tasks[4].IsSleeping());
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(!tasks[6].IsSleeping());
ASSERT_TRUE(!tasks[7].IsSleeping());
// Wake up task 0, 3 and 4. Task 5, 6, 7 running.
tasks[0].WakeUp();
tasks[3].WakeUp();
tasks[4].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
for (size_t i = 5; i < 8; i++) {
ASSERT_TRUE(tasks[i].IsSleeping());
}
// Shrink back to 1 thread. Still task 5, 6, 7 running
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(tasks[6].IsSleeping());
ASSERT_TRUE(tasks[7].IsSleeping());
// Wake up task 6. Task 5, 7 running
tasks[6].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(!tasks[6].IsSleeping());
ASSERT_TRUE(tasks[7].IsSleeping());
// Wake up threads 7. Task 5 running
tasks[7].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[7].IsSleeping());
// Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8],
Env::Priority::HIGH);
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0);
ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
// Increase to 4 threads. Task 5, 8, 9 running.
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[8].IsSleeping());
ASSERT_TRUE(tasks[9].IsSleeping());
// Shrink to 1 thread
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
// Wake up thread 9.
tasks[9].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[9].IsSleeping());
ASSERT_TRUE(tasks[8].IsSleeping());
// Wake up thread 8
tasks[8].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[8].IsSleeping());
// Wake up the last thread
tasks[5].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[5].IsSleeping());
}
#ifdef OS_LINUX #ifdef OS_LINUX
// To make sure the Env::GetUniqueId() related tests work correctly, The files // To make sure the Env::GetUniqueId() related tests work correctly, The files
// should be stored in regular storage like "hard disk" or "flash device". // should be stored in regular storage like "hard disk" or "flash device".

Loading…
Cancel
Save