Add options.base_background_compactions as a number of compaction threads for low compaction debt

Summary:
If options.base_background_compactions is given, we try to schedule number of compactions not existing this number, only when L0 files increase to certain number, or pending compaction bytes more than certain threshold, we schedule compactions based on options.max_background_compactions.

The watermarks are calculated based on slowdown thresholds.

Test Plan:
Add new test cases in column_family_test.
Adding more unit tests.

Reviewers: IslamAbdelRahman, yhchiang, kradhakrishnan, rven, anthony

Reviewed By: anthony

Subscribers: leveldb, dhruba, yoshinorim

Differential Revision: https://reviews.facebook.net/D53409
main
Venkatesh Radhakrishnan 9 years ago committed by sdong
parent 6ee38bb15c
commit 3b2a1ddd2e
  1. 49
      db/column_family.cc
  2. 142
      db/column_family_test.cc
  3. 117
      db/db_compaction_test.cc
  4. 28
      db/db_impl.cc
  5. 4
      db/db_impl.h
  6. 12
      db/write_controller.cc
  7. 18
      db/write_controller.h
  8. 11
      include/rocksdb/options.h
  9. 5
      util/options.cc
  10. 3
      util/options_helper.h
  11. 45
      util/options_test.cc

@ -239,6 +239,17 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
result.level0_slowdown_writes_trigger, result.level0_slowdown_writes_trigger,
result.level0_file_num_compaction_trigger); result.level0_file_num_compaction_trigger);
} }
if (result.soft_pending_compaction_bytes_limit == 0) {
result.soft_pending_compaction_bytes_limit =
result.hard_pending_compaction_bytes_limit;
} else if (result.hard_pending_compaction_bytes_limit > 0 &&
result.soft_pending_compaction_bytes_limit >
result.hard_pending_compaction_bytes_limit) {
result.soft_pending_compaction_bytes_limit =
result.hard_pending_compaction_bytes_limit;
}
if (result.level_compaction_dynamic_level_bytes) { if (result.level_compaction_dynamic_level_bytes) {
if (result.compaction_style != kCompactionStyleLevel || if (result.compaction_style != kCompactionStyleLevel ||
db_options.db_paths.size() > 1U) { db_options.db_paths.size() > 1U) {
@ -513,6 +524,21 @@ std::unique_ptr<WriteControllerToken> SetupDelay(
} }
return write_controller->GetDelayToken(write_rate); return write_controller->GetDelayToken(write_rate);
} }
int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
int level0_slowdown_writes_trigger) {
// SanitizeOptions() ensures it.
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
// 1/4 of the way between L0 compaction trigger threshold and slowdown
// condition.
// Or twice as compaction trigger, if it is smaller.
return std::min(level0_file_num_compaction_trigger * 2,
level0_file_num_compaction_trigger +
(level0_slowdown_writes_trigger -
level0_file_num_compaction_trigger) /
4);
}
} // namespace } // namespace
void ColumnFamilyData::RecalculateWriteStallConditions( void ColumnFamilyData::RecalculateWriteStallConditions(
@ -598,6 +624,29 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64 " rate %" PRIu64, "bytes %" PRIu64 " rate %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes(), name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (vstorage->l0_delay_trigger_count() >=
GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_file_num_compaction_trigger,
mutable_cf_options.level0_slowdown_writes_trigger)) {
write_controller_token_ = write_controller->GetCompactionPressureToken();
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because we have %d level-0 "
"files ",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
// Increase compaction threads if bytes needed for compaction exceeds
// 1/4 of threshold for slowing down.
// If soft pending compaction byte limit is not set, always speed up
// compaction.
write_controller_token_ = write_controller->GetCompactionPressureToken();
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because of estimated pending "
"compaction "
"bytes %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
}
} else { } else {
write_controller_token_.reset(); write_controller_token_.reset();
} }

