Compaction picker to expand output level files for keys cross files' boundary too.

Summary: We may wrongly drop delete operation if we pick a file with the entry to be delete, the put entry of the same user key is in the next file in the level, and the next file is not picked. We expand compaction inputs for output level too.

Test Plan: Add unit tests that reproduct the bug of dropping delete entry. Change compaction_picker_test to assert the new behavior.

Reviewers: IslamAbdelRahman, igor

Reviewed By: igor

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61173
main
sdong 9 years ago
parent ac0d93b08f
commit 0ce258f9b3
  1. 3
      db/compaction_picker.cc
  2. 35
      db/compaction_picker_test.cc
  3. 66
      db/db_compaction_test.cc

@ -390,6 +390,9 @@ bool CompactionPicker::SetupOtherInputs(
vstorage->GetOverlappingInputs(output_level, &smallest, &largest, vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
&output_level_inputs->files, *parent_index, &output_level_inputs->files, *parent_index,
parent_index); parent_index);
if (!output_level_inputs->empty()) {
ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs);
}
if (FilesInCompaction(output_level_inputs->files)) { if (FilesInCompaction(output_level_inputs->files)) {
return false; return false;

@ -621,9 +621,9 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys2) {
// Overlapping user keys on same level and output level // Overlapping user keys on same level and output level
Add(1, 1U, "200", "400", 1000000000U); Add(1, 1U, "200", "400", 1000000000U);
Add(1, 2U, "400", "500", 1U, 0, 0); Add(1, 2U, "400", "500", 1U, 0, 0);
Add(2, 3U, "400", "600", 1U); Add(2, 3U, "000", "100", 1U);
// The following file is not in the compaction despite overlapping user keys Add(2, 4U, "100", "600", 1U, 0, 0);
Add(2, 4U, "600", "700", 1U, 0, 0); Add(2, 5U, "600", "700", 1U, 0, 0);
UpdateVersionStorageInfo(); UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction( std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
@ -631,10 +631,12 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys2) {
ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1)); ASSERT_EQ(3U, compaction->num_input_files(1));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber()); ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(4U, compaction->input(1, 1)->fd.GetNumber());
ASSERT_EQ(5U, compaction->input(1, 2)->fd.GetNumber());
} }
TEST_F(CompactionPickerTest, OverlappingUserKeys3) { TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
@ -666,6 +668,31 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber()); ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
} }
TEST_F(CompactionPickerTest, OverlappingUserKeys4) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.max_bytes_for_level_base = 1000000;
Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "150", "199", 1U, 0, 0);
Add(1, 3U, "200", "250", 1100000U, 0, 0);
Add(1, 4U, "251", "300", 1U, 0, 0);
Add(1, 5U, "300", "350", 1U, 0, 0);
Add(2, 6U, "100", "115", 1U);
Add(2, 7U, "125", "325", 1U);
Add(2, 8U, "350", "400", 1U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(1, 0)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) { TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
NewVersionStorage(6, kCompactionStyleLevel); NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2; mutable_cf_options_.level0_file_num_compaction_trigger = 2;

@ -649,6 +649,72 @@ TEST_F(DBCompactionTest, MinorCompactionsHappen) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST_F(DBCompactionTest, UserKeyCrossFile1) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 3;
DestroyAndReopen(options);
// create first file and flush to l0
Put("4", "A");
Put("3", "A");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
Put("2", "A");
Delete("3");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("NOT_FOUND", Get("3"));
// move both files down to l1
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ("NOT_FOUND", Get("3"));
for (int i = 0; i < 3; i++) {
Put("2", "B");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("NOT_FOUND", Get("3"));
}
TEST_F(DBCompactionTest, UserKeyCrossFile2) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 3;
DestroyAndReopen(options);
// create first file and flush to l0
Put("4", "A");
Put("3", "A");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
Put("2", "A");
SingleDelete("3");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("NOT_FOUND", Get("3"));
// move both files down to l1
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ("NOT_FOUND", Get("3"));
for (int i = 0; i < 3; i++) {
Put("2", "B");
Flush();
dbfull()->TEST_WaitForFlushMemTable();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("NOT_FOUND", Get("3"));
}
TEST_F(DBCompactionTest, ZeroSeqIdCompaction) { TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel; options.compaction_style = kCompactionStyleLevel;

Loading…
Cancel
Save