From 6cc9aef1623a46f0631c91c704dd9f406f083854 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 23 May 2017 11:04:25 -0700 Subject: [PATCH] New API for background work in single thread pool Summary: Previously users could set `max_background_flushes=0` to force rocksdb to use a single thread pool for both background flushes and compactions. That'll no longer be possible since I'm going to deprecate `max_background_flushes` and `max_background_compactions` in favor of a single option. This diff introduces a new way to force a single thread pool: when high-pri pool has zero threads, all background jobs will be submitted to low-pri pool. Note the majority of the code change is adding `Env::GetBackgroundThreads()`, which is necessary to check whether the user has provided a zero-sized thread pool. Closes https://github.com/facebook/rocksdb/pull/2204 Differential Revision: D4936256 Pulled By: ajkr fbshipit-source-id: 929a07a0c0705f7766f5339cd013ff74e90d6e01 --- HISTORY.md | 3 +++ db/compaction_job_stats_test.cc | 2 -- db/db_compaction_test.cc | 1 - db/db_flush_test.cc | 37 +++++++++++++++++++++++++++++++++ db/db_impl_compaction_flush.cc | 11 +++++----- db/db_properties_test.cc | 3 --- db/db_test.cc | 6 +----- env/env_posix.cc | 5 +++++ hdfs/env_hdfs.h | 5 +++++ include/rocksdb/env.h | 4 ++++ include/rocksdb/threadpool.h | 1 + port/win/env_win.cc | 9 ++++++++ port/win/env_win.h | 2 ++ util/threadpool_imp.cc | 12 ++++++++++- util/threadpool_imp.h | 2 ++ 15 files changed, 86 insertions(+), 17 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 906c5e347..bec79ed71 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Public API Change +*Scheduling flushes and compactions in the same thread pool is no longer supported by setting `max_background_flushes=0`. Instead, users can achieve this by configuring their high-pri thread pool to have zero threads. + ### New Features * Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads. diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 2d3149c93..dc2fc0fef 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -656,7 +656,6 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) { Options options; options.listeners.emplace_back(stats_checker); options.create_if_missing = true; - options.max_background_flushes = 0; // just enough setting to hold off auto-compaction. options.level0_file_num_compaction_trigger = kTestScale + 1; options.num_levels = 3; @@ -878,7 +877,6 @@ TEST_P(CompactionJobStatsTest, DeletionStatsTest) { Options options; options.listeners.emplace_back(stats_checker); options.create_if_missing = true; - options.max_background_flushes = 0; options.level0_file_num_compaction_trigger = kTestScale+1; options.num_levels = 3; options.compression = kNoCompression; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 4ec901bf7..4afd8f167 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1977,7 +1977,6 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { if (iter == 0) { options = CurrentOptions(); - options.max_background_flushes = 0; options.num_levels = 3; options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 31b5a3944..1cd26b59a 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -91,6 +91,43 @@ TEST_F(DBFlushTest, SyncFail) { Destroy(options); } +TEST_F(DBFlushTest, FlushInLowPriThreadPool) { + // Verify setting an empty high-pri (flush) thread pool causes flushes to be + // scheduled in the low-pri (compaction) thread pool. + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.memtable_factory.reset(new SpecialSkipListFactory(1)); + Reopen(options); + env_->SetBackgroundThreads(0, Env::HIGH); + + std::thread::id tid; + int num_flushes = 0, num_compactions = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkFlush", [&](void* arg) { + if (tid == std::thread::id()) { + tid = std::this_thread::get_id(); + } else { + ASSERT_EQ(tid, std::this_thread::get_id()); + } + ++num_flushes; + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkCompaction", [&](void* arg) { + ASSERT_EQ(tid, std::this_thread::get_id()); + ++num_compactions; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key", "val")); + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put("key", "val")); + dbfull()->TEST_WaitForFlushMemTable(); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(4, num_flushes); + ASSERT_EQ(1, num_compactions); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 08b7e9fcb..b57292bcb 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -983,8 +983,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // DB is being deleted; no more background compactions return; } - - while (unscheduled_flushes_ > 0 && + bool is_flush_pool_empty = + env_->GetBackgroundThreads(Env::Priority::HIGH) == 0; + while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) { unscheduled_flushes_--; bg_flush_scheduled_++; @@ -993,9 +994,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { auto bg_compactions_allowed = BGCompactionsAllowed(); - // special case -- if max_background_flushes == 0, then schedule flush on a - // compaction thread - if (immutable_db_options_.max_background_flushes == 0) { + // special case -- if high-pri (flush) thread pool is empty, then schedule + // flushes in low-pri (compaction) thread pool. + if (is_flush_pool_empty) { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_compactions_allowed) { diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index 41a4db82f..210ae1508 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -940,7 +940,6 @@ TEST_F(DBPropertiesTest, EstimateCompressionRatio) { Options options = CurrentOptions(); options.compression_per_level = {kNoCompression, kSnappyCompression}; options.disable_auto_compactions = true; - options.max_background_flushes = 0; options.num_levels = 2; Reopen(options); @@ -1068,7 +1067,6 @@ class CountingDeleteTabPropCollectorFactory TEST_F(DBPropertiesTest, GetUserDefinedTableProperties) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = (1 << 30); - options.max_background_flushes = 0; options.table_properties_collector_factories.resize(1); std::shared_ptr collector_factory = std::make_shared(0); @@ -1109,7 +1107,6 @@ TEST_F(DBPropertiesTest, GetUserDefinedTableProperties) { TEST_F(DBPropertiesTest, UserDefinedTablePropertiesContext) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 3; - options.max_background_flushes = 0; options.table_properties_collector_factories.resize(1); std::shared_ptr collector_factory = std::make_shared(1); diff --git a/db/db_test.cc b/db/db_test.cc index 723ba01ab..756c01c2a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1441,7 +1441,6 @@ TEST_F(DBTest, UnremovableSingleDelete) { #ifndef ROCKSDB_LITE TEST_F(DBTest, DeletionMarkers1) { Options options = CurrentOptions(); - options.max_background_flushes = 0; CreateAndReopenWithCF({"pikachu"}, options); Put(1, "foo", "v1"); ASSERT_OK(Flush(1)); @@ -3075,9 +3074,9 @@ TEST_F(DBTest, DynamicMemtableOptions) { Env::Priority::LOW); // Start from scratch and disable compaction/flush. Flush can only happen // during compaction but trigger is pretty high - options.max_background_flushes = 0; options.disable_auto_compactions = true; DestroyAndReopen(options); + env_->SetBackgroundThreads(0, Env::HIGH); // Put until writes are stopped, bounded by 256 puts. We should see stop at // ~128KB @@ -3354,7 +3353,6 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) { TEST_P(DBTestWithParam, PreShutdownManualCompaction) { Options options = CurrentOptions(); - options.max_background_flushes = 0; options.max_subcompactions = max_subcompactions_; CreateAndReopenWithCF({"pikachu"}, options); @@ -3393,7 +3391,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) { if (iter == 0) { options = CurrentOptions(); - options.max_background_flushes = 0; options.num_levels = 3; options.create_if_missing = true; DestroyAndReopen(options); @@ -3404,7 +3401,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) { TEST_F(DBTest, PreShutdownFlush) { Options options = CurrentOptions(); - options.max_background_flushes = 0; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(1, "key", "value")); CancelAllBackgroundWork(db_); diff --git a/env/env_posix.cc b/env/env_posix.cc index 7d726176a..bdd1d9018 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -763,6 +763,11 @@ class PosixEnv : public Env { thread_pools_[pri].SetBackgroundThreads(num); } + virtual int GetBackgroundThreads(Priority pri) override { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); + } + // Allow increasing the number of worker threads. virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { assert(pri >= Priority::LOW && pri <= Priority::HIGH); diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index be5fab7d6..41fa9881f 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -151,6 +151,10 @@ class HdfsEnv : public Env { posixEnv->SetBackgroundThreads(number, pri); } + virtual int GetBackgroundThreads(Priority pri = LOW) { + return posixEnv->GetBackgroundThreads(pri); + } + virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { posixEnv->IncBackgroundThreadsIfNeeded(number, pri); } @@ -358,6 +362,7 @@ class HdfsEnv : public Env { } virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {} + virtual int GetBackgroundThreads(Priority pri = LOW) override { return 0; } virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { } virtual std::string TimeToString(uint64_t number) override { return ""; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 670a7e546..e9f5bca4b 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -364,6 +364,7 @@ class Env { // for this environment. 'LOW' is the default pool. // default number: 1 virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; + virtual int GetBackgroundThreads(Priority pri = LOW) = 0; // Enlarge number of background worker threads of a specific thread pool // for this environment if it is smaller than specified. 'LOW' is the default @@ -1027,6 +1028,9 @@ class EnvWrapper : public Env { void SetBackgroundThreads(int num, Priority pri) override { return target_->SetBackgroundThreads(num, pri); } + int GetBackgroundThreads(Priority pri) override { + return target_->GetBackgroundThreads(pri); + } void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { return target_->IncBackgroundThreadsIfNeeded(num, pri); diff --git a/include/rocksdb/threadpool.h b/include/rocksdb/threadpool.h index b8d3bac06..3711276d8 100644 --- a/include/rocksdb/threadpool.h +++ b/include/rocksdb/threadpool.h @@ -31,6 +31,7 @@ class ThreadPool { // Set the number of background threads that will be executing the // scheduled jobs. virtual void SetBackgroundThreads(int num) = 0; + virtual int GetBackgroundThreads() = 0; // Get the number of jobs scheduled in the ThreadPool queue. virtual unsigned int GetQueueLen() const = 0; diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 3b56545b7..1b97f756a 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -879,6 +879,11 @@ void WinEnvThreads::SetBackgroundThreads(int num, Env::Priority pri) { thread_pools_[pri].SetBackgroundThreads(num); } +int WinEnvThreads::GetBackgroundThreads(Env::Priority pri) { + assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); +} + void WinEnvThreads::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { assert(pri >= Env::Priority::LOW && pri <= Env::Priority::HIGH); thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); @@ -1056,6 +1061,10 @@ void WinEnv::SetBackgroundThreads(int num, Env::Priority pri) { return winenv_threads_.SetBackgroundThreads(num, pri); } +int WinEnv::GetBackgroundThreads(Env::Priority pri) { + return winenv_threads_.GetBackgroundThreads(pri); +} + void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) { return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri); } diff --git a/port/win/env_win.h b/port/win/env_win.h index 75164c231..e2a93d5ec 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -66,6 +66,7 @@ public: // Allow increasing the number of worker threads. void SetBackgroundThreads(int num, Env::Priority pri); + int GetBackgroundThreads(Env::Priority pri); void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri); @@ -276,6 +277,7 @@ public: // Allow increasing the number of worker threads. void SetBackgroundThreads(int num, Env::Priority pri) override; + int GetBackgroundThreads(Env::Priority pri) override; void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override; diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index 4c6f81126..c7603ef1d 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -47,6 +47,7 @@ struct ThreadPoolImpl::Impl { void JoinThreads(bool wait_for_jobs_to_complete); void SetBackgroundThreadsInternal(int num, bool allow_reduce); + int GetBackgroundThreads(); unsigned int GetQueueLen() const { return queue_len_.load(std::memory_order_relaxed); @@ -275,12 +276,17 @@ void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, } if (num > total_threads_limit_ || (num < total_threads_limit_ && allow_reduce)) { - total_threads_limit_ = std::max(1, num); + total_threads_limit_ = std::max(0, num); WakeUpAllThreads(); StartBGThreads(); } } +int ThreadPoolImpl::Impl::GetBackgroundThreads() { + std::unique_lock lock(mu_); + return total_threads_limit_; +} + void ThreadPoolImpl::Impl::StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { @@ -384,6 +390,10 @@ void ThreadPoolImpl::SetBackgroundThreads(int num) { impl_->SetBackgroundThreadsInternal(num, true); } +int ThreadPoolImpl::GetBackgroundThreads() { + return impl_->GetBackgroundThreads(); +} + unsigned int ThreadPoolImpl::GetQueueLen() const { return impl_->GetQueueLen(); } diff --git a/util/threadpool_imp.h b/util/threadpool_imp.h index 3433e5fab..65199e786 100644 --- a/util/threadpool_imp.h +++ b/util/threadpool_imp.h @@ -38,6 +38,8 @@ class ThreadPoolImpl : public ThreadPool { // Set the number of background threads that will be executing the // scheduled jobs. void SetBackgroundThreads(int num) override; + int GetBackgroundThreads() override; + // Get the number of jobs scheduled in the ThreadPool queue. unsigned int GetQueueLen() const override;