Introduce max_background_jobs mutable option

Summary:
- `max_background_flushes` and `max_background_compactions` are still supported for backwards compatibility
- `base_background_compactions` is completely deprecated. Now we just throttle to one background compaction when there's no pressure.
- `max_background_jobs` is added to automatically partition the concurrent background jobs into flushes vs compactions. Currently it's very simple as we just allocate one-fourth of the jobs to flushes, and the remaining can be used for compactions.
- The test cases that set `base_background_compactions > 1` needed to be updated. I just grab the pressure token such that the desired number of compactions can be scheduled.
Closes https://github.com/facebook/rocksdb/pull/2205

Differential Revision: D4937461

Pulled By: ajkr

fbshipit-source-id: df52cbbd497e13bbc9a60560a5ac2a2526b3f1f9
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 5a25304627
commit bb01c1880c
  1. 1
      HISTORY.md
  2. 51
      db/column_family_test.cc
  3. 7
      db/db_compaction_test.cc
  4. 17
      db/db_impl.h
  5. 49
      db/db_impl_compaction_flush.cc
  6. 7
      db/db_impl_debug.cc
  7. 16
      db/db_impl_open.cc
  8. 34
      db/db_options_test.cc
  9. 4
      db/db_test2.cc
  10. 1
      db/db_test_util.cc
  11. 52
      include/rocksdb/options.h
  12. 12
      options/db_options.cc
  13. 1
      options/db_options.h
  14. 3
      options/options.cc
  15. 1
      options/options_helper.cc
  16. 4
      options/options_helper.h
  17. 1
      options/options_settable_test.cc
  18. 1
      options/options_test.cc
  19. 6
      tools/db_bench_tool.cc
  20. 4
      tools/db_bench_tool_test.cc

@ -2,6 +2,7 @@
## Unreleased
### Public API Change
* Scheduling flushes and compactions in the same thread pool is no longer supported by setting `max_background_flushes=0`. Instead, users can achieve this by configuring their high-pri thread pool to have zero threads.
* Replace `Options::max_background_flushes`, `Options::max_background_compactions`, and `Options::base_background_compactions` all with `Options::max_background_jobs`, which automatically decides how many threads to allocate towards flush/compaction.
* options.delayed_write_rate by default take the value of options.rate_limiter rate.
### New Features

@ -1380,7 +1380,6 @@ TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
ColumnFamilyOptions default_cf, one, two;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1404,6 +1403,9 @@ TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
two.write_buffer_size = 100000;
Reopen({default_cf, one, two});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
bool cf_1_1 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
@ -1472,7 +1474,6 @@ TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
ColumnFamilyOptions default_cf, one, two;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1496,6 +1497,9 @@ TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
two.write_buffer_size = 100000;
Reopen({default_cf, one, two});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
@ -1567,7 +1571,6 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1586,6 +1589,9 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
@ -1665,7 +1671,6 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1684,6 +1689,9 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
@ -1754,7 +1762,6 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1773,6 +1780,9 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- level style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
@ -1850,7 +1860,6 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1869,6 +1878,9 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
@ -1969,7 +1981,6 @@ TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
db_options_.base_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
@ -1988,6 +1999,9 @@ TEST_F(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
bool cf_1_1 = true;
bool cf_1_2 = true;
@ -2496,7 +2510,6 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 800000u;
db_options_.delayed_write_rate = kBaseRate;
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"});
@ -2677,7 +2690,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
}
TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"});
ColumnFamilyData* cfd =
@ -2697,7 +2709,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2709,11 +2721,11 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
vstorage->TEST_set_estimated_compaction_needed_bytes(45);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2721,7 +2733,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
vstorage->set_l0_delay_trigger_count(6);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
// Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
mutable_cf_options.level0_file_num_compaction_trigger = 4;
@ -2730,7 +2742,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
vstorage->set_l0_delay_trigger_count(5);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2738,7 +2750,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
vstorage->set_l0_delay_trigger_count(3);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
}
TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
@ -2817,7 +2829,6 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
}
TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
column_family_options_.soft_pending_compaction_bytes_limit = 200;
column_family_options_.hard_pending_compaction_bytes_limit = 2000;
@ -2845,11 +2856,11 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(60);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
@ -2867,7 +2878,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2879,7 +2890,7 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
vstorage->set_l0_delay_trigger_count(0);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
}
#ifndef ROCKSDB_LITE

