Expose ThreadPool under include/rocksdb/threadpool.h

Summary:
This diff split ThreadPool to
-ThreadPool (abstract interface exposed in include/rocksdb/threadpool.h)
-ThreadPoolImpl (actual implementation in util/threadpool_imp.h)

This allow us to expose ThreadPool to the user so we can use it as an option later

Test Plan: existing unit tests

Reviewers: andrewkr, yiwu, yhchiang, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62085
main
Islam AbdelRahman 8 years ago
parent 23a057007c
commit e9b2af87f8
  1. 2
      CMakeLists.txt
  2. 37
      include/rocksdb/threadpool.h
  3. 5
      port/win/env_win.h
  4. 2
      src.mk
  5. 12
      util/env_posix.cc
  6. 48
      util/threadpool_imp.cc
  7. 13
      util/threadpool_imp.h

@ -240,7 +240,7 @@ set(SOURCES
util/testharness.cc util/testharness.cc
util/testutil.cc util/testutil.cc
util/thread_local.cc util/thread_local.cc
util/threadpool.cc util/threadpool_imp.cc
util/thread_status_impl.cc util/thread_status_impl.cc
util/thread_status_updater.cc util/thread_status_updater.cc
util/thread_status_util.cc util/thread_status_util.cc

@ -0,0 +1,37 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
namespace rocksdb {
/*
* ThreadPool is a component that will spawn N background threads that will
* be used to execute scheduled work, The number of background threads could
* be modified by calling SetBackgroundThreads().
* */
class ThreadPool {
public:
virtual ~ThreadPool() {}
// Wait for all threads to finish.
virtual void JoinAllThreads() = 0;
// Set the number of background threads that will be executing the
// scheduled jobs.
virtual void SetBackgroundThreads(int num) = 0;
// Get the number of jobs scheduled in the ThreadPool queue.
virtual unsigned int GetQueueLen() const = 0;
};
// NewThreadPool() is a function that could be used to create a ThreadPool
// with `num_threads` background threads.
extern ThreadPool* NewThreadPool(int num_threads);
} // namespace rocksdb

@ -17,7 +17,7 @@
#pragma once #pragma once
#include <rocksdb/env.h> #include <rocksdb/env.h>
#include "util/threadpool.h" #include "util/threadpool_imp.h"
#include <mutex> #include <mutex>
#include <vector> #include <vector>
@ -63,7 +63,7 @@ private:
Env* hosted_env_; Env* hosted_env_;
mutable std::mutex mu_; mutable std::mutex mu_;
std::vector<ThreadPool> thread_pools_; std::vector<ThreadPoolImpl> thread_pools_;
std::vector<std::thread> threads_to_join_; std::vector<std::thread> threads_to_join_;
}; };
@ -269,7 +269,6 @@ private:
WinEnvIO winenv_io_; WinEnvIO winenv_io_;
WinEnvThreads winenv_threads_; WinEnvThreads winenv_threads_;
}; };
} }

@ -110,7 +110,7 @@ LIB_SOURCES = \
util/iostats_context.cc \ util/iostats_context.cc \
util/io_posix.cc \ util/io_posix.cc \
util/lru_cache.cc \ util/lru_cache.cc \
util/threadpool.cc \ util/threadpool_imp.cc \
util/transaction_test_util.cc \ util/transaction_test_util.cc \
util/sharded_cache.cc \ util/sharded_cache.cc \
util/sst_file_manager_impl.cc \ util/sst_file_manager_impl.cc \

