Changed 'num_subcompactions' to the more accurate 'max_subcompactions'

Summary:
Up until this point we had DbOptions.num_subcompactions, but
it is semantically more correct to call this max_subcompactions since
we will schedule *up to* DbOptions.max_subcompactions smaller compactions
at a time during a compaction job.

I also added a --subcompactions option to db_bench

Test Plan: make all   make check

Reviewers: sdong, igor, anthony, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D45069
main
Ari Ekmekji 10 years ago
parent c852968465
commit b6def58f73
  1. 1
      HISTORY.md
  2. 6
      db/compaction.h
  3. 6
      db/compaction_job.cc
  4. 18
      db/compaction_job_stats_test.cc
  5. 18
      db/db_bench.cc
  6. 44
      db/db_compaction_test.cc
  7. 22
      db/db_test.cc
  8. 2
      include/rocksdb/options.h
  9. 2
      tools/db_stress.cc
  10. 4
      util/db_test_util.cc
  11. 6
      util/mutable_cf_options.h
  12. 4
      util/options.cc
  13. 4
      util/options_helper.h
  14. 4
      util/options_test.cc

@ -4,6 +4,7 @@
### Public API Changes ### Public API Changes
* Removed class Env::RandomRWFile and Env::NewRandomRWFile(). * Removed class Env::RandomRWFile and Env::NewRandomRWFile().
* Renamed DBOptions.num_subcompactions to DBOptions.max_subcompactions to make the name better match the actual funcionality of the option.
## 3.13.0 (8/6/2015) ## 3.13.0 (8/6/2015)
### New Features ### New Features

@ -200,13 +200,13 @@ class Compaction {
// Should this compaction be broken up into smaller ones run in parallel? // Should this compaction be broken up into smaller ones run in parallel?
bool IsSubCompaction() const { bool IsSubCompaction() const {
return start_level_ == 0 && output_level_ == 1 return start_level_ == 0 && output_level_ == 1 &&
&& mutable_cf_options_.num_subcompactions > 1; mutable_cf_options_.max_subcompactions > 1;
} }
// If is_sub_compaction == true, how many smaller compactions should execute // If is_sub_compaction == true, how many smaller compactions should execute
int NumSubCompactions() const { int NumSubCompactions() const {
return mutable_cf_options_.num_subcompactions; return mutable_cf_options_.max_subcompactions;
} }
private: private:

@ -380,13 +380,13 @@ void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest,
} }
// Divide the potential L1 file boundaries (those that passed the // Divide the potential L1 file boundaries (those that passed the
// checks above) into 'num_subcompactions' groups such that each have // checks above) into 'max_subcompactions' groups such that each have
// as close to an equal number of files in it as possible // as close to an equal number of files in it as possible
// TODO(aekmekji): refine this later to depend on file size // TODO(aekmekji): refine this later to depend on file size
size_t files_left = candidates.size(); size_t files_left = candidates.size();
size_t subcompactions_left = size_t subcompactions_left =
static_cast<size_t>(db_options_.num_subcompactions) < files_left static_cast<size_t>(db_options_.max_subcompactions) < files_left
? db_options_.num_subcompactions ? db_options_.max_subcompactions
: files_left; : files_left;
size_t num_to_include; size_t num_to_include;

