From 3d7dc75b3646a0f7484d5ecb7437856e0d8be3c7 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 14 May 2018 14:44:04 -0700 Subject: [PATCH] Bottommost level-based compactions in bottom-pri pool Summary: This feature was introduced for universal compaction in cc01985d. At that point we thought it'd be used only to prevent long-running universal full compactions from blocking short-lived upper-level compactions. Now we have a level compaction user who could benefit from it since they use more expensive compression algorithm in the bottom level. So enable it for level. Closes https://github.com/facebook/rocksdb/pull/3835 Differential Revision: D7957179 Pulled By: ajkr fbshipit-source-id: 177285d2cef3b650b6a4d81dc5db84bc441c9fe4 --- HISTORY.md | 1 + db/db_compaction_test.cc | 42 ++++++++++++++++++++++++++++ db/db_impl_compaction_flush.cc | 10 +++---- db/db_universal_compaction_test.cc | 44 ------------------------------ 4 files changed, 47 insertions(+), 50 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index bbe2d0203..7e32e73f7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,7 @@ * Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage. * Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks. * Fsync parent directory after deleting a file in delete scheduler. +* In level-based compaction, if bottom-pri thread pool was setup via `Env::SetBackgroundThreads()`, compactions to the bottom level will be delegated to that thread pool. ### Bug Fixes * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3cc02ecbe..7092afa37 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3128,6 +3128,48 @@ TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) { ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound()); } +TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) { + const int kNumFilesTrigger = 3; + Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); + for (bool use_universal_compaction : {false, true}) { + Options options = CurrentOptions(); + if (use_universal_compaction) { + options.compaction_style = kCompactionStyleUniversal; + } else { + options.compaction_style = kCompactionStyleLevel; + options.level_compaction_dynamic_level_bytes = true; + } + options.num_levels = 4; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB + options.level0_file_num_compaction_trigger = kNumFilesTrigger; + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + DestroyAndReopen(options); + + int num_bottom_pri_compactions = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkBottomCompaction", + [&](void* /*arg*/) { ++num_bottom_pri_compactions; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int num = 0; num < kNumFilesTrigger; num++) { + ASSERT_EQ(NumSortedRuns(), num); + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + } + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(1, num_bottom_pri_compactions); + + // Verify that size amplification did occur + ASSERT_EQ(NumSortedRuns(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } + Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); +} + TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) { // Deletions can be dropped when compacted to non-last level if they fall // outside the lower-level files' key-ranges. diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 9c43b6ab9..34d870e8d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -1866,9 +1866,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); - } else if (c->column_family_data()->ioptions()->compaction_style == - kCompactionStyleUniversal && - !is_prepicked && c->output_level() > 0 && + } else if (!is_prepicked && c->output_level() > 0 && c->output_level() == c->column_family_data() ->current() @@ -1876,9 +1874,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ->MaxOutputLevel( immutable_db_options_.allow_ingest_behind) && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { - // Forward universal compactions involving last level to the bottom pool - // if it exists, such that long-running compactions can't block short- - // lived ones, like L0->L0s. + // Forward compactions involving last level to the bottom pool if it exists, + // such that compactions unlikely to contribute to write stalls can be + // delayed or deprioritized. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); CompactionArg* ca = new CompactionArg; ca->db = this; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 09727c690..d9dcf03e7 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1680,50 +1680,6 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { Destroy(options); } -TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) { - const int kNumFilesTrigger = 3; - Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); - for (bool allow_ingest_behind : {false, true}) { - Options options = CurrentOptions(); - options.allow_ingest_behind = allow_ingest_behind; - options.compaction_style = kCompactionStyleUniversal; - options.num_levels = num_levels_; - options.write_buffer_size = 100 << 10; // 100KB - options.target_file_size_base = 32 << 10; // 32KB - options.level0_file_num_compaction_trigger = kNumFilesTrigger; - // Trigger compaction if size amplification exceeds 110% - options.compaction_options_universal.max_size_amplification_percent = 110; - DestroyAndReopen(options); - - int num_bottom_pri_compactions = 0; - SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BGWorkBottomCompaction", - [&](void* /*arg*/) { ++num_bottom_pri_compactions; }); - SyncPoint::GetInstance()->EnableProcessing(); - - Random rnd(301); - for (int num = 0; num < kNumFilesTrigger; num++) { - ASSERT_EQ(NumSortedRuns(), num); - int key_idx = 0; - GenerateNewFile(&rnd, &key_idx); - } - dbfull()->TEST_WaitForCompact(); - - if (allow_ingest_behind || num_levels_ > 1) { - // allow_ingest_behind increases number of levels while sanitizing. - ASSERT_EQ(1, num_bottom_pri_compactions); - } else { - // for single-level universal, everything's bottom level so nothing should - // be executed in bottom-pri thread pool. - ASSERT_EQ(0, num_bottom_pri_compactions); - } - // Verify that size amplification did occur - ASSERT_EQ(NumSortedRuns(), 1); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - } - Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); -} - TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) { if (num_levels_ == 1) { // for single-level universal, everything's bottom level so nothing should