Fix FIFO causing overlapping seqnos in L0 files due to overlapped seqnos between ingested files and memtable's (#10777)

Summary:
**Context:**
Same as https://github.com/facebook/rocksdb/pull/5958#issue-511150930 but apply the fix to FIFO Compaction case
Repro:
```
COERCE_CONTEXT_SWICH=1 make -j56 db_stress

./db_stress --acquire_snapshot_one_in=0 --adaptive_readahead=0 --allow_data_in_errors=True --async_io=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=18 --bottommost_compression_type=disable --bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_size=8388608 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=0 --charge_file_metadata=1 --charge_filter_construction=1 --charge_table_reader=1 --checkpoint_one_in=0 --checksum_type=kCRC32c --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=0 --compact_range_one_in=1000 --compaction_pri=3 --open_files=-1 --compaction_style=2 --fifo_allow_compaction=1 --compaction_ttl=0 --compression_max_dict_buffer_bytes=8388607 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --data_block_index_type=0 --db=/dev/shm/rocksdb_test0/rocksdb_crashtest_whitebox --db_write_buffer_size=8388608 --delpercent=4 --delrangepercent=1 --destroy_db_initially=1 --detect_filter_construct_corruption=0 --disable_wal=0 --enable_compaction_filter=0 --enable_pipelined_write=1 --fail_if_options_file_error=1 --file_checksum_impl=none --flush_one_in=1000 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=0 --get_property_one_in=0 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=15 --index_type=3 --ingest_external_file_one_in=100 --initial_auto_readahead_size=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --log2_keys_per_lock=10 --long_running_snapshots=0 --mark_for_compaction_one_file_in=10 --max_auto_readahead_size=16384 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=100000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=1048576 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=4194304 --memtable_prefix_bloom_size_ratio=0.5 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=1 --memtablerep=skip_list --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --num_levels=1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=32 --open_write_fault_one_in=0 --ops_per_thread=200000 --optimize_filters_for_memory=0 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=1 --pause_background_one_in=0 --periodic_compaction_seconds=0 --prefix_size=8 --prefixpercent=5 --prepopulate_block_cache=0 --progress_reports=0 --read_fault_one_in=0 --readahead_size=16384 --readpercent=45 --recycle_log_file_num=1 --reopen=20 --ribbon_starting_level=999 --snapshot_hold_ops=1000 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --subcompactions=2 --sync=0 --sync_fault_injection=0 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=3 --unpartitioned_pinning=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=1 --use_merge=0 --use_multiget=1 --user_timestamp_size=0 --value_size_mult=32 --verify_checksum=1 --verify_checksum_one_in=0 --verify_db_one_in=1000 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=524288 --write_dbid_to_manifest=0 --writepercent=35

put or merge error: Corruption: force_consistency_checks(DEBUG): VersionBuilder: L0 file https://github.com/facebook/rocksdb/issues/479 with seqno 23711 29070 vs. file https://github.com/facebook/rocksdb/issues/482 with seqno 27138 29049
```