@ -92,7 +92,7 @@ class CompactionJobStatsTest : public testing::Test,
Env* env_; Env* env_;
DB* db_; DB* db_;
std::vector<ColumnFamilyHandle*> handles_; std::vector<ColumnFamilyHandle*> handles_;
uint32_t num_subcompactions_; uint32_t max_subcompactions_;
Options last_options_; Options last_options_;
@ -103,8 +103,8 @@ class CompactionJobStatsTest : public testing::Test,
alternative_wal_dir_ = dbname_ + "/wal"; alternative_wal_dir_ = dbname_ + "/wal";
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
num_subcompactions_ = GetParam(); max_subcompactions_ = GetParam();
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
auto delete_options = options; auto delete_options = options;
delete_options.wal_dir = alternative_wal_dir_; delete_options.wal_dir = alternative_wal_dir_;
EXPECT_OK(DestroyDB(dbname_, delete_options)); EXPECT_OK(DestroyDB(dbname_, delete_options));
@ -656,7 +656,7 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
options.level0_file_num_compaction_trigger = kTestScale + 1; options.level0_file_num_compaction_trigger = kTestScale + 1;
options.num_levels = 3; options.num_levels = 3;
options.compression = kNoCompression; options.compression = kNoCompression;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options.bytes_per_sync = 512 * 1024; options.bytes_per_sync = 512 * 1024;
options.compaction_measure_io_stats = true; options.compaction_measure_io_stats = true;
@ -740,7 +740,7 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
// by 1 because multiple threads are consuming the input and generating // by 1 because multiple threads are consuming the input and generating
// output files without coordinating to see if the output could fit into // output files without coordinating to see if the output could fit into
// a smaller number of files like it does when it runs sequentially // a smaller number of files like it does when it runs sequentially
int num_output_files = options.num_subcompactions > 1 ? 2 : 1; int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
for (uint64_t start_key = key_base; for (uint64_t start_key = key_base;
num_L0_files > 1; num_L0_files > 1;
start_key += key_base * sparseness) { start_key += key_base * sparseness) {
@ -760,7 +760,7 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
Compact(1, smallest_key, largest_key); Compact(1, smallest_key, largest_key);
// TODO(aekmekji): account for whether parallel L0-L1 compaction is // TODO(aekmekji): account for whether parallel L0-L1 compaction is
// enabled or not. If so then num_L1_files will increase by 1 // enabled or not. If so then num_L1_files will increase by 1
if (options.num_subcompactions == 1) { if (options.max_subcompactions == 1) {
--num_L1_files; --num_L1_files;
} }
snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files); snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
@ -783,7 +783,7 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U); ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key); Compact(1, smallest_key, largest_key);
num_L1_files = options.num_subcompactions > 1 ? 7 : 4; num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
char L1_buf[4]; char L1_buf[4];
snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files); snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
std::string L1_files(L1_buf); std::string L1_files(L1_buf);
@ -880,7 +880,7 @@ TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
options.num_levels = 3; options.num_levels = 3;
options.compression = kNoCompression; options.compression = kNoCompression;
options.max_bytes_for_level_multiplier = 2; options.max_bytes_for_level_multiplier = 2;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -972,7 +972,7 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 1; options.compaction_options_universal.size_ratio = 1;
options.compaction_options_universal.max_size_amplification_percent = 1000; options.compaction_options_universal.max_size_amplification_percent = 1000;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);

