[RocksDB] Enhance Env to support two thread pools LOW and HIGH

Summary:
this is the ground work for separating memtable flush jobs to their own thread pool.
Both SetBackgroundThreads and Schedule take a third parameter Priority to indicate which thread pool they are working on. The names LOW and HIGH are just identifiers for two different thread pools, and does not indicate real difference in 'priority'. We can set number of threads in the pools independently.
The thread pool implementation is refactored.

Test Plan: make check

Reviewers: dhruba, emayanke

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12885
main
Haobo Xu 11 years ago
parent 0e422308aa
commit 1565dab809
  1. 14
      hdfs/env_hdfs.h
  2. 26
      include/rocksdb/env.h
  3. 169
      util/env_posix.cc
  4. 79
      util/env_test.cc

@ -107,8 +107,9 @@ class HdfsEnv : public Env {
virtual Status NewLogger(const std::string& fname, Logger** result);
virtual void Schedule( void (*function)(void* arg), void* arg) {
posixEnv->Schedule(function, arg);
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW) {
posixEnv->Schedule(function, arg, pri);
}
virtual void StartThread(void (*function)(void* arg), void* arg) {
@ -140,8 +141,8 @@ class HdfsEnv : public Env {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
virtual void SetBackgroundThreads(int number) {
posixEnv->SetBackgroundThreads(number);
virtual void SetBackgroundThreads(int number, Priority pri = LOW) {
posixEnv->SetBackgroundThreads(number, pri);
}
virtual std::string TimeToString(uint64_t number) {
@ -279,7 +280,8 @@ class HdfsEnv : public Env {
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result){return notsup;}
virtual void Schedule( void (*function)(void* arg), void* arg) {}
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW) {}
virtual void StartThread(void (*function)(void* arg), void* arg) {}
@ -296,7 +298,7 @@ class HdfsEnv : public Env {
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* outputpath) {return notsup;}
virtual void SetBackgroundThreads(int number) {}
virtual void SetBackgroundThreads(int number, Priority pri = LOW) {}
virtual std::string TimeToString(uint64_t number) { return "";}
};

@ -162,15 +162,20 @@ class Env {
// REQUIRES: lock has not already been unlocked.
virtual Status UnlockFile(FileLock* lock) = 0;
// Arrange to run "(*function)(arg)" once in a background thread.
//
enum Priority { LOW, HIGH, TOTAL };
// Arrange to run "(*function)(arg)" once in a background thread, in
// the thread pool specified by pri. By default, jobs go to the 'LOW'
// priority thread pool.
// "function" may run in an unspecified thread. Multiple functions
// added to the same Env may run concurrently in different threads.
// I.e., the caller may not assume that background work items are
// serialized.
virtual void Schedule(
void (*function)(void* arg),
void* arg) = 0;
void* arg,
Priority pri = LOW) = 0;
// Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed.
@ -210,9 +215,10 @@ class Env {
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) = 0;
// The number of background worker threads for this environment.
// default: 1
virtual void SetBackgroundThreads(int number) = 0;
// The number of background worker threads of a specific thread pool
// for this environment. 'LOW' is the default pool.
// default number: 1
virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0;
// Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time) = 0;
@ -496,8 +502,8 @@ class EnvWrapper : public Env {
return target_->LockFile(f, l);
}
Status UnlockFile(FileLock* l) { return target_->UnlockFile(l); }
void Schedule(void (*f)(void*), void* a) {
return target_->Schedule(f, a);
void Schedule(void (*f)(void*), void* a, Priority pri) {
return target_->Schedule(f, a, pri);
}
void StartThread(void (*f)(void*), void* a) {
return target_->StartThread(f, a);
@ -525,8 +531,8 @@ class EnvWrapper : public Env {
std::string* output_path) {
return target_->GetAbsolutePath(db_path, output_path);
}
void SetBackgroundThreads(int num) {
return target_->SetBackgroundThreads(num);
void SetBackgroundThreads(int num, Priority pri) {
return target_->SetBackgroundThreads(num, pri);
}
std::string TimeToString(uint64_t time) {
return target_->TimeToString(time);

@ -708,12 +708,24 @@ class PosixFileLock : public FileLock {
std::string filename;
};
namespace {
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
exit(1);
}
}
}
class PosixEnv : public Env {
public:
PosixEnv();
virtual ~PosixEnv(){
WaitForBGThreads();
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
}
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
@ -913,9 +925,7 @@ class PosixEnv : public Env {
return result;
}
virtual void Schedule(void (*function)(void*), void* arg);
virtual void WaitForBGThreads();
virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW);
virtual void StartThread(void (*function)(void* arg), void* arg);
@ -1008,13 +1018,9 @@ class PosixEnv : public Env {
}
// Allow increasing the number of worker threads.
virtual void SetBackgroundThreads(int num) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (num > num_threads_) {
num_threads_ = num;
bgthread_.resize(num_threads_);
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
virtual void SetBackgroundThreads(int num, Priority pri) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].SetBackgroundThreads(num);
}
virtual std::string TimeToString(uint64_t secondsSince1970) {
@ -1041,12 +1047,6 @@ class PosixEnv : public Env {
bool checkedDiskForMmap_;
bool forceMmapOff; // do we override Env options?
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
exit(1);
}
}
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname) {
@ -1057,14 +1057,6 @@ class PosixEnv : public Env {
return false; // stat() failed return false
}
// BGThread() is the body of the background thread
void BGThread();
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<PosixEnv*>(arg)->BGThread();
return nullptr;
}
bool SupportsFastAllocate(const std::string& path) {
struct statfs s;
if (statfs(path.c_str(), &s)){
@ -1083,46 +1075,66 @@ class PosixEnv : public Env {
}
size_t page_size_;
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
std::vector<pthread_t> bgthread_;
int started_bgthread_;
int num_threads_;
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue;
int queue_size_; // number of items in BGQueue
bool exit_all_threads_;
BGQueue queue_;
std::vector<pthread_t> threads_to_join_;
};
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
forceMmapOff(false),
page_size_(getpagesize()),
started_bgthread_(0),
num_threads_(1),
queue_size_(0),
class ThreadPool {
public:
ThreadPool() :
total_threads_limit_(1),
bgthreads_(0),
queue_(),
exit_all_threads_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
bgthread_.resize(num_threads_);
}
// Signal and Join all background threads started by calls to Schedule
void PosixEnv::WaitForBGThreads() {
~ThreadPool() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
assert(!exit_all_threads_);
exit_all_threads_ = true;
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
for (unsigned int i = 0; i < threads_to_join_.size(); i++) {
pthread_join(threads_to_join_[i], nullptr);
for (const auto tid : bgthreads_) {
pthread_join(tid, nullptr);
}
}
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
void BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty() && !exit_all_threads_) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<ThreadPool*>(arg)->BGThread();
return nullptr;
}
void SetBackgroundThreads(int num) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (num > total_threads_limit_) {
total_threads_limit_ = num;
}
assert(total_threads_limit_ > 0);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
@ -1130,15 +1142,16 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
return;
}
// Start background thread if necessary
for (; started_bgthread_ < num_threads_; started_bgthread_++) {
while ((int)bgthreads_.size() < total_threads_limit_) {
pthread_t t;
PthreadCall(
"create thread",
pthread_create(&bgthread_[started_bgthread_],
pthread_create(&t,
nullptr,
&PosixEnv::BGThreadWrapper,
&ThreadPool::BGThreadWrapper,
this));
threads_to_join_.push_back(bgthread_[started_bgthread_]);
fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
fprintf(stdout, "Created bg thread 0x%lx\n", t);
bgthreads_.push_back(t);
}
// Add to priority queue
@ -1152,24 +1165,36 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty() && !exit_all_threads_) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
private:
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
int total_threads_limit_;
std::vector<pthread_t> bgthreads_;
BGQueue queue_;
bool exit_all_threads_;
};
std::vector<ThreadPool> thread_pools_;
pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_;
};
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
forceMmapOff(false),
page_size_(getpagesize()),
thread_pools_(Priority::TOTAL) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
}
void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg);
}
namespace {
@ -1192,7 +1217,9 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
state->arg = arg;
PthreadCall("start thread",
pthread_create(&t, nullptr, &StartThreadWrapper, state));
PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
} // namespace

