diff --git a/HISTORY.md b/HISTORY.md index 6b287d581..4e1db503a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -23,6 +23,7 @@ * Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug. * `pin_l0_filter_and_index_blocks_in_cache` no longer applies to L0 files larger than `1.5 * write_buffer_size` to give more predictable memory usage. Such L0 files may exist due to intra-L0 compaction, external file ingestion, or user dynamically changing `write_buffer_size` (note, however, that files that are already pinned will continue being pinned, even after such a dynamic change). * In point-in-time wal recovery mode, fail database recovery in case of IOError while reading the WAL to avoid data loss. +* A new method `Env::LowerThreadPoolCPUPriority(Priority, CpuPriority)` is added to `Env` to be able to lower to a specific priority such as `CpuPriority::kIdle`. ### New Features * sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too. diff --git a/env/composite_env_wrapper.h b/env/composite_env_wrapper.h index 6e4608b73..8a8e2f266 100644 --- a/env/composite_env_wrapper.h +++ b/env/composite_env_wrapper.h @@ -555,14 +555,18 @@ class CompositeEnvWrapper : public Env { return env_target_->IncBackgroundThreadsIfNeeded(num, pri); } - void LowerThreadPoolIOPriority(Priority pool = LOW) override { + void LowerThreadPoolIOPriority(Priority pool) override { env_target_->LowerThreadPoolIOPriority(pool); } - void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + void LowerThreadPoolCPUPriority(Priority pool) override { env_target_->LowerThreadPoolCPUPriority(pool); } + Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override { + return env_target_->LowerThreadPoolCPUPriority(pool, pri); + } + std::string TimeToString(uint64_t time) override { return env_target_->TimeToString(time); } diff --git a/env/env_posix.cc b/env/env_posix.cc index 38dc8bdee..624dcefbe 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -329,7 +329,7 @@ class PosixEnv : public CompositeEnvWrapper { thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); } - void LowerThreadPoolIOPriority(Priority pool = LOW) override { + void LowerThreadPoolIOPriority(Priority pool) override { assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); #ifdef OS_LINUX thread_pools_[pool].LowerIOPriority(); @@ -338,13 +338,15 @@ class PosixEnv : public CompositeEnvWrapper { #endif } - void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + void LowerThreadPoolCPUPriority(Priority pool) override { assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); -#ifdef OS_LINUX - thread_pools_[pool].LowerCPUPriority(); -#else - (void)pool; -#endif + thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow); + } + + Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); + thread_pools_[pool].LowerCPUPriority(pri); + return Status::OK(); } std::string TimeToString(uint64_t secondsSince1970) override { diff --git a/env/env_test.cc b/env/env_test.cc index b2060ca95..d62f69192 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -202,6 +202,79 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) { } } } + +TEST_F(EnvPosixTest, LowerThreadPoolCpuPriority) { + std::atomic from_priority(CpuPriority::kNormal); + std::atomic to_priority(CpuPriority::kNormal); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ThreadPoolImpl::BGThread::BeforeSetCpuPriority", [&](void* pri) { + from_priority.store(*reinterpret_cast(pri)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ThreadPoolImpl::BGThread::AfterSetCpuPriority", [&](void* pri) { + to_priority.store(*reinterpret_cast(pri)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + env_->SetBackgroundThreads(1, Env::BOTTOM); + env_->SetBackgroundThreads(1, Env::HIGH); + + auto RunTask = [&](Env::Priority pool) { + std::atomic called(false); + env_->Schedule(&SetBool, &called, pool); + for (int i = 0; i < kDelayMicros; i++) { + if (called.load()) { + break; + } + Env::Default()->SleepForMicroseconds(1); + } + ASSERT_TRUE(called.load()); + }; + + { + // Same priority, no-op. + env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, + CpuPriority::kNormal); + RunTask(Env::Priority::BOTTOM); + ASSERT_EQ(from_priority, CpuPriority::kNormal); + ASSERT_EQ(to_priority, CpuPriority::kNormal); + } + + { + // Higher priority, no-op. + env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kHigh); + RunTask(Env::Priority::BOTTOM); + ASSERT_EQ(from_priority, CpuPriority::kNormal); + ASSERT_EQ(to_priority, CpuPriority::kNormal); + } + + { + // Lower priority from kNormal -> kLow. + env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kLow); + RunTask(Env::Priority::BOTTOM); + ASSERT_EQ(from_priority, CpuPriority::kNormal); + ASSERT_EQ(to_priority, CpuPriority::kLow); + } + + { + // Lower priority from kLow -> kIdle. + env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kIdle); + RunTask(Env::Priority::BOTTOM); + ASSERT_EQ(from_priority, CpuPriority::kLow); + ASSERT_EQ(to_priority, CpuPriority::kIdle); + } + + { + // Lower priority from kNormal -> kIdle for another pool. + env_->LowerThreadPoolCPUPriority(Env::Priority::HIGH, CpuPriority::kIdle); + RunTask(Env::Priority::HIGH); + ASSERT_EQ(from_priority, CpuPriority::kNormal); + ASSERT_EQ(to_priority, CpuPriority::kIdle); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} #endif TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index e1c54d751..4af2171f0 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -61,6 +61,13 @@ class FileSystem; const size_t kDefaultPageSize = 4 * 1024; +enum class CpuPriority { + kIdle = 0, + kLow = 1, + kNormal = 2, + kHigh = 3, +}; + // Options while opening a file to read/write struct EnvOptions { // Construct with default Options @@ -474,6 +481,13 @@ class Env { // Lower IO priority for threads from the specified pool. virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {} + // Lower CPU priority for threads from the specified pool. + virtual Status LowerThreadPoolCPUPriority(Priority /*pool*/, + CpuPriority /*pri*/) { + return Status::NotSupported( + "Env::LowerThreadPoolCPUPriority(Priority, CpuPriority) not supported"); + } + // Lower CPU priority for threads from the specified pool. virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {} @@ -1355,14 +1369,18 @@ class EnvWrapper : public Env { return target_->IncBackgroundThreadsIfNeeded(num, pri); } - void LowerThreadPoolIOPriority(Priority pool = LOW) override { + void LowerThreadPoolIOPriority(Priority pool) override { target_->LowerThreadPoolIOPriority(pool); } - void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + void LowerThreadPoolCPUPriority(Priority pool) override { target_->LowerThreadPoolCPUPriority(pool); } + Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override { + return target_->LowerThreadPoolCPUPriority(pool, pri); + } + std::string TimeToString(uint64_t time) override { return target_->TimeToString(time); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 040892e1e..f07a9a702 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -51,13 +51,6 @@ class InternalKeyComparator; class WalFilter; class FileSystem; -enum class CpuPriority { - kIdle = 0, - kLow = 1, - kNormal = 2, - kHigh = 3, -}; - // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before // being stored in a file. The following enum describes which diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index 4c5d8594f..dcaf288aa 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -57,7 +57,7 @@ struct ThreadPoolImpl::Impl { void LowerIOPriority(); - void LowerCPUPriority(); + void LowerCPUPriority(CpuPriority pri); void WakeUpAllThreads() { bgsignal_.notify_all(); @@ -102,7 +102,7 @@ private: static void BGThreadWrapper(void* arg); bool low_io_priority_; - bool low_cpu_priority_; + CpuPriority cpu_priority_; Env::Priority priority_; Env* env_; @@ -126,12 +126,9 @@ private: std::vector bgthreads_; }; - -inline -ThreadPoolImpl::Impl::Impl() - : - low_io_priority_(false), - low_cpu_priority_(false), +inline ThreadPoolImpl::Impl::Impl() + : low_io_priority_(false), + cpu_priority_(CpuPriority::kNormal), priority_(Env::LOW), env_(nullptr), total_threads_limit_(0), @@ -141,8 +138,7 @@ ThreadPoolImpl::Impl::Impl() queue_(), mu_(), bgsignal_(), - bgthreads_() { -} + bgthreads_() {} inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } @@ -178,15 +174,14 @@ void ThreadPoolImpl::Impl::LowerIOPriority() { low_io_priority_ = true; } -inline -void ThreadPoolImpl::Impl::LowerCPUPriority() { +inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) { std::lock_guard lock(mu_); - low_cpu_priority_ = true; + cpu_priority_ = pri; } void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { bool low_io_priority = false; - bool low_cpu_priority = false; + CpuPriority current_cpu_priority = CpuPriority::kNormal; while (true) { // Wait until there is an item that is ready to run @@ -227,16 +222,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { std::memory_order_relaxed); bool decrease_io_priority = (low_io_priority != low_io_priority_); - bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); + CpuPriority cpu_priority = cpu_priority_; lock.unlock(); -#ifdef OS_LINUX - if (decrease_cpu_priority) { + if (cpu_priority < current_cpu_priority) { + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority", + ¤t_cpu_priority); // 0 means current thread. - port::SetCpuPriority(0, CpuPriority::kLow); - low_cpu_priority = true; + port::SetCpuPriority(0, cpu_priority); + current_cpu_priority = cpu_priority; + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority", + ¤t_cpu_priority); } +#ifdef OS_LINUX if (decrease_io_priority) { #define IOPRIO_CLASS_SHIFT (13) #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) @@ -257,7 +256,6 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { } #else (void)decrease_io_priority; // avoid 'unused variable' error - (void)decrease_cpu_priority; #endif TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun", @@ -452,8 +450,8 @@ void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); } -void ThreadPoolImpl::LowerCPUPriority() { - impl_->LowerCPUPriority(); +void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) { + impl_->LowerCPUPriority(pri); } void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { diff --git a/util/threadpool_imp.h b/util/threadpool_imp.h index e3c12577b..0bf0824e4 100644 --- a/util/threadpool_imp.h +++ b/util/threadpool_imp.h @@ -51,7 +51,7 @@ class ThreadPoolImpl : public ThreadPool { // Make threads to run at a lower kernel CPU priority // Currently only has effect on Linux - void LowerCPUPriority(); + void LowerCPUPriority(CpuPriority pri); // Ensure there is at aleast num threads in the pool // but do not kill threads if there are more