**Summary:**
FIFO only does intra-L0 compaction in the following four cases. For other cases, FIFO drops data instead of compacting on data, which is irrelevant to the overlapping seqno issue we are solving.
-  [FIFOCompactionPicker::PickSizeCompaction](https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_picker_fifo.cc#L155) when `total size < compaction_options_fifo.max_table_files_size` and `compaction_options_fifo.allow_compaction == true`
   - For this path, we simply reuse the fix in `FindIntraL0Compaction` https://github.com/facebook/rocksdb/pull/5958/files#diff-c261f77d6dd2134333c4a955c311cf4a196a08d3c2bb6ce24fd6801407877c89R56
   - This path was not stress-tested at all. Therefore we covered `fifo.allow_compaction` in stress test to surface the overlapping seqno issue we are fixing here.
- [FIFOCompactionPicker::PickCompactionToWarm](https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_picker_fifo.cc#L313) when `compaction_options_fifo.age_for_warm > 0`
  - For this path, we simply replicate the idea in https://github.com/facebook/rocksdb/pull/5958#issue-511150930 and skip files of largest seqno greater than `earliest_mem_seqno`
  - This path was not stress-tested at all. However covering `age_for_warm` option worths a separate PR to deal with db stress compatibility. Therefore we manually tested this path for this PR
- [FIFOCompactionPicker::CompactRange](https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_picker_fifo.cc#L365) that ends up picking one of the above two compactions
- [CompactionPicker::CompactFiles](https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_picker.cc#L378)
    - Since `SanitizeCompactionInputFiles()` will be called [before](https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_picker.h#L111-L113) `CompactionPicker::CompactFiles` , we simply replicate the idea in https://github.com/facebook/rocksdb/pull/5958#issue-511150930  in `SanitizeCompactionInputFiles()`. To simplify implementation, we return `Stats::Abort()` on encountering seqno-overlapped file when doing compaction to L0 instead of skipping the file and proceed with the compaction.

Some additional clean-up included in this PR:
- Renamed `earliest_memtable_seqno` to `earliest_mem_seqno` for consistent naming
- Added comment about `earliest_memtable_seqno` in related APIs
- Made parameter `earliest_memtable_seqno` constant and required

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10777

Test Plan:
- make check
- New unit test `TEST_P(DBCompactionTestFIFOCheckConsistencyWithParam, FlushAfterIntraL0CompactionWithIngestedFile)`corresponding to the above 4 cases, which will fail accordingly without the fix
- Regular CI stress run on this PR + stress test with aggressive value https://github.com/facebook/rocksdb/pull/10761  and on FIFO compaction only

Reviewed By: ajkr

Differential Revision: D40090485

Pulled By: hx235

fbshipit-source-id: 52624186952ee7109117788741aeeac86b624a4f
main
Hui Xiao 2 years ago committed by Facebook GitHub Bot
parent 2a551976f4
commit fc74abb436
  1. 1
      HISTORY.md
  2. 5
      db/column_family.cc
  3. 23
      db/compaction/compaction_picker.cc
  4. 70
      db/compaction/compaction_picker.h
  5. 27
      db/compaction/compaction_picker_fifo.cc
  6. 16
      db/compaction/compaction_picker_fifo.h
  7. 6
      db/compaction/compaction_picker_level.cc
  8. 3
      db/compaction/compaction_picker_level.h
  9. 180
      db/compaction/compaction_picker_test.cc
  10. 2
      db/compaction/compaction_picker_universal.cc
  11. 3
      db/compaction/compaction_picker_universal.h
  12. 164
      db/db_compaction_test.cc
  13. 12
      db/db_impl/db_impl_compaction_flush.cc
  14. 1
      db_stress_tool/db_stress_common.h
  15. 4
      db_stress_tool/db_stress_gflags.cc
  16. 5
      db_stress_tool/db_stress_test_base.cc
  17. 1
      tools/db_crashtest.py

@ -25,6 +25,7 @@
* Fixed a bug where RocksDB could be doing compaction endlessly when allow_ingest_behind is true and the bottommost level is not filled (#10767).
* Fixed a memory safety bug in experimental HyperClockCache (#10768)
* Fixed some cases where `ldb update_manifest` and `ldb unsafe_remove_sst_file` are not usable because they were requiring the DB files to match the existing manifest state (before updating the manifest to match a desired state).
* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected.
### Performance Improvements
* Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files.

@ -1211,11 +1211,14 @@ Compaction* ColumnFamilyData::CompactRange(
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
SequenceNumber earliest_mem_seqno =
std::min(mem_->GetEarliestSequenceNumber(),
imm_.current()->GetEarliestSequenceNumber(false));
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
max_file_num_to_ignore, trim_ts);
max_file_num_to_ignore, trim_ts, earliest_mem_seqno);
if (result != nullptr) {
result->SetInputVersion(current_);
}

@ -32,7 +32,7 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno) {
const SequenceNumber earliest_mem_seqno) {
// Do not pick ingested file when there is at least one memtable not flushed
// which of seqno is overlap with the sst.
TEST_SYNC_POINT("FindIntraL0Compaction");
@ -613,7 +613,8 @@ Compaction* CompactionPicker::CompactRange(
int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber /*earliest_mem_seqno*/) {
// CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO);
@ -918,7 +919,8 @@ bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const {
auto& levels = cf_meta.levels;
auto comparator = icmp_->user_comparator();
@ -995,6 +997,13 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
current_files[f].name +
" is currently being compacted.");
}
if (output_level == 0 &&
current_files[f].largest_seqno > earliest_mem_seqno) {
return Status::Aborted(
"Necessary compaction input file " + current_files[f].name +
" has overlapping seqnos with earliest memtable seqnos.");
}
input_files->insert(TableFileNameToNumber(current_files[f].name));
}
@ -1051,12 +1060,14 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
"A running compaction is writing to the same output level in an "
"overlapping key range");
}
return Status::OK();
}
Status CompactionPicker::SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const {
assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
cf_meta.levels[cf_meta.levels.size() - 1].level);
if (output_level >= static_cast<int>(cf_meta.levels.size())) {
@ -1082,8 +1093,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
"A compaction must contain at least one file.");
}
Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
output_level);
Status s = SanitizeCompactionInputFilesForAllLevels(
input_files, cf_meta, output_level, earliest_mem_seqno);
if (!s.ok()) {
return s;

@ -51,15 +51,24 @@ class CompactionPicker {
virtual ~CompactionPicker();
// Pick level and inputs for a new compaction.
//
// `earliest_mem_seqno` is the earliest seqno of unflushed memtables.
// It is needed to compare with compaction input SST files' largest seqnos
// in order to exclude those of seqnos potentially overlap with memtables'
// seqnos when doing compaction to L0. This will avoid creating a SST files in
// L0 newer than a unflushed memtable. Such SST file can exist in the first
// place when it's ingested or resulted from compaction involving files
// ingested.
//
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0;
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) = 0;
// `earliest_mem_seqno`: see PickCompaction() API
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
@ -78,7 +87,8 @@ class CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts);
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber earliest_mem_seqno);
// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
@ -91,10 +101,18 @@ class CompactionPicker {
// files. If it's not possible to conver an invalid input_files
// into a valid one by adding more files, the function will return a
// non-ok status with specific reason.
//
// Cases of returning non-ok status include but not limited to:
// - When output_level == 0 and input_files contains sst files
// of largest seqno greater than `earliest_mem_seqno`. This will
// avoid creating a SST files in L0 newer than a unflushed memtable.
// Such SST file can exist in the first place when it's ingested or
// resulted from compaction involving files ingested.
#ifndef ROCKSDB_LITE
Status SanitizeCompactionInputFiles(std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
Status SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const;
#endif // ROCKSDB_LITE
// Free up the files that participated in a compaction
@ -230,7 +248,8 @@ class CompactionPicker {
#ifndef ROCKSDB_LITE
virtual Status SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level) const;
const ColumnFamilyMetaData& cf_meta, const int output_level,
const SequenceNumber earliest_mem_seqno) const;
#endif // ROCKSDB_LITE
// Keeps track of all compactions that are running on Level0.
@ -260,23 +279,22 @@ class NullCompactionPicker : public CompactionPicker {
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */,
SequenceNumber /* earliest_memtable_seqno */) override {
const SequenceNumber /* earliest_mem_seqno */) override {
return nullptr;
}
// Always return "nullptr"
Compaction* CompactRange(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/,
int /*input_level*/, int /*output_level*/,
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/,
const InternalKey* /*end*/,
InternalKey** /*compaction_end*/,
bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/,
const std::string& /*trim_ts*/) override {
Compaction* CompactRange(
const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/, int /*input_level*/,
int /*output_level*/,
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** /*compaction_end*/, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/,
const SequenceNumber /* earliest_mem_seqno */) override {
return nullptr;
}
@ -303,12 +321,14 @@ class NullCompactionPicker : public CompactionPicker {
// initialized with corresponding input
// files. Cannot be nullptr.
//
// @param earliest_mem_seqno See PickCompaction() API
// @return true iff compaction was found.
bool FindIntraL0Compaction(
const std::vector<FileMetaData*>& level_files, size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber);
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
uint64_t max_compaction_bytes,
CompactionInputFiles* comp_inputs,
const SequenceNumber earliest_mem_seqno);
CompressionType GetCompressionType(const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,

@ -139,7 +139,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
// compute the total size and identify the last non-empty level
int last_level = 0;
uint64_t total_size = 0;
@ -176,7 +176,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
.level0_file_num_compaction_trigger /* min_files_to_compact */
,
max_compact_bytes_per_del_file,
mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
mutable_cf_options.max_compaction_bytes, &comp_inputs,
earliest_mem_seqno)) {
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
@ -275,7 +276,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* FIFOCompactionPicker::PickCompactionToWarm(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
TEST_SYNC_POINT("PickCompactionToWarm");
if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) {
return nullptr;
}
@ -299,6 +301,8 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
cf_name.c_str(), status.ToString().c_str());
return nullptr;
}
TEST_SYNC_POINT_CALLBACK("PickCompactionToWarm::BeforeGetCurrentTime",
&_current_time);
const uint64_t current_time = static_cast<uint64_t>(_current_time);
if (!level0_compactions_in_progress_.empty()) {
@ -345,7 +349,8 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
// for warm tier.
break;
}
if (prev_file != nullptr) {
if (prev_file != nullptr &&
prev_file->fd.largest_seqno <= earliest_mem_seqno) {
compaction_size += prev_file->fd.GetFileSize();
if (compaction_size > mutable_cf_options.max_compaction_bytes) {
break;
@ -389,7 +394,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) {
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
Compaction* c = nullptr;
if (mutable_cf_options.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
@ -397,11 +402,11 @@ Compaction* FIFOCompactionPicker::PickCompaction(
}
if (c == nullptr) {
c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
vstorage, log_buffer, earliest_mem_seqno);
}
if (c == nullptr) {
c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
vstorage, log_buffer, earliest_mem_seqno);
}
RegisterCompaction(c);
return c;
@ -414,7 +419,8 @@ Compaction* FIFOCompactionPicker::CompactRange(
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) {
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/,
const SequenceNumber earliest_mem_seqno) {
#ifdef NDEBUG
(void)input_level;
(void)output_level;
@ -423,8 +429,9 @@ Compaction* FIFOCompactionPicker::CompactRange(
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger);
Compaction* c = PickCompaction(cf_name, mutable_cf_options,
mutable_db_options, vstorage, &log_buffer);
Compaction* c =
PickCompaction(cf_name, mutable_cf_options, mutable_db_options, vstorage,
&log_buffer, earliest_mem_seqno);
log_buffer.FlushBufferToLog();
return c;
}

@ -22,9 +22,12 @@ class FIFOCompactionPicker : public CompactionPicker {
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* version,
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override;
// `earliest_mem_seqno`: see PickCompaction() API for more. In FIFO's
// implementation of CompactRange(), different from others, we will not return
// `nullptr` right away when intput files of compaction to L0 has seqnos
// potentially overlapping with memtable's but exlucde those files.
virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
@ -32,7 +35,8 @@ class FIFOCompactionPicker : public CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore, const std::string& trim_ts) override;
uint64_t max_file_num_to_ignore, const std::string& trim_ts,
const SequenceNumber earliest_mem_seqno) override;
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; }
@ -51,13 +55,15 @@ class FIFOCompactionPicker : public CompactionPicker {
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
VersionStorageInfo* version,
LogBuffer* log_buffer);
LogBuffer* log_buffer,
SequenceNumber earliest_mem_seqno);
Compaction* PickCompactionToWarm(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
VersionStorageInfo* version,
LogBuffer* log_buffer);
LogBuffer* log_buffer,
const SequenceNumber earliest_mem_seqno);
};
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE

