diff --git a/HISTORY.md b/HISTORY.md index b1f357bb7..5aab25066 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,8 +1,12 @@ # Rocksdb Change Log ## Unreleased +### Behavior Changes +* Disable delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode and num_levels = 1 in order to avoid a corruption bug. + ### Bug Fixes * Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true. * Fix possible false NotFound status from batched MultiGet using index type kHashSearch. +* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys. ### Public API Change * Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request. diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index c15af5203..352a0be3d 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -1085,6 +1085,8 @@ void CompactionPicker::PickFilesMarkedForCompaction( Random64 rnd(/* seed */ reinterpret_cast(vstorage)); size_t random_file_index = static_cast(rnd.Uniform( static_cast(vstorage->FilesMarkedForCompaction().size()))); + TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction", + &random_file_index); if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) { // found the compaction! diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 278bdb06a..280a4ca9f 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -78,8 +78,17 @@ class CompactionPickerTest : public testing::Test { vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); } + // Create a new VersionStorageInfo object so we can add mode files and then + // merge it with the existing VersionStorageInfo + void AddVersionStorage() { + temp_vstorage_.reset(new VersionStorageInfo( + &icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style, + vstorage_.get(), false)); + } + void DeleteVersionStorage() { vstorage_.reset(); + temp_vstorage_.reset(); files_.clear(); file_map_.clear(); input_files_.clear(); @@ -88,18 +97,24 @@ class CompactionPickerTest : public testing::Test { void Add(int level, uint32_t file_number, const char* smallest, const char* largest, uint64_t file_size = 1, uint32_t path_id = 0, SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100, - size_t compensated_file_size = 0) { - assert(level < vstorage_->num_levels()); + size_t compensated_file_size = 0, bool marked_for_compact = false) { + VersionStorageInfo* vstorage; + if (temp_vstorage_) { + vstorage = temp_vstorage_.get(); + } else { + vstorage = vstorage_.get(); + } + assert(level < vstorage->num_levels()); FileMetaData* f = new FileMetaData( file_number, path_id, file_size, InternalKey(smallest, smallest_seq, kTypeValue), InternalKey(largest, largest_seq, kTypeValue), smallest_seq, - largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber, + largest_seq, marked_for_compact, kInvalidBlobFileNumber, kUnknownOldestAncesterTime, kUnknownFileCreationTime, kUnknownFileChecksum, kUnknownFileChecksumFuncName); f->compensated_file_size = (compensated_file_size != 0) ? compensated_file_size : file_size; - vstorage_->AddFile(level, f); + vstorage->AddFile(level, f); files_.emplace_back(f); file_map_.insert({file_number, {f, level}}); } @@ -122,6 +137,12 @@ class CompactionPickerTest : public testing::Test { } void UpdateVersionStorageInfo() { + if (temp_vstorage_) { + VersionBuilder builder(FileOptions(), &ioptions_, nullptr, + vstorage_.get(), nullptr); + builder.SaveTo(temp_vstorage_.get()); + vstorage_ = std::move(temp_vstorage_); + } vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri); vstorage_->UpdateNumNonEmptyLevels(); @@ -132,6 +153,28 @@ class CompactionPickerTest : public testing::Test { vstorage_->ComputeFilesMarkedForCompaction(); vstorage_->SetFinalized(); } + void AddFileToVersionStorage(int level, uint32_t file_number, + const char* smallest, const char* largest, + uint64_t file_size = 1, uint32_t path_id = 0, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100, + size_t compensated_file_size = 0, + bool marked_for_compact = false) { + VersionStorageInfo* base_vstorage = vstorage_.release(); + vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleUniversal, + base_vstorage, false)); + Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq, + largest_seq, compensated_file_size, marked_for_compact); + + VersionBuilder builder(FileOptions(), &ioptions_, nullptr, base_vstorage, + nullptr); + builder.SaveTo(vstorage_.get()); + UpdateVersionStorageInfo(); + } + + private: + std::unique_ptr temp_vstorage_; }; TEST_F(CompactionPickerTest, Empty) { @@ -1733,6 +1776,163 @@ TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) { ASSERT_EQ(0, compaction->output_level()); } +TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_style = kCompactionStyleUniversal; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + // This test covers the case where a "regular" universal compaction is + // scheduled first, followed by a delete triggered compaction. The latter + // should fail + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300); + Add(3, 5U, "010", "080", 8 * kFileSize, 0, 200, 251); + Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150); + Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction); + // Validate that its a compaction to reduce sorted runs + ASSERT_EQ(CompactionReason::kUniversalSortedRunNum, + compaction->compaction_reason()); + ASSERT_EQ(0, compaction->output_level()); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + + AddVersionStorage(); + // Simulate a flush and mark the file for compaction + Add(0, 1U, "150", "200", kFileSize, 0, 551, 600, 0, true); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction2( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_FALSE(compaction2); +} + +TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_style = kCompactionStyleUniversal; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + // This test covers the case where a delete triggered compaction is + // scheduled first, followed by a "regular" compaction. The latter + // should fail + NewVersionStorage(5, kCompactionStyleUniversal); + + // Mark file number 4 for compaction + Add(0, 4U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true); + Add(3, 5U, "240", "290", 8 * kFileSize, 0, 201, 250); + Add(4, 3U, "301", "350", 8 * kFileSize, 0, 101, 150); + Add(4, 6U, "501", "750", 8 * kFileSize, 0, 101, 150); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction); + // Validate that its a delete triggered compaction + ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction, + compaction->compaction_reason()); + ASSERT_EQ(3, compaction->output_level()); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->num_input_files(1)); + + AddVersionStorage(); + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", 2 * kFileSize, 0, 401, 450); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction2( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_FALSE(compaction2); +} + +TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) { + // The case where universal periodic compaction can be picked + // with some newer files being compacted. + const uint64_t kFileSize = 100000; + + ioptions_.compaction_style = kCompactionStyleUniversal; + + bool input_level_overlap = false; + bool output_level_overlap = false; + // Let's mark 2 files in 2 different levels for compaction. The + // compaction picker will randomly pick one, so use the sync point to + // ensure a deterministic order. Loop until both cases are covered + size_t random_index = 0; + SyncPoint::GetInstance()->SetCallBack( + "CompactionPicker::PickFilesMarkedForCompaction", [&](void* arg) { + size_t* index = static_cast(arg); + *index = random_index; + }); + SyncPoint::GetInstance()->EnableProcessing(); + while (!input_level_overlap || !output_level_overlap) { + // Ensure that the L0 file gets picked first + random_index = !input_level_overlap ? 0 : 1; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "260", "300", 4 * kFileSize, 0, 260, 300, 0, true); + Add(3, 2U, "010", "020", 2 * kFileSize, 0, 201, 248); + Add(3, 3U, "250", "270", 2 * kFileSize, 0, 202, 249); + Add(3, 4U, "290", "310", 2 * kFileSize, 0, 203, 250); + Add(3, 5U, "310", "320", 2 * kFileSize, 0, 204, 251, 0, true); + Add(4, 6U, "301", "350", 8 * kFileSize, 0, 101, 150); + Add(4, 7U, "501", "750", 8 * kFileSize, 0, 101, 150); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction); + // Validate that its a delete triggered compaction + ASSERT_EQ(CompactionReason::kFilesMarkedForCompaction, + compaction->compaction_reason()); + ASSERT_TRUE(compaction->start_level() == 0 || + compaction->start_level() == 3); + if (compaction->start_level() == 0) { + // The L0 file was picked. The next compaction will detect an + // overlap on its input level + input_level_overlap = true; + ASSERT_EQ(3, compaction->output_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(3U, compaction->num_input_files(1)); + } else { + // The level 3 file was picked. The next compaction will pick + // the L0 file and will detect overlap when adding output + // level inputs + output_level_overlap = true; + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->num_input_files(1)); + } + + vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_); + // After recomputing the compaction score, only one marked file will remain + random_index = 0; + std::unique_ptr compaction2( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_FALSE(compaction2); + DeleteVersionStorage(); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index fe2d8e49b..2cdd5962a 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -120,8 +120,7 @@ class UniversalCompactionBuilder { LogBuffer* log_buffer_; static std::vector CalculateSortedRuns( - const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options); + const VersionStorageInfo& vstorage); // Pick a path ID to place a newly generated file, with its estimated file // size. @@ -325,8 +324,7 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo( std::vector UniversalCompactionBuilder::CalculateSortedRuns( - const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/, - const MutableCFOptions& mutable_cf_options) { + const VersionStorageInfo& vstorage) { std::vector ret; for (FileMetaData* f : vstorage.LevelFiles(0)) { ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, @@ -336,27 +334,16 @@ UniversalCompactionBuilder::CalculateSortedRuns( uint64_t total_compensated_size = 0U; uint64_t total_size = 0U; bool being_compacted = false; - bool is_first = true; for (FileMetaData* f : vstorage.LevelFiles(level)) { total_compensated_size += f->compensated_file_size; total_size += f->fd.GetFileSize(); - if (mutable_cf_options.compaction_options_universal.allow_trivial_move == - true) { - if (f->being_compacted) { - being_compacted = f->being_compacted; - } - } else { - // Compaction always includes all files for a non-zero level, so for a - // non-zero level, all the files should share the same being_compacted - // value. - // This assumption is only valid when - // mutable_cf_options.compaction_options_universal.allow_trivial_move - // is false - assert(is_first || f->being_compacted == being_compacted); - } - if (is_first) { + // Size amp, read amp and periodic compactions always include all files + // for a non-zero level. However, a delete triggered compaction and + // a trivial move might pick a subset of files in a sorted run. So + // always check all files in a sorted run and mark the entire run as + // being compacted if one or more files are being compacted + if (f->being_compacted) { being_compacted = f->being_compacted; - is_first = false; } } if (total_compensated_size > 0) { @@ -372,8 +359,7 @@ UniversalCompactionBuilder::CalculateSortedRuns( Compaction* UniversalCompactionBuilder::PickCompaction() { const int kLevel0 = 0; score_ = vstorage_->CompactionScore(kLevel0); - sorted_runs_ = - CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_); + sorted_runs_ = CalculateSortedRuns(*vstorage_); if (sorted_runs_.size() == 0 || (vstorage_->FilesMarkedForPeriodicCompaction().empty() && @@ -855,6 +841,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { std::vector inputs; if (vstorage_->num_levels() == 1) { +#if defined(ENABLE_SINGLE_LEVEL_DTC) // This is single level universal. Since we're basically trying to reclaim // space by processing files marked for compaction due to high tombstone // density, let's do the same thing as compaction to reduce size amp which @@ -877,6 +864,11 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { return nullptr; } inputs.push_back(start_level_inputs); +#else + // Disable due to a known race condition. + // TODO: Reenable once the race condition is fixed + return nullptr; +#endif // ENABLE_SINGLE_LEVEL_DTC } else { int start_level; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 61531ae16..889e380dd 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1953,6 +1953,7 @@ TEST_F(DBTestUniversalCompaction2, BasicL0toL1) { ASSERT_GT(NumTableFilesAtLevel(6), 0); } +#if defined(ENABLE_SINGLE_LEVEL_DTC) TEST_F(DBTestUniversalCompaction2, SingleLevel) { const int kNumKeys = 3000; const int kWindowSize = 100; @@ -1991,6 +1992,7 @@ TEST_F(DBTestUniversalCompaction2, SingleLevel) { dbfull()->TEST_WaitForCompact(); ASSERT_EQ(1, NumTableFilesAtLevel(0)); } +#endif // ENABLE_SINGLE_LEVEL_DTC TEST_F(DBTestUniversalCompaction2, MultipleLevels) { const int kWindowSize = 100; diff --git a/db/version_set.cc b/db/version_set.cc index bf23d1e0e..19ecfb24f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2388,6 +2388,11 @@ void VersionStorageInfo::ComputeCompactionScore( // compaction score for the whole DB. Adding other levels as if // they are L0 files. for (int i = 1; i < num_levels(); i++) { + // Its possible that a subset of the files in a level may be in a + // compaction, due to delete triggered compaction or trivial move. + // In that case, the below check may not catch a level being + // compacted as it only checks the first file. The worst that can + // happen is a scheduled compaction thread will find nothing to do. if (!files_[i].empty() && !files_[i][0]->being_compacted) { num_sorted_runs++; }