Support reservation in thread pool (#10278)

Summary:
Add `ReserveThreads` and `ReleaseThreads` functions in thread pool to support reservation in for a specific thread pool.  With this feature, a thread will be blocked if the number of waiting threads (noted by `num_waiting_threads_`) equals the number of reserved threads (noted by `reserved_threads_`), normally `reserved_threads_` is upper bounded by `num_waiting_threads_`; in rare cases (e.g. `SetBackgroundThreadsInternal` is called when some threads are already reserved), `num_waiting_threads_` can be less than `reserved_threads`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10278

Test Plan: Add `ReserveThreads` unit test in `env_test`. Update the unit test `SimpleColumnFamilyInfoTest` in `thread_list_test` with adding `ReserveThreads` related assertions.

Reviewed By: hx235

Differential Revision: D37640946

Pulled By: littlepig2013

fbshipit-source-id: 4d691f6b9a433569f96ab52d52c3defe5b065367
main
zczhu 2 years ago committed by Facebook GitHub Bot
parent 28586be8ec
commit 96206531bc
  1. 1
      HISTORY.md
  2. 8
      env/composite_env_wrapper.h
  3. 14
      env/env_posix.cc
  4. 131
      env/env_test.cc
  5. 20
      include/rocksdb/env.h
  6. 9
      include/rocksdb/threadpool.h
  7. 17
      port/win/env_win.cc
  8. 8
      port/win/env_win.h
  9. 19
      util/thread_list_test.cc
  10. 71
      util/threadpool_imp.cc
  11. 8
      util/threadpool_imp.h

@ -17,6 +17,7 @@
* `rocksdb_file_metadata_t` and its and get functions & destroy functions. * `rocksdb_file_metadata_t` and its and get functions & destroy functions.
* Add suggest_compact_range() and suggest_compact_range_cf() to C API. * Add suggest_compact_range() and suggest_compact_range_cf() to C API.
* When using block cache strict capacity limit (`LRUCache` with `strict_capacity_limit=true`), DB operations now fail with Status code `kAborted` subcode `kMemoryLimit` (`IsMemoryLimit()`) instead of `kIncomplete` (`IsIncomplete()`) when the capacity limit is reached, because Incomplete can mean other specific things for some operations. In more detail, `Cache::Insert()` now returns the updated Status code and this usually propagates through RocksDB to the user on failure. * When using block cache strict capacity limit (`LRUCache` with `strict_capacity_limit=true`), DB operations now fail with Status code `kAborted` subcode `kMemoryLimit` (`IsMemoryLimit()`) instead of `kIncomplete` (`IsIncomplete()`) when the capacity limit is reached, because Incomplete can mean other specific things for some operations. In more detail, `Cache::Insert()` now returns the updated Status code and this usually propagates through RocksDB to the user on failure.
* Add two functions `int ReserveThreads(int threads_to_be_reserved)` and `int ReleaseThreads(threads_to_be_released)` into `Env` class. In the default implementation, both return 0. Newly added `xxxEnv` class that inherits `Env` should implement these two functions for thread reservation/releasing features.
### Bug Fixes ### Bug Fixes
* Fix a bug in which backup/checkpoint can include a WAL deleted by RocksDB. * Fix a bug in which backup/checkpoint can include a WAL deleted by RocksDB.

@ -322,6 +322,14 @@ class CompositeEnvWrapper : public CompositeEnv {
return target_.env->GetThreadPoolQueueLen(pri); return target_.env->GetThreadPoolQueueLen(pri);
} }
int ReserveThreads(int threads_to_be_reserved, Priority pri) override {
return target_.env->ReserveThreads(threads_to_be_reserved, pri);
}
int ReleaseThreads(int threads_to_be_released, Priority pri) override {
return target_.env->ReleaseThreads(threads_to_be_released, pri);
}
Status GetHostName(char* name, uint64_t len) override { Status GetHostName(char* name, uint64_t len) override {
return target_.env->GetHostName(name, len); return target_.env->GetHostName(name, len);
} }

14
env/env_posix.cc vendored

