Adhere to per-DB concurrency limit when bottom-pri compactions exist (#9179)

Summary:
- Fixed bug where bottom-pri manual compactions were counting towards `bg_compaction_scheduled_` instead of `bg_bottom_compaction_scheduled_`. It seems to have no negative effect.
- Fixed bug where automatic compaction scheduling did not consider `bg_bottom_compaction_scheduled_`. Now automatic compactions cannot be scheduled that exceed the per-DB compaction concurrency limit (`max_compactions`) when some existing compactions are bottommost.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9179

Test Plan: new unit test for manual/automatic. Also verified the existing automatic/automatic test ("ConcurrentBottomPriLowPriCompactions") hanged until changing it to explicitly enable concurrency.

Reviewed By: riversand963

Differential Revision: D32488048

Pulled By: ajkr

fbshipit-source-id: 20c4c0693678e81e43f85ed3cc3402fcf26e3310
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent 4a7c1dc375
commit 8cf4294e25
  1. 1
      HISTORY.md
  2. 59
      db/db_compaction_test.cc
  3. 22
      db/db_impl/db_impl_compaction_flush.cc
  4. 5
      db/db_universal_compaction_test.cc
  5. 4
      include/rocksdb/options.h

@ -20,6 +20,7 @@
* Fixed a bug of timestamp-based GC which can cause all versions of a key under full_history_ts_low to be dropped. This bug will be triggered when some of the ikeys' timestamps are lower than full_history_ts_low, while others are newer. * Fixed a bug of timestamp-based GC which can cause all versions of a key under full_history_ts_low to be dropped. This bug will be triggered when some of the ikeys' timestamps are lower than full_history_ts_low, while others are newer.
* In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before. * In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before.
* Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.) * Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.)
* Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`.
### Behavior Changes ### Behavior Changes
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.

@ -7000,6 +7000,65 @@ TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) {
refit_level_thread.join(); refit_level_thread.join();
} }
TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) {
// Flushes several files to trigger compaction while lock is released during
// a bottom-pri compaction. Verifies it does not get scheduled to thread pool
// because per-DB limit for compaction parallelism is one (default).
const int kNumL0Files = 4;
const int kNumLevels = 3;
env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// Setup last level to be non-empty since it's a bit unclear whether
// compaction to an empty level would be considered "bottommost".
ASSERT_OK(Put(Key(0), "val"));
ASSERT_OK(Flush());
MoveFilesToLevel(kNumLevels - 1);
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkBottomCompaction",
"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
"PreTriggerCompaction"},
{"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
"PostTriggerCompaction",
"BackgroundCallCompaction:0"}});
SyncPoint::GetInstance()->EnableProcessing();
port::Thread compact_range_thread([&] {
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
cro.exclusive_manual_compaction = false;
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
});
// Sleep in the low-pri thread so any newly scheduled compaction will be
// queued. Otherwise it might finish before we check its existence.
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
TEST_SYNC_POINT(
"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
"PreTriggerCompaction");
for (int i = 0; i < kNumL0Files; ++i) {
ASSERT_OK(Put(Key(0), "val"));
ASSERT_OK(Flush());
}
ASSERT_EQ(0u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
TEST_SYNC_POINT(
"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
"PostTriggerCompaction");
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
compact_range_thread.join();
}
#endif // !defined(ROCKSDB_LITE) #endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -1843,7 +1843,6 @@ Status DBImpl::RunManualCompaction(
} }
ca = new CompactionArg; ca = new CompactionArg;
ca->db = this; ca->db = this;
ca->compaction_pri_ = Env::Priority::LOW;
ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction; ca->prepicked_compaction->compaction = compaction;
@ -1853,14 +1852,19 @@ Status DBImpl::RunManualCompaction(
assert(false); assert(false);
} }
manual.incomplete = false; manual.incomplete = false;
bg_compaction_scheduled_++;
Env::Priority thread_pool_pri = Env::Priority::LOW;
if (compaction->bottommost_level() && if (compaction->bottommost_level() &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
thread_pool_pri = Env::Priority::BOTTOM; bg_bottom_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::BOTTOM;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
Env::Priority::BOTTOM, this,
&DBImpl::UnscheduleCompactionCallback);
} else {
bg_compaction_scheduled_++;
ca->compaction_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCompactionCallback);
} }
env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
&DBImpl::UnscheduleCompactionCallback);
scheduled = true; scheduled = true;
} }
} }
@ -2375,7 +2379,8 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return; return;
} }
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions && while (bg_compaction_scheduled_ + bg_bottom_compaction_scheduled_ <
bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) { unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg; CompactionArg* ca = new CompactionArg;
ca->db = this; ca->db = this;
@ -2564,8 +2569,7 @@ void DBImpl::BGWorkBottomCompaction(void* arg) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction"); TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
auto* prepicked_compaction = ca.prepicked_compaction; auto* prepicked_compaction = ca.prepicked_compaction;
assert(prepicked_compaction && prepicked_compaction->compaction && assert(prepicked_compaction && prepicked_compaction->compaction);
!prepicked_compaction->manual_compaction_state);
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM); ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
delete prepicked_compaction; delete prepicked_compaction;
} }

@ -1680,6 +1680,7 @@ TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.max_background_compactions = 2;
options.num_levels = num_levels_; options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB options.target_file_size_base = 32 << 10; // 32KB
@ -1688,6 +1689,10 @@ TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
options.compaction_options_universal.max_size_amplification_percent = 110; options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options); DestroyAndReopen(options);
// Need to get a token to enable compaction parallelism up to
// `max_background_compactions` jobs.
auto pressure_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{// wait for the full compaction to be picked before adding files intended {// wait for the full compaction to be picked before adding files intended
// for the second one. // for the second one.

@ -663,7 +663,7 @@ struct DBOptions {
// Dynamically changeable through SetDBOptions() API. // Dynamically changeable through SetDBOptions() API.
int base_background_compactions = -1; int base_background_compactions = -1;
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the // DEPRECATED: RocksDB automatically decides this based on the
// value of max_background_jobs. For backwards compatibility we will set // value of max_background_jobs. For backwards compatibility we will set
// `max_background_jobs = max_background_compactions + max_background_flushes` // `max_background_jobs = max_background_compactions + max_background_flushes`
// in the case where user sets at least one of `max_background_compactions` or // in the case where user sets at least one of `max_background_compactions` or
@ -689,7 +689,7 @@ struct DBOptions {
// Dynamically changeable through SetDBOptions() API. // Dynamically changeable through SetDBOptions() API.
uint32_t max_subcompactions = 1; uint32_t max_subcompactions = 1;
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the // DEPRECATED: RocksDB automatically decides this based on the
// value of max_background_jobs. For backwards compatibility we will set // value of max_background_jobs. For backwards compatibility we will set
// `max_background_jobs = max_background_compactions + max_background_flushes` // `max_background_jobs = max_background_compactions + max_background_flushes`
// in the case where user sets at least one of `max_background_compactions` or // in the case where user sets at least one of `max_background_compactions` or

Loading…
Cancel
Save