Allow a configurable number of background threads.

Summary:
The background threads are necessary for compaction.
For slower storage, it might be necessary to have more than
one compaction thread per DB. This patch allows creating
a configurable number of worker threads.
The default reamins at 1 (to maintain backward compatibility).

Test Plan:
run all unit tests. changes to db-bench coming in
a separate patch.

Reviewers: heyongqiang

Reviewed By: heyongqiang

CC: MarkCallaghan

Differential Revision: https://reviews.facebook.net/D5559
main
Dhruba Borthakur 12 years ago
parent fb4b381a0c
commit 9e84834eb4
  1. 5
      hdfs/env_hdfs.h
  2. 7
      include/leveldb/env.h
  3. 23
      util/env_posix.cc

@ -135,6 +135,9 @@ class HdfsEnv : public Env {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
virtual void SetBackgroundThreads(int number) {
posixEnv->SetBackgroundThreads(number);
}
static uint64_t gettid() {
assert(sizeof(pthread_t) <= sizeof(uint64_t));
@ -268,6 +271,8 @@ class HdfsEnv : public Env {
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* outputpath) {return notsup;}
virtual void SetBackgroundThreads(int number) {}
};
}

@ -155,6 +155,10 @@ class Env {
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) = 0;
// The number of background worker threads for this environment.
// default: 1
virtual void SetBackgroundThreads(int number) = 0;
private:
// No copying allowed
Env(const Env&);
@ -344,6 +348,9 @@ class EnvWrapper : public Env {
std::string* output_path) {
return target_->GetAbsolutePath(db_path, output_path);
}
void SetBackgroundThreads(int num) {
return target_->SetBackgroundThreads(num);
}
private:
Env* target_;

@ -601,6 +601,14 @@ class PosixEnv : public Env {
return Status::OK();
}
// Allow increasing the number of worker threads.
virtual void SetBackgroundThreads(int num) {
if (num > num_threads_) {
num_threads_ = num;
bgthread_.resize(num_threads_);
}
}
private:
void PthreadCall(const char* label, int result) {
if (result != 0) {
@ -619,8 +627,9 @@ class PosixEnv : public Env {
size_t page_size_;
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
pthread_t bgthread_;
bool started_bgthread_;
std::vector<pthread_t> bgthread_;
int started_bgthread_;
int num_threads_;
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
@ -629,20 +638,22 @@ class PosixEnv : public Env {
};
PosixEnv::PosixEnv() : page_size_(getpagesize()),
started_bgthread_(false) {
started_bgthread_(0),
num_threads_(1) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
bgthread_.resize(num_threads_);
}
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
for (; started_bgthread_ < num_threads_; started_bgthread_++) {
PthreadCall(
"create thread",
pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
pthread_create(&bgthread_[started_bgthread_], NULL, &PosixEnv::BGThreadWrapper, this));
fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
}
// If the queue is currently empty, the background thread may currently be

Loading…
Cancel
Save