@ -302,6 +302,10 @@ class PosixEnv : public CompositeEnv {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
int ReserveThreads(int threads_to_be_reserved, Priority pri) override;
int ReleaseThreads(int threads_to_be_released, Priority pri) override;
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override { Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
assert(thread_status_updater_); assert(thread_status_updater_);
return thread_status_updater_->GetThreadList(thread_list); return thread_status_updater_->GetThreadList(thread_list);
@ -437,6 +441,16 @@ unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
return thread_pools_[pri].GetQueueLen(); return thread_pools_[pri].GetQueueLen();
} }
int PosixEnv::ReserveThreads(int threads_to_reserved, Priority pri) {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].ReserveThreads(threads_to_reserved);
}
int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].ReleaseThreads(threads_to_released);
}
struct StartThreadState { struct StartThreadState {
void (*user_function)(void*); void (*user_function)(void*);
void* arg; void* arg;

131
env/env_test.cc vendored

@ -839,6 +839,135 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
WaitThreadPoolsEmpty(); WaitThreadPoolsEmpty();
} }
TEST_P(EnvPosixTestWithParam, ReserveThreads) {
// Initialize the background thread to 1 in case other threads exist
// from the last unit test
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 1);
constexpr int kWaitMicros = 10000000; // 10seconds
std::vector<test::SleepingBackgroundTask> tasks(4);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Set the sync point to ensure thread 0 can terminate
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ThreadPoolImpl::BGThread::Termination:th0",
"EnvTest::ReserveThreads:0"}});
// Empty the thread pool to ensure all the threads can start later
env_->SetBackgroundThreads(0, Env::Priority::HIGH);
TEST_SYNC_POINT("EnvTest::ReserveThreads:0");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// Set the sync point to ensure threads start and pass the sync point
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ThreadPoolImpl::BGThread::Start:th0", "EnvTest::ReserveThreads:1"},
{"ThreadPoolImpl::BGThread::Start:th1", "EnvTest::ReserveThreads:2"},
{"ThreadPoolImpl::BGThread::Start:th2", "EnvTest::ReserveThreads:3"},
{"ThreadPoolImpl::BGThread::Start:th3", "EnvTest::ReserveThreads:4"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Set number of thread to 3 first.
env_->SetBackgroundThreads(3, Env::Priority::HIGH);
ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 3);
// Add sync points to ensure all 3 threads start
TEST_SYNC_POINT("EnvTest::ReserveThreads:1");
TEST_SYNC_POINT("EnvTest::ReserveThreads:2");
TEST_SYNC_POINT("EnvTest::ReserveThreads:3");
// Reserve 2 threads
ASSERT_EQ(2, env_->ReserveThreads(2, Env::Priority::HIGH));
// Schedule 3 tasks. Task 0 running (in this context, doing
// SleepingBackgroundTask); Task 1, 2 waiting; 3 reserved threads.
for (size_t i = 0; i < 3; i++) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH);
}
ASSERT_FALSE(tasks[0].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Release 2 threads. Task 0, 1, 2 running; 0 reserved thread.
ASSERT_EQ(2, env_->ReleaseThreads(2, Env::Priority::HIGH));
ASSERT_FALSE(tasks[1].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_FALSE(tasks[2].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[1].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping());
// No more threads can be reserved
ASSERT_EQ(0, env_->ReserveThreads(3, Env::Priority::HIGH));
// Expand the number of background threads so that the last thread
// is waiting
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
// Add sync point to ensure the 4th thread starts
TEST_SYNC_POINT("EnvTest::ReserveThreads:4");
// As the thread pool is expanded, we can reserve one more thread
ASSERT_EQ(1, env_->ReserveThreads(3, Env::Priority::HIGH));
// No more threads can be reserved
ASSERT_EQ(0, env_->ReserveThreads(3, Env::Priority::HIGH));
// Reset the sync points for the next iteration in BGThread or the
// next time Submit() is called
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ThreadPoolImpl::BGThread::WaitingThreadsInc",
"EnvTest::ReserveThreads:5"},
{"ThreadPoolImpl::BGThread::Termination", "EnvTest::ReserveThreads:6"},
{"ThreadPoolImpl::Submit::Enqueue", "EnvTest::ReserveThreads:7"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
tasks[0].WakeUp();
ASSERT_FALSE(tasks[0].TimedWaitUntilDone(kWaitMicros));
// Add sync point to ensure the number of waiting threads increases
TEST_SYNC_POINT("EnvTest::ReserveThreads:5");
// 1 more thread can be reserved
ASSERT_EQ(1, env_->ReserveThreads(3, Env::Priority::HIGH));
// 2 reserved threads now
// Currently, two threads are blocked since the number of waiting
// threads is equal to the number of reserved threads (i.e., 2).
// If we reduce the number of background thread to 1, at least one thread
// will be the last excessive thread (here we have no control over the
// number of excessive threads because thread order does not
// necessarily follows the schedule order, but we ensure that the last thread
// shall not run any task by expanding the thread pool after we schedule
// the tasks), and thus they(it) become(s) unblocked, the number of waiting
// threads decreases to 0 or 1, but the number of reserved threads is still 2
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
// Task 1,2 running; 2 reserved threads, however, in fact, we only have
// 0 or 1 waiting thread in the thread pool, proved by the
// following test, we CANNOT reserve 2 threads even though we just
// release 2
TEST_SYNC_POINT("EnvTest::ReserveThreads:6");
ASSERT_EQ(2, env_->ReleaseThreads(2, Env::Priority::HIGH));
ASSERT_GT(2, env_->ReserveThreads(2, Env::Priority::HIGH));
// Every new task will be put into the queue at this point
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[3],
Env::Priority::HIGH);
TEST_SYNC_POINT("EnvTest::ReserveThreads:7");
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(!tasks[3].IsSleeping());
// Set the number of threads to 3 so that Task 3 can dequeue
env_->SetBackgroundThreads(3, Env::Priority::HIGH);
// Wakup Task 1
tasks[1].WakeUp();
ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros));
// Task 2, 3 running (Task 3 dequeue); 0 or 1 reserved thread
ASSERT_FALSE(tasks[3].TimedWaitUntilSleeping(kWaitMicros));
ASSERT_TRUE(tasks[3].IsSleeping());
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
// At most 1 thread can be released
ASSERT_GT(2, env_->ReleaseThreads(3, Env::Priority::HIGH));
tasks[2].WakeUp();
ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros));
tasks[3].WakeUp();
ASSERT_FALSE(tasks[3].TimedWaitUntilDone(kWaitMicros));
WaitThreadPoolsEmpty();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
#if (defined OS_LINUX || defined OS_WIN) #if (defined OS_LINUX || defined OS_WIN)
// Travis doesn't support fallocate or getting unique ID from files for whatever // Travis doesn't support fallocate or getting unique ID from files for whatever
// reason. // reason.
@ -1271,8 +1400,8 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
} }
} }
}); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
std::unique_ptr<RandomAccessFile> file; std::unique_ptr<RandomAccessFile> file;
std::vector<ReadRequest> reqs(3); std::vector<ReadRequest> reqs(3);
std::vector<std::unique_ptr<char, Deleter>> data; std::vector<std::unique_ptr<char, Deleter>> data;