@ -2137,6 +2137,9 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 810000u; const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate; db_options_.delayed_write_rate = kBaseRate;
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"}); Open({"default"});
ColumnFamilyData* cfd = ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd(); static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
@ -2162,6 +2165,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(400); vstorage->TEST_set_estimated_compaction_needed_bytes(400);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2169,6 +2173,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(500); vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2224,6 +2229,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(3001); vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2248,6 +2254,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(101); vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2320,6 +2327,73 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
} }
TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
// Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 36;
mutable_cf_options.level0_stop_writes_trigger = 50;
// Speedup threshold = 200 / 4 = 50
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(45);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(6);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
// Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 16;
mutable_cf_options.level0_stop_writes_trigger = 30;
vstorage->set_l0_delay_trigger_count(5);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(3);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
}
TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
const uint64_t kBaseRate = 810000u; const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate; db_options_.delayed_write_rate = kBaseRate;
@ -2401,6 +2475,74 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
} }
TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
column_family_options_.soft_pending_compaction_bytes_limit = 200;
column_family_options_.hard_pending_compaction_bytes_limit = 2000;
Open();
CreateColumnFamilies({"one"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
ColumnFamilyData* cfd1 =
static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
// Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 36;
mutable_cf_options.level0_stop_writes_trigger = 30;
// Speedup threshold = 200 / 4 = 50
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
MutableCFOptions mutable_cf_options1 = mutable_cf_options;
mutable_cf_options1.level0_slowdown_writes_trigger = 16;
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(60);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(20);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->set_l0_delay_trigger_count(2);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(0);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -533,6 +533,104 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
} }
TEST_F(DBCompactionTest, BGCompactionsAllowed) {
// Create several column families. Make compaction triggers in all of them
// and see number of compactions scheduled to be less than allowed.
const int kNumKeysPerFile = 100;
Options options;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
options.num_levels = 3;
// Should speed up compaction when there are 4 files.
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 20;
options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
options.base_background_compactions = 1;
options.max_background_compactions = 3;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
options = CurrentOptions(options);
// Block all threads in thread pool.
const size_t kTotalTasks = 4;
env_->SetBackgroundThreads(4, Env::LOW);
test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
for (size_t i = 0; i < kTotalTasks; i++) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i], Env::Priority::LOW);
sleeping_tasks[i].WaitUntilSleeping();
}
CreateAndReopenWithCF({"one", "two", "three"}, options);
Random rnd(301);
for (int cf = 0; cf < 4; cf++) {
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(cf, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
// Now all column families qualify compaction but only one should be
// scheduled, because no column family hits speed up condition.
ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
// Create two more files for one column family, which triggers speed up
// condition, three compactions will be scheduled.
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(2, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(2, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
NumTableFilesAtLevel(0, 2));
}
ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
// Unblock all threads to unblock all compactions.
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
dbfull()->TEST_WaitForCompact();
// Verify number of compactions allowed will come back to 1.
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].Reset();
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i], Env::Priority::LOW);
sleeping_tasks[i].WaitUntilSleeping();
}
for (int cf = 0; cf < 4; cf++) {
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(cf, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
// Now all column families qualify compaction but only one should be
// scheduled, because no column family hits speed up condition.
ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
}
TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
Options options; Options options;
options.write_buffer_size = 100000000; // Large write buffer options.write_buffer_size = 100000000; // Large write buffer
@ -2198,6 +2296,25 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
Destroy(options); Destroy(options);
} }
TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
Options options = CurrentOptions();
options.max_background_compactions = 5;
options.soft_pending_compaction_bytes_limit = 0;
options.hard_pending_compaction_bytes_limit = 100;
options.create_if_missing = true;
DestroyAndReopen(options);
ASSERT_EQ(5, db_->GetOptions().base_background_compactions);
ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
options.base_background_compactions = 4;
options.max_background_compactions = 3;
options.soft_pending_compaction_bytes_limit = 200;
options.hard_pending_compaction_bytes_limit = 150;
DestroyAndReopen(options);
ASSERT_EQ(3, db_->GetOptions().base_background_compactions);
ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
}
// This tests for a bug that could cause two level0 compactions running // This tests for a bug that could cause two level0 compactions running
// concurrently // concurrently
// TODO(aekmekji): Make sure that the reason this fails when run with // TODO(aekmekji): Make sure that the reason this fails when run with

