From 6fd0ed499384273ea1d6a79e78f2b00b2f9bc981 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 24 Mar 2020 20:20:33 -0700 Subject: [PATCH] CompactRange() to use bottom pool when goes to bottommost level (#6593) Summary: In automatic compaction, if a compaction is bottommost, it goes to bottom thread pool. We should do the same for manual compaction too. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6593 Test Plan: Add a unit test. See all existing tests pass. Reviewed By: ajkr Differential Revision: D20637408 fbshipit-source-id: cb03031e8f895085f7acf6d2d65e69e84c9ddef3 --- db/db_compaction_test.cc | 66 ++++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 7 ++- util/threadpool_imp.cc | 11 +++-- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7807d4737..554a02cbc 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -593,6 +593,72 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) { } } +TEST_F(DBCompactionTest, CompactRangeBottomPri) { + ASSERT_OK(Put(Key(50), "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(Key(100), "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(Key(200), "")); + ASSERT_OK(Flush()); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + dbfull()->CompactRange(cro, nullptr, nullptr); + } + ASSERT_EQ("0,0,3", FilesPerLevel(0)); + + ASSERT_OK(Put(Key(1), "")); + ASSERT_OK(Put(Key(199), "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(Key(2), "")); + ASSERT_OK(Put(Key(199), "")); + ASSERT_OK(Flush()); + ASSERT_EQ("2,0,3", FilesPerLevel(0)); + + // Now we have 2 L0 files, and 3 L2 files, and a manual compaction will + // be triggered. + // Two compaction jobs will run. One compacts 2 L0 files in Low Pri Pool + // and one compact to L2 in bottom pri pool. + int low_pri_count = 0; + int bottom_pri_count = 0; + SyncPoint::GetInstance()->SetCallBack( + "ThreadPoolImpl::Impl::BGThread:BeforeRun", [&](void* arg) { + Env::Priority* pri = reinterpret_cast(arg); + // First time is low pri pool in the test case. + if (low_pri_count == 0 && bottom_pri_count == 0) { + ASSERT_EQ(Env::Priority::LOW, *pri); + } + if (*pri == Env::Priority::LOW) { + low_pri_count++; + } else { + bottom_pri_count++; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + env_->SetBackgroundThreads(1, Env::Priority::BOTTOM); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_EQ(1, low_pri_count); + ASSERT_EQ(1, bottom_pri_count); + ASSERT_EQ("0,0,2", FilesPerLevel(0)); + + // Recompact bottom most level uses bottom pool + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_EQ(1, low_pri_count); + ASSERT_EQ(2, bottom_pri_count); + + env_->SetBackgroundThreads(0, Env::Priority::BOTTOM); + dbfull()->CompactRange(cro, nullptr, nullptr); + // Low pri pool is used if bottom pool has size 0. + ASSERT_EQ(2, low_pri_count); + ASSERT_EQ(2, bottom_pri_count); + + SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBCompactionTest, DisableStatsUpdateReopen) { uint64_t db_size[3]; for (int test = 0; test < 2; ++test) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4fd00ccce..ac9a97a75 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1510,7 +1510,12 @@ Status DBImpl::RunManualCompaction( } manual.incomplete = false; bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + Env::Priority thread_pool_pri = Env::Priority::LOW; + if (compaction->bottommost_level() && + env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { + thread_pool_pri = Env::Priority::BOTTOM; + } + env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this, &DBImpl::UnscheduleCompactionCallback); scheduled = true; } diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index bf216c956..713af02e2 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -9,9 +9,6 @@ #include "util/threadpool_imp.h" -#include "monitoring/thread_status_util.h" -#include "port/port.h" - #ifndef OS_WIN # include #endif @@ -31,6 +28,10 @@ #include #include +#include "monitoring/thread_status_util.h" +#include "port/port.h" +#include "test_util/sync_point.h" + namespace ROCKSDB_NAMESPACE { void ThreadPoolImpl::PthreadCall(const char* label, int result) { @@ -262,6 +263,10 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { (void)decrease_io_priority; // avoid 'unused variable' error (void)decrease_cpu_priority; #endif + + TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun", + &priority_); + func(); } }