UniversalCompaction should ignore sorted runs being compacted (when compacting for file num)

Summary:
If we have total number of sorted runs greater than level0_file_num_compaction_trigger, Universal compaction will always issue a compaction
even if the number of sorted runs that are not being compacted is less than level0_file_num_compaction_trigger.

This diff changes this behaviour to relay on the `number of sorted runs not being compacted` instead of `total number of sorted runs`

Test Plan: New unit test

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61533
main
Islam AbdelRahman 9 years ago
parent 1b0069ce2d
commit ccecf3f4fb
  1. 5
      db/compaction_job_stats_test.cc
  2. 37
      db/compaction_picker.cc
  3. 2
      db/db_test_util.cc
  4. 160
      db/db_universal_compaction_test.cc

@ -957,7 +957,7 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
uint64_t key_base = 100000000l;
// Note: key_base must be multiple of num_keys_per_L0_file
int num_keys_per_table = 100;
const uint32_t kTestScale = 8;
const uint32_t kTestScale = 6;
const int kKeySize = 10;
const int kValueSize = 900;
double compression_ratio = 1.0;
@ -1008,8 +1008,9 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
num_input_units,
num_keys_per_table * num_input_units,
1.0, 0, false));
dbfull()->TEST_WaitForCompact();
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 4U);
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale;

@ -1313,6 +1313,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(
sorted_runs.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", cf_name.c_str());
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
return nullptr;
}
VersionStorageInfo::LevelSummaryStorage tmp;
@ -1347,19 +1349,34 @@ Compaction* UniversalCompactionPicker::PickCompaction(
assert(sorted_runs.size() >=
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger));
unsigned int num_files =
static_cast<unsigned int>(sorted_runs.size()) -
mutable_cf_options.level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, sorted_runs, log_buffer)) != nullptr) {
LogToBuffer(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
// Get the total number of sorted runs that are not being compacted
int num_sr_not_compacted = 0;
for (size_t i = 0; i < sorted_runs.size(); i++) {
if (sorted_runs[i].being_compacted == false) {
num_sr_not_compacted++;
}
}
// The number of sorted runs that are not being compacted is greater than
// the maximum allowed number of sorted runs
if (num_sr_not_compacted >
mutable_cf_options.level0_file_num_compaction_trigger) {
unsigned int num_files =
num_sr_not_compacted -
mutable_cf_options.level0_file_num_compaction_trigger + 1;
if ((c = PickCompactionUniversalReadAmp(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, sorted_runs, log_buffer)) != nullptr) {
LogToBuffer(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
}
}
}
}
if (c == nullptr) {
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
return nullptr;
}
@ -1412,6 +1429,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(
level0_compactions_in_progress_.insert(c);
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
c);
return c;
}

@ -730,7 +730,7 @@ double DBTestBase::CompressionRatioAtLevel(int level, int cf) {
int DBTestBase::TotalTableFiles(int cf, int levels) {
if (levels == -1) {
levels = CurrentOptions().num_levels;
levels = (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
}
int result = 0;
for (int level = 0; level < levels; level++) {

@ -637,10 +637,132 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
}
}
TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 1 * 1024; // 1KB
options.level0_file_num_compaction_trigger = 7;
options.max_background_compactions = 2;
options.target_file_size_base = 1024 * 1024; // 1MB
// Disable size amplifiction compaction
options.compaction_options_universal.max_size_amplification_percent =
UINT_MAX;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
"BackgroundCallCompaction:0"},
{"UniversalCompactionPicker::PickCompaction:Return",
"DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
{"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
"CompactionJob::Run():Start"}});
int total_picked_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) {
if (arg) {
total_picked_compactions++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Write 7 files to trigger compaction
int key_idx = 1;
for (int i = 1; i <= 70; i++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
if (i % 10 == 0) {
ASSERT_OK(Flush());
}
}
// Wait for the 1st background compaction process to start
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
rocksdb::SyncPoint::GetInstance()->ClearTrace();
// Write 3 files while 1st compaction is held
// These 3 files have different sizes to avoid compacting based on size_ratio
int num_keys = 1000;
for (int i = 0; i < 3; i++) {
for (int j = 1; j <= num_keys; j++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
}
ASSERT_OK(Flush());
num_keys -= 100;
}
// Wait for the 2nd background compaction process to start
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
// Hold the 1st and 2nd compaction from finishing
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
dbfull()->TEST_WaitForCompact();
// Although 2 compaction threads started, the second one did not compact
// anything because the number of files not being compacted is less than
// level0_file_num_compaction_trigger
EXPECT_EQ(total_picked_compactions, 1);
EXPECT_EQ(TotalTableFiles(), 4);
// Stop SyncPoint and destroy the DB and reopen it again
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
key_idx = 1;
total_picked_compactions = 0;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Write 7 files to trigger compaction
for (int i = 1; i <= 70; i++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
if (i % 10 == 0) {
ASSERT_OK(Flush());
}
}
// Wait for the 1st background compaction process to start
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
rocksdb::SyncPoint::GetInstance()->ClearTrace();
// Write 8 files while 1st compaction is held
// These 8 files have different sizes to avoid compacting based on size_ratio
num_keys = 1000;
for (int i = 0; i < 8; i++) {
for (int j = 1; j <= num_keys; j++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
}
ASSERT_OK(Flush());
num_keys -= 100;
}
// Wait for the 2nd background compaction process to start
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
// Hold the 1st and 2nd compaction from finishing
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
dbfull()->TEST_WaitForCompact();
// This time we will trigger a compaction because of size ratio and
// another compaction because of number of files that are not compacted
// greater than 7
EXPECT_GE(total_picked_compactions, 2);
}
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel,
DBTestUniversalCompactionParallel,
::testing::Combine(::testing::Values(1, 10),
::testing::Bool()));
::testing::Values(false)));
TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
Options options = CurrentOptions();
@ -996,13 +1118,13 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 2, 4)
// (1, 2, 4) -> (3, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 1, 2, 4) -> (8)
// (1, 3, 4) -> (8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
@ -1016,22 +1138,22 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
// (1, 2, 8)
// (1, 2, 8) -> (3, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 1, 2, 8) -> (4, 8)
// (1, 3, 8) -> (4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
// (1, 4, 8)
// (1, 4, 8) -> (5, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
ASSERT_EQ(0, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
@ -1195,12 +1317,12 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 2, 4)
// (1, 2, 4) -> (3, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(2, GetSstFileCount(dbname_));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 1, 2, 4) -> (8)
// (1, 3, 4) -> (8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
@ -1215,20 +1337,20 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 2, 8)
// (1, 2, 8) -> (3, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(2, GetSstFileCount(dbname_));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 1, 2, 8) -> (4, 8)
// (1, 3, 8) -> (4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 4, 8)
// (1, 4, 8) -> (5, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
ASSERT_EQ(0, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));

Loading…
Cancel
Save