CompactRange() to use bottom pool when goes to bottommost level (#6593)

Summary:
In automatic compaction, if a compaction is bottommost, it goes to bottom thread pool. We should do the same for manual compaction too.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6593

Test Plan: Add a unit test. See all existing tests pass.

Reviewed By: ajkr

Differential Revision: D20637408

fbshipit-source-id: cb03031e8f895085f7acf6d2d65e69e84c9ddef3
main
sdong 5 years ago committed by Facebook GitHub Bot
parent ceeca7542d
commit 6fd0ed4993
  1. 66
      db/db_compaction_test.cc
  2. 7
      db/db_impl/db_impl_compaction_flush.cc
  3. 11
      util/threadpool_imp.cc

@ -593,6 +593,72 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
} }
} }
TEST_F(DBCompactionTest, CompactRangeBottomPri) {
ASSERT_OK(Put(Key(50), ""));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(100), ""));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(200), ""));
ASSERT_OK(Flush());
{
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
dbfull()->CompactRange(cro, nullptr, nullptr);
}
ASSERT_EQ("0,0,3", FilesPerLevel(0));
ASSERT_OK(Put(Key(1), ""));
ASSERT_OK(Put(Key(199), ""));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(2), ""));
ASSERT_OK(Put(Key(199), ""));
ASSERT_OK(Flush());
ASSERT_EQ("2,0,3", FilesPerLevel(0));
// Now we have 2 L0 files, and 3 L2 files, and a manual compaction will
// be triggered.
// Two compaction jobs will run. One compacts 2 L0 files in Low Pri Pool
// and one compact to L2 in bottom pri pool.
int low_pri_count = 0;
int bottom_pri_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"ThreadPoolImpl::Impl::BGThread:BeforeRun", [&](void* arg) {
Env::Priority* pri = reinterpret_cast<Env::Priority*>(arg);
// First time is low pri pool in the test case.
if (low_pri_count == 0 && bottom_pri_count == 0) {
ASSERT_EQ(Env::Priority::LOW, *pri);
}
if (*pri == Env::Priority::LOW) {
low_pri_count++;
} else {
bottom_pri_count++;
}
});
SyncPoint::GetInstance()->EnableProcessing();
env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(1, low_pri_count);
ASSERT_EQ(1, bottom_pri_count);
ASSERT_EQ("0,0,2", FilesPerLevel(0));
// Recompact bottom most level uses bottom pool
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_EQ(1, low_pri_count);
ASSERT_EQ(2, bottom_pri_count);
env_->SetBackgroundThreads(0, Env::Priority::BOTTOM);
dbfull()->CompactRange(cro, nullptr, nullptr);
// Low pri pool is used if bottom pool has size 0.
ASSERT_EQ(2, low_pri_count);
ASSERT_EQ(2, bottom_pri_count);
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, DisableStatsUpdateReopen) { TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
uint64_t db_size[3]; uint64_t db_size[3];
for (int test = 0; test < 2; ++test) { for (int test = 0; test < 2; ++test) {

@ -1510,7 +1510,12 @@ Status DBImpl::RunManualCompaction(
} }
manual.incomplete = false; manual.incomplete = false;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, Env::Priority thread_pool_pri = Env::Priority::LOW;
if (compaction->bottommost_level() &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
thread_pool_pri = Env::Priority::BOTTOM;
}
env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
&DBImpl::UnscheduleCompactionCallback); &DBImpl::UnscheduleCompactionCallback);
scheduled = true; scheduled = true;
} }

@ -9,9 +9,6 @@
#include "util/threadpool_imp.h" #include "util/threadpool_imp.h"
#include "monitoring/thread_status_util.h"
#include "port/port.h"
#ifndef OS_WIN #ifndef OS_WIN
# include <unistd.h> # include <unistd.h>
#endif #endif
@ -31,6 +28,10 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "monitoring/thread_status_util.h"
#include "port/port.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
void ThreadPoolImpl::PthreadCall(const char* label, int result) { void ThreadPoolImpl::PthreadCall(const char* label, int result) {
@ -262,6 +263,10 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
(void)decrease_io_priority; // avoid 'unused variable' error (void)decrease_io_priority; // avoid 'unused variable' error
(void)decrease_cpu_priority; (void)decrease_cpu_priority;
#endif #endif
TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
&priority_);
func(); func();
} }
} }

Loading…
Cancel
Save