@ -2,11 +2,14 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/env.h"
#include <iostream>
#include <unordered_set>
#include "rocksdb/env.h"
#include "port/port.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/testharness.h"
namespace leveldb {
@ -27,6 +30,80 @@ static void SetBool(void* ptr) {
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
}
TEST(EnvPosixTest, TwoPools) {
class CB {
public:
CB(const std::string& pool_name, int pool_size)
: mu_(),
num_running_(0),
num_finished_(0),
pool_size_(pool_size),
pool_name_(pool_name) { }
static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v);
cb->Run();
}
void Run() {
{
MutexLock l(&mu_);
num_running_++;
std::cout << "Pool " << pool_name_ << ": "
<< num_running_ << " running threads.\n";
// make sure we don't have more than pool_size_ jobs running.
ASSERT_LE(num_running_, pool_size_);
}
// sleep for 1 sec
Env::Default()->SleepForMicroseconds(1000000);
{
MutexLock l(&mu_);
num_running_--;
num_finished_++;
}
}
int NumFinished() {
MutexLock l(&mu_);
return num_finished_;
}
private:
port::Mutex mu_;
int num_running_;
int num_finished_;
int pool_size_;
std::string pool_name_;
};
const int kLowPoolSize = 2;
const int kHighPoolSize = 4;
const int kJobs = 8;
CB low_pool_job("low", kLowPoolSize);
CB high_pool_job("high", kHighPoolSize);
env_->SetBackgroundThreads(kLowPoolSize);
env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
// schedule same number of jobs in each pool
for (int i = 0; i < kJobs; i++) {
env_->Schedule(&CB::Run, &low_pool_job);
env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
}
// wait for all jobs to finish
while (low_pool_job.NumFinished() < kJobs ||
high_pool_job.NumFinished() < kJobs) {
env_->SleepForMicroseconds(kDelayMicros);
}
}
TEST(EnvPosixTest, RunImmediately) {
port::AtomicPointer called (nullptr);
env_->Schedule(&SetBool, &called);

Loading…
Cancel
Save