diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index bc286fe8d..b9309b3a1 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -107,8 +107,9 @@ class HdfsEnv : public Env { virtual Status NewLogger(const std::string& fname, Logger** result); - virtual void Schedule( void (*function)(void* arg), void* arg) { - posixEnv->Schedule(function, arg); + virtual void Schedule(void (*function)(void* arg), void* arg, + Priority pri = LOW) { + posixEnv->Schedule(function, arg, pri); } virtual void StartThread(void (*function)(void* arg), void* arg) { @@ -140,8 +141,8 @@ class HdfsEnv : public Env { return posixEnv->GetAbsolutePath(db_path, output_path); } - virtual void SetBackgroundThreads(int number) { - posixEnv->SetBackgroundThreads(number); + virtual void SetBackgroundThreads(int number, Priority pri = LOW) { + posixEnv->SetBackgroundThreads(number, pri); } virtual std::string TimeToString(uint64_t number) { @@ -279,7 +280,8 @@ class HdfsEnv : public Env { virtual Status NewLogger(const std::string& fname, shared_ptr* result){return notsup;} - virtual void Schedule( void (*function)(void* arg), void* arg) {} + virtual void Schedule(void (*function)(void* arg), void* arg, + Priority pri = LOW) {} virtual void StartThread(void (*function)(void* arg), void* arg) {} @@ -296,7 +298,7 @@ class HdfsEnv : public Env { virtual Status GetAbsolutePath(const std::string& db_path, std::string* outputpath) {return notsup;} - virtual void SetBackgroundThreads(int number) {} + virtual void SetBackgroundThreads(int number, Priority pri = LOW) {} virtual std::string TimeToString(uint64_t number) { return "";} }; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index f4803e2e5..50b29167b 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -162,15 +162,20 @@ class Env { // REQUIRES: lock has not already been unlocked. virtual Status UnlockFile(FileLock* lock) = 0; - // Arrange to run "(*function)(arg)" once in a background thread. - // + enum Priority { LOW, HIGH, TOTAL }; + + // Arrange to run "(*function)(arg)" once in a background thread, in + // the thread pool specified by pri. By default, jobs go to the 'LOW' + // priority thread pool. + // "function" may run in an unspecified thread. Multiple functions // added to the same Env may run concurrently in different threads. // I.e., the caller may not assume that background work items are // serialized. virtual void Schedule( void (*function)(void* arg), - void* arg) = 0; + void* arg, + Priority pri = LOW) = 0; // Start a new thread, invoking "function(arg)" within the new thread. // When "function(arg)" returns, the thread will be destroyed. @@ -210,9 +215,10 @@ class Env { virtual Status GetAbsolutePath(const std::string& db_path, std::string* output_path) = 0; - // The number of background worker threads for this environment. - // default: 1 - virtual void SetBackgroundThreads(int number) = 0; + // The number of background worker threads of a specific thread pool + // for this environment. 'LOW' is the default pool. + // default number: 1 + virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; // Converts seconds-since-Jan-01-1970 to a printable string virtual std::string TimeToString(uint64_t time) = 0; @@ -496,8 +502,8 @@ class EnvWrapper : public Env { return target_->LockFile(f, l); } Status UnlockFile(FileLock* l) { return target_->UnlockFile(l); } - void Schedule(void (*f)(void*), void* a) { - return target_->Schedule(f, a); + void Schedule(void (*f)(void*), void* a, Priority pri) { + return target_->Schedule(f, a, pri); } void StartThread(void (*f)(void*), void* a) { return target_->StartThread(f, a); @@ -525,8 +531,8 @@ class EnvWrapper : public Env { std::string* output_path) { return target_->GetAbsolutePath(db_path, output_path); } - void SetBackgroundThreads(int num) { - return target_->SetBackgroundThreads(num); + void SetBackgroundThreads(int num, Priority pri) { + return target_->SetBackgroundThreads(num, pri); } std::string TimeToString(uint64_t time) { return target_->TimeToString(time); diff --git a/util/env_posix.cc b/util/env_posix.cc index 3771c406d..50f96e507 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -708,12 +708,24 @@ class PosixFileLock : public FileLock { std::string filename; }; + +namespace { +void PthreadCall(const char* label, int result) { + if (result != 0) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); + exit(1); + } +} +} + class PosixEnv : public Env { public: PosixEnv(); virtual ~PosixEnv(){ - WaitForBGThreads(); + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } } void SetFD_CLOEXEC(int fd, const EnvOptions* options) { @@ -913,9 +925,7 @@ class PosixEnv : public Env { return result; } - virtual void Schedule(void (*function)(void*), void* arg); - - virtual void WaitForBGThreads(); + virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW); virtual void StartThread(void (*function)(void* arg), void* arg); @@ -1008,13 +1018,9 @@ class PosixEnv : public Env { } // Allow increasing the number of worker threads. - virtual void SetBackgroundThreads(int num) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - if (num > num_threads_) { - num_threads_ = num; - bgthread_.resize(num_threads_); - } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + virtual void SetBackgroundThreads(int num, Priority pri) { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); } virtual std::string TimeToString(uint64_t secondsSince1970) { @@ -1041,12 +1047,6 @@ class PosixEnv : public Env { bool checkedDiskForMmap_; bool forceMmapOff; // do we override Env options? - void PthreadCall(const char* label, int result) { - if (result != 0) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - exit(1); - } - } // Returns true iff the named directory exists and is a directory. virtual bool DirExists(const std::string& dname) { @@ -1057,14 +1057,6 @@ class PosixEnv : public Env { return false; // stat() failed return false } - - // BGThread() is the body of the background thread - void BGThread(); - static void* BGThreadWrapper(void* arg) { - reinterpret_cast(arg)->BGThread(); - return nullptr; - } - bool SupportsFastAllocate(const std::string& path) { struct statfs s; if (statfs(path.c_str(), &s)){ @@ -1083,93 +1075,126 @@ class PosixEnv : public Env { } size_t page_size_; - pthread_mutex_t mu_; - pthread_cond_t bgsignal_; - std::vector bgthread_; - int started_bgthread_; - int num_threads_; - - // Entry per Schedule() call - struct BGItem { void* arg; void (*function)(void*); }; - typedef std::deque BGQueue; - int queue_size_; // number of items in BGQueue - bool exit_all_threads_; - BGQueue queue_; - std::vector threads_to_join_; -}; -PosixEnv::PosixEnv() : checkedDiskForMmap_(false), - forceMmapOff(false), - page_size_(getpagesize()), - started_bgthread_(0), - num_threads_(1), - queue_size_(0), - exit_all_threads_(false) { - PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); - PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); - bgthread_.resize(num_threads_); -} -// Signal and Join all background threads started by calls to Schedule -void PosixEnv::WaitForBGThreads() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - assert(! exit_all_threads_); - exit_all_threads_ = true; - PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - for (unsigned int i = 0; i < threads_to_join_.size(); i++) { - pthread_join(threads_to_join_[i], nullptr); - } -} + class ThreadPool { + public: -void PosixEnv::Schedule(void (*function)(void*), void* arg) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + ThreadPool() : + total_threads_limit_(1), + bgthreads_(0), + queue_(), + exit_all_threads_(false) { + PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); + } - if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return; - } - // Start background thread if necessary - for (; started_bgthread_ < num_threads_; started_bgthread_++) { - PthreadCall( - "create thread", - pthread_create(&bgthread_[started_bgthread_], - nullptr, - &PosixEnv::BGThreadWrapper, - this)); - threads_to_join_.push_back(bgthread_[started_bgthread_]); - fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); - } + ~ThreadPool() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + assert(!exit_all_threads_); + exit_all_threads_ = true; + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + for (const auto tid : bgthreads_) { + pthread_join(tid, nullptr); + } + } - // Add to priority queue - queue_.push_back(BGItem()); - queue_.back().function = function; - queue_.back().arg = arg; + void BGThread() { + while (true) { + // Wait until there is an item that is ready to run + PthreadCall("lock", pthread_mutex_lock(&mu_)); + while (queue_.empty() && !exit_all_threads_) { + PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); + } + if (exit_all_threads_) { // mechanism to let BG threads exit safely + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } + void (*function)(void*) = queue_.front().function; + void* arg = queue_.front().arg; + queue_.pop_front(); - // always wake up at least one waiting thread. - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + (*function)(arg); + } + } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); -} + static void* BGThreadWrapper(void* arg) { + reinterpret_cast(arg)->BGThread(); + return nullptr; + } -void PosixEnv::BGThread() { - while (true) { - // Wait until there is an item that is ready to run - PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty() && !exit_all_threads_) { - PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); + void SetBackgroundThreads(int num) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + if (num > total_threads_limit_) { + total_threads_limit_ = num; + } + assert(total_threads_limit_ > 0); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } - if (exit_all_threads_) { // mechanism to let BG threads exit safely + + void Schedule(void (*function)(void*), void* arg) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + // Start background thread if necessary + while ((int)bgthreads_.size() < total_threads_limit_) { + pthread_t t; + PthreadCall( + "create thread", + pthread_create(&t, + nullptr, + &ThreadPool::BGThreadWrapper, + this)); + fprintf(stdout, "Created bg thread 0x%lx\n", t); + bgthreads_.push_back(t); + } + + // Add to priority queue + queue_.push_back(BGItem()); + queue_.back().function = function; + queue_.back().arg = arg; + + // always wake up at least one waiting thread. + PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - break; } - void (*function)(void*) = queue_.front().function; - void* arg = queue_.front().arg; - queue_.pop_front(); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - (*function)(arg); - } + private: + // Entry per Schedule() call + struct BGItem { void* arg; void (*function)(void*); }; + typedef std::deque BGQueue; + + pthread_mutex_t mu_; + pthread_cond_t bgsignal_; + int total_threads_limit_; + std::vector bgthreads_; + BGQueue queue_; + bool exit_all_threads_; + }; + + std::vector thread_pools_; + + pthread_mutex_t mu_; + std::vector threads_to_join_; + +}; + +PosixEnv::PosixEnv() : checkedDiskForMmap_(false), + forceMmapOff(false), + page_size_(getpagesize()), + thread_pools_(Priority::TOTAL) { + PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); +} + +void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + thread_pools_[pri].Schedule(function, arg); } namespace { @@ -1192,7 +1217,9 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { state->arg = arg; PthreadCall("start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + PthreadCall("lock", pthread_mutex_lock(&mu_)); threads_to_join_.push_back(t); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } } // namespace diff --git a/util/env_test.cc b/util/env_test.cc index c7695501e..ebbd6a658 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -2,11 +2,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "rocksdb/env.h" +#include #include + +#include "rocksdb/env.h" #include "port/port.h" #include "util/coding.h" +#include "util/mutexlock.h" #include "util/testharness.h" namespace leveldb { @@ -27,6 +30,80 @@ static void SetBool(void* ptr) { reinterpret_cast(ptr)->NoBarrier_Store(ptr); } +TEST(EnvPosixTest, TwoPools) { + + class CB { + public: + CB(const std::string& pool_name, int pool_size) + : mu_(), + num_running_(0), + num_finished_(0), + pool_size_(pool_size), + pool_name_(pool_name) { } + + static void Run(void* v) { + CB* cb = reinterpret_cast(v); + cb->Run(); + } + + void Run() { + { + MutexLock l(&mu_); + num_running_++; + std::cout << "Pool " << pool_name_ << ": " + << num_running_ << " running threads.\n"; + // make sure we don't have more than pool_size_ jobs running. + ASSERT_LE(num_running_, pool_size_); + } + + // sleep for 1 sec + Env::Default()->SleepForMicroseconds(1000000); + + { + MutexLock l(&mu_); + num_running_--; + num_finished_++; + } + } + + int NumFinished() { + MutexLock l(&mu_); + return num_finished_; + } + + private: + port::Mutex mu_; + int num_running_; + int num_finished_; + int pool_size_; + std::string pool_name_; + }; + + const int kLowPoolSize = 2; + const int kHighPoolSize = 4; + const int kJobs = 8; + + CB low_pool_job("low", kLowPoolSize); + CB high_pool_job("high", kHighPoolSize); + + + env_->SetBackgroundThreads(kLowPoolSize); + env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + + // schedule same number of jobs in each pool + for (int i = 0; i < kJobs; i++) { + env_->Schedule(&CB::Run, &low_pool_job); + env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); + } + + // wait for all jobs to finish + while (low_pool_job.NumFinished() < kJobs || + high_pool_job.NumFinished() < kJobs) { + env_->SleepForMicroseconds(kDelayMicros); + } +} + + TEST(EnvPosixTest, RunImmediately) { port::AtomicPointer called (nullptr); env_->Schedule(&SetBool, &called);