diff --git a/HISTORY.md b/HISTORY.md index b5e959c1b..8a690db9d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -25,6 +25,7 @@ * Fixed a bug where RocksDB could be doing compaction endlessly when allow_ingest_behind is true and the bottommost level is not filled (#10767). * Fixed a memory safety bug in experimental HyperClockCache (#10768) * Fixed some cases where `ldb update_manifest` and `ldb unsafe_remove_sst_file` are not usable because they were requiring the DB files to match the existing manifest state (before updating the manifest to match a desired state). +* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. ### Performance Improvements * Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files. diff --git a/db/column_family.cc b/db/column_family.cc index 4cde8dc08..0ce72ee2f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1211,11 +1211,14 @@ Compaction* ColumnFamilyData::CompactRange( const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* conflict, uint64_t max_file_num_to_ignore, const std::string& trim_ts) { + SequenceNumber earliest_mem_seqno = + std::min(mem_->GetEarliestSequenceNumber(), + imm_.current()->GetEarliestSequenceNumber(false)); auto* result = compaction_picker_->CompactRange( GetName(), mutable_cf_options, mutable_db_options, current_->storage_info(), input_level, output_level, compact_range_options, begin, end, compaction_end, conflict, - max_file_num_to_ignore, trim_ts); + max_file_num_to_ignore, trim_ts, earliest_mem_seqno); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index abdecca9f..579d9cbdf 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -32,7 +32,7 @@ bool FindIntraL0Compaction(const std::vector& level_files, uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes, CompactionInputFiles* comp_inputs, - SequenceNumber earliest_mem_seqno) { + const SequenceNumber earliest_mem_seqno) { // Do not pick ingested file when there is at least one memtable not flushed // which of seqno is overlap with the sst. TEST_SYNC_POINT("FindIntraL0Compaction"); @@ -613,7 +613,8 @@ Compaction* CompactionPicker::CompactRange( int input_level, int output_level, const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore, const std::string& trim_ts) { + uint64_t max_file_num_to_ignore, const std::string& trim_ts, + const SequenceNumber /*earliest_mem_seqno*/) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -918,7 +919,8 @@ bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a, Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, const int output_level) const { + const ColumnFamilyMetaData& cf_meta, const int output_level, + const SequenceNumber earliest_mem_seqno) const { auto& levels = cf_meta.levels; auto comparator = icmp_->user_comparator(); @@ -995,6 +997,13 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( current_files[f].name + " is currently being compacted."); } + if (output_level == 0 && + current_files[f].largest_seqno > earliest_mem_seqno) { + return Status::Aborted( + "Necessary compaction input file " + current_files[f].name + + " has overlapping seqnos with earliest memtable seqnos."); + } + input_files->insert(TableFileNameToNumber(current_files[f].name)); } @@ -1051,12 +1060,14 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( "A running compaction is writing to the same output level in an " "overlapping key range"); } + return Status::OK(); } Status CompactionPicker::SanitizeCompactionInputFiles( std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, const int output_level) const { + const ColumnFamilyMetaData& cf_meta, const int output_level, + const SequenceNumber earliest_mem_seqno) const { assert(static_cast(cf_meta.levels.size()) - 1 == cf_meta.levels[cf_meta.levels.size() - 1].level); if (output_level >= static_cast(cf_meta.levels.size())) { @@ -1082,8 +1093,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles( "A compaction must contain at least one file."); } - Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta, - output_level); + Status s = SanitizeCompactionInputFilesForAllLevels( + input_files, cf_meta, output_level, earliest_mem_seqno); if (!s.ok()) { return s; diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 7739dd96b..2e46d73b3 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -51,15 +51,24 @@ class CompactionPicker { virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. + // + // `earliest_mem_seqno` is the earliest seqno of unflushed memtables. + // It is needed to compare with compaction input SST files' largest seqnos + // in order to exclude those of seqnos potentially overlap with memtables' + // seqnos when doing compaction to L0. This will avoid creating a SST files in + // L0 newer than a unflushed memtable. Such SST file can exist in the first + // place when it's ingested or resulted from compaction involving files + // ingested. + // // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. virtual Compaction* PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, - SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0; + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) = 0; + // `earliest_mem_seqno`: see PickCompaction() API // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that // level that overlaps the specified range. Caller should delete @@ -78,7 +87,8 @@ class CompactionPicker { const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore, const std::string& trim_ts); + uint64_t max_file_num_to_ignore, const std::string& trim_ts, + const SequenceNumber earliest_mem_seqno); // The maximum allowed output level. Default value is NumberLevels() - 1. virtual int MaxOutputLevel() const { return NumberLevels() - 1; } @@ -91,10 +101,18 @@ class CompactionPicker { // files. If it's not possible to conver an invalid input_files // into a valid one by adding more files, the function will return a // non-ok status with specific reason. +// +// Cases of returning non-ok status include but not limited to: +// - When output_level == 0 and input_files contains sst files +// of largest seqno greater than `earliest_mem_seqno`. This will +// avoid creating a SST files in L0 newer than a unflushed memtable. +// Such SST file can exist in the first place when it's ingested or +// resulted from compaction involving files ingested. #ifndef ROCKSDB_LITE - Status SanitizeCompactionInputFiles(std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, - const int output_level) const; + Status SanitizeCompactionInputFiles( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, const int output_level, + const SequenceNumber earliest_mem_seqno) const; #endif // ROCKSDB_LITE // Free up the files that participated in a compaction @@ -230,7 +248,8 @@ class CompactionPicker { #ifndef ROCKSDB_LITE virtual Status SanitizeCompactionInputFilesForAllLevels( std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, const int output_level) const; + const ColumnFamilyMetaData& cf_meta, const int output_level, + const SequenceNumber earliest_mem_seqno) const; #endif // ROCKSDB_LITE // Keeps track of all compactions that are running on Level0. @@ -260,23 +279,22 @@ class NullCompactionPicker : public CompactionPicker { const MutableCFOptions& /*mutable_cf_options*/, const MutableDBOptions& /*mutable_db_options*/, VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */, - SequenceNumber /* earliest_memtable_seqno */) override { + const SequenceNumber /* earliest_mem_seqno */) override { return nullptr; } // Always return "nullptr" - Compaction* CompactRange(const std::string& /*cf_name*/, - const MutableCFOptions& /*mutable_cf_options*/, - const MutableDBOptions& /*mutable_db_options*/, - VersionStorageInfo* /*vstorage*/, - int /*input_level*/, int /*output_level*/, - const CompactRangeOptions& /*compact_range_options*/, - const InternalKey* /*begin*/, - const InternalKey* /*end*/, - InternalKey** /*compaction_end*/, - bool* /*manual_conflict*/, - uint64_t /*max_file_num_to_ignore*/, - const std::string& /*trim_ts*/) override { + Compaction* CompactRange( + const std::string& /*cf_name*/, + const MutableCFOptions& /*mutable_cf_options*/, + const MutableDBOptions& /*mutable_db_options*/, + VersionStorageInfo* /*vstorage*/, int /*input_level*/, + int /*output_level*/, + const CompactRangeOptions& /*compact_range_options*/, + const InternalKey* /*begin*/, const InternalKey* /*end*/, + InternalKey** /*compaction_end*/, bool* /*manual_conflict*/, + uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/, + const SequenceNumber /* earliest_mem_seqno */) override { return nullptr; } @@ -303,12 +321,14 @@ class NullCompactionPicker : public CompactionPicker { // initialized with corresponding input // files. Cannot be nullptr. // +// @param earliest_mem_seqno See PickCompaction() API // @return true iff compaction was found. -bool FindIntraL0Compaction( - const std::vector& level_files, size_t min_files_to_compact, - uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes, - CompactionInputFiles* comp_inputs, - SequenceNumber earliest_mem_seqno = kMaxSequenceNumber); +bool FindIntraL0Compaction(const std::vector& level_files, + size_t min_files_to_compact, + uint64_t max_compact_bytes_per_del_file, + uint64_t max_compaction_bytes, + CompactionInputFiles* comp_inputs, + const SequenceNumber earliest_mem_seqno); CompressionType GetCompressionType(const VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction/compaction_picker_fifo.cc b/db/compaction/compaction_picker_fifo.cc index 1f875e3e1..adf16a36d 100644 --- a/db/compaction/compaction_picker_fifo.cc +++ b/db/compaction/compaction_picker_fifo.cc @@ -139,7 +139,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( Compaction* FIFOCompactionPicker::PickSizeCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) { // compute the total size and identify the last non-empty level int last_level = 0; uint64_t total_size = 0; @@ -176,7 +176,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( .level0_file_num_compaction_trigger /* min_files_to_compact */ , max_compact_bytes_per_del_file, - mutable_cf_options.max_compaction_bytes, &comp_inputs)) { + mutable_cf_options.max_compaction_bytes, &comp_inputs, + earliest_mem_seqno)) { Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, mutable_db_options, {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */, @@ -275,7 +276,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( Compaction* FIFOCompactionPicker::PickCompactionToWarm( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) { + TEST_SYNC_POINT("PickCompactionToWarm"); if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) { return nullptr; } @@ -299,6 +301,8 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm( cf_name.c_str(), status.ToString().c_str()); return nullptr; } + TEST_SYNC_POINT_CALLBACK("PickCompactionToWarm::BeforeGetCurrentTime", + &_current_time); const uint64_t current_time = static_cast(_current_time); if (!level0_compactions_in_progress_.empty()) { @@ -345,7 +349,8 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm( // for warm tier. break; } - if (prev_file != nullptr) { + if (prev_file != nullptr && + prev_file->fd.largest_seqno <= earliest_mem_seqno) { compaction_size += prev_file->fd.GetFileSize(); if (compaction_size > mutable_cf_options.max_compaction_bytes) { break; @@ -389,7 +394,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm( Compaction* FIFOCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) { + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) { Compaction* c = nullptr; if (mutable_cf_options.ttl > 0) { c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options, @@ -397,11 +402,11 @@ Compaction* FIFOCompactionPicker::PickCompaction( } if (c == nullptr) { c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options, - vstorage, log_buffer); + vstorage, log_buffer, earliest_mem_seqno); } if (c == nullptr) { c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options, - vstorage, log_buffer); + vstorage, log_buffer, earliest_mem_seqno); } RegisterCompaction(c); return c; @@ -414,7 +419,8 @@ Compaction* FIFOCompactionPicker::CompactRange( const CompactRangeOptions& /*compact_range_options*/, const InternalKey* /*begin*/, const InternalKey* /*end*/, InternalKey** compaction_end, bool* /*manual_conflict*/, - uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) { + uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/, + const SequenceNumber earliest_mem_seqno) { #ifdef NDEBUG (void)input_level; (void)output_level; @@ -423,8 +429,9 @@ Compaction* FIFOCompactionPicker::CompactRange( assert(output_level == 0); *compaction_end = nullptr; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger); - Compaction* c = PickCompaction(cf_name, mutable_cf_options, - mutable_db_options, vstorage, &log_buffer); + Compaction* c = + PickCompaction(cf_name, mutable_cf_options, mutable_db_options, vstorage, + &log_buffer, earliest_mem_seqno); log_buffer.FlushBufferToLog(); return c; } diff --git a/db/compaction/compaction_picker_fifo.h b/db/compaction/compaction_picker_fifo.h index 544259f38..f3e7d08fd 100644 --- a/db/compaction/compaction_picker_fifo.h +++ b/db/compaction/compaction_picker_fifo.h @@ -22,9 +22,12 @@ class FIFOCompactionPicker : public CompactionPicker { virtual Compaction* PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* version, - LogBuffer* log_buffer, - SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override; + // `earliest_mem_seqno`: see PickCompaction() API for more. In FIFO's + // implementation of CompactRange(), different from others, we will not return + // `nullptr` right away when intput files of compaction to L0 has seqnos + // potentially overlapping with memtable's but exlucde those files. virtual Compaction* CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, @@ -32,7 +35,8 @@ class FIFOCompactionPicker : public CompactionPicker { const CompactRangeOptions& compact_range_options, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict, - uint64_t max_file_num_to_ignore, const std::string& trim_ts) override; + uint64_t max_file_num_to_ignore, const std::string& trim_ts, + const SequenceNumber earliest_mem_seqno) override; // The maximum allowed output level. Always returns 0. virtual int MaxOutputLevel() const override { return 0; } @@ -51,13 +55,15 @@ class FIFOCompactionPicker : public CompactionPicker { const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* version, - LogBuffer* log_buffer); + LogBuffer* log_buffer, + SequenceNumber earliest_mem_seqno); Compaction* PickCompactionToWarm(const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* version, - LogBuffer* log_buffer); + LogBuffer* log_buffer, + const SequenceNumber earliest_mem_seqno); }; } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index b689b6add..a53ea19b7 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -50,7 +50,7 @@ class LevelCompactionBuilder { public: LevelCompactionBuilder(const std::string& cf_name, VersionStorageInfo* vstorage, - SequenceNumber earliest_mem_seqno, + const SequenceNumber earliest_mem_seqno, CompactionPicker* compaction_picker, LogBuffer* log_buffer, const MutableCFOptions& mutable_cf_options, @@ -122,7 +122,7 @@ class LevelCompactionBuilder { const std::string& cf_name_; VersionStorageInfo* vstorage_; - SequenceNumber earliest_mem_seqno_; + const SequenceNumber earliest_mem_seqno_; CompactionPicker* compaction_picker_; LogBuffer* log_buffer_; int start_level_ = -1; @@ -832,7 +832,7 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() { Compaction* LevelCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, SequenceNumber earliest_mem_seqno) { + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) { LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this, log_buffer, mutable_cf_options, ioptions_, mutable_db_options); diff --git a/db/compaction/compaction_picker_level.h b/db/compaction/compaction_picker_level.h index 42a9b60a6..f7304d259 100644 --- a/db/compaction/compaction_picker_level.h +++ b/db/compaction/compaction_picker_level.h @@ -23,8 +23,7 @@ class LevelCompactionPicker : public CompactionPicker { virtual Compaction* PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, - SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override; virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 2e2e566c0..0a7b35251 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -218,7 +218,7 @@ TEST_F(CompactionPickerTest, Empty) { UpdateVersionStorageInfo(); std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() == nullptr); } @@ -230,7 +230,7 @@ TEST_F(CompactionPickerTest, Single) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() == nullptr); } @@ -244,7 +244,7 @@ TEST_F(CompactionPickerTest, Level0Trigger) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -258,7 +258,7 @@ TEST_F(CompactionPickerTest, Level1Trigger) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber()); @@ -277,7 +277,7 @@ TEST_F(CompactionPickerTest, Level1Trigger2) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(1)); @@ -309,7 +309,7 @@ TEST_F(CompactionPickerTest, LevelMaxScore) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber()); @@ -357,7 +357,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -382,7 +382,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic2) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -408,7 +408,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic3) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -438,7 +438,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic4) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -471,7 +471,7 @@ TEST_F(CompactionPickerTest, LevelTriggerDynamic4) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber()); @@ -528,7 +528,7 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // output level should be the one above the bottom-most ASSERT_EQ(1, compaction->output_level()); @@ -563,7 +563,7 @@ TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(!compaction->is_trivial_move()); } @@ -590,7 +590,7 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction->is_trivial_move()); } @@ -619,7 +619,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction1) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); @@ -650,7 +650,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction2) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_FALSE(compaction); } @@ -677,7 +677,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction3) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_FALSE(compaction); } @@ -708,7 +708,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction4) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(!compaction || compaction->start_level() != compaction->output_level()); } @@ -729,7 +729,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction5) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(0, compaction->start_level()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -754,7 +754,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->start_level()); ASSERT_EQ(2U, compaction->num_input_files(0)); @@ -792,7 +792,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(3, compaction->start_level()); @@ -834,7 +834,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(2, compaction->start_level()); @@ -876,7 +876,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace3) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(2, compaction->start_level()); @@ -924,7 +924,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace4) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(3, compaction->start_level()); @@ -968,7 +968,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(3, compaction->start_level()); @@ -1035,7 +1035,7 @@ TEST_F(CompactionPickerTest, FIFOToWarm1) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber()); @@ -1073,7 +1073,7 @@ TEST_F(CompactionPickerTest, FIFOToWarm2) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); @@ -1114,7 +1114,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmMaxSize) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); @@ -1155,7 +1155,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithExistingWarm) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); @@ -1197,7 +1197,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithOngoing) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // Stop if a file is being compacted ASSERT_TRUE(compaction.get() == nullptr); } @@ -1236,7 +1236,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithHotBetweenWarms) { ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // Stop if a file is being compacted ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -1267,7 +1267,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); // Pick file 8 because it overlaps with 0 files on level 3. @@ -1300,7 +1300,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping2) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); // Picking file 7 because overlapping ratio is the biggest. @@ -1328,7 +1328,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping3) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); // Picking file 8 because overlapping ratio is the biggest. @@ -1359,7 +1359,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); // Picking file 8 because overlapping ratio is the biggest. @@ -1395,7 +1395,7 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) { std::unique_ptr compaction( local_level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); // Since the max bytes for level 2 is 120M, picking one file to compact // makes the post-compaction level size less than 120M, there is exactly one @@ -1435,7 +1435,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin1) { std::unique_ptr compaction( local_level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); // The maximum compaction bytes is very large in this case so we can igore its @@ -1478,7 +1478,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin2) { std::unique_ptr compaction( local_level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); // The maximum compaction bytes is only 2500 bytes now. Even though we are @@ -1522,7 +1522,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin3) { std::unique_ptr compaction( local_level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); // Cannot pick more files since we reach the last file in level 2 @@ -1581,7 +1581,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlappingManyFiles) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_files(0)); // Picking file 8 because overlapping ratio is the biggest. @@ -1609,7 +1609,7 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); } // This test checks ExpandWhileOverlapping() by having overlapping user keys @@ -1627,7 +1627,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_levels()); ASSERT_EQ(2U, compaction->num_input_files(0)); @@ -1647,7 +1647,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys2) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(2U, compaction->num_input_files(0)); @@ -1675,7 +1675,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(5U, compaction->num_input_files(0)); @@ -1706,7 +1706,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys4) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -1730,7 +1730,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys5) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() == nullptr); } @@ -1752,7 +1752,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys6) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -1773,7 +1773,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys7) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_GE(1U, compaction->num_input_files(0)); @@ -1802,7 +1802,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys8) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(3U, compaction->num_input_files(0)); @@ -1835,7 +1835,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys9) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(5U, compaction->num_input_files(0)); @@ -1876,7 +1876,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys10) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -1915,7 +1915,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys11) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -2013,7 +2013,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) { ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1)); std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() == nullptr); } @@ -2044,7 +2044,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri2) { ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1)); std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); } @@ -2078,7 +2078,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri3) { ASSERT_EQ(0, vstorage_->CompactionScoreLevel(1)); std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); } @@ -2374,7 +2374,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesHit) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -2400,7 +2400,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(3U, compaction->num_input_files(0)); @@ -2430,7 +2430,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOn) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); } @@ -2455,7 +2455,7 @@ TEST_F(CompactionPickerTest, L0TrivialMove1) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1, compaction->num_input_levels()); ASSERT_EQ(2, compaction->num_input_files(0)); @@ -2484,7 +2484,7 @@ TEST_F(CompactionPickerTest, L0TrivialMoveOneFile) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1, compaction->num_input_levels()); ASSERT_EQ(1, compaction->num_input_files(0)); @@ -2510,7 +2510,7 @@ TEST_F(CompactionPickerTest, L0TrivialMoveWholeL0) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1, compaction->num_input_levels()); ASSERT_EQ(4, compaction->num_input_files(0)); @@ -2541,7 +2541,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOffSstPartitioned) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); // No trivial move, because partitioning is applied ASSERT_TRUE(!compaction->IsTrivialMove()); @@ -2564,7 +2564,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOff) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_FALSE(compaction->IsTrivialMove()); } @@ -2593,7 +2593,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles1) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2627,7 +2627,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles2) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2660,7 +2660,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles3) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2686,7 +2686,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles4) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2716,7 +2716,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles5) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2750,7 +2750,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles6) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_TRUE(compaction->IsTrivialMove()); ASSERT_EQ(1, compaction->num_input_levels()); @@ -2785,7 +2785,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -2795,7 +2795,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) { compaction.reset(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(2U, compaction->num_input_levels()); ASSERT_EQ(1U, compaction->num_input_files(0)); @@ -2805,7 +2805,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) { compaction.reset(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() == nullptr); ASSERT_EQ(4, vstorage_->NextCompactionIndex(1 /* level */)); } @@ -2831,7 +2831,7 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_levels()); ASSERT_EQ(5U, compaction->num_input_files(0)); @@ -2862,7 +2862,7 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) { std::unique_ptr compaction(level_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction.get() != nullptr); ASSERT_EQ(1U, compaction->num_input_levels()); ASSERT_EQ(4U, compaction->num_input_files(0)); @@ -2928,7 +2928,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a compaction to reduce sorted runs @@ -2946,7 +2946,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) { std::unique_ptr compaction2( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_FALSE(compaction2); } @@ -2971,7 +2971,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a delete triggered compaction @@ -2990,7 +2990,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) { std::unique_ptr compaction2( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_FALSE(compaction2); } @@ -3031,7 +3031,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a delete triggered compaction @@ -3062,7 +3062,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) { std::unique_ptr compaction2( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_FALSE(compaction2); DeleteVersionStorage(); } @@ -3088,7 +3088,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0NoOverlap) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a delete triggered compaction @@ -3125,7 +3125,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0WithOverlap) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a delete triggered compaction @@ -3159,7 +3159,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction); // Validate that its a delete triggered compaction @@ -3180,7 +3180,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) { std::unique_ptr compaction2( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); ASSERT_TRUE(compaction2); ASSERT_EQ(3U, compaction->num_input_files(0)); ASSERT_TRUE(file_map_[1].first->being_compacted); @@ -3215,7 +3215,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) { cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), nullptr, nullptr, &manual_end, &manual_conflict, - std::numeric_limits::max(), "")); + std::numeric_limits::max(), "", kMaxSequenceNumber)); ASSERT_TRUE(compaction); @@ -3256,7 +3256,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNonLastLevel) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // Make sure it's a size amp compaction and includes all files ASSERT_EQ(compaction->compaction_reason(), @@ -3292,7 +3292,7 @@ TEST_F(CompactionPickerTest, UniversalSizeRatioTierCompactionLastLevel) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // Internally, size amp compaction is evaluated before size ratio compaction. // Here to make sure it's size ratio compaction instead of size amp @@ -3329,7 +3329,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNotSuport) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // size amp compaction is still triggered even preclude_last_level is set ASSERT_EQ(compaction->compaction_reason(), @@ -3363,7 +3363,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionLastLevel) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); // It's a Size Amp compaction, but doesn't include the last level file and // output to the penultimate level. @@ -3471,7 +3471,7 @@ TEST_F(CompactionPickerU64TsTest, CannotTrivialMoveUniversal) { std::unique_ptr compaction( universal_compaction_picker.PickCompaction( cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), - &log_buffer_)); + &log_buffer_, kMaxSequenceNumber)); assert(compaction); ASSERT_TRUE(!compaction->is_trivial_move()); } diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index dbdd4934b..e166f9a67 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -293,7 +293,7 @@ bool UniversalCompactionPicker::NeedsCompaction( Compaction* UniversalCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, SequenceNumber /* earliest_memtable_seqno */) { + LogBuffer* log_buffer, const SequenceNumber /* earliest_mem_seqno */) { UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name, mutable_cf_options, mutable_db_options, vstorage, this, log_buffer); diff --git a/db/compaction/compaction_picker_universal.h b/db/compaction/compaction_picker_universal.h index 5f897cc9b..552a19ddd 100644 --- a/db/compaction/compaction_picker_universal.h +++ b/db/compaction/compaction_picker_universal.h @@ -21,8 +21,7 @@ class UniversalCompactionPicker : public CompactionPicker { virtual Compaction* PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, - LogBuffer* log_buffer, - SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; + LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override; virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } virtual bool NeedsCompaction( diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 0594825d6..e67194660 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -186,6 +186,13 @@ class RoundRobinSubcompactionsAgainstResources int max_compaction_limits_; }; +class DBCompactionTestFIFOCheckConsistencyWithParam + : public DBCompactionTest, + public testing::WithParamInterface { + public: + DBCompactionTestFIFOCheckConsistencyWithParam() : DBCompactionTest() {} +}; + namespace { class FlushedFileCollector : public EventListener { public: @@ -6457,6 +6464,163 @@ TEST_P(DBCompactionTestWithParam, } } +INSTANTIATE_TEST_CASE_P(DBCompactionTestFIFOCheckConsistencyWithParam, + DBCompactionTestFIFOCheckConsistencyWithParam, + ::testing::Values("FindIntraL0Compaction", + "PickCompactionToWarm", + "CompactRange", "CompactFile")); + +TEST_P(DBCompactionTestFIFOCheckConsistencyWithParam, + FlushAfterIntraL0CompactionWithIngestedFile) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.compression = kNoCompression; + + options.force_consistency_checks = true; + options.compaction_style = kCompactionStyleFIFO; + options.max_open_files = -1; + options.num_levels = 1; + options.level0_file_num_compaction_trigger = 3; + + CompactionOptionsFIFO fifo_options; + const std::string compaction_path_to_test = GetParam(); + if (compaction_path_to_test == "FindIntraL0Compaction") { + fifo_options.allow_compaction = true; + fifo_options.age_for_warm = 0; + } else if (compaction_path_to_test == "PickCompactionToWarm") { + fifo_options.allow_compaction = false; + fifo_options.age_for_warm = 2; + } else if (compaction_path_to_test == "CompactRange") { + // FIFOCompactionPicker::CompactRange() implementes + // on top of regular compaction paths. Here we choose + // to trigger FIFOCompactionPicker::PickCompactionToWarm() + // for simplicity + fifo_options.allow_compaction = false; + fifo_options.age_for_warm = 2; + options.disable_auto_compactions = true; + } else if (compaction_path_to_test == "CompactFile") { + fifo_options.allow_compaction = false; + fifo_options.age_for_warm = 0; + options.disable_auto_compactions = true; + } else { + assert(false); + } + options.compaction_options_fifo = fifo_options; + + DestroyAndReopen(options); + + // To force assigning the global seqno to ingested file + // for our test purpose + const Snapshot* snapshot = db_->GetSnapshot(); + + std::atomic compaction_path_sync_point_called(false); + if (compaction_path_to_test == "FindIntraL0Compaction") { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FindIntraL0Compaction", + [&](void* /*arg*/) { compaction_path_sync_point_called.store(true); }); + } else if (compaction_path_to_test == "PickCompactionToWarm" || + compaction_path_to_test == "CompactRange") { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PickCompactionToWarm", + [&](void* /*arg*/) { compaction_path_sync_point_called.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PickCompactionToWarm::BeforeGetCurrentTime", + [&fifo_options](void* current_time_arg) -> void { + // The unit test goes so quickly that there is almost no time + // elapsed after we ingest a file and before we check whether ingested + // files can compact to warm. + // Therefore we need this trick to simulate elapsed + // time in reality. + int64_t* current_time = (int64_t*)current_time_arg; + *current_time = *current_time + fifo_options.age_for_warm + 1; + }); + } else if (compaction_path_to_test == "CompactFile") { + // Sync point is not needed in this case + compaction_path_sync_point_called.store(true); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Create an existing SST file s0 of key range [key1,key4] and seqno range + // [1,2] + ASSERT_OK(Put("key1", "seq1")); + ASSERT_OK(Put("key4", "seq2")); + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + // Accumulate entries in a memtable m1 of key range [key1,key2] and seqno + // range [3,4] Noted that it contains a overlaped key with s0 + ASSERT_OK(Put("key1", "seq3")); // overlapped key + ASSERT_OK(Put("key2", "seq4")); + + ASSERT_TRUE(compaction_path_to_test == "CompactFile" || + !compaction_path_sync_point_called.load()); + + // Stop background compaction job to obtain accurate + // `NumTableFilesAtLevel(0)` after file ingestion + test::SleepingBackgroundTask sleeping_tasks; + if (!options.disable_auto_compactions) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks, + Env::Priority::LOW); + sleeping_tasks.WaitUntilSleeping(); + } + + // Ingested two SST files, s1 of key range [key5,key5] and seqno range [5,5] + // and s2 of key range [key6,key6] and seqno range [6,6] + IngestOneKeyValue(dbfull(), "key5", "seq5", options); + IngestOneKeyValue(dbfull(), "key6", "seq6", options); + // Up to now, L0 contains s0, s1, s2 + ASSERT_EQ(3, NumTableFilesAtLevel(0)); + + // Resume background compaction job so that Intra level0 compaction can be + // triggered + if (!options.disable_auto_compactions) { + sleeping_tasks.WakeUp(); + sleeping_tasks.WaitUntilDone(); + } + + if (compaction_path_to_test == "CompactRange") { + // `start` and `end` is carefully chosen so that compact range: + // (1) doesn't overlap with memtable therefore the memtable won't be flushed + // (2) should target at compacting s0 with s1 and s2 + Slice start("key4"), end("key6"); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + } else if (compaction_path_to_test == "CompactFile") { + ColumnFamilyMetaData cf_meta_data; + db_->GetColumnFamilyMetaData(&cf_meta_data); + assert(cf_meta_data.levels[0].files.size() == 3); + std::vector input_files; + for (const auto& file : cf_meta_data.levels[0].files) { + input_files.push_back(file.name); + } + Status s = db_->CompactFiles(CompactionOptions(), input_files, 0); + EXPECT_TRUE(s.IsAborted()); + EXPECT_TRUE(s.ToString().find( + "has overlapping seqnos with earliest memtable seqnos") != + std::string::npos); + } else { + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + ASSERT_TRUE(compaction_path_to_test == "CompactFile" || + compaction_path_sync_point_called.load()); + + // To verify compaction of s0, s1 and s2 (leading to new SST s4) didn't + // happen. + // + // Otherwise, when m1 flushes in the next step and become s3, + // we will have s3 of seqnos [3, 4], s4 of seqnos [1, 6], which is a + // corruption because s3 is older than s4 based on largest seqno while s2 + // contains a value of Key(1) newer than the value of Key(1) contained in s4. + // And in this case, Flush() will return Status::Corruption() caught by + // `force_consistency_checks=1` + EXPECT_EQ(3, NumTableFilesAtLevel(0)); + EXPECT_OK(Flush()); + db_->ReleaseSnapshot(snapshot); +} + TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) { constexpr int kSstNum = 10; Options options = CurrentOptions(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index a605fac87..6412eb548 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1344,8 +1344,18 @@ Status DBImpl::CompactFilesImpl( } } + SequenceNumber earliest_mem_seqno = kMaxSequenceNumber; + if (cfd->mem() != nullptr) { + earliest_mem_seqno = + std::min(cfd->mem()->GetEarliestSequenceNumber(), earliest_mem_seqno); + } + if (cfd->imm() != nullptr && cfd->imm()->current() != nullptr) { + earliest_mem_seqno = + std::min(cfd->imm()->current()->GetEarliestSequenceNumber(false), + earliest_mem_seqno); + } Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( - &input_set, cf_meta, output_level); + &input_set, cf_meta, output_level, earliest_mem_seqno); if (!s.ok()) { return s; } diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 6fdc3eb3c..bc9e6a17b 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -150,6 +150,7 @@ DECLARE_string(cache_type); DECLARE_uint64(subcompactions); DECLARE_uint64(periodic_compaction_seconds); DECLARE_uint64(compaction_ttl); +DECLARE_bool(fifo_allow_compaction); DECLARE_bool(allow_concurrent_memtable_write); DECLARE_double(experimental_mempurge_threshold); DECLARE_bool(enable_write_thread_adaptive_yield); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 7adc66509..fc3ddf296 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -379,6 +379,10 @@ DEFINE_uint64(compaction_ttl, 1000, DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); +DEFINE_bool(fifo_allow_compaction, false, + "If true, set `Options::compaction_options_fifo.allow_compaction = " + "true`. It only take effect when FIFO compaction is used."); + DEFINE_double(experimental_mempurge_threshold, 0.0, "Maximum estimated useful payload that triggers a " "mempurge process to collect memtable garbage bytes."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index ad24855e1..ea045cd7a 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3123,6 +3123,11 @@ void InitializeOptionsFromFlags( options.max_background_flushes = FLAGS_max_background_flushes; options.compaction_style = static_cast(FLAGS_compaction_style); + if (options.compaction_style == + ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleFIFO) { + options.compaction_options_fifo.allow_compaction = + FLAGS_fifo_allow_compaction; + } options.compaction_pri = static_cast(FLAGS_compaction_pri); options.num_levels = FLAGS_num_levels; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index e52afbfd5..4c5b10b79 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -140,6 +140,7 @@ default_params = { # 0 = never (used by some), 10 = often (for threading bugs), 600 = default "stats_dump_period_sec": lambda: random.choice([0, 10, 600]), "compaction_ttl": lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]), + "fifo_allow_compaction": lambda: random.randint(0, 1), # Test small max_manifest_file_size in a smaller chance, as most of the # time we wnat manifest history to be preserved to help debug "max_manifest_file_size": lambda: random.choice(