diff --git a/HISTORY.md b/HISTORY.md index 26f3c5919..73a309d7e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,7 @@ * Fixed a bug of timestamp-based GC which can cause all versions of a key under full_history_ts_low to be dropped. This bug will be triggered when some of the ikeys' timestamps are lower than full_history_ts_low, while others are newer. * In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before. * Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.) +* Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`. ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 370cf9318..a2fe771d9 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -7000,6 +7000,65 @@ TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) { refit_level_thread.join(); } +TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) { + // Flushes several files to trigger compaction while lock is released during + // a bottom-pri compaction. Verifies it does not get scheduled to thread pool + // because per-DB limit for compaction parallelism is one (default). + const int kNumL0Files = 4; + const int kNumLevels = 3; + + env_->SetBackgroundThreads(1, Env::Priority::BOTTOM); + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // Setup last level to be non-empty since it's a bit unclear whether + // compaction to an empty level would be considered "bottommost". + ASSERT_OK(Put(Key(0), "val")); + ASSERT_OK(Flush()); + MoveFilesToLevel(kNumLevels - 1); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkBottomCompaction", + "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:" + "PreTriggerCompaction"}, + {"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:" + "PostTriggerCompaction", + "BackgroundCallCompaction:0"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread compact_range_thread([&] { + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + cro.exclusive_manual_compaction = false; + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + }); + + // Sleep in the low-pri thread so any newly scheduled compaction will be + // queued. Otherwise it might finish before we check its existence. + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + TEST_SYNC_POINT( + "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:" + "PreTriggerCompaction"); + for (int i = 0; i < kNumL0Files; ++i) { + ASSERT_OK(Put(Key(0), "val")); + ASSERT_OK(Flush()); + } + ASSERT_EQ(0u, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + TEST_SYNC_POINT( + "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:" + "PostTriggerCompaction"); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + compact_range_thread.join(); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 17bdb027f..4a5f4a646 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1843,7 +1843,6 @@ Status DBImpl::RunManualCompaction( } ca = new CompactionArg; ca->db = this; - ca->compaction_pri_ = Env::Priority::LOW; ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; @@ -1853,14 +1852,19 @@ Status DBImpl::RunManualCompaction( assert(false); } manual.incomplete = false; - bg_compaction_scheduled_++; - Env::Priority thread_pool_pri = Env::Priority::LOW; if (compaction->bottommost_level() && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { - thread_pool_pri = Env::Priority::BOTTOM; + bg_bottom_compaction_scheduled_++; + ca->compaction_pri_ = Env::Priority::BOTTOM; + env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, + Env::Priority::BOTTOM, this, + &DBImpl::UnscheduleCompactionCallback); + } else { + bg_compaction_scheduled_++; + ca->compaction_pri_ = Env::Priority::LOW; + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + &DBImpl::UnscheduleCompactionCallback); } - env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this, - &DBImpl::UnscheduleCompactionCallback); scheduled = true; } } @@ -2375,7 +2379,8 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { return; } - while (bg_compaction_scheduled_ < bg_job_limits.max_compactions && + while (bg_compaction_scheduled_ + bg_bottom_compaction_scheduled_ < + bg_job_limits.max_compactions && unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; @@ -2564,8 +2569,7 @@ void DBImpl::BGWorkBottomCompaction(void* arg) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); auto* prepicked_compaction = ca.prepicked_compaction; - assert(prepicked_compaction && prepicked_compaction->compaction && - !prepicked_compaction->manual_compaction_state); + assert(prepicked_compaction && prepicked_compaction->compaction); ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); delete prepicked_compaction; } diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 610341bed..95e3e6609 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1680,6 +1680,7 @@ TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) { Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; + options.max_background_compactions = 2; options.num_levels = num_levels_; options.write_buffer_size = 100 << 10; // 100KB options.target_file_size_base = 32 << 10; // 32KB @@ -1688,6 +1689,10 @@ TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) { options.compaction_options_universal.max_size_amplification_percent = 110; DestroyAndReopen(options); + // Need to get a token to enable compaction parallelism up to + // `max_background_compactions` jobs. + auto pressure_token = + dbfull()->TEST_write_controler().GetCompactionPressureToken(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {// wait for the full compaction to be picked before adding files intended // for the second one. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8f49950c7..84e9071c0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -663,7 +663,7 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. int base_background_compactions = -1; - // NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the + // DEPRECATED: RocksDB automatically decides this based on the // value of max_background_jobs. For backwards compatibility we will set // `max_background_jobs = max_background_compactions + max_background_flushes` // in the case where user sets at least one of `max_background_compactions` or @@ -689,7 +689,7 @@ struct DBOptions { // Dynamically changeable through SetDBOptions() API. uint32_t max_subcompactions = 1; - // NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the + // DEPRECATED: RocksDB automatically decides this based on the // value of max_background_jobs. For backwards compatibility we will set // `max_background_jobs = max_background_compactions + max_background_flushes` // in the case where user sets at least one of `max_background_compactions` or