diff --git a/HISTORY.md b/HISTORY.md index 5163b0222..c0809b919 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data. * TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki. * Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage. +* Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks. ### Bug Fixes * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. diff --git a/env/env_posix.cc b/env/env_posix.cc index 2d1985f68..707625f3f 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -815,6 +815,15 @@ class PosixEnv : public Env { #endif } + virtual void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerCPUPriority(); +#else + (void)pool; +#endif + } + virtual std::string TimeToString(uint64_t secondsSince1970) override { const time_t seconds = (time_t)secondsSince1970; struct tm t; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 33b366544..55356a904 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -397,6 +397,9 @@ class Env { // Lower IO priority for threads from the specified pool. virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {} + // Lower CPU priority for threads from the specified pool. + virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {} + // Converts seconds-since-Jan-01-1970 to a printable string virtual std::string TimeToString(uint64_t time) = 0; @@ -1092,6 +1095,10 @@ class EnvWrapper : public Env { target_->LowerThreadPoolIOPriority(pool); } + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + target_->LowerThreadPoolCPUPriority(pool); + } + std::string TimeToString(uint64_t time) override { return target_->TimeToString(time); } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 908912036..3b2dbd7ce 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -992,6 +992,8 @@ DEFINE_int32(memtable_insert_with_hint_prefix_size, 0, "memtable insert with hint with the given prefix size."); DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction " "threads' IO priority"); +DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction " + "threads' CPU priority"); DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo " "table becomes an identity function. This is only valid when key " "is 8 bytes"); @@ -3321,6 +3323,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH); } + if (FLAGS_enable_cpu_prio) { + FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW); + FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH); + } options.env = FLAGS_env; if (FLAGS_rate_limiter_bytes_per_sec > 0) { diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index 25ad1a693..3b1f762b8 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -18,6 +18,7 @@ #ifdef OS_LINUX # include +# include #endif #include @@ -54,6 +55,8 @@ struct ThreadPoolImpl::Impl { void LowerIOPriority(); + void LowerCPUPriority(); + void WakeUpAllThreads() { bgsignal_.notify_all(); } @@ -98,6 +101,7 @@ private: static void* BGThreadWrapper(void* arg); bool low_io_priority_; + bool low_cpu_priority_; Env::Priority priority_; Env* env_; @@ -126,6 +130,7 @@ inline ThreadPoolImpl::Impl::Impl() : low_io_priority_(false), + low_cpu_priority_(false), priority_(Env::LOW), env_(nullptr), total_threads_limit_(0), @@ -172,9 +177,16 @@ void ThreadPoolImpl::Impl::LowerIOPriority() { low_io_priority_ = true; } +inline +void ThreadPoolImpl::Impl::LowerCPUPriority() { + std::lock_guard lock(mu_); + low_cpu_priority_ = true; +} void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { bool low_io_priority = false; + bool low_cpu_priority = false; + while (true) { // Wait until there is an item that is ready to run std::unique_lock lock(mu_); @@ -214,9 +226,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { std::memory_order_relaxed); bool decrease_io_priority = (low_io_priority != low_io_priority_); + bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); lock.unlock(); #ifdef OS_LINUX + if (decrease_cpu_priority) { + setpriority( + PRIO_PROCESS, + // Current thread. + 0, + // Lowest priority possible. + 19); + low_cpu_priority = true; + } + if (decrease_io_priority) { #define IOPRIO_CLASS_SHIFT (13) #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) @@ -237,6 +260,7 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { } #else (void)decrease_io_priority; // avoid 'unused variable' error + (void)decrease_cpu_priority; #endif func(); } @@ -425,6 +449,10 @@ void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); } +void ThreadPoolImpl::LowerCPUPriority() { + impl_->LowerCPUPriority(); +} + void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { impl_->SetBackgroundThreadsInternal(num, false); } diff --git a/util/threadpool_imp.h b/util/threadpool_imp.h index cced19bdd..3cdafb839 100644 --- a/util/threadpool_imp.h +++ b/util/threadpool_imp.h @@ -46,10 +46,14 @@ class ThreadPoolImpl : public ThreadPool { // start yet void WaitForJobsAndJoinAllThreads() override; - // Make threads to run at a lower kernel priority + // Make threads to run at a lower kernel IO priority // Currently only has effect on Linux void LowerIOPriority(); + // Make threads to run at a lower kernel CPU priority + // Currently only has effect on Linux + void LowerCPUPriority(); + // Ensure there is at aleast num threads in the pool // but do not kill threads if there are more void IncBackgroundThreadsIfNeeded(int num);