Allow env_posix to lower background thread IO priority

Summary: This is a linux-specific system call.

Test Plan: ran db_bench

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: haobo, leveldb

Differential Revision: https://reviews.facebook.net/D21183
main
Lei Jin 10 years ago
parent 6a2be31f14
commit 58c49466d2
  1. 6
      db/db_bench.cc
  2. 8
      include/rocksdb/env.h
  3. 49
      util/env_posix.cc

@ -507,6 +507,8 @@ DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated " DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
"per prefix, 0 means no special handling of the prefix, " "per prefix, 0 means no special handling of the prefix, "
"i.e. use the prefix comes with the generated random number."); "i.e. use the prefix comes with the generated random number.");
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
"threads' IO priority");
enum RepFactory { enum RepFactory {
kSkipList, kSkipList,
@ -1639,6 +1641,10 @@ class Benchmark {
options.bloom_locality = FLAGS_bloom_locality; options.bloom_locality = FLAGS_bloom_locality;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
}
options.env = FLAGS_env; options.env = FLAGS_env;
options.disableDataSync = FLAGS_disable_data_sync; options.disableDataSync = FLAGS_disable_data_sync;
options.use_fsync = FLAGS_use_fsync; options.use_fsync = FLAGS_use_fsync;

@ -201,7 +201,7 @@ class Env {
// Priority for scheduling job in thread pool // Priority for scheduling job in thread pool
enum Priority { LOW, HIGH, TOTAL }; enum Priority { LOW, HIGH, TOTAL };
// Priority for scheduling job in thread pool // Priority for requesting bytes in rate limiter scheduler
enum IOPriority { enum IOPriority {
IO_LOW = 0, IO_LOW = 0,
IO_HIGH = 1, IO_HIGH = 1,
@ -272,6 +272,9 @@ class Env {
// default number: 1 // default number: 1
virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0;
// Lower IO priority for threads from the specified pool.
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) {}
// Converts seconds-since-Jan-01-1970 to a printable string // Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time) = 0; virtual std::string TimeToString(uint64_t time) = 0;
@ -779,6 +782,9 @@ class EnvWrapper : public Env {
void SetBackgroundThreads(int num, Priority pri) { void SetBackgroundThreads(int num, Priority pri) {
return target_->SetBackgroundThreads(num, pri); return target_->SetBackgroundThreads(num, pri);
} }
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
target_->LowerThreadPoolIOPriority(pool);
}
std::string TimeToString(uint64_t time) { std::string TimeToString(uint64_t time) {
return target_->TimeToString(time); return target_->TimeToString(time);
} }

@ -21,6 +21,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <sys/statfs.h> #include <sys/statfs.h>
#include <sys/syscall.h>
#endif #endif
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
@ -28,10 +29,6 @@
#include <unistd.h> #include <unistd.h>
#if defined(OS_LINUX) #if defined(OS_LINUX)
#include <linux/fs.h> #include <linux/fs.h>
#include <fcntl.h>
#endif
#if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h>
#endif #endif
#include <signal.h> #include <signal.h>
#include <algorithm> #include <algorithm>
@ -1398,6 +1395,13 @@ class PosixEnv : public Env {
thread_pools_[pri].SetBackgroundThreads(num); thread_pools_[pri].SetBackgroundThreads(num);
} }
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
assert(pool >= Priority::LOW && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority();
#endif
}
virtual std::string TimeToString(uint64_t secondsSince1970) { virtual std::string TimeToString(uint64_t secondsSince1970) {
const time_t seconds = (time_t)secondsSince1970; const time_t seconds = (time_t)secondsSince1970;
struct tm t; struct tm t;
@ -1480,7 +1484,8 @@ class PosixEnv : public Env {
bgthreads_(0), bgthreads_(0),
queue_(), queue_(),
queue_len_(0), queue_len_(0),
exit_all_threads_(false) { exit_all_threads_(false),
low_io_priority_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
} }
@ -1496,6 +1501,14 @@ class PosixEnv : public Env {
} }
} }
void LowerIOPriority() {
#ifdef OS_LINUX
PthreadCall("lock", pthread_mutex_lock(&mu_));
low_io_priority_ = true;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
#endif
}
// Return true if there is at least one thread needs to terminate. // Return true if there is at least one thread needs to terminate.
bool HasExcessiveThread() { bool HasExcessiveThread() {
return static_cast<int>(bgthreads_.size()) > total_threads_limit_; return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
@ -1514,6 +1527,7 @@ class PosixEnv : public Env {
} }
void BGThread(size_t thread_id) { void BGThread(size_t thread_id) {
bool low_io_priority = false;
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
@ -1549,7 +1563,31 @@ class PosixEnv : public Env {
queue_.pop_front(); queue_.pop_front();
queue_len_.store(queue_.size(), std::memory_order_relaxed); queue_len_.store(queue_.size(), std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_);
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
#ifdef OS_LINUX
if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) \
(((class) << IOPRIO_CLASS_SHIFT) | data)
// Put schedule into IOPRIO_CLASS_IDLE class (lowest)
// These system calls only have an effect when used in conjunction
// with an I/O scheduler that supports I/O priorities. As at
// kernel 2.6.17 the only such scheduler is the Completely
// Fair Queuing (CFQ) I/O scheduler.
// To change scheduler:
// echo cfq > /sys/block/<device_name>/queue/schedule
// Tunables to consider:
// /sys/block/<device_name>/queue/slice_idle
// /sys/block/<device_name>/queue/slice_sync
syscall(SYS_ioprio_set,
1, // IOPRIO_WHO_PROCESS
0, // current thread
IOPRIO_PRIO_VALUE(3, 0));
low_io_priority = true;
}
#endif
(*function)(arg); (*function)(arg);
} }
} }
@ -1657,6 +1695,7 @@ class PosixEnv : public Env {
BGQueue queue_; BGQueue queue_;
std::atomic_uint queue_len_; // Queue length. Used for stats reporting std::atomic_uint queue_len_; // Queue length. Used for stats reporting
bool exit_all_threads_; bool exit_all_threads_;
bool low_io_priority_;
}; };
std::vector<ThreadPool> thread_pools_; std::vector<ThreadPool> thread_pools_;

Loading…
Cancel
Save