@ -50,7 +50,7 @@ class LevelCompactionBuilder {
public:
LevelCompactionBuilder(const std::string& cf_name,
VersionStorageInfo* vstorage,
SequenceNumber earliest_mem_seqno,
const SequenceNumber earliest_mem_seqno,
CompactionPicker* compaction_picker,
LogBuffer* log_buffer,
const MutableCFOptions& mutable_cf_options,
@ -122,7 +122,7 @@ class LevelCompactionBuilder {
const std::string& cf_name_;
VersionStorageInfo* vstorage_;
SequenceNumber earliest_mem_seqno_;
const SequenceNumber earliest_mem_seqno_;
CompactionPicker* compaction_picker_;
LogBuffer* log_buffer_;
int start_level_ = -1;
@ -832,7 +832,7 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() {
Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber earliest_mem_seqno) {
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) {
LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this,
log_buffer, mutable_cf_options, ioptions_,
mutable_db_options);

@ -23,8 +23,7 @@ class LevelCompactionPicker : public CompactionPicker {
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override;
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;

@ -218,7 +218,7 @@ TEST_F(CompactionPickerTest, Empty) {
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() == nullptr);
}
@ -230,7 +230,7 @@ TEST_F(CompactionPickerTest, Single) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() == nullptr);
}
@ -244,7 +244,7 @@ TEST_F(CompactionPickerTest, Level0Trigger) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -258,7 +258,7 @@ TEST_F(CompactionPickerTest, Level1Trigger) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
@ -277,7 +277,7 @@ TEST_F(CompactionPickerTest, Level1Trigger2) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->num_input_files(1));
@ -309,7 +309,7 @@ TEST_F(CompactionPickerTest, LevelMaxScore) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
@ -357,7 +357,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -382,7 +382,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic2) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -408,7 +408,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic3) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -438,7 +438,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic4) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -471,7 +471,7 @@ TEST_F(CompactionPickerTest, LevelTriggerDynamic4) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber());
@ -528,7 +528,7 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// output level should be the one above the bottom-most
ASSERT_EQ(1, compaction->output_level());
@ -563,7 +563,7 @@ TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(!compaction->is_trivial_move());
}
@ -590,7 +590,7 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction->is_trivial_move());
}
@ -619,7 +619,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction1) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
@ -650,7 +650,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction2) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_FALSE(compaction);
}
@ -677,7 +677,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction3) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_FALSE(compaction);
}
@ -708,7 +708,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction4) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(!compaction ||
compaction->start_level() != compaction->output_level());
}
@ -729,7 +729,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction5) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -754,7 +754,7 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->start_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
@ -792,7 +792,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
@ -834,7 +834,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2, compaction->start_level());
@ -876,7 +876,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace3) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2, compaction->start_level());
@ -924,7 +924,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace4) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
@ -968,7 +968,7 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
@ -1035,7 +1035,7 @@ TEST_F(CompactionPickerTest, FIFOToWarm1) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber());
@ -1073,7 +1073,7 @@ TEST_F(CompactionPickerTest, FIFOToWarm2) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
@ -1114,7 +1114,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmMaxSize) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -1155,7 +1155,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithExistingWarm) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
@ -1197,7 +1197,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithOngoing) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// Stop if a file is being compacted
ASSERT_TRUE(compaction.get() == nullptr);
}
@ -1236,7 +1236,7 @@ TEST_F(CompactionPickerTest, FIFOToWarmWithHotBetweenWarms) {
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true);
std::unique_ptr<Compaction> compaction(fifo_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// Stop if a file is being compacted
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -1267,7 +1267,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
// Pick file 8 because it overlaps with 0 files on level 3.
@ -1300,7 +1300,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping2) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
// Picking file 7 because overlapping ratio is the biggest.
@ -1328,7 +1328,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping3) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
// Picking file 8 because overlapping ratio is the biggest.
@ -1359,7 +1359,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
// Picking file 8 because overlapping ratio is the biggest.
@ -1395,7 +1395,7 @@ TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
// Since the max bytes for level 2 is 120M, picking one file to compact
// makes the post-compaction level size less than 120M, there is exactly one
@ -1435,7 +1435,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin1) {
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
// The maximum compaction bytes is very large in this case so we can igore its
@ -1478,7 +1478,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin2) {
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
// The maximum compaction bytes is only 2500 bytes now. Even though we are
@ -1522,7 +1522,7 @@ TEST_F(CompactionPickerTest, CompactionPriMultipleFilesRoundRobin3) {
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
// Cannot pick more files since we reach the last file in level 2
@ -1581,7 +1581,7 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlappingManyFiles) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
// Picking file 8 because overlapping ratio is the biggest.
@ -1609,7 +1609,7 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
}
// This test checks ExpandWhileOverlapping() by having overlapping user keys
@ -1627,7 +1627,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(2U, compaction->num_input_files(0));
@ -1647,7 +1647,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys2) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(2U, compaction->num_input_files(0));
@ -1675,7 +1675,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(5U, compaction->num_input_files(0));
@ -1706,7 +1706,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys4) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -1730,7 +1730,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys5) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() == nullptr);
}
@ -1752,7 +1752,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys6) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -1773,7 +1773,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys7) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_GE(1U, compaction->num_input_files(0));
@ -1802,7 +1802,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys8) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(3U, compaction->num_input_files(0));
@ -1835,7 +1835,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys9) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(5U, compaction->num_input_files(0));
@ -1876,7 +1876,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys10) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -1915,7 +1915,7 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys11) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -2013,7 +2013,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1));
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() == nullptr);
}
@ -2044,7 +2044,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri2) {
ASSERT_EQ(1, vstorage_->CompactionScoreLevel(1));
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
}
@ -2078,7 +2078,7 @@ TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri3) {
ASSERT_EQ(0, vstorage_->CompactionScoreLevel(1));
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
}
@ -2374,7 +2374,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesHit) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -2400,7 +2400,7 @@ TEST_F(CompactionPickerTest, MaxCompactionBytesNotHit) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(3U, compaction->num_input_files(0));
@ -2430,7 +2430,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOn) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
}
@ -2455,7 +2455,7 @@ TEST_F(CompactionPickerTest, L0TrivialMove1) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1, compaction->num_input_levels());
ASSERT_EQ(2, compaction->num_input_files(0));
@ -2484,7 +2484,7 @@ TEST_F(CompactionPickerTest, L0TrivialMoveOneFile) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1, compaction->num_input_levels());
ASSERT_EQ(1, compaction->num_input_files(0));
@ -2510,7 +2510,7 @@ TEST_F(CompactionPickerTest, L0TrivialMoveWholeL0) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1, compaction->num_input_levels());
ASSERT_EQ(4, compaction->num_input_files(0));
@ -2541,7 +2541,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOffSstPartitioned) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
// No trivial move, because partitioning is applied
ASSERT_TRUE(!compaction->IsTrivialMove());
@ -2564,7 +2564,7 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOff) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_FALSE(compaction->IsTrivialMove());
}
@ -2593,7 +2593,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles1) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2627,7 +2627,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles2) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2660,7 +2660,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles3) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2686,7 +2686,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles4) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2716,7 +2716,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles5) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2750,7 +2750,7 @@ TEST_F(CompactionPickerTest, TrivialMoveMultipleFiles6) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_TRUE(compaction->IsTrivialMove());
ASSERT_EQ(1, compaction->num_input_levels());
@ -2785,7 +2785,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -2795,7 +2795,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
compaction.reset(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
@ -2805,7 +2805,7 @@ TEST_F(CompactionPickerTest, CacheNextCompactionIndex) {
compaction.reset(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() == nullptr);
ASSERT_EQ(4, vstorage_->NextCompactionIndex(1 /* level */));
}
@ -2831,7 +2831,7 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(5U, compaction->num_input_files(0));
@ -2862,7 +2862,7 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) {
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(4U, compaction->num_input_files(0));
@ -2928,7 +2928,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a compaction to reduce sorted runs
@ -2946,7 +2946,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap) {
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_FALSE(compaction2);
}
@ -2971,7 +2971,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
@ -2990,7 +2990,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionFullOverlap2) {
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_FALSE(compaction2);
}
@ -3031,7 +3031,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
@ -3062,7 +3062,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedCompactionStartOutputOverlap) {
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_FALSE(compaction2);
DeleteVersionStorage();
}
@ -3088,7 +3088,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0NoOverlap) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
@ -3125,7 +3125,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0WithOverlap) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
@ -3159,7 +3159,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction);
// Validate that its a delete triggered compaction
@ -3180,7 +3180,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) {
std::unique_ptr<Compaction> compaction2(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
ASSERT_TRUE(compaction2);
ASSERT_EQ(3U, compaction->num_input_files(0));
ASSERT_TRUE(file_map_[1].first->being_compacted);
@ -3215,7 +3215,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) {
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(),
nullptr, nullptr, &manual_end, &manual_conflict,
std::numeric_limits<uint64_t>::max(), ""));
std::numeric_limits<uint64_t>::max(), "", kMaxSequenceNumber));
ASSERT_TRUE(compaction);
@ -3256,7 +3256,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNonLastLevel) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// Make sure it's a size amp compaction and includes all files
ASSERT_EQ(compaction->compaction_reason(),
@ -3292,7 +3292,7 @@ TEST_F(CompactionPickerTest, UniversalSizeRatioTierCompactionLastLevel) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// Internally, size amp compaction is evaluated before size ratio compaction.
// Here to make sure it's size ratio compaction instead of size amp
@ -3329,7 +3329,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNotSuport) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// size amp compaction is still triggered even preclude_last_level is set
ASSERT_EQ(compaction->compaction_reason(),
@ -3363,7 +3363,7 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionLastLevel) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
// It's a Size Amp compaction, but doesn't include the last level file and
// output to the penultimate level.
@ -3471,7 +3471,7 @@ TEST_F(CompactionPickerU64TsTest, CannotTrivialMoveUniversal) {
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
&log_buffer_, kMaxSequenceNumber));
assert(compaction);
ASSERT_TRUE(!compaction->is_trivial_move());
}