@ -42,7 +42,6 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/io_posix.h" #include "util/io_posix.h"
#include "util/threadpool.h"
#include "util/iostats_context_imp.h" #include "util/iostats_context_imp.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/posix_logger.h" #include "util/posix_logger.h"
@ -51,6 +50,7 @@
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "util/thread_status_updater.h" #include "util/thread_status_updater.h"
#include "util/threadpool_imp.h"
#if !defined(TMPFS_MAGIC) #if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994 #define TMPFS_MAGIC 0x01021994
@ -739,7 +739,7 @@ class PosixEnv : public Env {
size_t page_size_; size_t page_size_;
std::vector<ThreadPool> thread_pools_; std::vector<ThreadPoolImpl> thread_pools_;
pthread_mutex_t mu_; pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_; std::vector<pthread_t> threads_to_join_;
}; };
@ -749,7 +749,7 @@ PosixEnv::PosixEnv()
forceMmapOff(false), forceMmapOff(false),
page_size_(getpagesize()), page_size_(getpagesize()),
thread_pools_(Priority::TOTAL) { thread_pools_(Priority::TOTAL) {
ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority( thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id)); static_cast<Env::Priority>(pool_id));
@ -791,11 +791,11 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
StartThreadState* state = new StartThreadState; StartThreadState* state = new StartThreadState;
state->user_function = function; state->user_function = function;
state->arg = arg; state->arg = arg;
ThreadPool::PthreadCall( ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPool::PthreadCall("lock", pthread_mutex_lock(&mu_)); ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t); threads_to_join_.push_back(t);
ThreadPool::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }
void PosixEnv::WaitForJoin() { void PosixEnv::WaitForJoin() {

@ -7,9 +7,9 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/threadpool.h" #include "util/threadpool_imp.h"
#include <atomic>
#include <algorithm> #include <algorithm>
#include <atomic>
#ifndef OS_WIN #ifndef OS_WIN
# include <unistd.h> # include <unistd.h>
@ -26,7 +26,7 @@
namespace rocksdb { namespace rocksdb {
void ThreadPool::PthreadCall(const char* label, int result) { void ThreadPoolImpl::PthreadCall(const char* label, int result) {
if (result != 0) { if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort(); abort();
@ -38,7 +38,7 @@ namespace {
struct Lock { struct Lock {
std::unique_lock<std::mutex> ul_; std::unique_lock<std::mutex> ul_;
Lock(std::mutex& m) : ul_(m, std::defer_lock) {} explicit Lock(const std::mutex& m) : ul_(m, std::defer_lock) {}
}; };
using Condition = std::condition_variable; using Condition = std::condition_variable;
@ -124,7 +124,7 @@ int ThreadDetach(pthread_t& thread) {
#endif #endif
} }
ThreadPool::ThreadPool() ThreadPoolImpl::ThreadPoolImpl()
: total_threads_limit_(1), : total_threads_limit_(1),
bgthreads_(0), bgthreads_(0),
queue_(), queue_(),
@ -138,10 +138,9 @@ ThreadPool::ThreadPool()
#endif #endif
} }
ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); } ThreadPoolImpl::~ThreadPoolImpl() { assert(bgthreads_.size() == 0U); }
void ThreadPool::JoinAllThreads() {
void ThreadPoolImpl::JoinAllThreads() {
Lock lock(mu_); Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock)); PthreadCall("lock", ThreadPoolMutexLock(lock));
assert(!exit_all_threads_); assert(!exit_all_threads_);
@ -156,7 +155,7 @@ void ThreadPool::JoinAllThreads() {
bgthreads_.clear(); bgthreads_.clear();
} }
void ThreadPool::LowerIOPriority() { void ThreadPoolImpl::LowerIOPriority() {
#ifdef OS_LINUX #ifdef OS_LINUX
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
low_io_priority_ = true; low_io_priority_ = true;
@ -164,7 +163,7 @@ void ThreadPool::LowerIOPriority() {
#endif #endif
} }
void ThreadPool::BGThread(size_t thread_id) { void ThreadPoolImpl::BGThread(size_t thread_id) {
bool low_io_priority = false; bool low_io_priority = false;
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
@ -233,16 +232,16 @@ void ThreadPool::BGThread(size_t thread_id) {
// Helper struct for passing arguments when creating threads. // Helper struct for passing arguments when creating threads.
struct BGThreadMetadata { struct BGThreadMetadata {
ThreadPool* thread_pool_; ThreadPoolImpl* thread_pool_;
size_t thread_id_; // Thread count in the thread. size_t thread_id_; // Thread count in the thread.
BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) BGThreadMetadata(ThreadPoolImpl* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {} : thread_pool_(thread_pool), thread_id_(thread_id) {}
}; };
static void* BGThreadWrapper(void* arg) { static void* BGThreadWrapper(void* arg) {
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_; size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_; ThreadPoolImpl* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
// for thread-status // for thread-status
ThreadStatusUtil::RegisterThread( ThreadStatusUtil::RegisterThread(
@ -258,11 +257,11 @@ static void* BGThreadWrapper(void* arg) {
return nullptr; return nullptr;
} }
void ThreadPool::WakeUpAllThreads() { void ThreadPoolImpl::WakeUpAllThreads() {
PthreadCall("signalall", ConditionSignalAll(bgsignal_)); PthreadCall("signalall", ConditionSignalAll(bgsignal_));
} }
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
Lock lock(mu_); Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock)); PthreadCall("lock", ThreadPoolMutexLock(lock));
if (exit_all_threads_) { if (exit_all_threads_) {
@ -278,15 +277,15 @@ void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
PthreadCall("unlock", MutexUnlock(lock)); PthreadCall("unlock", MutexUnlock(lock));
} }
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
SetBackgroundThreadsInternal(num, false); SetBackgroundThreadsInternal(num, false);
} }
void ThreadPool::SetBackgroundThreads(int num) { void ThreadPoolImpl::SetBackgroundThreads(int num) {
SetBackgroundThreadsInternal(num, true); SetBackgroundThreadsInternal(num, true);
} }
void ThreadPool::StartBGThreads() { void ThreadPoolImpl::StartBGThreads() {
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
#ifdef ROCKSDB_STD_THREADPOOL #ifdef ROCKSDB_STD_THREADPOOL
@ -313,9 +312,8 @@ void ThreadPool::StartBGThreads() {
} }
} }
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
void (*unschedFunction)(void* arg)) { void* tag, void (*unschedFunction)(void* arg)) {
Lock lock(mu_); Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock)); PthreadCall("lock", ThreadPoolMutexLock(lock));
@ -347,7 +345,7 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
PthreadCall("unlock", MutexUnlock(lock)); PthreadCall("unlock", MutexUnlock(lock));
} }
int ThreadPool::UnSchedule(void* arg) { int ThreadPoolImpl::UnSchedule(void* arg) {
int count = 0; int count = 0;
Lock lock(mu_); Lock lock(mu_);
@ -374,4 +372,10 @@ int ThreadPool::UnSchedule(void* arg) {
return count; return count;
} }
ThreadPool* NewThreadPool(int num_threads) {
ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
thread_pool->SetBackgroundThreads(num_threads);
return thread_pool;
}
} // namespace rocksdb } // namespace rocksdb

