diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 4ec1b1f5d..944946692 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -135,6 +135,9 @@ class HdfsEnv : public Env { return posixEnv->GetAbsolutePath(db_path, output_path); } + virtual void SetBackgroundThreads(int number) { + posixEnv->SetBackgroundThreads(number); + } static uint64_t gettid() { assert(sizeof(pthread_t) <= sizeof(uint64_t)); @@ -268,6 +271,8 @@ class HdfsEnv : public Env { virtual Status GetAbsolutePath(const std::string& db_path, std::string* outputpath) {return notsup;} + + virtual void SetBackgroundThreads(int number) {} }; } diff --git a/include/leveldb/env.h b/include/leveldb/env.h index a0e443af4..91c9e9d31 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -155,6 +155,10 @@ class Env { virtual Status GetAbsolutePath(const std::string& db_path, std::string* output_path) = 0; + // The number of background worker threads for this environment. + // default: 1 + virtual void SetBackgroundThreads(int number) = 0; + private: // No copying allowed Env(const Env&); @@ -344,6 +348,9 @@ class EnvWrapper : public Env { std::string* output_path) { return target_->GetAbsolutePath(db_path, output_path); } + void SetBackgroundThreads(int num) { + return target_->SetBackgroundThreads(num); + } private: Env* target_; diff --git a/util/env_posix.cc b/util/env_posix.cc index 75c953c74..9ee494def 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -601,6 +601,14 @@ class PosixEnv : public Env { return Status::OK(); } + // Allow increasing the number of worker threads. + virtual void SetBackgroundThreads(int num) { + if (num > num_threads_) { + num_threads_ = num; + bgthread_.resize(num_threads_); + } + } + private: void PthreadCall(const char* label, int result) { if (result != 0) { @@ -619,8 +627,9 @@ class PosixEnv : public Env { size_t page_size_; pthread_mutex_t mu_; pthread_cond_t bgsignal_; - pthread_t bgthread_; - bool started_bgthread_; + std::vector bgthread_; + int started_bgthread_; + int num_threads_; // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; @@ -629,20 +638,22 @@ class PosixEnv : public Env { }; PosixEnv::PosixEnv() : page_size_(getpagesize()), - started_bgthread_(false) { + started_bgthread_(0), + num_threads_(1) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); + bgthread_.resize(num_threads_); } void PosixEnv::Schedule(void (*function)(void*), void* arg) { PthreadCall("lock", pthread_mutex_lock(&mu_)); // Start background thread if necessary - if (!started_bgthread_) { - started_bgthread_ = true; + for (; started_bgthread_ < num_threads_; started_bgthread_++) { PthreadCall( "create thread", - pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this)); + pthread_create(&bgthread_[started_bgthread_], NULL, &PosixEnv::BGThreadWrapper, this)); + fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); } // If the queue is currently empty, the background thread may currently be