@ -293,7 +293,7 @@ bool UniversalCompactionPicker::NeedsCompaction(
Compaction* UniversalCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber /* earliest_memtable_seqno */) {
LogBuffer* log_buffer, const SequenceNumber /* earliest_mem_seqno */) {
UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
mutable_cf_options, mutable_db_options,
vstorage, this, log_buffer);

@ -21,8 +21,7 @@ class UniversalCompactionPicker : public CompactionPicker {
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override;
LogBuffer* log_buffer, const SequenceNumber earliest_mem_seqno) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(

@ -186,6 +186,13 @@ class RoundRobinSubcompactionsAgainstResources
int max_compaction_limits_;
};
class DBCompactionTestFIFOCheckConsistencyWithParam
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
DBCompactionTestFIFOCheckConsistencyWithParam() : DBCompactionTest() {}
};
namespace {
class FlushedFileCollector : public EventListener {
public:
@ -6457,6 +6464,163 @@ TEST_P(DBCompactionTestWithParam,
}
}
INSTANTIATE_TEST_CASE_P(DBCompactionTestFIFOCheckConsistencyWithParam,
DBCompactionTestFIFOCheckConsistencyWithParam,
::testing::Values("FindIntraL0Compaction",
"PickCompactionToWarm",
"CompactRange", "CompactFile"));
TEST_P(DBCompactionTestFIFOCheckConsistencyWithParam,
FlushAfterIntraL0CompactionWithIngestedFile) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.compression = kNoCompression;
options.force_consistency_checks = true;
options.compaction_style = kCompactionStyleFIFO;
options.max_open_files = -1;
options.num_levels = 1;
options.level0_file_num_compaction_trigger = 3;
CompactionOptionsFIFO fifo_options;
const std::string compaction_path_to_test = GetParam();
if (compaction_path_to_test == "FindIntraL0Compaction") {
fifo_options.allow_compaction = true;
fifo_options.age_for_warm = 0;
} else if (compaction_path_to_test == "PickCompactionToWarm") {
fifo_options.allow_compaction = false;
fifo_options.age_for_warm = 2;
} else if (compaction_path_to_test == "CompactRange") {
// FIFOCompactionPicker::CompactRange() implementes
// on top of regular compaction paths. Here we choose
// to trigger FIFOCompactionPicker::PickCompactionToWarm()
// for simplicity
fifo_options.allow_compaction = false;
fifo_options.age_for_warm = 2;
options.disable_auto_compactions = true;
} else if (compaction_path_to_test == "CompactFile") {
fifo_options.allow_compaction = false;
fifo_options.age_for_warm = 0;
options.disable_auto_compactions = true;
} else {
assert(false);
}
options.compaction_options_fifo = fifo_options;
DestroyAndReopen(options);
// To force assigning the global seqno to ingested file
// for our test purpose
const Snapshot* snapshot = db_->GetSnapshot();
std::atomic<bool> compaction_path_sync_point_called(false);
if (compaction_path_to_test == "FindIntraL0Compaction") {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FindIntraL0Compaction",
[&](void* /*arg*/) { compaction_path_sync_point_called.store(true); });
} else if (compaction_path_to_test == "PickCompactionToWarm" ||
compaction_path_to_test == "CompactRange") {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PickCompactionToWarm",
[&](void* /*arg*/) { compaction_path_sync_point_called.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PickCompactionToWarm::BeforeGetCurrentTime",
[&fifo_options](void* current_time_arg) -> void {
// The unit test goes so quickly that there is almost no time
// elapsed after we ingest a file and before we check whether ingested
// files can compact to warm.
// Therefore we need this trick to simulate elapsed
// time in reality.
int64_t* current_time = (int64_t*)current_time_arg;
*current_time = *current_time + fifo_options.age_for_warm + 1;
});
} else if (compaction_path_to_test == "CompactFile") {
// Sync point is not needed in this case
compaction_path_sync_point_called.store(true);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Create an existing SST file s0 of key range [key1,key4] and seqno range
// [1,2]
ASSERT_OK(Put("key1", "seq1"));
ASSERT_OK(Put("key4", "seq2"));
ASSERT_OK(Flush());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Accumulate entries in a memtable m1 of key range [key1,key2] and seqno
// range [3,4] Noted that it contains a overlaped key with s0
ASSERT_OK(Put("key1", "seq3")); // overlapped key
ASSERT_OK(Put("key2", "seq4"));
ASSERT_TRUE(compaction_path_to_test == "CompactFile" ||
!compaction_path_sync_point_called.load());
// Stop background compaction job to obtain accurate
// `NumTableFilesAtLevel(0)` after file ingestion
test::SleepingBackgroundTask sleeping_tasks;
if (!options.disable_auto_compactions) {
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
Env::Priority::LOW);
sleeping_tasks.WaitUntilSleeping();
}
// Ingested two SST files, s1 of key range [key5,key5] and seqno range [5,5]
// and s2 of key range [key6,key6] and seqno range [6,6]
IngestOneKeyValue(dbfull(), "key5", "seq5", options);
IngestOneKeyValue(dbfull(), "key6", "seq6", options);
// Up to now, L0 contains s0, s1, s2
ASSERT_EQ(3, NumTableFilesAtLevel(0));
// Resume background compaction job so that Intra level0 compaction can be
// triggered
if (!options.disable_auto_compactions) {
sleeping_tasks.WakeUp();
sleeping_tasks.WaitUntilDone();
}
if (compaction_path_to_test == "CompactRange") {
// `start` and `end` is carefully chosen so that compact range:
// (1) doesn't overlap with memtable therefore the memtable won't be flushed
// (2) should target at compacting s0 with s1 and s2
Slice start("key4"), end("key6");
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
} else if (compaction_path_to_test == "CompactFile") {
ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(&cf_meta_data);
assert(cf_meta_data.levels[0].files.size() == 3);
std::vector<std::string> input_files;
for (const auto& file : cf_meta_data.levels[0].files) {
input_files.push_back(file.name);
}
Status s = db_->CompactFiles(CompactionOptions(), input_files, 0);
EXPECT_TRUE(s.IsAborted());
EXPECT_TRUE(s.ToString().find(
"has overlapping seqnos with earliest memtable seqnos") !=
std::string::npos);
} else {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_TRUE(compaction_path_to_test == "CompactFile" ||
compaction_path_sync_point_called.load());
// To verify compaction of s0, s1 and s2 (leading to new SST s4) didn't
// happen.
//
// Otherwise, when m1 flushes in the next step and become s3,
// we will have s3 of seqnos [3, 4], s4 of seqnos [1, 6], which is a
// corruption because s3 is older than s4 based on largest seqno while s2
// contains a value of Key(1) newer than the value of Key(1) contained in s4.
// And in this case, Flush() will return Status::Corruption() caught by
// `force_consistency_checks=1`
EXPECT_EQ(3, NumTableFilesAtLevel(0));
EXPECT_OK(Flush());
db_->ReleaseSnapshot(snapshot);
}
TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
constexpr int kSstNum = 10;
Options options = CurrentOptions();

