Make it able to lower cpu priority to specific level in threadpool (#6969)

Summary:
`Env::LowerThreadPoolCPUPriority` takes a new parameter `CpuPriority` to be able to lower to a specific priority such as `CpuPriority::kIdle`, previously, the priority is always lowered to `CpuPriority::kLow`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6969

Test Plan: unit test `EnvPosixTest::LowerThreadPoolCpuPriority` added to `env_test.cc`.

Reviewed By: siying

Differential Revision: D22011169

Pulled By: cheng-chang

fbshipit-source-id: 568878c24a924912e35cef00c552d4a63431cdf4
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 15d9f28da5
commit f7613e2a9e
  1. 1
      HISTORY.md
  2. 8
      env/composite_env_wrapper.h
  3. 16
      env/env_posix.cc
  4. 73
      env/env_test.cc
  5. 22
      include/rocksdb/env.h
  6. 7
      include/rocksdb/options.h
  7. 42
      util/threadpool_imp.cc
  8. 2
      util/threadpool_imp.h

@ -23,6 +23,7 @@
* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug.
* `pin_l0_filter_and_index_blocks_in_cache` no longer applies to L0 files larger than `1.5 * write_buffer_size` to give more predictable memory usage. Such L0 files may exist due to intra-L0 compaction, external file ingestion, or user dynamically changing `write_buffer_size` (note, however, that files that are already pinned will continue being pinned, even after such a dynamic change).
* In point-in-time wal recovery mode, fail database recovery in case of IOError while reading the WAL to avoid data loss.
* A new method `Env::LowerThreadPoolCPUPriority(Priority, CpuPriority)` is added to `Env` to be able to lower to a specific priority such as `CpuPriority::kIdle`.
### New Features
* sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too.

@ -555,14 +555,18 @@ class CompositeEnvWrapper : public Env {
return env_target_->IncBackgroundThreadsIfNeeded(num, pri);
}
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
void LowerThreadPoolIOPriority(Priority pool) override {
env_target_->LowerThreadPoolIOPriority(pool);
}
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
void LowerThreadPoolCPUPriority(Priority pool) override {
env_target_->LowerThreadPoolCPUPriority(pool);
}
Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
return env_target_->LowerThreadPoolCPUPriority(pool, pri);
}
std::string TimeToString(uint64_t time) override {
return env_target_->TimeToString(time);
}

16
env/env_posix.cc vendored

