Fix delete triggered compaction for single level universal (#7224)

Summary:
Delete triggered compaction (DTC) for universal compaction style with ```num_levels = 1``` has been disabled for sometime due to a data correctness bug. This PR re-enables it with a bug fix. A file marked for compaction can be picked, along with all L0 files after it as the compaction input. We stop adding files to the input once we encounter a file already being compacted (the original bug failed to check the compaction status of the files).

Tests:
Add unit tests to ```compaction_picker_test.cc```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7224

Reviewed By: ajkr

Differential Revision: D23031845

Pulled By: anand1976

fbshipit-source-id: 9de3cab5f9774cede666c2c48d309a7d9b88a505
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 6e99de6d3c
commit f308da5273
  1. 121
      db/compaction/compaction_picker_test.cc
  2. 36
      db/compaction/compaction_picker_universal.cc

@ -2016,6 +2016,127 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
DeleteVersionStorage(); DeleteVersionStorage();
} }
} }
TEST_F(CompactionPickerTest, UniversalMarkedL0NoOverlap) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a delete triggered compaction is
// scheduled and should result in a full compaction
NewVersionStorage(1, kCompactionStyleUniversal);
// Mark file number 4 for compaction
Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300, 0, true);
Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250);
Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(4U, compaction->num_input_files(0));
ASSERT_TRUE(file_map_[4].first->being_compacted);
ASSERT_TRUE(file_map_[5].first->being_compacted);
ASSERT_TRUE(file_map_[3].first->being_compacted);
ASSERT_TRUE(file_map_[6].first->being_compacted);
}
TEST_F(CompactionPickerTest, UniversalMarkedL0WithOverlap) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a file is being compacted, and a
// delete triggered compaction is then scheduled. The latter should stop
// at the first file being compacted
NewVersionStorage(1, kCompactionStyleUniversal);
// Mark file number 4 for compaction
Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300, 0, true);
Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250);
Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
UpdateVersionStorageInfo();
file_map_[3].first->being_compacted = true;
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_TRUE(file_map_[4].first->being_compacted);
ASSERT_TRUE(file_map_[5].first->being_compacted);
}
TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a delete triggered compaction is
// scheduled first, followed by a "regular" compaction. The latter
// should fail
NewVersionStorage(1, kCompactionStyleUniversal);
// Mark file number 4 for compaction
Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300);
Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250, 0, true);
Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
Add(0, 6U, "501", "750", 8 * kFileSize, 0, 50, 100);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(3U, compaction->num_input_files(0));
ASSERT_TRUE(file_map_[5].first->being_compacted);
ASSERT_TRUE(file_map_[3].first->being_compacted);
ASSERT_TRUE(file_map_[6].first->being_compacted);
AddVersionStorage();
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction2);
ASSERT_EQ(3U, compaction->num_input_files(0));
ASSERT_TRUE(file_map_[1].first->being_compacted);
ASSERT_TRUE(file_map_[2].first->being_compacted);
ASSERT_TRUE(file_map_[4].first->being_compacted);
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -841,34 +841,48 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
std::vector<CompactionInputFiles> inputs; std::vector<CompactionInputFiles> inputs;
if (vstorage_->num_levels() == 1) { if (vstorage_->num_levels() == 1) {
#if defined(ENABLE_SINGLE_LEVEL_DTC)
// This is single level universal. Since we're basically trying to reclaim // This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone // space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which // density, let's do the same thing as compaction to reduce size amp which
// has the same goals. // has the same goals.
bool compact = false; int start_index = -1;
start_level_inputs.level = 0; start_level_inputs.level = 0;
start_level_inputs.files.clear(); start_level_inputs.files.clear();
output_level = 0; output_level = 0;
for (FileMetaData* f : vstorage_->LevelFiles(0)) { // Find the first file marked for compaction. Ignore the last file
if (f->marked_for_compaction) { for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) {
compact = true; SortedRun* sr = &sorted_runs_[loop];
if (sr->being_compacted) {
continue;
} }
if (compact) { FileMetaData* f = vstorage_->LevelFiles(0)[loop];
if (f->marked_for_compaction) {
start_level_inputs.files.push_back(f); start_level_inputs.files.push_back(f);
start_index =
static_cast<int>(loop); // Consider this as the first candidate.
break;
} }
} }
if (start_index < 0) {
// Either no file marked, or they're already being compacted
return nullptr;
}
for (size_t loop = start_index + 1; loop < sorted_runs_.size(); loop++) {
SortedRun* sr = &sorted_runs_[loop];
if (sr->being_compacted) {
break;
}
FileMetaData* f = vstorage_->LevelFiles(0)[loop];
start_level_inputs.files.push_back(f);
}
if (start_level_inputs.size() <= 1) { if (start_level_inputs.size() <= 1) {
// If only the last file in L0 is marked for compaction, ignore it // If only the last file in L0 is marked for compaction, ignore it
return nullptr; return nullptr;
} }
inputs.push_back(start_level_inputs); inputs.push_back(start_level_inputs);
#else
// Disable due to a known race condition.
// TODO: Reenable once the race condition is fixed
return nullptr;
#endif // ENABLE_SINGLE_LEVEL_DTC
} else { } else {
int start_level; int start_level;

Loading…
Cancel
Save