@ -521,7 +521,6 @@ TEST_F(DBCompactionTest, BGCompactionsAllowed) {
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 20;
options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
options.base_background_compactions = 1;
options.max_background_compactions = 3;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
@ -1230,6 +1229,9 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
options.max_background_compactions = 3;
DestroyAndReopen(options);
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
int32_t value_size = 10 * 1024; // 10 KB
// Add 2 non-overlapping files
@ -2422,15 +2424,12 @@ TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
options.hard_pending_compaction_bytes_limit = 100;
options.create_if_missing = true;
DestroyAndReopen(options);
ASSERT_EQ(5, db_->GetOptions().base_background_compactions);
ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
options.base_background_compactions = 4;
options.max_background_compactions = 3;
options.soft_pending_compaction_bytes_limit = 200;
options.hard_pending_compaction_bytes_limit = 150;
DestroyAndReopen(options);
ASSERT_EQ(3, db_->GetOptions().base_background_compactions);
ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
}

@ -401,12 +401,21 @@ class DBImpl : public DB {
uint64_t TEST_FindMinPrepLogReferencedByMemTable();
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
#endif // NDEBUG
// Return maximum background compaction allowed to be scheduled based on
// compaction status.
int BGCompactionsAllowed() const;
struct BGJobLimits {
int max_flushes;
int max_compactions;
};
// Returns maximum background flushes and compactions allowed to be scheduled
BGJobLimits GetBGJobLimits() const;
// Need a static version that can be called during SanitizeOptions().
static BGJobLimits GetBGJobLimits(int max_background_flushes,
int max_background_compactions,
int max_background_jobs,
bool parallelize_compactions);
// move logs pending closing from job_context to the DB queue and
// schedule a purge
@ -1186,7 +1195,7 @@ extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
extern CompressionType GetCompressionFlush(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {

@ -983,23 +983,22 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// DB is being deleted; no more background compactions
return;
}
auto bg_job_limits = GetBGJobLimits();
bool is_flush_pool_empty =
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) {
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
auto bg_compactions_allowed = BGCompactionsAllowed();
// special case -- if high-pri (flush) thread pool is empty, then schedule
// flushes in low-pri (compaction) thread pool.
if (is_flush_pool_empty) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_compactions_allowed) {
bg_job_limits.max_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
@ -1017,7 +1016,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return;
}
while (bg_compaction_scheduled_ < bg_compactions_allowed &&
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
@ -1029,13 +1028,35 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}
}
int DBImpl::BGCompactionsAllowed() const {
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
mutex_.AssertHeld();
if (write_controller_.NeedSpeedupCompaction()) {
return mutable_db_options_.max_background_compactions;
return GetBGJobLimits(immutable_db_options_.max_background_flushes,
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
write_controller_.NeedSpeedupCompaction());
}
DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
int max_background_compactions,
int max_background_jobs,
bool parallelize_compactions) {
BGJobLimits res;
if (max_background_flushes == -1 && max_background_compactions == -1) {
// for our first stab implementing max_background_jobs, simply allocate a
// quarter of the threads to flushes.
res.max_flushes = std::max(1, max_background_jobs / 4);
res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
} else {
return mutable_db_options_.base_background_compactions;
// compatibility code in case users haven't migrated to max_background_jobs,
// which automatically computes flush/compaction limits
res.max_flushes = std::max(1, max_background_flushes);
res.max_compactions = std::max(1, max_background_compactions);
}
if (!parallelize_compactions) {
// throttle background compactions until we deem necessary
res.max_compactions = 1;
}
return res;
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
@ -1157,13 +1178,15 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
if (cfd != nullptr) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
auto bg_job_limits = GetBGJobLimits();
ROCKS_LOG_BUFFER(
log_buffer,
"Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots allowed %d, "
"compaction slots scheduled %d",
cfd->GetName().c_str(), immutable_db_options_.max_background_flushes -
bg_flush_scheduled_, BGCompactionsAllowed(), bg_compaction_scheduled_);
"family [%s], flush slots available %d, compaction slots available %d, "
"flush slots scheduled %d, compaction slots scheduled %d",
cfd->GetName().c_str(), bg_job_limits.max_flushes,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer);
if (cfd->Unref()) {

@ -188,7 +188,12 @@ Status DBImpl::TEST_GetLatestMutableCFOptions(
int DBImpl::TEST_BGCompactionsAllowed() const {
InstrumentedMutexLock l(&mutex_);
return BGCompactionsAllowed();
return GetBGJobLimits().max_compactions;
}
int DBImpl::TEST_BGFlushesAllowed() const {
InstrumentedMutexLock l(&mutex_);
return GetBGJobLimits().max_flushes;
}
} // namespace rocksdb

@ -56,15 +56,13 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.write_buffer_manager.reset(
new WriteBufferManager(result.db_write_buffer_size));
}
if (result.base_background_compactions == -1) {
result.base_background_compactions = result.max_background_compactions;
}
if (result.base_background_compactions > result.max_background_compactions) {
result.base_background_compactions = result.max_background_compactions;
}
result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
result.max_background_compactions,
result.max_background_jobs,
true /* parallelize_compactions */);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
Env::Priority::HIGH);
if (result.rate_limiter.get() != nullptr) {
@ -902,7 +900,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
}
return s;
}
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {

@ -255,18 +255,44 @@ TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) {
TEST_F(DBOptionsTest, SetBackgroundCompactionThreads) {
Options options;
options.create_if_missing = true;
options.base_background_compactions = 1; // default value
options.max_background_compactions = 1; // default value
options.env = env_;
Reopen(options);
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "2"},
{"max_background_compactions", "3"}}));
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_OK(dbfull()->SetDBOptions({{"max_background_compactions", "3"}}));
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
auto stop_token = dbfull()->TEST_write_controler().GetStopToken();
ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed());
}
TEST_F(DBOptionsTest, SetBackgroundJobs) {
Options options;
options.create_if_missing = true;
options.max_background_jobs = 8;
options.env = env_;
Reopen(options);
for (int i = 0; i < 2; ++i) {
if (i > 0) {
options.max_background_jobs = 12;
ASSERT_OK(dbfull()->SetDBOptions(
{{"max_background_jobs",
std::to_string(options.max_background_jobs)}}));
}
ASSERT_EQ(options.max_background_jobs / 4,
dbfull()->TEST_BGFlushesAllowed());
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
auto stop_token = dbfull()->TEST_write_controler().GetStopToken();
ASSERT_EQ(options.max_background_jobs / 4,
dbfull()->TEST_BGFlushesAllowed());
ASSERT_EQ(3 * options.max_background_jobs / 4,
dbfull()->TEST_BGCompactionsAllowed());
}
}
TEST_F(DBOptionsTest, AvoidFlushDuringShutdown) {
Options options;
options.create_if_missing = true;

@ -1082,7 +1082,6 @@ TEST_F(DBTest2, CompressionOptions) {
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.base_background_compactions = 1;
CompactionCompressionListener* listener =
new CompactionCompressionListener(&options);
@ -1159,6 +1158,9 @@ TEST_F(DBTest2, CompactionStall) {
CompactionStallTestListener* listener = new CompactionStallTestListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
Random rnd(301);

@ -251,7 +251,6 @@ Options DBTestBase::CurrentOptions(
options.target_file_size_base = 2 * 1024 * 1024;
options.max_bytes_for_level_base = 10 * 1024 * 1024;
options.max_open_files = 5000;
options.base_background_compactions = -1;
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options.compaction_pri = CompactionPri::kByCompensatedSize;

@ -463,24 +463,27 @@ struct DBOptions {
// regardless of this setting
uint64_t delete_obsolete_files_period_micros = 6ULL * 60 * 60 * 1000000;
// Suggested number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool.
//
// Default: 1
int base_background_compactions = 1;
// Maximum number of concurrent background jobs (compactions and flushes).
int max_background_jobs = 2;
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the
// value of max_background_jobs. This option is ignored.
int base_background_compactions = -1;
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the
// value of max_background_jobs. For backwards compatibility we will set
// `max_background_jobs = max_background_compactions + max_background_flushes`
// in the case where user sets at least one of `max_background_compactions` or
// `max_background_flushes` (we replace -1 by 1 in case one option is unset).
//
// Maximum number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool.
// We first try to schedule compactions based on
// `base_background_compactions`. If the compaction cannot catch up , we
// will increase number of compaction threads up to
// `max_background_compactions`.
//
// If you're increasing this, also consider increasing number of threads in
// LOW priority thread pool. For more information, see
// Env::SetBackgroundThreads
// Default: 1
int max_background_compactions = 1;
// Default: -1
int max_background_compactions = -1;
// This value represents the maximum number of threads that will
// concurrently perform a compaction job by breaking it into multiple,
@ -488,22 +491,27 @@ struct DBOptions {
// Default: 1 (i.e. no subcompactions)
uint32_t max_subcompactions = 1;
// Maximum number of concurrent background memtable flush jobs, submitted to
// the HIGH priority thread pool.
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the
// value of max_background_jobs. For backwards compatibility we will set
// `max_background_jobs = max_background_compactions + max_background_flushes`
// in the case where user sets at least one of `max_background_compactions` or
// `max_background_flushes`.
//
// Maximum number of concurrent background memtable flush jobs, submitted by
// default to the HIGH priority thread pool. If the HIGH priority thread pool
// is configured to have zero threads, flush jobs will share the LOW priority
// thread pool with compaction jobs.
//
// By default, all background jobs (major compaction and memtable flush) go
// to the LOW priority pool. If this option is set to a positive number,
// memtable flush jobs will be submitted to the HIGH priority pool.
// It is important when the same Env is shared by multiple db instances.
// Without a separate pool, long running major compaction jobs could
// potentially block memtable flush jobs of other db instances, leading to
// unnecessary Put stalls.
// It is important to use both thread pools when the same Env is shared by
// multiple db instances. Without a separate pool, long running compaction
// jobs could potentially block memtable flush jobs of other db instances,
// leading to unnecessary Put stalls.
//
// If you're increasing this, also consider increasing number of threads in
// HIGH priority thread pool. For more information, see
// Env::SetBackgroundThreads
// Default: 1
int max_background_flushes = 1;
// Default: -1
int max_background_flushes = -1;
// Specify the maximal size of the info log file. If the log file
// is larger than `max_log_file_size`, a new info log file will

@ -222,8 +222,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
}
MutableDBOptions::MutableDBOptions()
: base_background_compactions(1),
max_background_compactions(1),
: max_background_jobs(2),
base_background_compactions(-1),
max_background_compactions(-1),
avoid_flush_during_shutdown(false),
delayed_write_rate(2 * 1024U * 1024U),
max_total_wal_size(0),
@ -232,7 +233,8 @@ MutableDBOptions::MutableDBOptions()
max_open_files(-1) {}
MutableDBOptions::MutableDBOptions(const DBOptions& options)
: base_background_compactions(options.base_background_compactions),
: max_background_jobs(options.max_background_jobs),
base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions),
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
delayed_write_rate(options.delayed_write_rate),
@ -243,8 +245,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
max_open_files(options.max_open_files) {}
void MutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " Options.base_background_compactions: %d",
base_background_compactions);
ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d",
max_background_jobs);
ROCKS_LOG_HEADER(log, " Options.max_background_compactions: %d",
max_background_compactions);
ROCKS_LOG_HEADER(log, " Options.avoid_flush_during_shutdown: %d",

@ -88,6 +88,7 @@ struct MutableDBOptions {
void Dump(Logger* log) const;
int max_background_jobs;
int base_background_compactions;
int max_background_compactions;
bool avoid_flush_during_shutdown;

@ -139,6 +139,7 @@ DBOptions::DBOptions(const Options& options)
wal_dir(options.wal_dir),
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
max_background_jobs(options.max_background_jobs),
base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions),
max_subcompactions(options.max_subcompactions),
@ -455,7 +456,6 @@ Options::PrepareForBulkLoad()
// to L1. This is helpful so that all files that are
// input to the manual compaction are all at L0.
max_background_compactions = 2;
base_background_compactions = 2;
// The compaction would create large files in L1.
target_file_size_base = 256 * 1024 * 1024;
@ -491,7 +491,6 @@ DBOptions* DBOptions::OldDefaults(int rocksdb_major_version,
delayed_write_rate = 16 * 1024U * 1024U;
}
max_open_files = 5000;
base_background_compactions = -1;
wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
return this;
}

@ -52,6 +52,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.wal_dir = immutable_db_options.wal_dir;
options.delete_obsolete_files_period_micros =
mutable_db_options.delete_obsolete_files_period_micros;
options.max_background_jobs = mutable_db_options.max_background_jobs;
options.base_background_compactions =
mutable_db_options.base_background_compactions;
options.max_background_compactions =

@ -223,6 +223,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"use_fsync",
{offsetof(struct DBOptions, use_fsync), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
{"max_background_jobs",
{offsetof(struct DBOptions, max_background_jobs), OptionType::kInt,
OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, max_background_jobs)}},
{"max_background_compactions",
{offsetof(struct DBOptions, max_background_compactions), OptionType::kInt,
OptionVerificationType::kNormal, true,

@ -246,6 +246,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"table_cache_numshardbits=28;"
"max_open_files=72;"
"max_file_opening_threads=35;"
"max_background_jobs=8;"
"base_background_compactions=3;"
"max_background_compactions=33;"
"use_fsync=true;"

@ -1287,7 +1287,6 @@ TEST_F(OptionsParserTest, DifferentDefault) {
old_default_opts.OldDefaults();
ASSERT_EQ(10 * 1048576, old_default_opts.max_bytes_for_level_base);
ASSERT_EQ(5000, old_default_opts.max_open_files);
ASSERT_EQ(-1, old_default_opts.base_background_compactions);
ASSERT_EQ(2 * 1024U * 1024U, old_default_opts.delayed_write_rate);
ASSERT_EQ(WALRecoveryMode::kTolerateCorruptedTailRecords,
old_default_opts.wal_recovery_mode);

@ -315,10 +315,7 @@ DEFINE_int32(max_background_compactions,
"The maximum number of concurrent background compactions"
" that can occur in parallel.");
DEFINE_int32(base_background_compactions,
rocksdb::Options().base_background_compactions,
"The base number of concurrent background compactions"
" to occur in parallel.");
DEFINE_int32(base_background_compactions, -1, "DEPRECATED");
DEFINE_uint64(subcompactions, 1,
"Maximum number of subcompactions to divide L0-L1 compactions "
@ -2823,7 +2820,6 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_min_write_buffer_number_to_merge;
options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain;
options.base_background_compactions = FLAGS_base_background_compactions;
options.max_background_compactions = FLAGS_max_background_compactions;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options.max_background_flushes = FLAGS_max_background_flushes;

@ -106,7 +106,6 @@ TEST_F(DBBenchTest, OptionsFile) {
Options opt;
opt.create_if_missing = true;
opt.max_open_files = 256;
opt.base_background_compactions = 5;
opt.max_background_compactions = 10;
opt.arena_block_size = 8388608;
ASSERT_OK(PersistRocksDBOptions(DBOptions(opt), {"default"},
@ -130,7 +129,6 @@ TEST_F(DBBenchTest, OptionsFileUniversal) {
opt.num_levels = 1;
opt.create_if_missing = true;
opt.max_open_files = 256;
opt.base_background_compactions = 5;
opt.max_background_compactions = 10;
opt.arena_block_size = 8388608;
ASSERT_OK(PersistRocksDBOptions(DBOptions(opt), {"default"},
@ -154,7 +152,6 @@ TEST_F(DBBenchTest, OptionsFileMultiLevelUniversal) {
opt.num_levels = 12;
opt.create_if_missing = true;
opt.max_open_files = 256;
opt.base_background_compactions = 5;
opt.max_background_compactions = 10;
opt.arena_block_size = 8388608;
ASSERT_OK(PersistRocksDBOptions(DBOptions(opt), {"default"},
@ -185,7 +182,6 @@ const std::string options_file_content = R"OPTIONS_FILE(
table_cache_numshardbits=4
max_open_files=-1
max_file_opening_threads=10
base_background_compactions=3
max_background_compactions=5
use_fsync=false
use_adaptive_mutex=false

Loading…
Cancel
Save