@ -493,6 +493,17 @@ class Env : public Customizable {
// Wait for all threads started by StartThread to terminate. // Wait for all threads started by StartThread to terminate.
virtual void WaitForJoin() {} virtual void WaitForJoin() {}
// Reserve available background threads in the specified thread pool.
virtual int ReserveThreads(int /*threads_to_be_reserved*/, Priority /*pri*/) {
return 0;
}
// Release a specific number of reserved threads from the specified thread
// pool
virtual int ReleaseThreads(int /*threads_to_be_released*/, Priority /*pri*/) {
return 0;
}
// Get thread pool queue length for specific thread pool. // Get thread pool queue length for specific thread pool.
virtual unsigned int GetThreadPoolQueueLen(Priority /*pri*/ = LOW) const { virtual unsigned int GetThreadPoolQueueLen(Priority /*pri*/ = LOW) const {
return 0; return 0;
@ -1533,6 +1544,15 @@ class EnvWrapper : public Env {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return target_.env->GetThreadPoolQueueLen(pri); return target_.env->GetThreadPoolQueueLen(pri);
} }
int ReserveThreads(int threads_to_be_reserved, Priority pri) override {
return target_.env->ReserveThreads(threads_to_be_reserved, pri);
}
int ReleaseThreads(int threads_to_be_released, Priority pri) override {
return target_.env->ReleaseThreads(threads_to_be_released, pri);
}
Status GetTestDirectory(std::string* path) override { Status GetTestDirectory(std::string* path) override {
return target_.env->GetTestDirectory(path); return target_.env->GetTestDirectory(path);
} }

@ -49,6 +49,15 @@ class ThreadPool {
virtual void SubmitJob(const std::function<void()>&) = 0; virtual void SubmitJob(const std::function<void()>&) = 0;
// This moves the function in for efficiency // This moves the function in for efficiency
virtual void SubmitJob(std::function<void()>&&) = 0; virtual void SubmitJob(std::function<void()>&&) = 0;
// Reserve available background threads. This function does not ensure
// so many threads can be reserved, instead it will return the number of
// threads that can be reserved against the desired one. In other words,
// the number of available threads could be less than the input.
virtual int ReserveThreads(int /*threads_to_be_reserved*/) { return 0; }
// Release a specific number of reserved threads
virtual int ReleaseThreads(int /*threads_to_be_released*/) { return 0; }
}; };
// NewThreadPool() is a function that could be used to create a ThreadPool // NewThreadPool() is a function that could be used to create a ThreadPool

@ -1322,6 +1322,16 @@ unsigned int WinEnvThreads::GetThreadPoolQueueLen(Env::Priority pri) const {
return thread_pools_[pri].GetQueueLen(); return thread_pools_[pri].GetQueueLen();
} }
int WinEnvThreads::ReserveThreads(int threads_to_reserved, Env::Priority pri) {
assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH);
return thread_pools_[pri].ReserveThreads(threads_to_reserved);
}
int WinEnvThreads::ReleaseThreads(int threads_to_released, Env::Priority pri) {
assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH);
return thread_pools_[pri].ReleaseThreads(threads_to_released);
}
uint64_t WinEnvThreads::gettid() { uint64_t WinEnvThreads::gettid() {
uint64_t thread_id = GetCurrentThreadId(); uint64_t thread_id = GetCurrentThreadId();
return thread_id; return thread_id;
@ -1388,6 +1398,13 @@ void WinEnv::WaitForJoin() { return winenv_threads_.WaitForJoin(); }
unsigned int WinEnv::GetThreadPoolQueueLen(Env::Priority pri) const { unsigned int WinEnv::GetThreadPoolQueueLen(Env::Priority pri) const {
return winenv_threads_.GetThreadPoolQueueLen(pri); return winenv_threads_.GetThreadPoolQueueLen(pri);
} }
int WinEnv::ReserveThreads(int threads_to_reserved, Env::Priority pri) {
return winenv_threads_.ReserveThreads(threads_to_reserved, pri);
}
int WinEnv::ReleaseThreads(int threads_to_released, Env::Priority pri) {
return winenv_threads_.ReleaseThreads(threads_to_released, pri);
}
uint64_t WinEnv::GetThreadID() const { return winenv_threads_.GetThreadID(); } uint64_t WinEnv::GetThreadID() const { return winenv_threads_.GetThreadID(); }

@ -57,6 +57,10 @@ class WinEnvThreads {
unsigned int GetThreadPoolQueueLen(Env::Priority pri) const; unsigned int GetThreadPoolQueueLen(Env::Priority pri) const;
int ReserveThreads(int threads_to_be_reserved, Env::Priority pri);
int ReleaseThreads(int threads_to_be_released, Env::Priority pri);
static uint64_t gettid(); static uint64_t gettid();
uint64_t GetThreadID() const; uint64_t GetThreadID() const;
@ -279,6 +283,10 @@ class WinEnv : public CompositeEnv {
unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override; unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override;
int ReserveThreads(int threads_to_be_reserved, Env::Priority pri) override;
int ReleaseThreads(int threads_to_be_released, Env::Priority pri) override;
uint64_t GetThreadID() const override; uint64_t GetThreadID() const override;
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.

@ -126,9 +126,11 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
const int kLowPriorityThreads = 5; const int kLowPriorityThreads = 5;
const int kSimulatedHighPriThreads = kHighPriorityThreads - 1; const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
const int kSimulatedLowPriThreads = kLowPriorityThreads / 3; const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
const int kDelayMicros = 1000000;
env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
// Wait 1 second so that threads start
Env::Default()->SleepForMicroseconds(kDelayMicros);
SimulatedBackgroundTask running_task( SimulatedBackgroundTask running_task(
reinterpret_cast<void*>(1234), "running", reinterpret_cast<void*>(1234), "running",
reinterpret_cast<void*>(5678), "pikachu"); reinterpret_cast<void*>(5678), "pikachu");
@ -137,13 +139,20 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&running_task, Env::Priority::HIGH); &running_task, Env::Priority::HIGH);
} }
for (int test = 0; test < kSimulatedLowPriThreads; ++test) { for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
&running_task, Env::Priority::LOW); &running_task, Env::Priority::LOW);
} }
running_task.WaitUntilScheduled(kSimulatedHighPriThreads + running_task.WaitUntilScheduled(kSimulatedHighPriThreads +
kSimulatedLowPriThreads); kSimulatedLowPriThreads);
// We can only reserve limited number of waiting threads
ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads,
env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH));
ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads,
env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW));
// Reservation shall not affect the existing thread list
std::vector<ThreadStatus> thread_list; std::vector<ThreadStatus> thread_list;
// Verify the number of running threads in each pool. // Verify the number of running threads in each pool.
@ -155,6 +164,10 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
running_count[thread_status.thread_type]++; running_count[thread_status.thread_type]++;
} }
} }
// Cannot reserve more threads
ASSERT_EQ(0, env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH));
ASSERT_EQ(0, env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW));
ASSERT_EQ( ASSERT_EQ(
running_count[ThreadStatus::HIGH_PRIORITY], running_count[ThreadStatus::HIGH_PRIORITY],
kSimulatedHighPriThreads); kSimulatedHighPriThreads);
@ -167,6 +180,10 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
running_task.FinishAllTasks(); running_task.FinishAllTasks();
running_task.WaitUntilDone(); running_task.WaitUntilDone();
ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads,
env->ReleaseThreads(kHighPriorityThreads, Env::Priority::HIGH));
ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads,
env->ReleaseThreads(kLowPriorityThreads, Env::Priority::LOW));
// Verify none of the threads are running // Verify none of the threads are running
ASSERT_OK(env->GetThreadList(&thread_list)); ASSERT_OK(env->GetThreadList(&thread_list));

