New API for background work in single thread pool

Summary:
Previously users could set `max_background_flushes=0` to force rocksdb to use a single thread pool for both background flushes and compactions. That'll no longer be possible since I'm going to deprecate `max_background_flushes` and `max_background_compactions` in favor of a single option. This diff introduces a new way to force a single thread pool: when high-pri pool has zero threads, all background jobs will be submitted to low-pri pool.

Note the majority of the code change is adding `Env::GetBackgroundThreads()`, which is necessary to check whether the user has provided a zero-sized thread pool.
Closes https://github.com/facebook/rocksdb/pull/2204

Differential Revision: D4936256

Pulled By: ajkr

fbshipit-source-id: 929a07a0c0705f7766f5339cd013ff74e90d6e01
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 9d0a07ed52
commit 6cc9aef162
  1. 3
      HISTORY.md
  2. 2
      db/compaction_job_stats_test.cc
  3. 1
      db/db_compaction_test.cc
  4. 37
      db/db_flush_test.cc
  5. 11
      db/db_impl_compaction_flush.cc
  6. 3
      db/db_properties_test.cc
  7. 6
      db/db_test.cc
  8. 5
      env/env_posix.cc
  9. 5
      hdfs/env_hdfs.h
  10. 4
      include/rocksdb/env.h
  11. 1
      include/rocksdb/threadpool.h
  12. 9
      port/win/env_win.cc
  13. 2
      port/win/env_win.h
  14. 12
      util/threadpool_imp.cc
  15. 2
      util/threadpool_imp.h

@ -1,5 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Public API Change
*Scheduling flushes and compactions in the same thread pool is no longer supported by setting `max_background_flushes=0`. Instead, users can achieve this by configuring their high-pri thread pool to have zero threads.
### New Features ### New Features
* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads. * Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.

@ -656,7 +656,6 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
Options options; Options options;
options.listeners.emplace_back(stats_checker); options.listeners.emplace_back(stats_checker);
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0;
// just enough setting to hold off auto-compaction. // just enough setting to hold off auto-compaction.
options.level0_file_num_compaction_trigger = kTestScale + 1; options.level0_file_num_compaction_trigger = kTestScale + 1;
options.num_levels = 3; options.num_levels = 3;
@ -878,7 +877,6 @@ TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
Options options; Options options;
options.listeners.emplace_back(stats_checker); options.listeners.emplace_back(stats_checker);
options.create_if_missing = true; options.create_if_missing = true;
options.max_background_flushes = 0;
options.level0_file_num_compaction_trigger = kTestScale+1; options.level0_file_num_compaction_trigger = kTestScale+1;
options.num_levels = 3; options.num_levels = 3;
options.compression = kNoCompression; options.compression = kNoCompression;

