diff --git a/db/db_bench.cc b/db/db_bench.cc index 41e707f74..8c1d28835 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -507,6 +507,8 @@ DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and " DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated " "per prefix, 0 means no special handling of the prefix, " "i.e. use the prefix comes with the generated random number."); +DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction " + "threads' IO priority"); enum RepFactory { kSkipList, @@ -1639,6 +1641,10 @@ class Benchmark { options.bloom_locality = FLAGS_bloom_locality; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; + if (FLAGS_enable_io_prio) { + FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); + FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH); + } options.env = FLAGS_env; options.disableDataSync = FLAGS_disable_data_sync; options.use_fsync = FLAGS_use_fsync; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index fc4665d90..70244bb31 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -201,7 +201,7 @@ class Env { // Priority for scheduling job in thread pool enum Priority { LOW, HIGH, TOTAL }; - // Priority for scheduling job in thread pool + // Priority for requesting bytes in rate limiter scheduler enum IOPriority { IO_LOW = 0, IO_HIGH = 1, @@ -272,6 +272,9 @@ class Env { // default number: 1 virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; + // Lower IO priority for threads from the specified pool. + virtual void LowerThreadPoolIOPriority(Priority pool = LOW) {} + // Converts seconds-since-Jan-01-1970 to a printable string virtual std::string TimeToString(uint64_t time) = 0; @@ -779,6 +782,9 @@ class EnvWrapper : public Env { void SetBackgroundThreads(int num, Priority pri) { return target_->SetBackgroundThreads(num, pri); } + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + target_->LowerThreadPoolIOPriority(pool); + } std::string TimeToString(uint64_t time) { return target_->TimeToString(time); } diff --git a/util/env_posix.cc b/util/env_posix.cc index dc8696e55..94d1f61c9 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -21,6 +21,7 @@ #include #ifdef OS_LINUX #include +#include #endif #include #include @@ -28,10 +29,6 @@ #include #if defined(OS_LINUX) #include -#include -#endif -#if defined(LEVELDB_PLATFORM_ANDROID) -#include #endif #include #include @@ -1398,6 +1395,13 @@ class PosixEnv : public Env { thread_pools_[pri].SetBackgroundThreads(num); } + virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override { + assert(pool >= Priority::LOW && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerIOPriority(); +#endif + } + virtual std::string TimeToString(uint64_t secondsSince1970) { const time_t seconds = (time_t)secondsSince1970; struct tm t; @@ -1480,7 +1484,8 @@ class PosixEnv : public Env { bgthreads_(0), queue_(), queue_len_(0), - exit_all_threads_(false) { + exit_all_threads_(false), + low_io_priority_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } @@ -1496,6 +1501,14 @@ class PosixEnv : public Env { } } + void LowerIOPriority() { +#ifdef OS_LINUX + PthreadCall("lock", pthread_mutex_lock(&mu_)); + low_io_priority_ = true; + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +#endif + } + // Return true if there is at least one thread needs to terminate. bool HasExcessiveThread() { return static_cast(bgthreads_.size()) > total_threads_limit_; @@ -1514,6 +1527,7 @@ class PosixEnv : public Env { } void BGThread(size_t thread_id) { + bool low_io_priority = false; while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); @@ -1549,7 +1563,31 @@ class PosixEnv : public Env { queue_.pop_front(); queue_len_.store(queue_.size(), std::memory_order_relaxed); + bool decrease_io_priority = (low_io_priority != low_io_priority_); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + +#ifdef OS_LINUX + if (decrease_io_priority) { + #define IOPRIO_CLASS_SHIFT (13) + #define IOPRIO_PRIO_VALUE(class, data) \ + (((class) << IOPRIO_CLASS_SHIFT) | data) + // Put schedule into IOPRIO_CLASS_IDLE class (lowest) + // These system calls only have an effect when used in conjunction + // with an I/O scheduler that supports I/O priorities. As at + // kernel 2.6.17 the only such scheduler is the Completely + // Fair Queuing (CFQ) I/O scheduler. + // To change scheduler: + // echo cfq > /sys/block//queue/schedule + // Tunables to consider: + // /sys/block//queue/slice_idle + // /sys/block//queue/slice_sync + syscall(SYS_ioprio_set, + 1, // IOPRIO_WHO_PROCESS + 0, // current thread + IOPRIO_PRIO_VALUE(3, 0)); + low_io_priority = true; + } +#endif (*function)(arg); } } @@ -1657,6 +1695,7 @@ class PosixEnv : public Env { BGQueue queue_; std::atomic_uint queue_len_; // Queue length. Used for stats reporting bool exit_all_threads_; + bool low_io_priority_; }; std::vector thread_pools_;