@ -146,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.info_log = nullptr; result.info_log = nullptr;
} }
} }
if (result.base_background_compactions == -1) {
result.base_background_compactions = result.max_background_compactions;
}
if (result.base_background_compactions > result.max_background_compactions) {
result.base_background_compactions = result.max_background_compactions;
}
result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
Env::Priority::LOW); Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
@ -2448,12 +2454,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
} }
auto bg_compactions_allowed = BGCompactionsAllowed();
// special case -- if max_background_flushes == 0, then schedule flush on a // special case -- if max_background_flushes == 0, then schedule flush on a
// compaction thread // compaction thread
if (db_options_.max_background_flushes == 0) { if (db_options_.max_background_flushes == 0) {
while (unscheduled_flushes_ > 0 && while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_flush_scheduled_ + bg_compaction_scheduled_ <
db_options_.max_background_compactions) { bg_compactions_allowed) {
unscheduled_flushes_--; unscheduled_flushes_--;
bg_flush_scheduled_++; bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
@ -2466,7 +2474,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return; return;
} }
while (bg_compaction_scheduled_ < db_options_.max_background_compactions && while (bg_compaction_scheduled_ < bg_compactions_allowed &&
unscheduled_compactions_ > 0) { unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg; CompactionArg* ca = new CompactionArg;
ca->db = this; ca->db = this;
@ -2478,6 +2486,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
} }
int DBImpl::BGCompactionsAllowed() const {
if (write_controller_.NeedSpeedupCompaction()) {
return db_options_.max_background_compactions;
} else {
return db_options_.base_background_compactions;
}
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
assert(!cfd->pending_compaction()); assert(!cfd->pending_compaction());
cfd->Ref(); cfd->Ref();
@ -2590,10 +2606,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogToBuffer( LogToBuffer(
log_buffer, log_buffer,
"Calling FlushMemTableToOutputFile with column " "Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots available %d", "family [%s], flush slots available %d, compaction slots allowed %d, "
cfd->GetName().c_str(), "compaction slots scheduled %d",
db_options_.max_background_flushes - bg_flush_scheduled_, cfd->GetName().c_str(), db_options_.max_background_flushes,
db_options_.max_background_compactions - bg_compaction_scheduled_); bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer); job_context, log_buffer);
if (cfd->Unref()) { if (cfd->Unref()) {

@ -347,6 +347,10 @@ class DBImpl : public DB {
#endif // NDEBUG #endif // NDEBUG
// Return maximum background compaction alowed to be scheduled based on
// compaction status.
int BGCompactionsAllowed() const;
// Returns the list of live files in 'live' and the list // Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'. // of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than // If force == false and the last call was less than

@ -26,6 +26,13 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
} }
std::unique_ptr<WriteControllerToken>
WriteController::GetCompactionPressureToken() {
++total_compaction_pressure_;
return std::unique_ptr<WriteControllerToken>(
new CompactionPressureToken(this));
}
bool WriteController::IsStopped() const { return total_stopped_ > 0; } bool WriteController::IsStopped() const { return total_stopped_ > 0; }
// This is inside DB mutex, so we can't sleep and need to minimize // This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time. // frequency to get time.
@ -106,4 +113,9 @@ DelayWriteToken::~DelayWriteToken() {
assert(controller_->total_delayed_ >= 0); assert(controller_->total_delayed_ >= 0);
} }
CompactionPressureToken::~CompactionPressureToken() {
controller_->total_compaction_pressure_--;
assert(controller_->total_compaction_pressure_ >= 0);
}
} // namespace rocksdb } // namespace rocksdb