@ -100,6 +100,30 @@ struct ThreadPoolImpl::Impl {
// Set the thread priority. // Set the thread priority.
void SetThreadPriority(Env::Priority priority) { priority_ = priority; } void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
int ReserveThreads(int threads_to_be_reserved) {
std::unique_lock<std::mutex> lock(mu_);
// We can reserve at most num_waiting_threads_ in total so the number of
// threads that can be reserved might be fewer than the desired one. In
// rare cases, num_waiting_threads_ could be less than reserved_threads
// due to SetBackgroundThreadInternal or last excessive threads. If that
// happens, we cannot reserve any other threads.
int reserved_threads_in_success =
std::min(std::max(num_waiting_threads_ - reserved_threads_, 0),
threads_to_be_reserved);
reserved_threads_ += reserved_threads_in_success;
return reserved_threads_in_success;
}
int ReleaseThreads(int threads_to_be_released) {
std::unique_lock<std::mutex> lock(mu_);
// We cannot release more than reserved_threads_
int released_threads_in_success =
std::min(reserved_threads_, threads_to_be_released);
reserved_threads_ -= released_threads_in_success;
WakeUpAllThreads();
return released_threads_in_success;
}
private: private:
static void BGThreadWrapper(void* arg); static void BGThreadWrapper(void* arg);
@ -110,6 +134,16 @@ private:
int total_threads_limit_; int total_threads_limit_;
std::atomic_uint queue_len_; // Queue length. Used for stats reporting std::atomic_uint queue_len_; // Queue length. Used for stats reporting
// Number of reserved threads, managed by ReserveThreads(..) and
// ReleaseThreads(..), if num_waiting_threads_ is no larger than
// reserved_threads_, its thread will be blocked to ensure the reservation
// mechanism
int reserved_threads_;
// Number of waiting threads (Maximum number of threads that can be
// reserved), in rare cases, num_waiting_threads_ could be less than
// reserved_threads due to SetBackgroundThreadInternal or last
// excessive threads.
int num_waiting_threads_;
bool exit_all_threads_; bool exit_all_threads_;
bool wait_for_jobs_to_complete_; bool wait_for_jobs_to_complete_;
@ -135,6 +169,8 @@ inline ThreadPoolImpl::Impl::Impl()
env_(nullptr), env_(nullptr),
total_threads_limit_(0), total_threads_limit_(0),
queue_len_(), queue_len_(),
reserved_threads_(0),
num_waiting_threads_(0),
exit_all_threads_(false), exit_all_threads_(false),
wait_for_jobs_to_complete_(false), wait_for_jobs_to_complete_(false),
queue_(), queue_(),
@ -155,6 +191,8 @@ void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
// prevent threads from being recreated right after they're joined, in case // prevent threads from being recreated right after they're joined, in case
// the user is concurrently submitting jobs. // the user is concurrently submitting jobs.
total_threads_limit_ = 0; total_threads_limit_ = 0;
reserved_threads_ = 0;
num_waiting_threads_ = 0;
lock.unlock(); lock.unlock();
@ -189,10 +227,23 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
// Stop waiting if the thread needs to do work or needs to terminate. // Stop waiting if the thread needs to do work or needs to terminate.
// Increase num_waiting_threads_ once this task has started waiting
num_waiting_threads_++;
TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc");
TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id);
// When not exist_all_threads and the current thread id is not the last
// excessive thread, it may be blocked due to 3 reasons: 1) queue is empty
// 2) it is the excessive thread (not the last one)
// 3) the number of waiting threads is not greater than reserved threads
// (i.e, no available threads due to full reservation")
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) { (queue_.empty() || IsExcessiveThread(thread_id) ||
num_waiting_threads_ <= reserved_threads_)) {
bgsignal_.wait(lock); bgsignal_.wait(lock);
} }
// Decrease num_waiting_threads_ once the thread is not waiting
num_waiting_threads_--;
if (exit_all_threads_) { // mechanism to let BG threads exit safely if (exit_all_threads_) { // mechanism to let BG threads exit safely
@ -209,11 +260,13 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
auto& terminating_thread = bgthreads_.back(); auto& terminating_thread = bgthreads_.back();
terminating_thread.detach(); terminating_thread.detach();
bgthreads_.pop_back(); bgthreads_.pop_back();
if (HasExcessiveThread()) { if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate. // There is still at least more excessive thread to terminate.
WakeUpAllThreads(); WakeUpAllThreads();
} }
TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th",
thread_id);
TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination");
break; break;
} }
@ -333,7 +386,6 @@ int ThreadPoolImpl::Impl::GetBackgroundThreads() {
void ThreadPoolImpl::Impl::StartBGThreads() { void ThreadPoolImpl::Impl::StartBGThreads() {
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
port::Thread p_t(&BGThreadWrapper, port::Thread p_t(&BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size())); new BGThreadMetadata(this, bgthreads_.size()));
@ -367,7 +419,7 @@ void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
// Add to priority queue // Add to priority queue
queue_.push_back(BGItem()); queue_.push_back(BGItem());
TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue");
auto& item = queue_.back(); auto& item = queue_.back();
item.tag = tag; item.tag = tag;
item.function = std::move(schedule); item.function = std::move(schedule);
@ -498,6 +550,17 @@ void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
impl_->SetThreadPriority(priority); impl_->SetThreadPriority(priority);
} }
// Reserve a specific number of threads, prevent them from running other
// functions The number of reserved threads could be fewer than the desired one
int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) {
return impl_->ReserveThreads(threads_to_be_reserved);
}
// Release a specific number of threads
int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) {
return impl_->ReleaseThreads(threads_to_be_released);
}
ThreadPool* NewThreadPool(int num_threads) { ThreadPool* NewThreadPool(int num_threads) {
ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
thread_pool->SetBackgroundThreads(num_threads); thread_pool->SetBackgroundThreads(num_threads);

@ -88,6 +88,14 @@ class ThreadPoolImpl : public ThreadPool {
// Set the thread priority. // Set the thread priority.
void SetThreadPriority(Env::Priority priority); void SetThreadPriority(Env::Priority priority);
// Reserve a specific number of threads, prevent them from running other
// functions The number of reserved threads could be fewer than the desired
// one
int ReserveThreads(int threads_to_be_reserved) override;
// Release a specific number of threads
int ReleaseThreads(int threads_to_be_released) override;
static void PthreadCall(const char* label, int result); static void PthreadCall(const char* label, int result);
struct Impl; struct Impl;

Loading…
Cancel
Save