@ -1977,7 +1977,6 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) {
if (iter == 0) { if (iter == 0) {
options = CurrentOptions(); options = CurrentOptions();
options.max_background_flushes = 0;
options.num_levels = 3; options.num_levels = 3;
options.create_if_missing = true; options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics(); options.statistics = rocksdb::CreateDBStatistics();

@ -91,6 +91,43 @@ TEST_F(DBFlushTest, SyncFail) {
Destroy(options); Destroy(options);
} }
TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
// Verify setting an empty high-pri (flush) thread pool causes flushes to be
// scheduled in the low-pri (compaction) thread pool.
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
std::thread::id tid;
int num_flushes = 0, num_compactions = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkFlush", [&](void* arg) {
if (tid == std::thread::id()) {
tid = std::this_thread::get_id();
} else {
ASSERT_EQ(tid, std::this_thread::get_id());
}
++num_flushes;
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkCompaction", [&](void* arg) {
ASSERT_EQ(tid, std::this_thread::get_id());
++num_compactions;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "val"));
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put("key", "val"));
dbfull()->TEST_WaitForFlushMemTable();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(4, num_flushes);
ASSERT_EQ(1, num_compactions);
}
TEST_P(DBFlushDirectIOTest, DirectIO) { TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;

@ -983,8 +983,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
return; return;
} }
bool is_flush_pool_empty =
while (unscheduled_flushes_ > 0 && env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) { bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) {
unscheduled_flushes_--; unscheduled_flushes_--;
bg_flush_scheduled_++; bg_flush_scheduled_++;
@ -993,9 +994,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
auto bg_compactions_allowed = BGCompactionsAllowed(); auto bg_compactions_allowed = BGCompactionsAllowed();
// special case -- if max_background_flushes == 0, then schedule flush on a // special case -- if high-pri (flush) thread pool is empty, then schedule
// compaction thread // flushes in low-pri (compaction) thread pool.
if (immutable_db_options_.max_background_flushes == 0) { if (is_flush_pool_empty) {
while (unscheduled_flushes_ > 0 && while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_compactions_allowed) { bg_compactions_allowed) {

@ -940,7 +940,6 @@ TEST_F(DBPropertiesTest, EstimateCompressionRatio) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compression_per_level = {kNoCompression, kSnappyCompression}; options.compression_per_level = {kNoCompression, kSnappyCompression};
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.max_background_flushes = 0;
options.num_levels = 2; options.num_levels = 2;
Reopen(options); Reopen(options);
@ -1068,7 +1067,6 @@ class CountingDeleteTabPropCollectorFactory
TEST_F(DBPropertiesTest, GetUserDefinedTableProperties) { TEST_F(DBPropertiesTest, GetUserDefinedTableProperties) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = (1 << 30); options.level0_file_num_compaction_trigger = (1 << 30);
options.max_background_flushes = 0;
options.table_properties_collector_factories.resize(1); options.table_properties_collector_factories.resize(1);
std::shared_ptr<CountingUserTblPropCollectorFactory> collector_factory = std::shared_ptr<CountingUserTblPropCollectorFactory> collector_factory =
std::make_shared<CountingUserTblPropCollectorFactory>(0); std::make_shared<CountingUserTblPropCollectorFactory>(0);
@ -1109,7 +1107,6 @@ TEST_F(DBPropertiesTest, GetUserDefinedTableProperties) {
TEST_F(DBPropertiesTest, UserDefinedTablePropertiesContext) { TEST_F(DBPropertiesTest, UserDefinedTablePropertiesContext) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
options.max_background_flushes = 0;
options.table_properties_collector_factories.resize(1); options.table_properties_collector_factories.resize(1);
std::shared_ptr<CountingUserTblPropCollectorFactory> collector_factory = std::shared_ptr<CountingUserTblPropCollectorFactory> collector_factory =
std::make_shared<CountingUserTblPropCollectorFactory>(1); std::make_shared<CountingUserTblPropCollectorFactory>(1);

@ -1441,7 +1441,6 @@ TEST_F(DBTest, UnremovableSingleDelete) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBTest, DeletionMarkers1) { TEST_F(DBTest, DeletionMarkers1) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "v1"); Put(1, "foo", "v1");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
@ -3075,9 +3074,9 @@ TEST_F(DBTest, DynamicMemtableOptions) {
Env::Priority::LOW); Env::Priority::LOW);
// Start from scratch and disable compaction/flush. Flush can only happen // Start from scratch and disable compaction/flush. Flush can only happen
// during compaction but trigger is pretty high // during compaction but trigger is pretty high
options.max_background_flushes = 0;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
DestroyAndReopen(options); DestroyAndReopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
// Put until writes are stopped, bounded by 256 puts. We should see stop at // Put until writes are stopped, bounded by 256 puts. We should see stop at
// ~128KB // ~128KB
@ -3354,7 +3353,6 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
TEST_P(DBTestWithParam, PreShutdownManualCompaction) { TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
options.max_subcompactions = max_subcompactions_; options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -3393,7 +3391,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
if (iter == 0) { if (iter == 0) {
options = CurrentOptions(); options = CurrentOptions();
options.max_background_flushes = 0;
options.num_levels = 3; options.num_levels = 3;
options.create_if_missing = true; options.create_if_missing = true;
DestroyAndReopen(options); DestroyAndReopen(options);
@ -3404,7 +3401,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
TEST_F(DBTest, PreShutdownFlush) { TEST_F(DBTest, PreShutdownFlush) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "value")); ASSERT_OK(Put(1, "key", "value"));
CancelAllBackgroundWork(db_); CancelAllBackgroundWork(db_);

5
env/env_posix.cc vendored

@ -763,6 +763,11 @@ class PosixEnv : public Env {
thread_pools_[pri].SetBackgroundThreads(num); thread_pools_[pri].SetBackgroundThreads(num);
} }
virtual int GetBackgroundThreads(Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
return thread_pools_[pri].GetBackgroundThreads();
}
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::LOW && pri <= Priority::HIGH);

@ -151,6 +151,10 @@ class HdfsEnv : public Env {
posixEnv->SetBackgroundThreads(number, pri); posixEnv->SetBackgroundThreads(number, pri);
} }
virtual int GetBackgroundThreads(Priority pri = LOW) {
return posixEnv->GetBackgroundThreads(pri);
}
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri); posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
} }
@ -358,6 +362,7 @@ class HdfsEnv : public Env {
} }
virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {} virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {}
virtual int GetBackgroundThreads(Priority pri = LOW) override { return 0; }
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
} }
virtual std::string TimeToString(uint64_t number) override { return ""; } virtual std::string TimeToString(uint64_t number) override { return ""; }