@ -23,6 +23,7 @@ class WriteController {
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
: total_stopped_(0), : total_stopped_(0),
total_delayed_(0), total_delayed_(0),
total_compaction_pressure_(0),
bytes_left_(0), bytes_left_(0),
last_refill_time_(0) { last_refill_time_(0) {
set_delayed_write_rate(_delayed_write_rate); set_delayed_write_rate(_delayed_write_rate);
@ -38,10 +39,16 @@ class WriteController {
// which returns number of microseconds to sleep. // which returns number of microseconds to sleep.
std::unique_ptr<WriteControllerToken> GetDelayToken( std::unique_ptr<WriteControllerToken> GetDelayToken(
uint64_t delayed_write_rate); uint64_t delayed_write_rate);
// When an actor (column family) requests a moderate token, compaction
// threads will be increased
std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
// these two metods are querying the state of the WriteController // these three metods are querying the state of the WriteController
bool IsStopped() const; bool IsStopped() const;
bool NeedsDelay() const { return total_delayed_ > 0; } bool NeedsDelay() const { return total_delayed_ > 0; }
bool NeedSpeedupCompaction() const {
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
}
// return how many microseconds the caller needs to sleep after the call // return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB. // num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held. // Prerequisite: DB mutex held.
@ -59,9 +66,11 @@ class WriteController {
friend class WriteControllerToken; friend class WriteControllerToken;
friend class StopWriteToken; friend class StopWriteToken;
friend class DelayWriteToken; friend class DelayWriteToken;
friend class CompactionPressureToken;
int total_stopped_; int total_stopped_;
int total_delayed_; int total_delayed_;
int total_compaction_pressure_;
uint64_t bytes_left_; uint64_t bytes_left_;
uint64_t last_refill_time_; uint64_t last_refill_time_;
uint64_t delayed_write_rate_; uint64_t delayed_write_rate_;
@ -96,4 +105,11 @@ class DelayWriteToken : public WriteControllerToken {
virtual ~DelayWriteToken(); virtual ~DelayWriteToken();
}; };
class CompactionPressureToken : public WriteControllerToken {
public:
explicit CompactionPressureToken(WriteController* controller)
: WriteControllerToken(controller) {}
virtual ~CompactionPressureToken();
};
} // namespace rocksdb } // namespace rocksdb

@ -933,8 +933,19 @@ struct DBOptions {
// regardless of this setting // regardless of this setting
uint64_t delete_obsolete_files_period_micros; uint64_t delete_obsolete_files_period_micros;
// Suggested number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool.
//
// Default: max_background_compactions
int base_background_compactions;
// Maximum number of concurrent background compaction jobs, submitted to // Maximum number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool. // the default LOW priority thread pool.
// We first try to schedule compactions based on
// `base_background_compactions`. If the compaction cannot catch up , we
// will increase number of compaction threads up to
// `max_background_compactions`.
//
// If you're increasing this, also consider increasing number of threads in // If you're increasing this, also consider increasing number of threads in
// LOW priority thread pool. For more information, see // LOW priority thread pool. For more information, see
// Env::SetBackgroundThreads // Env::SetBackgroundThreads

@ -229,6 +229,7 @@ DBOptions::DBOptions()
db_log_dir(""), db_log_dir(""),
wal_dir(""), wal_dir(""),
delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000),
base_background_compactions(-1),
max_background_compactions(1), max_background_compactions(1),
max_subcompactions(1), max_subcompactions(1),
max_background_flushes(1), max_background_flushes(1),
@ -295,6 +296,7 @@ DBOptions::DBOptions(const Options& options)
wal_dir(options.wal_dir), wal_dir(options.wal_dir),
delete_obsolete_files_period_micros( delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros), options.delete_obsolete_files_period_micros),
base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions), max_background_compactions(options.max_background_compactions),
max_subcompactions(options.max_subcompactions), max_subcompactions(options.max_subcompactions),
max_background_flushes(options.max_background_flushes), max_background_flushes(options.max_background_flushes),
@ -383,6 +385,8 @@ void DBOptions::Dump(Logger* log) const {
table_cache_numshardbits); table_cache_numshardbits);
Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64, Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64,
delete_obsolete_files_period_micros); delete_obsolete_files_period_micros);
Header(log, " Options.base_background_compactions: %d",
base_background_compactions);
Header(log, " Options.max_background_compactions: %d", Header(log, " Options.max_background_compactions: %d",
max_background_compactions); max_background_compactions);
Header(log, " Options.max_subcompactions: %" PRIu32, Header(log, " Options.max_subcompactions: %" PRIu32,
@ -652,6 +656,7 @@ Options::PrepareForBulkLoad()
// to L1. This is helpful so that all files that are // to L1. This is helpful so that all files that are
// input to the manual compaction are all at L0. // input to the manual compaction are all at L0.
max_background_compactions = 2; max_background_compactions = 2;
base_background_compactions = 2;
// The compaction would create large files in L1. // The compaction would create large files in L1.
target_file_size_base = 256 * 1024 * 1024; target_file_size_base = 256 * 1024 * 1024;

@ -219,6 +219,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"max_background_compactions", {"max_background_compactions",
{offsetof(struct DBOptions, max_background_compactions), OptionType::kInt, {offsetof(struct DBOptions, max_background_compactions), OptionType::kInt,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},
{"base_background_compactions",
{offsetof(struct DBOptions, base_background_compactions), OptionType::kInt,
OptionVerificationType::kNormal}},
{"max_background_flushes", {"max_background_flushes",
{offsetof(struct DBOptions, max_background_flushes), OptionType::kInt, {offsetof(struct DBOptions, max_background_flushes), OptionType::kInt,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},

@ -1669,50 +1669,7 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) {
"table_cache_numshardbits=28;" "table_cache_numshardbits=28;"
"max_open_files=72;" "max_open_files=72;"
"max_file_opening_threads=35;" "max_file_opening_threads=35;"
"max_background_compactions=33;" "base_background_compactions=3;"
"use_fsync=true;"
"use_adaptive_mutex=false;"
"max_total_wal_size=4295005604;"
"compaction_readahead_size=0;"
"new_table_reader_for_compaction_inputs=false;"
"keep_log_file_num=4890;"
"skip_stats_update_on_db_open=false;"
"max_manifest_file_size=4295009941;"
"db_log_dir=path/to/db_log_dir;"
"skip_log_error_on_recovery=true;"
"writable_file_max_buffer_size=1048576;"
"paranoid_checks=true;"
"is_fd_close_on_exec=false;"
"bytes_per_sync=4295013613;"
"enable_thread_tracking=false;"
"disable_data_sync=false;"
"recycle_log_file_num=0;"
"disableDataSync=false;"
"create_missing_column_families=true;"
"log_file_time_to_roll=3097;"
"max_background_flushes=35;"
"create_if_missing=false;"
"error_if_exists=true;"
"allow_os_buffer=false;"
"delayed_write_rate=4294976214;"
"manifest_preallocation_size=1222;"
"allow_mmap_writes=false;"
"stats_dump_period_sec=70127;"
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"max_log_file_size=4607;"
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"
"wal_bytes_per_sync=4295048118;"
"delete_obsolete_files_period_micros=4294967758;"
"WAL_ttl_seconds=4295008036;"
"WAL_size_limit_MB=4295036161;"
"wal_dir=path/to/wal_dir;"
"db_write_buffer_size=2587;"
"max_subcompactions=64330;"
"table_cache_numshardbits=28;"
"max_open_files=72;"
"max_file_opening_threads=35;"
"max_background_compactions=33;" "max_background_compactions=33;"
"use_fsync=true;" "use_fsync=true;"
"use_adaptive_mutex=false;" "use_adaptive_mutex=false;"

Loading…
Cancel
Save