@ -1344,8 +1344,18 @@ Status DBImpl::CompactFilesImpl(
}
}
SequenceNumber earliest_mem_seqno = kMaxSequenceNumber;
if (cfd->mem() != nullptr) {
earliest_mem_seqno =
std::min(cfd->mem()->GetEarliestSequenceNumber(), earliest_mem_seqno);
}
if (cfd->imm() != nullptr && cfd->imm()->current() != nullptr) {
earliest_mem_seqno =
std::min(cfd->imm()->current()->GetEarliestSequenceNumber(false),
earliest_mem_seqno);
}
Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
&input_set, cf_meta, output_level);
&input_set, cf_meta, output_level, earliest_mem_seqno);
if (!s.ok()) {
return s;
}

@ -150,6 +150,7 @@ DECLARE_string(cache_type);
DECLARE_uint64(subcompactions);
DECLARE_uint64(periodic_compaction_seconds);
DECLARE_uint64(compaction_ttl);
DECLARE_bool(fifo_allow_compaction);
DECLARE_bool(allow_concurrent_memtable_write);
DECLARE_double(experimental_mempurge_threshold);
DECLARE_bool(enable_write_thread_adaptive_yield);

@ -379,6 +379,10 @@ DEFINE_uint64(compaction_ttl, 1000,
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");
DEFINE_bool(fifo_allow_compaction, false,
"If true, set `Options::compaction_options_fifo.allow_compaction = "
"true`. It only take effect when FIFO compaction is used.");
DEFINE_double(experimental_mempurge_threshold, 0.0,
"Maximum estimated useful payload that triggers a "
"mempurge process to collect memtable garbage bytes.");

@ -3123,6 +3123,11 @@ void InitializeOptionsFromFlags(
options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style =
static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
if (options.compaction_style ==
ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleFIFO) {
options.compaction_options_fifo.allow_compaction =
FLAGS_fifo_allow_compaction;
}
options.compaction_pri =
static_cast<ROCKSDB_NAMESPACE::CompactionPri>(FLAGS_compaction_pri);
options.num_levels = FLAGS_num_levels;

@ -140,6 +140,7 @@ default_params = {
# 0 = never (used by some), 10 = often (for threading bugs), 600 = default
"stats_dump_period_sec": lambda: random.choice([0, 10, 600]),
"compaction_ttl": lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
"fifo_allow_compaction": lambda: random.randint(0, 1),
# Test small max_manifest_file_size in a smaller chance, as most of the
# time we wnat manifest history to be preserved to help debug
"max_manifest_file_size": lambda: random.choice(

Loading…
Cancel
Save