@ -364,6 +364,7 @@ class Env {
// for this environment. 'LOW' is the default pool. // for this environment. 'LOW' is the default pool.
// default number: 1 // default number: 1
virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0;
virtual int GetBackgroundThreads(Priority pri = LOW) = 0;
// Enlarge number of background worker threads of a specific thread pool // Enlarge number of background worker threads of a specific thread pool
// for this environment if it is smaller than specified. 'LOW' is the default // for this environment if it is smaller than specified. 'LOW' is the default
@ -1027,6 +1028,9 @@ class EnvWrapper : public Env {
void SetBackgroundThreads(int num, Priority pri) override { void SetBackgroundThreads(int num, Priority pri) override {
return target_->SetBackgroundThreads(num, pri); return target_->SetBackgroundThreads(num, pri);
} }
int GetBackgroundThreads(Priority pri) override {
return target_->GetBackgroundThreads(pri);
}
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
return target_->IncBackgroundThreadsIfNeeded(num, pri); return target_->IncBackgroundThreadsIfNeeded(num, pri);

@ -31,6 +31,7 @@ class ThreadPool {
// Set the number of background threads that will be executing the // Set the number of background threads that will be executing the
// scheduled jobs. // scheduled jobs.
virtual void SetBackgroundThreads(int num) = 0; virtual void SetBackgroundThreads(int num) = 0;
virtual int GetBackgroundThreads() = 0;
// Get the number of jobs scheduled in the ThreadPool queue. // Get the number of jobs scheduled in the ThreadPool queue.
virtual unsigned int GetQueueLen() const = 0; virtual unsigned int GetQueueLen() const = 0;

@ -879,6 +879,11 @@ void WinEnvThreads::SetBackgroundThreads(int num, Env::Priority pri) {
thread_pools_[pri].SetBackgroundThreads(num); thread_pools_[pri].SetBackgroundThreads(num);
} }
int WinEnvThreads::GetBackgroundThreads(Env::Priority pri) {
assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH);
return thread_pools_[pri].GetBackgroundThreads();
}
void WinEnvThreads::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { void WinEnvThreads::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) {
assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
@ -1056,6 +1061,10 @@ void WinEnv::SetBackgroundThreads(int num, Env::Priority pri) {
return winenv_threads_.SetBackgroundThreads(num, pri); return winenv_threads_.SetBackgroundThreads(num, pri);
} }
int WinEnv::GetBackgroundThreads(Env::Priority pri) {
return winenv_threads_.GetBackgroundThreads(pri);
}
void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) {
return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri); return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri);
} }

@ -66,6 +66,7 @@ public:
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Env::Priority pri); void SetBackgroundThreads(int num, Env::Priority pri);
int GetBackgroundThreads(Env::Priority pri);
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri); void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri);
@ -276,6 +277,7 @@ public:
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Env::Priority pri) override; void SetBackgroundThreads(int num, Env::Priority pri) override;
int GetBackgroundThreads(Env::Priority pri) override;
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override; void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override;

@ -47,6 +47,7 @@ struct ThreadPoolImpl::Impl {
void JoinThreads(bool wait_for_jobs_to_complete); void JoinThreads(bool wait_for_jobs_to_complete);
void SetBackgroundThreadsInternal(int num, bool allow_reduce); void SetBackgroundThreadsInternal(int num, bool allow_reduce);
int GetBackgroundThreads();
unsigned int GetQueueLen() const { unsigned int GetQueueLen() const {
return queue_len_.load(std::memory_order_relaxed); return queue_len_.load(std::memory_order_relaxed);
@ -275,12 +276,17 @@ void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
} }
if (num > total_threads_limit_ || if (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce)) { (num < total_threads_limit_ && allow_reduce)) {
total_threads_limit_ = std::max(1, num); total_threads_limit_ = std::max(0, num);
WakeUpAllThreads(); WakeUpAllThreads();
StartBGThreads(); StartBGThreads();
} }
} }
int ThreadPoolImpl::Impl::GetBackgroundThreads() {
std::unique_lock<std::mutex> lock(mu_);
return total_threads_limit_;
}
void ThreadPoolImpl::Impl::StartBGThreads() { void ThreadPoolImpl::Impl::StartBGThreads() {
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
@ -384,6 +390,10 @@ void ThreadPoolImpl::SetBackgroundThreads(int num) {
impl_->SetBackgroundThreadsInternal(num, true); impl_->SetBackgroundThreadsInternal(num, true);
} }
int ThreadPoolImpl::GetBackgroundThreads() {
return impl_->GetBackgroundThreads();
}
unsigned int ThreadPoolImpl::GetQueueLen() const { unsigned int ThreadPoolImpl::GetQueueLen() const {
return impl_->GetQueueLen(); return impl_->GetQueueLen();
} }

@ -38,6 +38,8 @@ class ThreadPoolImpl : public ThreadPool {
// Set the number of background threads that will be executing the // Set the number of background threads that will be executing the
// scheduled jobs. // scheduled jobs.
void SetBackgroundThreads(int num) override; void SetBackgroundThreads(int num) override;
int GetBackgroundThreads() override;
// Get the number of jobs scheduled in the ThreadPool queue. // Get the number of jobs scheduled in the ThreadPool queue.
unsigned int GetQueueLen() const override; unsigned int GetQueueLen() const override;

Loading…
Cancel
Save