Fix race due to delete triggered compaction in Universal compaction mode (#6799)

Summary:
Delete triggered compaction in universal compaction mode was causing a corruption when scheduled in parallel with other compactions.
1. When num_levels = 1, a file marked for compaction may be picked along with all older files in L0, without checking if any of them are already being compaction. This can cause unpredictable results like resurrection of older versions of keys or deleted keys.
2. When num_levels > 1, a delete triggered compaction would not get scheduled if it overlaps with a running regular compaction. However, the reverse is not true. This is due to the fact that in ```UniversalCompactionBuilder::CalculateSortedRuns```, it assumes that entire sorted runs are picked for compaction and only checks the first file in a sorted run to determine conflicts. This is violated by a delete triggered compaction as it works on a subset of a sorted run.

Fix the bug for num_levels > 1, and disable the feature for now when num_levels = 1. After disabling this feature, files would still get marked for compaction, but no compaction would get scheduled.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6799

Reviewed By: pdillinger

Differential Revision: D21431286

Pulled By: anand1976

fbshipit-source-id: ae9f0bdb1d6ae2f10284847db731c23f43af164a
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 3730b05dc9
commit 94265234de
  1. 4
      HISTORY.md
  2. 2
      db/compaction/compaction_picker.cc
  3. 208
      db/compaction/compaction_picker_test.cc
  4. 38
      db/compaction/compaction_picker_universal.cc
  5. 2
      db/db_universal_compaction_test.cc
  6. 5
      db/version_set.cc

@ -1,8 +1,12 @@
# Rocksdb Change Log
## Unreleased
### Behavior Changes
* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug.
### Bug Fixes
* Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true.
* Fix possible false NotFound status from batched MultiGet using index type kHashSearch.
* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys.
### Public API Change
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.

@ -1085,6 +1085,8 @@ void CompactionPicker::PickFilesMarkedForCompaction(
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction",
&random_file_index);
if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
// found the compaction!

@ -78,8 +78,17 @@ class CompactionPickerTest : public testing::Test {
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
}
// Create a new VersionStorageInfo object so we can add mode files and then
// merge it with the existing VersionStorageInfo
void AddVersionStorage() {
temp_vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
vstorage_.get(), false));
}
void DeleteVersionStorage() {
vstorage_.reset();
temp_vstorage_.reset();
files_.clear();
file_map_.clear();
input_files_.clear();
@ -88,18 +97,24 @@ class CompactionPickerTest : public testing::Test {
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0) {
assert(level < vstorage_->num_levels());
size_t compensated_file_size = 0, bool marked_for_compact = false) {
VersionStorageInfo* vstorage;
if (temp_vstorage_) {
vstorage = temp_vstorage_.get();
} else {
vstorage = vstorage_.get();
}
assert(level < vstorage->num_levels());
FileMetaData* f = new FileMetaData(
file_number, path_id, file_size,
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
largest_seq, marked_for_compact, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
vstorage_->AddFile(level, f);
vstorage->AddFile(level, f);
files_.emplace_back(f);
file_map_.insert({file_number, {f, level}});
}
@ -122,6 +137,12 @@ class CompactionPickerTest : public testing::Test {
}
void UpdateVersionStorageInfo() {
if (temp_vstorage_) {
VersionBuilder builder(FileOptions(), &ioptions_, nullptr,
vstorage_.get(), nullptr);
builder.SaveTo(temp_vstorage_.get());
vstorage_ = std::move(temp_vstorage_);
}
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
vstorage_->UpdateNumNonEmptyLevels();
@ -132,6 +153,28 @@ class CompactionPickerTest : public testing::Test {
vstorage_->ComputeFilesMarkedForCompaction();
vstorage_->SetFinalized();
}
void AddFileToVersionStorage(int level, uint32_t file_number,
const char* smallest, const char* largest,
uint64_t file_size = 1, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100,
size_t compensated_file_size = 0,
bool marked_for_compact = false) {
VersionStorageInfo* base_vstorage = vstorage_.release();
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleUniversal,
base_vstorage, false));
Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq,
largest_seq, compensated_file_size, marked_for_compact);
VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage,
nullptr);
builder.SaveTo(vstorage_.get());
UpdateVersionStorageInfo();
}
private:
std::unique_ptr<VersionStorageInfo> temp_vstorage_;
};
TEST_F(CompactionPickerTest, Empty) {
@ -1733,6 +1776,163 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) {
ASSERT_EQ(0, compaction->output_level());
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a "regular" universal compaction is
// scheduled first, followed by a delete triggered compaction. The latter
// should fail
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300);
Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251);
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a compaction to reduce sorted runs
ASSERT_EQ(CompactionReason::kUniversalSortedRunNum,
compaction->compaction_reason());
ASSERT_EQ(0, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
AddVersionStorage();
// Simulate a flush and mark the file for compaction
Add(0, 1U, "150", "200", kFileSize, 0, 551, 600, 0, true);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
// This test covers the case where a delete triggered compaction is
// scheduled first, followed by a "regular" compaction. The latter
// should fail
NewVersionStorage(5, kCompactionStyleUniversal);
// Mark file number 4 for compaction
Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250);
Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_EQ(3, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
AddVersionStorage();
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
}
TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
// The case where universal periodic compaction can be picked
// with some newer files being compacted.
const uint64_t kFileSize = 100000;
ioptions_.compaction_style = kCompactionStyleUniversal;
bool input_level_overlap = false;
bool output_level_overlap = false;
// Let's mark 2 files in 2 different levels for compaction. The
// compaction picker will randomly pick one, so use the sync point to
// ensure a deterministic order. Loop until both cases are covered
size_t random_index = 0;
SyncPoint::GetInstance()->SetCallBack(
"CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) {
size_t* index = static_cast<size_t*>(arg);
*index = random_index;
});
SyncPoint::GetInstance()->EnableProcessing();
while (!input_level_overlap || !output_level_overlap) {
// Ensure that the L0 file gets picked first
random_index = !input_level_overlap ? 0 : 1;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true);
Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248);
Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249);
Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250);
Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true);
Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150);
Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction,
compaction->compaction_reason());
ASSERT_TRUE(compaction->start_level() == 0 ||
compaction->start_level() == 3);
if (compaction->start_level() == 0) {
// The L0 file was picked. The next compaction will detect an
// overlap on its input level
input_level_overlap = true;
ASSERT_EQ(3, compaction->output_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(3U, compaction->num_input_files(1));
} else {
// The level 3 file was picked. The next compaction will pick
// the L0 file and will detect overlap when adding output
// level inputs
output_level_overlap = true;
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->num_input_files(1));
}
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
// After recomputing the compaction score, only one marked file will remain
random_index = 0;
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction2);
DeleteVersionStorage();
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -120,8 +120,7 @@ class UniversalCompactionBuilder {
LogBuffer* log_buffer_;
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
const VersionStorageInfo& vstorage);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
@ -325,8 +324,7 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
std::vector<UniversalCompactionBuilder::SortedRun>
UniversalCompactionBuilder::CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
const MutableCFOptions& mutable_cf_options) {
const VersionStorageInfo& vstorage) {
std::vector<UniversalCompactionBuilder::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
@ -336,27 +334,16 @@ UniversalCompactionBuilder::CalculateSortedRuns(
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
bool is_first = true;
for (FileMetaData* f : vstorage.LevelFiles(level)) {
total_compensated_size += f->compensated_file_size;
total_size += f->fd.GetFileSize();
if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
true) {
if (f->being_compacted) {
being_compacted = f->being_compacted;
}
} else {
// Compaction always includes all files for a non-zero level, so for a
// non-zero level, all the files should share the same being_compacted
// value.
// This assumption is only valid when
// mutable_cf_options.compaction_options_universal.allow_trivial_move
// is false
assert(is_first || f->being_compacted == being_compacted);
}
if (is_first) {
// Size amp, read amp and periodic compactions always include all files
// for a non-zero level. However, a delete triggered compaction and
// a trivial move might pick a subset of files in a sorted run. So
// always check all files in a sorted run and mark the entire run as
// being compacted if one or more files are being compacted
if (f->being_compacted) {
being_compacted = f->being_compacted;
is_first = false;
}
}
if (total_compensated_size > 0) {
@ -372,8 +359,7 @@ UniversalCompactionBuilder::CalculateSortedRuns(
Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ =
CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
sorted_runs_ = CalculateSortedRuns(*vstorage_);
if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
@ -855,6 +841,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
std::vector<CompactionInputFiles> inputs;
if (vstorage_->num_levels() == 1) {
#if defined(ENABLE_SINGLE_LEVEL_DTC)
// This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which
@ -877,6 +864,11 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}
inputs.push_back(start_level_inputs);
#else
// Disable due to a known race condition.
// TODO: Reenable once the race condition is fixed
return nullptr;
#endif // ENABLE_SINGLE_LEVEL_DTC
} else {
int start_level;

@ -1953,6 +1953,7 @@ TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
#if defined(ENABLE_SINGLE_LEVEL_DTC)
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
@ -1991,6 +1992,7 @@ TEST_F(DBTestUniversalCompaction2, SingleLevel) {
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
#endif // ENABLE_SINGLE_LEVEL_DTC
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
const int kWindowSize = 100;

@ -2388,6 +2388,11 @@ void VersionStorageInfo::ComputeCompactionScore(
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
// Its possible that a subset of the files in a level may be in a
// compaction, due to delete triggered compaction or trivial move.
// In that case, the below check may not catch a level being
// compacted as it only checks the first file. The worst that can
// happen is a scheduled compaction thread will find nothing to do.
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}

Loading…
Cancel
Save