@ -221,6 +221,17 @@ static bool ValidateKeySize(const char* flagname, int32_t value) {
return true; return true;
} }
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
if (value > std::numeric_limits<uint32_t>::max()) {
fprintf(stderr,
"Invalid value for --%s: %lu, overflow\n",
flagname,
(unsigned long)value);
return false;
}
return true;
}
DEFINE_int32(key_size, 16, "size of each key"); DEFINE_int32(key_size, 16, "size of each key");
DEFINE_int32(num_multi_db, 0, DEFINE_int32(num_multi_db, 0,
@ -286,6 +297,12 @@ DEFINE_int32(max_background_compactions,
"The maximum number of concurrent background compactions" "The maximum number of concurrent background compactions"
" that can occur in parallel."); " that can occur in parallel.");
DEFINE_uint64(subcompactions, 1,
"Maximum number of subcompactions to divide L0-L1 compactions "
"into.");
static const bool FLAGS_subcompactions_dummy __attribute__((unused)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
DEFINE_int32(max_background_flushes, DEFINE_int32(max_background_flushes,
rocksdb::Options().max_background_flushes, rocksdb::Options().max_background_flushes,
"The maximum number of concurrent background flushes" "The maximum number of concurrent background flushes"
@ -2158,6 +2175,7 @@ class Benchmark {
options.max_write_buffer_number_to_maintain = options.max_write_buffer_number_to_maintain =
FLAGS_max_write_buffer_number_to_maintain; FLAGS_max_write_buffer_number_to_maintain;
options.max_background_compactions = FLAGS_max_background_compactions; options.max_background_compactions = FLAGS_max_background_compactions;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options.max_background_flushes = FLAGS_max_background_flushes; options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style = FLAGS_compaction_style_e; options.compaction_style = FLAGS_compaction_style_e;
if (FLAGS_prefix_size != 0) { if (FLAGS_prefix_size != 0) {

@ -26,14 +26,14 @@ class DBCompactionTestWithParam : public DBTestBase,
public testing::WithParamInterface<uint32_t> { public testing::WithParamInterface<uint32_t> {
public: public:
DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") { DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
num_subcompactions_ = GetParam(); max_subcompactions_ = GetParam();
} }
// Required if inheriting from testing::WithParamInterface<> // Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {} static void SetUpTestCase() {}
static void TearDownTestCase() {} static void TearDownTestCase() {}
uint32_t num_subcompactions_; uint32_t max_subcompactions_;
}; };
namespace { namespace {
@ -214,12 +214,12 @@ const SstFileMetaData* PickFileRandomly(
} // anonymous namespace } // anonymous namespace
// All the TEST_P tests run once with sub_compactions disabled (i.e. // All the TEST_P tests run once with sub_compactions disabled (i.e.
// options.num_subcompactions = 1) and once with it enabled // options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) { TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
for (int tid = 0; tid < 3; ++tid) { for (int tid = 0; tid < 3; ++tid) {
uint64_t db_size[2]; uint64_t db_size[2];
Options options = CurrentOptions(DeletionTriggerOptions()); Options options = CurrentOptions(DeletionTriggerOptions());
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
if (tid == 1) { if (tid == 1) {
// the following only disable stats update in DB::Open() // the following only disable stats update in DB::Open()
@ -392,7 +392,7 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
for (int tid = 0; tid < 2; ++tid) { for (int tid = 0; tid < 2; ++tid) {
uint64_t db_size[3]; uint64_t db_size[3];
Options options = CurrentOptions(DeletionTriggerOptions()); Options options = CurrentOptions(DeletionTriggerOptions());
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
if (tid == 1) { if (tid == 1) {
// second pass with universal compaction // second pass with universal compaction
@ -508,7 +508,7 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
options.write_buffer_size = 100 << 10; // 100KB options.write_buffer_size = 100 << 10; // 100KB
options.num_levels = 3; options.num_levels = 3;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -541,7 +541,7 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
Options options; Options options;
options.write_buffer_size = 100000000; // Large write buffer options.write_buffer_size = 100000000; // Large write buffer
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -628,7 +628,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
Options options; Options options;
options.write_buffer_size = 100000000; options.write_buffer_size = 100000000;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);
@ -687,7 +687,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.write_buffer_size = 10 * 1024 * 1024; options.write_buffer_size = 10 * 1024 * 1024;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
// non overlapping ranges // non overlapping ranges
@ -784,7 +784,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.write_buffer_size = 10 * 1024 * 1024; options.write_buffer_size = 10 * 1024 * 1024;
options.num_levels = 7; options.num_levels = 7;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB int32_t value_size = 10 * 1024; // 10 KB
@ -840,7 +840,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
Options options; Options options;
options.write_buffer_size = 100000000; options.write_buffer_size = 100000000;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);
@ -896,7 +896,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4; options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024; options.max_bytes_for_level_base = 400 * 1024;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
// options = CurrentOptions(options); // options = CurrentOptions(options);
std::vector<std::string> filenames; std::vector<std::string> filenames;
@ -1010,7 +1010,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4; options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024; options.max_bytes_for_level_base = 400 * 1024;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
// options = CurrentOptions(options); // options = CurrentOptions(options);
std::vector<std::string> filenames; std::vector<std::string> filenames;
@ -1129,7 +1129,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
options.max_bytes_for_level_multiplier = 1; options.max_bytes_for_level_multiplier = 1;
options.target_file_size_base = 200 << 10; // 200KB options.target_file_size_base = 200 << 10; // 200KB
options.target_file_size_multiplier = 1; options.target_file_size_multiplier = 1;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -1267,7 +1267,7 @@ TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
TEST_P(DBCompactionTestWithParam, ManualCompaction) { TEST_P(DBCompactionTestWithParam, ManualCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// iter - 0 with 7 levels // iter - 0 with 7 levels
@ -1319,7 +1319,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// iter - 0 with 7 levels // iter - 0 with 7 levels
@ -1423,7 +1423,7 @@ TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
options.level0_stop_writes_trigger = 2; options.level0_stop_writes_trigger = 2;
options.max_bytes_for_level_multiplier = 2; options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression; options.compression = kNoCompression;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -1483,7 +1483,7 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
options.target_file_size_base; options.target_file_size_base;
options.max_bytes_for_level_multiplier = 2; options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression; options.compression = kNoCompression;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
@ -1564,7 +1564,7 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2; // trigger compaction when we have 2 files 2; // trigger compaction when we have 2 files
OnFileDeletionListener* listener = new OnFileDeletionListener(); OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
@ -1638,7 +1638,7 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4; options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024; options.max_bytes_for_level_base = 400 * 1024;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
// First two levels have no compression, so that a trivial move between // First two levels have no compression, so that a trivial move between
// them will be allowed. Level 2 has Zlib compression so that a trivial // them will be allowed. Level 2 has Zlib compression so that a trivial
// move to level 3 will not be allowed // move to level 3 will not be allowed
@ -1751,7 +1751,7 @@ TEST_P(DBCompactionTestWithParam, SuggestCompactRangeNoTwoLevel0Compactions) {
options.target_file_size_base = 98 << 10; options.target_file_size_base = 98 << 10;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.max_background_compactions = 2; options.max_background_compactions = 2;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
@ -1808,7 +1808,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
Options options; Options options;
options.write_buffer_size = 100000000; options.write_buffer_size = 100000000;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);

@ -120,15 +120,13 @@ class DBTest : public DBTestBase {
class DBTestWithParam : public DBTest, class DBTestWithParam : public DBTest,
public testing::WithParamInterface<uint32_t> { public testing::WithParamInterface<uint32_t> {
public: public:
DBTestWithParam() { DBTestWithParam() { max_subcompactions_ = GetParam(); }
num_subcompactions_ = GetParam();
}
// Required if inheriting from testing::WithParamInterface<> // Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {} static void SetUpTestCase() {}
static void TearDownTestCase() {} static void TearDownTestCase() {}
uint32_t num_subcompactions_; uint32_t max_subcompactions_;
}; };
TEST_F(DBTest, Empty) { TEST_F(DBTest, Empty) {
@ -5745,7 +5743,7 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) {
options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB
options.compression = kNoCompression; options.compression = kNoCompression;
options.create_if_missing = true; options.create_if_missing = true;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
if (iter == 1) { if (iter == 1) {
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
} }
@ -6389,7 +6387,7 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
const int kNumL0Files = 4; const int kNumL0Files = 4;
options.level0_file_num_compaction_trigger = kNumL0Files; options.level0_file_num_compaction_trigger = kNumL0Files;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"}, {"DBTest::ThreadStatusSingleCompaction:0", "DBImpl::BGWorkCompaction"},
@ -6438,7 +6436,7 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
TEST_P(DBTestWithParam, PreShutdownManualCompaction) { TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_background_flushes = 0; options.max_background_flushes = 0;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// iter - 0 with 7 levels // iter - 0 with 7 levels
@ -6524,7 +6522,7 @@ TEST_P(DBTestWithParam, PreShutdownMultipleCompaction) {
options.max_background_compactions = kLowPriCount; options.max_background_compactions = kLowPriCount;
options.level0_stop_writes_trigger = 1 << 10; options.level0_stop_writes_trigger = 1 << 10;
options.level0_slowdown_writes_trigger = 1 << 10; options.level0_slowdown_writes_trigger = 1 << 10;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
TryReopen(options); TryReopen(options);
Random rnd(301); Random rnd(301);
@ -6613,7 +6611,7 @@ TEST_P(DBTestWithParam, PreShutdownCompactionMiddle) {
options.max_background_compactions = kLowPriCount; options.max_background_compactions = kLowPriCount;
options.level0_stop_writes_trigger = 1 << 10; options.level0_stop_writes_trigger = 1 << 10;
options.level0_slowdown_writes_trigger = 1 << 10; options.level0_slowdown_writes_trigger = 1 << 10;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
TryReopen(options); TryReopen(options);
Random rnd(301); Random rnd(301);
@ -6938,7 +6936,7 @@ TEST_P(DBTestWithParam, DynamicCompactionOptions) {
options.target_file_size_multiplier = 1; options.target_file_size_multiplier = 1;
options.max_bytes_for_level_base = k128KB; options.max_bytes_for_level_base = k128KB;
options.max_bytes_for_level_multiplier = 4; options.max_bytes_for_level_multiplier = 4;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
// Block flush thread and disable compaction thread // Block flush thread and disable compaction thread
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
@ -7670,7 +7668,7 @@ TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
options.statistics = rocksdb::CreateDBStatistics(); options.statistics = rocksdb::CreateDBStatistics();
options.merge_operator.reset(new DelayedMergeOperator(this)); options.merge_operator.reset(new DelayedMergeOperator(this));
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options); DestroyAndReopen(options);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -7690,7 +7688,7 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.create_if_missing = true; options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics(); options.statistics = rocksdb::CreateDBStatistics();
options.num_subcompactions = num_subcompactions_; options.max_subcompactions = max_subcompactions_;
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);

@ -908,7 +908,7 @@ struct DBOptions {
// into multiple, smaller ones that are run simultaneously. This is still // into multiple, smaller ones that are run simultaneously. This is still
// under development and is only available for level-based compaction. // under development and is only available for level-based compaction.
// Default: 1 // Default: 1
uint32_t num_subcompactions; uint32_t max_subcompactions;
// Maximum number of concurrent background memtable flush jobs, submitted to // Maximum number of concurrent background memtable flush jobs, submitted to
// the HIGH priority thread pool. // the HIGH priority thread pool.

@ -1883,7 +1883,7 @@ class StressTest {
options_.max_manifest_file_size = 10 * 1024; options_.max_manifest_file_size = 10 * 1024;
options_.filter_deletes = FLAGS_filter_deletes; options_.filter_deletes = FLAGS_filter_deletes;
options_.inplace_update_support = FLAGS_in_place_update; options_.inplace_update_support = FLAGS_in_place_update;
options_.num_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions); options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) {
fprintf(stderr, fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n"); "prefix_size should be non-zero iff memtablerep == prefix_hash\n");

@ -152,7 +152,7 @@ bool DBTestBase::ChangeCompactOptions() {
option_config_ = kLevelSubcompactions; option_config_ = kLevelSubcompactions;
Destroy(last_options_); Destroy(last_options_);
auto options = CurrentOptions(); auto options = CurrentOptions();
options.num_subcompactions = 4; options.max_subcompactions = 4;
TryReopen(options); TryReopen(options);
return true; return true;
} else { } else {
@ -312,7 +312,7 @@ Options DBTestBase::CurrentOptions(
break; break;
} }
case kLevelSubcompactions: { case kLevelSubcompactions: {
options.num_subcompactions = 2; options.max_subcompactions = 2;
break; break;
} }

@ -40,7 +40,7 @@ struct MutableCFOptions {
max_bytes_for_level_multiplier_additional( max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
verify_checksums_in_compaction(options.verify_checksums_in_compaction), verify_checksums_in_compaction(options.verify_checksums_in_compaction),
num_subcompactions(options.num_subcompactions), max_subcompactions(options.max_subcompactions),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations), options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks), paranoid_file_checks(options.paranoid_file_checks),
@ -73,7 +73,7 @@ struct MutableCFOptions {
max_bytes_for_level_base(0), max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
verify_checksums_in_compaction(false), verify_checksums_in_compaction(false),
num_subcompactions(1), max_subcompactions(1),
max_sequential_skip_in_iterations(0), max_sequential_skip_in_iterations(0),
paranoid_file_checks(false), paranoid_file_checks(false),
compaction_measure_io_stats(false) {} compaction_measure_io_stats(false) {}
@ -125,7 +125,7 @@ struct MutableCFOptions {
int max_bytes_for_level_multiplier; int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional; std::vector<int> max_bytes_for_level_multiplier_additional;
bool verify_checksums_in_compaction; bool verify_checksums_in_compaction;
int num_subcompactions; int max_subcompactions;
// Misc options // Misc options
uint64_t max_sequential_skip_in_iterations; uint64_t max_sequential_skip_in_iterations;

@ -221,7 +221,7 @@ DBOptions::DBOptions()
wal_dir(""), wal_dir(""),
delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL),
max_background_compactions(1), max_background_compactions(1),
num_subcompactions(1), max_subcompactions(1),
max_background_flushes(1), max_background_flushes(1),
max_log_file_size(0), max_log_file_size(0),
log_file_time_to_roll(0), log_file_time_to_roll(0),
@ -273,7 +273,7 @@ DBOptions::DBOptions(const Options& options)
delete_obsolete_files_period_micros( delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros), options.delete_obsolete_files_period_micros),
max_background_compactions(options.max_background_compactions), max_background_compactions(options.max_background_compactions),
num_subcompactions(options.num_subcompactions), max_subcompactions(options.max_subcompactions),
max_background_flushes(options.max_background_flushes), max_background_flushes(options.max_background_flushes),
max_log_file_size(options.max_log_file_size), max_log_file_size(options.max_log_file_size),
log_file_time_to_roll(options.log_file_time_to_roll), log_file_time_to_roll(options.log_file_time_to_roll),

@ -114,8 +114,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"db_log_dir", {"db_log_dir",
{offsetof(struct DBOptions, db_log_dir), OptionType::kString}}, {offsetof(struct DBOptions, db_log_dir), OptionType::kString}},
{"wal_dir", {offsetof(struct DBOptions, wal_dir), OptionType::kString}}, {"wal_dir", {offsetof(struct DBOptions, wal_dir), OptionType::kString}},
{"num_subcompactions", {"max_subcompactions",
{offsetof(struct DBOptions, num_subcompactions), OptionType::kUInt32T}}, {offsetof(struct DBOptions, max_subcompactions), OptionType::kUInt32T}},
{"WAL_size_limit_MB", {"WAL_size_limit_MB",
{offsetof(struct DBOptions, WAL_size_limit_MB), OptionType::kUInt64T}}, {offsetof(struct DBOptions, WAL_size_limit_MB), OptionType::kUInt64T}},
{"WAL_ttl_seconds", {"WAL_ttl_seconds",

@ -552,7 +552,7 @@ void VerifyDBOptions(const DBOptions& base_opt, const DBOptions& new_opt) {
ASSERT_EQ(base_opt.wal_dir, new_opt.wal_dir); ASSERT_EQ(base_opt.wal_dir, new_opt.wal_dir);
// uint32_t options // uint32_t options
ASSERT_EQ(base_opt.num_subcompactions, new_opt.num_subcompactions); ASSERT_EQ(base_opt.max_subcompactions, new_opt.max_subcompactions);
// uint64_t options // uint64_t options
ASSERT_EQ(base_opt.WAL_size_limit_MB, new_opt.WAL_size_limit_MB); ASSERT_EQ(base_opt.WAL_size_limit_MB, new_opt.WAL_size_limit_MB);
@ -611,7 +611,7 @@ TEST_F(OptionsTest, DBOptionsSerialization) {
base_options.wal_dir = "path/to/wal_dir"; base_options.wal_dir = "path/to/wal_dir";
// uint32_t options // uint32_t options
base_options.num_subcompactions = rnd.Uniform(100000); base_options.max_subcompactions = rnd.Uniform(100000);
// uint64_t options // uint64_t options
static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);

Loading…
Cancel
Save