From cc01985db09b3f8ebb2ba971aa505abd77fa6345 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 3 Aug 2017 15:36:28 -0700 Subject: [PATCH] Introduce bottom-pri thread pool for large universal compactions Summary: When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs. This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions. - Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads. - Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default. - Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic. - Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`). - Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free. Closes https://github.com/facebook/rocksdb/pull/2580 Differential Revision: D5422916 Pulled By: ajkr fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333 --- HISTORY.md | 1 + db/db_impl.cc | 11 ++- db/db_impl.h | 36 ++++++-- db/db_impl_compaction_flush.cc | 143 ++++++++++++++++++++--------- db/db_impl_debug.cc | 4 +- db/db_universal_compaction_test.cc | 97 +++++++++++++++++++ db/version_set.cc | 8 ++ db/version_set.h | 1 + env/env_posix.cc | 12 +-- env/env_test.cc | 14 +-- include/rocksdb/env.h | 2 +- memtable/inlineskiplist_test.cc | 1 + memtable/skiplist_test.cc | 1 + tools/db_bench_tool.cc | 6 ++ util/threadpool_imp.cc | 6 +- 15 files changed, 273 insertions(+), 70 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 53ff4b9cf..7c71fdd16 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features * Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators. * Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1. +* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`. ### Bug Fixes * Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`. diff --git a/db/db_impl.cc b/db/db_impl.cc index bfe38302f..86bb4a433 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) last_batch_group_size_(0), unscheduled_flushes_(0), unscheduled_compactions_(0), + bg_bottom_compaction_scheduled_(0), bg_compaction_scheduled_(0), num_running_compactions_(0), bg_flush_scheduled_(0), @@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { return; } // Wait for background work to finish - while (bg_compaction_scheduled_ || bg_flush_scheduled_) { + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_) { bg_cv_.Wait(); } } @@ -252,15 +254,18 @@ DBImpl::~DBImpl() { // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) CancelAllBackgroundWork(false); + int bottom_compactions_unscheduled = + env_->UnSchedule(this, Env::Priority::BOTTOM); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); mutex_.Lock(); + bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; bg_compaction_scheduled_ -= compactions_unscheduled; bg_flush_scheduled_ -= flushes_unscheduled; // Wait for background work to finish - while (bg_compaction_scheduled_ || bg_flush_scheduled_ || - bg_purge_scheduled_) { + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_ || bg_purge_scheduled_) { TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); bg_cv_.Wait(); } diff --git a/db/db_impl.h b/db/db_impl.h index 3284048a6..d89ea50ca 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -658,6 +658,7 @@ class DBImpl : public DB { } }; + struct PrepickedCompaction; struct PurgeFileInfo; // Recover the descriptor from persistent storage. May do a significant @@ -799,14 +800,19 @@ class DBImpl : public DB { void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, uint32_t path_id, int job_id); static void BGWorkCompaction(void* arg); + // Runs a pre-chosen universal compaction involving bottom level in a + // separate, bottom-pri thread pool. + static void BGWorkBottomCompaction(void* arg); static void BGWorkFlush(void* db); static void BGWorkPurge(void* arg); static void UnscheduleCallback(void* arg); - void BackgroundCallCompaction(void* arg); + void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, + Env::Priority bg_thread_pri); void BackgroundCallFlush(); void BackgroundCallPurge(); Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer, void* m = 0); + LogBuffer* log_buffer, + PrepickedCompaction* prepicked_compaction); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); @@ -1059,6 +1065,10 @@ class DBImpl : public DB { int unscheduled_flushes_; int unscheduled_compactions_; + // count how many background compactions are running or have been scheduled in + // the BOTTOM pool + int bg_bottom_compaction_scheduled_; + // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; @@ -1075,7 +1085,7 @@ class DBImpl : public DB { int bg_purge_scheduled_; // Information for a manual compaction - struct ManualCompaction { + struct ManualCompactionState { ColumnFamilyData* cfd; int input_level; int output_level; @@ -1091,13 +1101,21 @@ class DBImpl : public DB { InternalKey* manual_end; // how far we are compacting InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress + }; + struct PrepickedCompaction { + // background compaction takes ownership of `compaction`. Compaction* compaction; + // caller retains ownership of `manual_compaction_state` as it is reused + // across background compactions. + ManualCompactionState* manual_compaction_state; // nullptr if non-manual }; - std::deque manual_compaction_dequeue_; + std::deque manual_compaction_dequeue_; struct CompactionArg { + // caller retains ownership of `db`. DBImpl* db; - ManualCompaction* m; + // background compaction takes ownership of `prepicked_compaction`. + PrepickedCompaction* prepicked_compaction; }; // Have we encountered a background error in paranoid mode? @@ -1231,11 +1249,11 @@ class DBImpl : public DB { bool HasPendingManualCompaction(); bool HasExclusiveManualCompaction(); - void AddManualCompaction(ManualCompaction* m); - void RemoveManualCompaction(ManualCompaction* m); - bool ShouldntRunManualCompaction(ManualCompaction* m); + void AddManualCompaction(ManualCompactionState* m); + void RemoveManualCompaction(ManualCompactionState* m); + bool ShouldntRunManualCompaction(ManualCompactionState* m); bool HaveManualCompaction(ColumnFamilyData* cfd); - bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); + bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 68d283123..3e686fe70 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl( Status DBImpl::PauseBackgroundWork() { InstrumentedMutexLock guard_lock(&mutex_); bg_compaction_paused_++; - while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) { + while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || + bg_flush_scheduled_ > 0) { bg_cv_.Wait(); } bg_work_paused_++; @@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, bool scheduled = false; bool manual_conflict = false; - ManualCompaction manual; + ManualCompactionState manual; manual.cfd = cfd; manual.input_level = input_level; manual.output_level = output_level; @@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, AddManualCompaction(&manual); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); if (exclusive) { - while (bg_compaction_scheduled_ > 0) { + while (bg_bottom_compaction_scheduled_ > 0 || + bg_compaction_scheduled_ > 0) { TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); ROCKS_LOG_INFO( immutable_db_options_.info_log, @@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, while (!manual.done) { assert(HasPendingManualCompaction()); manual_conflict = false; + Compaction* compaction; if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || scheduled || - ((manual.manual_end = &manual.tmp_storage1)&&( - (manual.compaction = manual.cfd->CompactRange( - *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, - manual.output_level, manual.output_path_id, manual.begin, - manual.end, &manual.manual_end, &manual_conflict)) == - nullptr) && + ((manual.manual_end = &manual.tmp_storage1) && + ((compaction = manual.cfd->CompactRange( + *manual.cfd->GetLatestMutableCFOptions(), manual.input_level, + manual.output_level, manual.output_path_id, manual.begin, + manual.end, &manual.manual_end, &manual_conflict)) == nullptr) && manual_conflict)) { // exclusive manual compactions should not see a conflict during // CompactRange @@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.incomplete = false; } } else if (!scheduled) { - if (manual.compaction == nullptr) { + if (compaction == nullptr) { manual.done = true; bg_cv_.SignalAll(); continue; } ca = new CompactionArg; ca->db = this; - ca->m = &manual; + ca->prepicked_compaction = new PrepickedCompaction; + ca->prepicked_compaction->manual_compaction_state = &manual; + ca->prepicked_compaction->compaction = compaction; manual.incomplete = false; bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, @@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; - ca->m = nullptr; + ca->prepicked_compaction = nullptr; bg_compaction_scheduled_++; unscheduled_compactions_--; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, @@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) { delete reinterpret_cast(arg); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); - reinterpret_cast(ca.db)->BackgroundCallCompaction(ca.m); + auto prepicked_compaction = + static_cast(ca.prepicked_compaction); + reinterpret_cast(ca.db)->BackgroundCallCompaction( + prepicked_compaction, Env::Priority::LOW); + delete prepicked_compaction; +} + +void DBImpl::BGWorkBottomCompaction(void* arg) { + CompactionArg ca = *(static_cast(arg)); + delete static_cast(arg); + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); + TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); + auto* prepicked_compaction = ca.prepicked_compaction; + assert(prepicked_compaction && prepicked_compaction->compaction && + !prepicked_compaction->manual_compaction_state); + ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); + delete prepicked_compaction; } void DBImpl::BGWorkPurge(void* db) { @@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) { void DBImpl::UnscheduleCallback(void* arg) { CompactionArg ca = *(reinterpret_cast(arg)); delete reinterpret_cast(arg); - if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) { - delete ca.m->compaction; + if (ca.prepicked_compaction != nullptr) { + if (ca.prepicked_compaction->compaction != nullptr) { + delete ca.prepicked_compaction->compaction; + } + delete ca.prepicked_compaction; } TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); } @@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() { } } -void DBImpl::BackgroundCallCompaction(void* arg) { +void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, + Env::Priority bg_thread_pri) { bool made_progress = false; - ManualCompaction* m = reinterpret_cast(arg); JobContext job_context(next_job_id_.fetch_add(1), true); TEST_SYNC_POINT("BackgroundCallCompaction:0"); MaybeDumpStats(); @@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) { auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - assert(bg_compaction_scheduled_); - Status s = - BackgroundCompaction(&made_progress, &job_context, &log_buffer, m); + assert((bg_thread_pri == Env::Priority::BOTTOM && + bg_bottom_compaction_scheduled_) || + (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); + Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, + prepicked_compaction); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in @@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) { assert(num_running_compactions_ > 0); num_running_compactions_--; - bg_compaction_scheduled_--; + if (bg_thread_pri == Env::Priority::LOW) { + bg_compaction_scheduled_--; + } else { + assert(bg_thread_pri == Env::Priority::BOTTOM); + bg_bottom_compaction_scheduled_--; + } versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); // See if there's more work to be done MaybeScheduleFlushOrCompaction(); - if (made_progress || bg_compaction_scheduled_ == 0 || + if (made_progress || + (bg_compaction_scheduled_ == 0 && + bg_bottom_compaction_scheduled_ == 0) || HasPendingManualCompaction()) { // signal if // * made_progress -- need to wakeup DelayWrite - // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl + // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * HasPendingManualCompaction -- need to wakeup RunManualCompaction // If none of this is true, there is no need to signal since nobody is // waiting for it @@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) { Status DBImpl::BackgroundCompaction(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer, void* arg) { - ManualCompaction* manual_compaction = - reinterpret_cast(arg); + LogBuffer* log_buffer, + PrepickedCompaction* prepicked_compaction) { + ManualCompactionState* manual_compaction = + prepicked_compaction == nullptr + ? nullptr + : prepicked_compaction->manual_compaction_state; *made_progress = false; mutex_.AssertHeld(); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); bool is_manual = (manual_compaction != nullptr); + unique_ptr c; + if (prepicked_compaction != nullptr && + prepicked_compaction->compaction != nullptr) { + c.reset(prepicked_compaction->compaction); + } + bool is_prepicked = is_manual || c; // (manual_compaction->in_progress == false); bool trivial_move_disallowed = @@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->status = status; manual_compaction->done = true; manual_compaction->in_progress = false; - delete manual_compaction->compaction; manual_compaction = nullptr; } return status; @@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->in_progress = true; } - unique_ptr c; // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; if (is_manual) { - ManualCompaction* m = manual_compaction; + ManualCompactionState* m = manual_compaction; assert(m->in_progress); - c.reset(std::move(m->compaction)); if (!c) { m->done = true; m->manual_end = nullptr; @@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ? "(end)" : m->manual_end->DebugString().c_str())); } - } else if (!compaction_queue_.empty()) { + } else if (!is_prepicked && !compaction_queue_.empty()) { if (HaveManualCompaction(compaction_queue_.front())) { // Can't compact right now, but try again later TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); @@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); + } else if (c->column_family_data()->ioptions()->compaction_style == + kCompactionStyleUniversal && + !is_prepicked && c->output_level() > 0 && + c->output_level() == + c->column_family_data() + ->current() + ->storage_info() + ->MaxOutputLevel( + immutable_db_options_.allow_ingest_behind) && + env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { + // Forward universal compactions involving last level to the bottom pool + // if it exists, such that long-running compactions can't block short- + // lived ones, like L0->L0s. + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); + CompactionArg* ca = new CompactionArg; + ca->db = this; + ca->prepicked_compaction = new PrepickedCompaction; + ca->prepicked_compaction->compaction = c.release(); + ca->prepicked_compaction->manual_compaction_state = nullptr; + ++bg_bottom_compaction_scheduled_; + env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, + this, &DBImpl::UnscheduleCallback); } else { int output_level __attribute__((unused)) = c->output_level(); TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", @@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } if (is_manual) { - ManualCompaction* m = manual_compaction; + ManualCompactionState* m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true; @@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() { return (!manual_compaction_dequeue_.empty()); } -void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) { +void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) { manual_compaction_dequeue_.push_back(m); } -void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { +void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) { // Remove from queue - std::deque::iterator it = + std::deque::iterator it = manual_compaction_dequeue_.begin(); while (it != manual_compaction_dequeue_.end()) { if (m == (*it)) { @@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { return; } -bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { +bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) { if (num_running_ingest_file_ > 0) { // We need to wait for other IngestExternalFile() calls to finish // before running a manual compaction. return true; } if (m->exclusive) { - return (bg_compaction_scheduled_ > 0); + return (bg_bottom_compaction_scheduled_ > 0 || + bg_compaction_scheduled_ > 0); } - std::deque::iterator it = + std::deque::iterator it = manual_compaction_dequeue_.begin(); bool seen = false; while (it != manual_compaction_dequeue_.end()) { @@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { // Remove from priority queue - std::deque::iterator it = + std::deque::iterator it = manual_compaction_dequeue_.begin(); while (it != manual_compaction_dequeue_.end()) { if ((*it)->exclusive) { @@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { bool DBImpl::HasExclusiveManualCompaction() { // Remove from priority queue - std::deque::iterator it = + std::deque::iterator it = manual_compaction_dequeue_.begin(); while (it != manual_compaction_dequeue_.end()) { if ((*it)->exclusive) { @@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() { return false; } -bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) { +bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { if ((m->exclusive) || (m1->exclusive)) { return true; } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 9f4fccabc..de5b66f2a 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -112,7 +112,9 @@ Status DBImpl::TEST_WaitForCompact() { // OR flush to finish. InstrumentedMutexLock l(&mutex_); - while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) { + while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_) && + bg_error_.ok()) { bg_cv_.Wait(); } return bg_error_; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index c6334f8e0..ca7ebac8e 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1370,6 +1370,103 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { Destroy(options); } +TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) { + const int kNumFilesTrigger = 3; + Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); + for (bool allow_ingest_behind : {false, true}) { + Options options = CurrentOptions(); + options.allow_ingest_behind = allow_ingest_behind; + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels_; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB + options.level0_file_num_compaction_trigger = kNumFilesTrigger; + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + DestroyAndReopen(options); + + int num_bottom_pri_compactions = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkBottomCompaction", + [&](void* arg) { ++num_bottom_pri_compactions; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int num = 0; num < kNumFilesTrigger; num++) { + ASSERT_EQ(NumSortedRuns(), num); + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + } + dbfull()->TEST_WaitForCompact(); + + if (allow_ingest_behind || num_levels_ > 1) { + // allow_ingest_behind increases number of levels while sanitizing. + ASSERT_EQ(1, num_bottom_pri_compactions); + } else { + // for single-level universal, everything's bottom level so nothing should + // be executed in bottom-pri thread pool. + ASSERT_EQ(0, num_bottom_pri_compactions); + } + // Verify that size amplification did occur + ASSERT_EQ(NumSortedRuns(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + +TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) { + if (num_levels_ == 1) { + // for single-level universal, everything's bottom level so nothing should + // be executed in bottom-pri thread pool. + return; + } + const int kNumFilesTrigger = 3; + Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels_; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB + options.level0_file_num_compaction_trigger = kNumFilesTrigger; + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + DestroyAndReopen(options); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {// wait for the full compaction to be picked before adding files intended + // for the second one. + {"DBImpl::BackgroundCompaction:ForwardToBottomPriPool", + "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"}, + // the full (bottom-pri) compaction waits until a partial (low-pri) + // compaction has started to verify they can run in parallel. + {"DBImpl::BackgroundCompaction:NonTrivial", + "DBImpl::BGWorkBottomCompaction"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int i = 0; i < 2; ++i) { + for (int num = 0; num < kNumFilesTrigger; num++) { + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx, true /* no_wait */); + // use no_wait above because that one waits for flush and compaction. We + // don't want to wait for compaction because the full compaction is + // intentionally blocked while more files are flushed. + dbfull()->TEST_WaitForFlushMemTable(); + } + if (i == 0) { + TEST_SYNC_POINT( + "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"); + } + } + dbfull()->TEST_WaitForCompact(); + + // First compaction should output to bottom level. Second should output to L0 + // since older L0 files pending compaction prevent it from being placed lower. + ASSERT_EQ(NumSortedRuns(), 2); + ASSERT_GT(NumTableFilesAtLevel(0), 0); + ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, ::testing::Combine(::testing::Values(1, 3, 5), ::testing::Bool())); diff --git a/db/version_set.cc b/db/version_set.cc index f8465027b..6b9611aa9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1227,6 +1227,14 @@ int VersionStorageInfo::MaxInputLevel() const { return 0; } +int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const { + if (allow_ingest_behind) { + assert(num_levels() > 1); + return num_levels() - 2; + } + return num_levels() - 1; +} + void VersionStorageInfo::EstimateCompactionBytesNeeded( const MutableCFOptions& mutable_cf_options) { // Only implemented for level-based compaction diff --git a/db/version_set.h b/db/version_set.h index 5a1f8d07d..9fb000c05 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -147,6 +147,7 @@ class VersionStorageInfo { } int MaxInputLevel() const; + int MaxOutputLevel(bool allow_ingest_behind) const; // Return level number that has idx'th highest score int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; } diff --git a/env/env_posix.cc b/env/env_posix.cc index 7f2bc3b85..5a671d72f 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -761,23 +761,23 @@ class PosixEnv : public Env { // Allow increasing the number of worker threads. virtual void SetBackgroundThreads(int num, Priority pri) override { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); thread_pools_[pri].SetBackgroundThreads(num); } virtual int GetBackgroundThreads(Priority pri) override { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); return thread_pools_[pri].GetBackgroundThreads(); } // Allow increasing the number of worker threads. virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); } virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override { - assert(pool >= Priority::LOW && pool <= Priority::HIGH); + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); #ifdef OS_LINUX thread_pools_[pool].LowerIOPriority(); #endif @@ -883,7 +883,7 @@ PosixEnv::PosixEnv() void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, void* tag, void (*unschedFunction)(void* arg)) { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } @@ -892,7 +892,7 @@ int PosixEnv::UnSchedule(void* arg, Priority pri) { } unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { - assert(pri >= Priority::LOW && pri <= Priority::HIGH); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); return thread_pools_[pri].GetQueueLen(); } diff --git a/env/env_test.cc b/env/env_test.cc index 7fd71a3c4..9ec2f142e 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -125,12 +125,14 @@ static void SetBool(void* ptr) { reinterpret_cast*>(ptr)->store(true); } -TEST_P(EnvPosixTestWithParam, RunImmediately) { - std::atomic called(false); - env_->Schedule(&SetBool, &called); - Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_TRUE(called.load()); - WaitThreadPoolsEmpty(); +TEST_F(EnvPosixTest, RunImmediately) { + for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) { + std::atomic called(false); + env_->SetBackgroundThreads(1, static_cast(pri)); + env_->Schedule(&SetBool, &called, static_cast(pri)); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(called.load()); + } } TEST_P(EnvPosixTestWithParam, UnSchedule) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 869073899..e2efbdc15 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -283,7 +283,7 @@ class Env { virtual Status UnlockFile(FileLock* lock) = 0; // Priority for scheduling job in thread pool - enum Priority { LOW, HIGH, TOTAL }; + enum Priority { BOTTOM, LOW, HIGH, TOTAL }; // Priority for requesting bytes in rate limiter scheduler enum IOPriority { diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc index 46d6c0fa9..5803e5b0f 100644 --- a/memtable/inlineskiplist_test.cc +++ b/memtable/inlineskiplist_test.cc @@ -571,6 +571,7 @@ static void RunConcurrentRead(int run) { fprintf(stderr, "Run %d of %d\n", i, N); } TestState state(seed + 1); + Env::Default()->SetBackgroundThreads(1); Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); for (int k = 0; k < kSize; ++k) { diff --git a/memtable/skiplist_test.cc b/memtable/skiplist_test.cc index 2f4af1788..50c3588bb 100644 --- a/memtable/skiplist_test.cc +++ b/memtable/skiplist_test.cc @@ -363,6 +363,7 @@ static void RunConcurrent(int run) { fprintf(stderr, "Run %d of %d\n", i, N); } TestState state(seed + 1); + Env::Default()->SetBackgroundThreads(1); Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); for (int k = 0; k < kSize; k++) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f02216259..6dfff771f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -318,6 +318,10 @@ DEFINE_int32(max_background_jobs, "The maximum number of concurrent background jobs that can occur " "in parallel."); +DEFINE_int32(num_bottom_pri_threads, 0, + "The number of threads in the bottom-priority thread pool (used " + "by universal compaction only)."); + DEFINE_int32(max_background_compactions, rocksdb::Options().max_background_compactions, "The maximum number of concurrent background compactions" @@ -5242,6 +5246,8 @@ int db_bench_tool(int argc, char** argv) { FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes, rocksdb::Env::Priority::HIGH); + FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads, + rocksdb::Env::Priority::BOTTOM); // Choose a location for the test database if none given with --db= if (FLAGS_db.empty()) { diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index aa40ab9cd..f38e6422b 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -123,11 +123,11 @@ private: inline ThreadPoolImpl::Impl::Impl() - : + : low_io_priority_(false), priority_(Env::LOW), env_(nullptr), - total_threads_limit_(1), + total_threads_limit_(0), queue_len_(), exit_all_threads_(false), wait_for_jobs_to_complete_(false), @@ -372,7 +372,7 @@ int ThreadPoolImpl::Impl::UnSchedule(void* arg) { return count; } -ThreadPoolImpl::ThreadPoolImpl() : +ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) { }