@ -329,7 +329,7 @@ class PosixEnv : public CompositeEnvWrapper {
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
}
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
void LowerThreadPoolIOPriority(Priority pool) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority();
@ -338,13 +338,15 @@ class PosixEnv : public CompositeEnvWrapper {
#endif
}
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
void LowerThreadPoolCPUPriority(Priority pool) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerCPUPriority();
#else
(void)pool;
#endif
thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
}
Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
thread_pools_[pool].LowerCPUPriority(pri);
return Status::OK();
}
std::string TimeToString(uint64_t secondsSince1970) override {

73
env/env_test.cc vendored

@ -202,6 +202,79 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) {
}
}
}
TEST_F(EnvPosixTest, LowerThreadPoolCpuPriority) {
std::atomic<CpuPriority> from_priority(CpuPriority::kNormal);
std::atomic<CpuPriority> to_priority(CpuPriority::kNormal);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ThreadPoolImpl::BGThread::BeforeSetCpuPriority", [&](void* pri) {
from_priority.store(*reinterpret_cast<CpuPriority*>(pri));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ThreadPoolImpl::BGThread::AfterSetCpuPriority", [&](void* pri) {
to_priority.store(*reinterpret_cast<CpuPriority*>(pri));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->SetBackgroundThreads(1, Env::BOTTOM);
env_->SetBackgroundThreads(1, Env::HIGH);
auto RunTask = [&](Env::Priority pool) {
std::atomic<bool> called(false);
env_->Schedule(&SetBool, &called, pool);
for (int i = 0; i < kDelayMicros; i++) {
if (called.load()) {
break;
}
Env::Default()->SleepForMicroseconds(1);
}
ASSERT_TRUE(called.load());
};
{
// Same priority, no-op.
env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM,
CpuPriority::kNormal);
RunTask(Env::Priority::BOTTOM);
ASSERT_EQ(from_priority, CpuPriority::kNormal);
ASSERT_EQ(to_priority, CpuPriority::kNormal);
}
{
// Higher priority, no-op.
env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kHigh);
RunTask(Env::Priority::BOTTOM);
ASSERT_EQ(from_priority, CpuPriority::kNormal);
ASSERT_EQ(to_priority, CpuPriority::kNormal);
}
{
// Lower priority from kNormal -> kLow.
env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kLow);
RunTask(Env::Priority::BOTTOM);
ASSERT_EQ(from_priority, CpuPriority::kNormal);
ASSERT_EQ(to_priority, CpuPriority::kLow);
}
{
// Lower priority from kLow -> kIdle.
env_->LowerThreadPoolCPUPriority(Env::Priority::BOTTOM, CpuPriority::kIdle);
RunTask(Env::Priority::BOTTOM);
ASSERT_EQ(from_priority, CpuPriority::kLow);
ASSERT_EQ(to_priority, CpuPriority::kIdle);
}
{
// Lower priority from kNormal -> kIdle for another pool.
env_->LowerThreadPoolCPUPriority(Env::Priority::HIGH, CpuPriority::kIdle);
RunTask(Env::Priority::HIGH);
ASSERT_EQ(from_priority, CpuPriority::kNormal);
ASSERT_EQ(to_priority, CpuPriority::kIdle);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif
TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {

@ -61,6 +61,13 @@ class FileSystem;
const size_t kDefaultPageSize = 4 * 1024;
enum class CpuPriority {
kIdle = 0,
kLow = 1,
kNormal = 2,
kHigh = 3,
};
// Options while opening a file to read/write
struct EnvOptions {
// Construct with default Options
@ -474,6 +481,13 @@ class Env {
// Lower IO priority for threads from the specified pool.
virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {}
// Lower CPU priority for threads from the specified pool.
virtual Status LowerThreadPoolCPUPriority(Priority /*pool*/,
CpuPriority /*pri*/) {
return Status::NotSupported(
"Env::LowerThreadPoolCPUPriority(Priority, CpuPriority) not supported");
}
// Lower CPU priority for threads from the specified pool.
virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {}
@ -1355,14 +1369,18 @@ class EnvWrapper : public Env {
return target_->IncBackgroundThreadsIfNeeded(num, pri);
}
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
void LowerThreadPoolIOPriority(Priority pool) override {
target_->LowerThreadPoolIOPriority(pool);
}
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
void LowerThreadPoolCPUPriority(Priority pool) override {
target_->LowerThreadPoolCPUPriority(pool);
}
Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
return target_->LowerThreadPoolCPUPriority(pool, pri);
}
std::string TimeToString(uint64_t time) override {
return target_->TimeToString(time);
}

@ -51,13 +51,6 @@ class InternalKeyComparator;
class WalFilter;
class FileSystem;
enum class CpuPriority {
kIdle = 0,
kLow = 1,
kNormal = 2,
kHigh = 3,
};
// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
// being stored in a file. The following enum describes which

@ -57,7 +57,7 @@ struct ThreadPoolImpl::Impl {
void LowerIOPriority();
void LowerCPUPriority();
void LowerCPUPriority(CpuPriority pri);
void WakeUpAllThreads() {
bgsignal_.notify_all();
@ -102,7 +102,7 @@ private:
static void BGThreadWrapper(void* arg);
bool low_io_priority_;
bool low_cpu_priority_;
CpuPriority cpu_priority_;
Env::Priority priority_;
Env* env_;
@ -126,12 +126,9 @@ private:
std::vector<port::Thread> bgthreads_;
};
inline
ThreadPoolImpl::Impl::Impl()
:
low_io_priority_(false),
low_cpu_priority_(false),
inline ThreadPoolImpl::Impl::Impl()
: low_io_priority_(false),
cpu_priority_(CpuPriority::kNormal),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(0),
@ -141,8 +138,7 @@ ThreadPoolImpl::Impl::Impl()
queue_(),
mu_(),
bgsignal_(),
bgthreads_() {
}
bgthreads_() {}
inline
ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
@ -178,15 +174,14 @@ void ThreadPoolImpl::Impl::LowerIOPriority() {
low_io_priority_ = true;
}
inline
void ThreadPoolImpl::Impl::LowerCPUPriority() {
inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
std::lock_guard<std::mutex> lock(mu_);
low_cpu_priority_ = true;
cpu_priority_ = pri;
}
void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false;
bool low_cpu_priority = false;
CpuPriority current_cpu_priority = CpuPriority::kNormal;
while (true) {
// Wait until there is an item that is ready to run
@ -227,16 +222,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_);
bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
CpuPriority cpu_priority = cpu_priority_;
lock.unlock();
#ifdef OS_LINUX
if (decrease_cpu_priority) {
if (cpu_priority < current_cpu_priority) {
TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
&current_cpu_priority);
// 0 means current thread.
port::SetCpuPriority(0, CpuPriority::kLow);
low_cpu_priority = true;
port::SetCpuPriority(0, cpu_priority);
current_cpu_priority = cpu_priority;
TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
&current_cpu_priority);
}
#ifdef OS_LINUX
if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
@ -257,7 +256,6 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
}
#else
(void)decrease_io_priority; // avoid 'unused variable' error
(void)decrease_cpu_priority;
#endif
TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
@ -452,8 +450,8 @@ void ThreadPoolImpl::LowerIOPriority() {
impl_->LowerIOPriority();
}
void ThreadPoolImpl::LowerCPUPriority() {
impl_->LowerCPUPriority();
void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
impl_->LowerCPUPriority(pri);
}
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {

@ -51,7 +51,7 @@ class ThreadPoolImpl : public ThreadPool {
// Make threads to run at a lower kernel CPU priority
// Currently only has effect on Linux
void LowerCPUPriority();
void LowerCPUPriority(CpuPriority pri);
// Ensure there is at aleast num threads in the pool
// but do not kill threads if there are more

Loading…
Cancel
Save