@ -13,6 +13,7 @@
#endif #endif
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/threadpool.h"
#include "util/thread_status_util.h" #include "util/thread_status_util.h"
#ifdef ROCKSDB_STD_THREADPOOL #ifdef ROCKSDB_STD_THREADPOOL
@ -26,23 +27,23 @@
namespace rocksdb { namespace rocksdb {
class ThreadPool { class ThreadPoolImpl : public ThreadPool {
public: public:
ThreadPool(); ThreadPoolImpl();
~ThreadPool(); ~ThreadPoolImpl();
void JoinAllThreads(); void JoinAllThreads() override;
void LowerIOPriority(); void LowerIOPriority();
void BGThread(size_t thread_id); void BGThread(size_t thread_id);
void WakeUpAllThreads(); void WakeUpAllThreads();
void IncBackgroundThreadsIfNeeded(int num); void IncBackgroundThreadsIfNeeded(int num);
void SetBackgroundThreads(int num); void SetBackgroundThreads(int num) override;
void StartBGThreads(); void StartBGThreads();
void Schedule(void (*function)(void* arg1), void* arg, void* tag, void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)); void (*unschedFunction)(void* arg));
int UnSchedule(void* arg); int UnSchedule(void* arg);
unsigned int GetQueueLen() const { unsigned int GetQueueLen() const override {
return queue_len_.load(std::memory_order_relaxed); return queue_len_.load(std::memory_order_relaxed);
} }
Loading…
Cancel
Save