From 229297d1b83c1885e7db2573b9b44736a7be23a5 Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Wed, 22 Feb 2023 12:28:18 -0800 Subject: [PATCH] Refactor AddRangeDels() + consider range tombstone during compaction file cutting (#11113) Summary: A second attempt after https://github.com/facebook/rocksdb/issues/10802, with bug fixes and refactoring. This PR updates compaction logic to take range tombstones into account when determining whether to cut the current compaction output file (https://github.com/facebook/rocksdb/issues/4811). Before this change, only point keys were considered, and range tombstones could cause large compactions. For example, if the current compaction outputs is a range tombstone [a, b) and 2 point keys y, z, they would be added to the same file, and may overlap with too many files in the next level and cause a large compaction in the future. This PR also includes ajkr's effort to simplify the logic to add range tombstones to compaction output files in `AddRangeDels()` ([https://github.com/facebook/rocksdb/issues/11078](https://github.com/facebook/rocksdb/pull/11078#issuecomment-1386078861)). The main change is for `CompactionIterator` to emit range tombstone start keys to be processed by `CompactionOutputs`. A new class `CompactionMergingIterator` is introduced to replace `MergingIterator` under `CompactionIterator` to enable emitting of range tombstone start keys. Further improvement after this PR include cutting compaction output at some grandparent boundary key (instead of the next output key) when cutting within a range tombstone to reduce overlap with grandparents. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11113 Test Plan: * added unit test in db_range_del_test * crash test with a small key range: `python3 tools/db_crashtest.py blackbox --simple --max_key=100 --interval=600 --write_buffer_size=262144 --target_file_size_base=256 --max_bytes_for_level_base=262144 --block_size=128 --value_size_mult=33 --subcompactions=10 --use_multiget=1 --delpercent=3 --delrangepercent=2 --verify_iterator_with_expected_state_one_in=2 --num_iterations=10` Reviewed By: ajkr Differential Revision: D42655709 Pulled By: cbi42 fbshipit-source-id: 8367e36ef5640e8f21c14a3855d4a8d6e360a34c --- CMakeLists.txt | 1 + HISTORY.md | 2 + TARGETS | 2 + db/blob/blob_counting_iterator.h | 4 + db/compaction/clipping_iterator.h | 5 + db/compaction/compaction_iterator.cc | 31 +- db/compaction/compaction_iterator.h | 18 +- db/compaction/compaction_job.cc | 30 +- db/compaction/compaction_job.h | 4 +- db/compaction/compaction_outputs.cc | 547 ++++++++++++--------------- db/compaction/compaction_outputs.h | 15 +- db/compaction/subcompaction_state.h | 5 + db/db_range_del_test.cc | 393 ++++++++++++++++++- db/dbformat.h | 4 +- db/history_trimming_iterator.h | 4 + db/merge_helper.cc | 4 + db/range_del_aggregator.cc | 41 +- db/range_del_aggregator.h | 15 +- db/range_del_aggregator_test.cc | 184 +++++---- db/range_tombstone_fragmenter.h | 3 +- db/version_set.cc | 26 +- src.mk | 1 + table/compaction_merging_iterator.cc | 370 ++++++++++++++++++ table/compaction_merging_iterator.h | 44 +++ table/merging_iterator.cc | 234 ++++++------ table/merging_iterator.h | 1 + 26 files changed, 1408 insertions(+), 580 deletions(-) create mode 100644 table/compaction_merging_iterator.cc create mode 100644 table/compaction_merging_iterator.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 65cc32004..e9f56c308 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -838,6 +838,7 @@ set(SOURCES table/get_context.cc table/iterator.cc table/merging_iterator.cc + table/compaction_merging_iterator.cc table/meta_blocks.cc table/persistent_cache_helper.cc table/plain/plain_table_bloom.cc diff --git a/HISTORY.md b/HISTORY.md index 1fc3431e7..551537e54 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### Behavior changes +* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys. ## 8.0.0 (02/19/2023) ### Behavior changes diff --git a/TARGETS b/TARGETS index 5a0c956dd..848a5918f 100644 --- a/TARGETS +++ b/TARGETS @@ -200,6 +200,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", + "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", @@ -543,6 +544,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", + "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h index de549afa2..b21651f66 100644 --- a/db/blob/blob_counting_iterator.h +++ b/db/blob/blob_counting_iterator.h @@ -123,6 +123,10 @@ class BlobCountingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } + bool IsDeleteRangeSentinelKey() const override { + return iter_->IsDeleteRangeSentinelKey(); + } + private: void UpdateAndCountBlobIfNeeded() { assert(!iter_->Valid() || iter_->status().ok()); diff --git a/db/compaction/clipping_iterator.h b/db/compaction/clipping_iterator.h index 1ed465c2c..3f50cdd9d 100644 --- a/db/compaction/clipping_iterator.h +++ b/db/compaction/clipping_iterator.h @@ -188,6 +188,11 @@ class ClippingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } + bool IsDeleteRangeSentinelKey() const override { + assert(valid_); + return iter_->IsDeleteRangeSentinelKey(); + } + private: void UpdateValid() { assert(!iter_->Valid() || iter_->status().ok()); diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 1f104c4af..fcd40e116 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -464,6 +464,7 @@ void CompactionIterator::NextFromInput() { value_ = input_.value(); blob_value_.Reset(); iter_stats_.num_input_records++; + is_range_del_ = input_.IsDeleteRangeSentinelKey(); Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); if (!pik_status.ok()) { @@ -483,7 +484,10 @@ void CompactionIterator::NextFromInput() { break; } TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); - + if (is_range_del_) { + validity_info_.SetValid(kRangeDeletion); + break; + } // Update input statistics if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || ikey_.type == kTypeDeletionWithTimestamp) { @@ -705,6 +709,14 @@ void CompactionIterator::NextFromInput() { ParsedInternalKey next_ikey; AdvanceInputIter(); + while (input_.Valid() && input_.IsDeleteRangeSentinelKey() && + ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) + .ok() && + cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { + // skip range tombstone start keys with the same user key + // since they are not "real" point keys. + AdvanceInputIter(); + } // Check whether the next key exists, is not corrupt, and is the same key // as the single delete. @@ -712,6 +724,7 @@ void CompactionIterator::NextFromInput() { ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok() && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { + assert(!input_.IsDeleteRangeSentinelKey()); #ifndef NDEBUG const Compaction* c = compaction_ ? compaction_->real_compaction() : nullptr; @@ -936,12 +949,14 @@ void CompactionIterator::NextFromInput() { // Note that a deletion marker of type kTypeDeletionWithTimestamp will be // considered to have a different user key unless the timestamp is older // than *full_history_ts_low_. + // + // Range tombstone start keys are skipped as they are not "real" keys. while (!IsPausingManualCompaction() && !IsShuttingDown() && input_.Valid() && (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok()) && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && - (prev_snapshot == 0 || + (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() || DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) { AdvanceInputIter(); } @@ -1235,10 +1250,12 @@ void CompactionIterator::DecideOutputLevel() { void CompactionIterator::PrepareOutput() { if (Valid()) { - if (ikey_.type == kTypeValue) { - ExtractLargeValueIfNeeded(); - } else if (ikey_.type == kTypeBlobIndex) { - GarbageCollectBlobIfNeeded(); + if (LIKELY(!is_range_del_)) { + if (ikey_.type == kTypeValue) { + ExtractLargeValueIfNeeded(); + } else if (ikey_.type == kTypeBlobIndex) { + GarbageCollectBlobIfNeeded(); + } } if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) { @@ -1261,7 +1278,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < preserve_time_min_seqno_) { + ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index a224a8e0e..ea2dc062e 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -63,6 +63,10 @@ class SequenceIterWrapper : public InternalIterator { void SeekToLast() override { assert(false); } uint64_t num_itered() const { return num_itered_; } + bool IsDeleteRangeSentinelKey() const override { + assert(Valid()); + return inner_iter_->IsDeleteRangeSentinelKey(); + } private: InternalKeyComparator icmp_; @@ -242,7 +246,12 @@ class CompactionIterator { const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } inline bool Valid() const { return validity_info_.IsValid(); } - const Slice& user_key() const { return current_user_key_; } + const Slice& user_key() const { + if (UNLIKELY(is_range_del_)) { + return ikey_.user_key; + } + return current_user_key_; + } const CompactionIterationStats& iter_stats() const { return iter_stats_; } uint64_t num_input_entry_scanned() const { return input_.num_itered(); } // If the current key should be placed on penultimate level, only valid if @@ -252,6 +261,8 @@ class CompactionIterator { } Status InputStatus() const { return input_.status(); } + bool IsDeleteRangeSentinelKey() const { return is_range_del_; } + private: // Processes the input stream to find the next output void NextFromInput(); @@ -385,6 +396,7 @@ class CompactionIterator { kKeepSD = 8, kKeepDel = 9, kNewUserKey = 10, + kRangeDeletion = 11, }; struct ValidityInfo { @@ -493,6 +505,10 @@ class CompactionIterator { // This is a best-effort facility, so memory_order_relaxed is sufficient. return manual_compaction_canceled_.load(std::memory_order_relaxed); } + + // Stores whether the current compaction iterator output + // is a range tombstone start key. + bool is_range_del_{false}; }; inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index bd575966e..331be915e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1118,6 +1118,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { IterKey end_ikey; Slice start_slice; Slice end_slice; + Slice start_user_key{}; + Slice end_user_key{}; static constexpr char kMaxTs[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"; @@ -1140,6 +1142,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &ts_slice); } start_slice = start_ikey.GetInternalKey(); + start_user_key = start_ikey.GetUserKey(); } if (end.has_value()) { end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek); @@ -1148,6 +1151,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &ts_slice); } end_slice = end_ikey.GetInternalKey(); + end_user_key = end_ikey.GetUserKey(); } std::unique_ptr clip; @@ -1263,11 +1267,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { [this, sub_compact](CompactionOutputs& outputs) { return this->OpenCompactionOutputFile(sub_compact, outputs); }; + const CompactionFileCloseFunc close_file_func = - [this, sub_compact](CompactionOutputs& outputs, const Status& status, - const Slice& next_table_min_key) { - return this->FinishCompactionOutputFile(status, sub_compact, outputs, - next_table_min_key); + [this, sub_compact, start_user_key, end_user_key]( + CompactionOutputs& outputs, const Status& status, + const Slice& next_table_min_key) { + return this->FinishCompactionOutputFile( + status, sub_compact, outputs, next_table_min_key, + sub_compact->start.has_value() ? &start_user_key : nullptr, + sub_compact->end.has_value() ? &end_user_key : nullptr); }; Status status; @@ -1278,7 +1286,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. - assert(!end.has_value() || cfd->user_comparator()->Compare( c_iter->user_key(), end.value()) < 0); @@ -1458,7 +1465,8 @@ void CompactionJob::RecordDroppedKeys( Status CompactionJob::FinishCompactionOutputFile( const Status& input_status, SubcompactionState* sub_compact, - CompactionOutputs& outputs, const Slice& next_table_min_key) { + CompactionOutputs& outputs, const Slice& next_table_min_key, + const Slice* comp_start_user_key, const Slice* comp_end_user_key) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(sub_compact != nullptr); @@ -1488,12 +1496,10 @@ Status CompactionJob::FinishCompactionOutputFile( // output_to_penultimate_level compaction here, as it's only used to decide // if range dels could be dropped. if (outputs.HasRangeDel()) { - s = outputs.AddRangeDels( - sub_compact->start.has_value() ? &(sub_compact->start.value()) - : nullptr, - sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr, - range_del_out_stats, bottommost_level_, cfd->internal_comparator(), - earliest_snapshot, next_table_min_key, full_history_ts_low_); + s = outputs.AddRangeDels(comp_start_user_key, comp_end_user_key, + range_del_out_stats, bottommost_level_, + cfd->internal_comparator(), earliest_snapshot, + next_table_min_key, full_history_ts_low_); } RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 2f8cb08da..a930c15f1 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -256,7 +256,9 @@ class CompactionJob { Status FinishCompactionOutputFile(const Status& input_status, SubcompactionState* sub_compact, CompactionOutputs& outputs, - const Slice& next_table_min_key); + const Slice& next_table_min_key, + const Slice* comp_start_user_key, + const Slice* comp_end_user_key); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); Status OpenCompactionOutputFile(SubcompactionState* sub_compact, CompactionOutputs& outputs); diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index 598bffb24..3aedc3fe1 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -226,6 +226,15 @@ uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes( bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { assert(c_iter.Valid()); const Slice& internal_key = c_iter.key(); +#ifndef NDEBUG + bool should_stop = false; + std::pair p{&should_stop, internal_key}; + TEST_SYNC_POINT_CALLBACK( + "CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p); + if (should_stop) { + return true; + } +#endif // NDEBUG const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_; const InternalKeyComparator* icmp = &compaction_->column_family_data()->internal_comparator(); @@ -347,8 +356,14 @@ Status CompactionOutputs::AddToOutput( const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { Status s; + bool is_range_del = c_iter.IsDeleteRangeSentinelKey(); + if (is_range_del && compaction_->bottommost_level()) { + // We don't consider range tombstone for bottommost level since: + // 1. there is no grandparent and hence no overlap to consider + // 2. range tombstone may be dropped at bottommost level. + return s; + } const Slice& key = c_iter.key(); - if (ShouldStopBefore(c_iter) && HasBuilder()) { s = close_file_func(*this, c_iter.InputStatus(), key); if (!s.ok()) { @@ -358,6 +373,13 @@ Status CompactionOutputs::AddToOutput( grandparent_boundary_switched_num_ = 0; grandparent_overlapped_bytes_ = GetCurrentKeyGrandparentOverlappedBytes(key); + if (UNLIKELY(is_range_del)) { + // lower bound for this new output file, this is needed as the lower bound + // does not come from the smallest point key in this case. + range_tombstone_lower_bound_.DecodeFrom(key); + } else { + range_tombstone_lower_bound_.Clear(); + } } // Open output file if necessary @@ -368,6 +390,17 @@ Status CompactionOutputs::AddToOutput( } } + // c_iter may emit range deletion keys, so update `last_key_for_partitioner_` + // here before returning below when `is_range_del` is true + if (partitioner_) { + last_key_for_partitioner_.assign(c_iter.user_key().data_, + c_iter.user_key().size_); + } + + if (UNLIKELY(is_range_del)) { + return s; + } + assert(builder_ != nullptr); const Slice& value = c_iter.value(); s = current_output().validator.Add(key, value); @@ -391,28 +424,33 @@ Status CompactionOutputs::AddToOutput( s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, ikey.type); - if (partitioner_) { - last_key_for_partitioner_.assign(c_iter.user_key().data_, - c_iter.user_key().size_); - } - return s; } +namespace { +void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key, + const size_t ts_sz) { + if (ts_sz) { + static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + if (ts_sz <= strlen(kTsMax)) { + internal_key = InternalKey(user_key, kMaxSequenceNumber, + kTypeRangeDeletion, Slice(kTsMax, ts_sz)); + } else { + internal_key = + InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion, + std::string(ts_sz, '\xff')); + } + } else { + internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion); + } +} +} // namespace + Status CompactionOutputs::AddRangeDels( const Slice* comp_start_user_key, const Slice* comp_end_user_key, CompactionIterationStats& range_del_out_stats, bool bottommost_level, const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, const Slice& next_table_min_key, const std::string& full_history_ts_low) { - assert(HasRangeDel()); - FileMetaData& meta = current_output().meta; - const Comparator* ucmp = icmp.user_comparator(); - - Slice lower_bound_guard, upper_bound_guard; - std::string smallest_user_key; - const Slice *lower_bound, *upper_bound; - bool lower_bound_from_sub_compact = false; - // The following example does not happen since // CompactionOutput::ShouldStopBefore() always return false for the first // point key. But we should consider removing this dependency. Suppose for the @@ -424,98 +462,134 @@ Status CompactionOutputs::AddRangeDels( // Then meta.smallest will be set to comp_start_user_key@seqno // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber // which violates the assumption that meta.smallest should be <= meta.largest. + assert(HasRangeDel()); + FileMetaData& meta = current_output().meta; + const Comparator* ucmp = icmp.user_comparator(); + InternalKey lower_bound_buf, upper_bound_buf; + Slice lower_bound_guard, upper_bound_guard; + std::string smallest_user_key; + const Slice *lower_bound, *upper_bound; + + // We first determine the internal key lower_bound and upper_bound for + // this output file. All and only range tombstones that overlap with + // [lower_bound, upper_bound] should be added to this file. File + // boundaries (meta.smallest/largest) should be updated accordingly when + // extended by range tombstones. size_t output_size = outputs_.size(); if (output_size == 1) { - // For the first output table, include range tombstones before the min - // key but after the subcompaction boundary. - lower_bound = comp_start_user_key; - lower_bound_from_sub_compact = true; - } else if (meta.smallest.size() > 0) { + // This is the first file in the subcompaction. + // + // When outputting a range tombstone that spans a subcompaction boundary, + // the files on either side of that boundary need to include that + // boundary's user key. Otherwise, the spanning range tombstone would lose + // coverage. + // + // To achieve this while preventing files from overlapping in internal key + // (an LSM invariant violation), we allow the earlier file to include the + // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The + // later file can begin at the boundary user key at the newest key version + // it contains. At this point that version number is unknown since we have + // not processed the range tombstones yet, so permit any version. Same story + // applies to timestamp, and a non-nullptr `comp_start_user_key` should have + // `kMaxTs` here, which similarly permits any timestamp. + if (comp_start_user_key) { + lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber, + kTypeRangeDeletion); + lower_bound_guard = lower_bound_buf.Encode(); + lower_bound = &lower_bound_guard; + } else { + lower_bound = nullptr; + } + } else { // For subsequent output tables, only include range tombstones from min // key onwards since the previous file was extended to contain range // tombstones falling before min key. - smallest_user_key = meta.smallest.user_key().ToString(false /*hex*/); - lower_bound_guard = Slice(smallest_user_key); - lower_bound = &lower_bound_guard; - } else { - lower_bound = nullptr; - } - if (!next_table_min_key.empty()) { - // This may be the last file in the subcompaction in some cases, so we - // need to compare the end key of subcompaction with the next file start - // key. When the end key is chosen by the subcompaction, we know that - // it must be the biggest key in output file. Therefore, it is safe to - // use the smaller key as the upper bound of the output file, to ensure - // that there is no overlapping between different output files. - upper_bound_guard = ExtractUserKey(next_table_min_key); - if (comp_end_user_key != nullptr && - ucmp->CompareWithoutTimestamp(upper_bound_guard, *comp_end_user_key) >= - 0) { - upper_bound = comp_end_user_key; + if (range_tombstone_lower_bound_.size() > 0) { + assert(meta.smallest.size() == 0 || + icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0); + lower_bound_guard = range_tombstone_lower_bound_.Encode(); } else { + assert(meta.smallest.size() > 0); + lower_bound_guard = meta.smallest.Encode(); + } + lower_bound = &lower_bound_guard; + } + + const size_t ts_sz = ucmp->timestamp_size(); + if (next_table_min_key.empty()) { + // Last file of the subcompaction. + if (comp_end_user_key) { + upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber, + kTypeRangeDeletion); + upper_bound_guard = upper_bound_buf.Encode(); upper_bound = &upper_bound_guard; + } else { + upper_bound = nullptr; } } else { - // This is the last file in the subcompaction, so extend until the - // subcompaction ends. - upper_bound = comp_end_user_key; - } - bool has_overlapping_endpoints; - if (upper_bound != nullptr && meta.largest.size() > 0) { - has_overlapping_endpoints = ucmp->CompareWithoutTimestamp( - meta.largest.user_key(), *upper_bound) == 0; - } else { - has_overlapping_endpoints = false; + // There is another file coming whose coverage will begin at + // `next_table_min_key`. The current file needs to extend range tombstone + // coverage through its own keys (through `meta.largest`) and through user + // keys preceding `next_table_min_key`'s user key. + ParsedInternalKey next_table_min_key_parsed; + ParseInternalKey(next_table_min_key, &next_table_min_key_parsed, + false /* log_err_key */) + .PermitUncheckedError(); + assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber); + assert(meta.largest.size() == 0 || + icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0); + assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0); + if (meta.largest.size() > 0 && + ucmp->EqualWithoutTimestamp(meta.largest.user_key(), + next_table_min_key_parsed.user_key)) { + // Caution: this assumes meta.largest.Encode() lives longer than + // upper_bound, which is only true if meta.largest is never updated. + // This just happens to be the case here since meta.largest serves + // as the upper_bound. + upper_bound_guard = meta.largest.Encode(); + } else { + SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key, + ts_sz); + upper_bound_guard = upper_bound_buf.Encode(); + } + upper_bound = &upper_bound_guard; + } + if (lower_bound && upper_bound && + icmp.Compare(*lower_bound, *upper_bound) > 0) { + assert(meta.smallest.size() == 0 && + ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound), + ExtractUserKey(*upper_bound))); + // This can only happen when lower_bound have the same user key as + // next_table_min_key and that there is no point key in the current + // compaction output file. + return Status::OK(); } - // The end key of the subcompaction must be bigger or equal to the upper // bound. If the end of subcompaction is null or the upper bound is null, // it means that this file is the last file in the compaction. So there // will be no overlapping between this file and others. assert(comp_end_user_key == nullptr || upper_bound == nullptr || - ucmp->CompareWithoutTimestamp(*upper_bound, *comp_end_user_key) <= 0); - auto it = range_del_agg_->NewIterator(lower_bound, upper_bound, - has_overlapping_endpoints); - // Position the range tombstone output iterator. There may be tombstone - // fragments that are entirely out of range, so make sure that we do not - // include those. - if (lower_bound != nullptr) { - it->Seek(*lower_bound); - } else { - it->SeekToFirst(); - } + ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound), + *comp_end_user_key) <= 0); + auto it = range_del_agg_->NewIterator(lower_bound, upper_bound); Slice last_tombstone_start_user_key{}; - for (; it->Valid(); it->Next()) { + bool reached_lower_bound = false; + for (it->SeekToFirst(); it->Valid(); it->Next()) { auto tombstone = it->Tombstone(); - if (upper_bound != nullptr) { - int cmp = - ucmp->CompareWithoutTimestamp(*upper_bound, tombstone.start_key_); - // Tombstones starting after upper_bound only need to be included in - // the next table. - // If the current SST ends before upper_bound, i.e., - // `has_overlapping_endpoints == false`, we can also skip over range - // tombstones that start exactly at upper_bound. Such range - // tombstones will be included in the next file and are not relevant - // to the point keys or endpoints of the current file. - // If the current SST ends at the same user key at upper_bound, - // i.e., `has_overlapping_endpoints == true`, AND the tombstone has - // the same start key as upper_bound, i.e., cmp == 0, then - // the tombstone is relevant only if the tombstone's sequence number - // is no larger than this file's largest key's sequence number. This - // is because the upper bound to truncate this file's range tombstone - // will be meta.largest in this case, and any tombstone that starts after - // it will not be relevant. - if (cmp < 0) { - break; - } else if (cmp == 0) { - if (!has_overlapping_endpoints || - tombstone.seq_ < GetInternalKeySeqno(meta.largest.Encode())) { - break; - } - } + auto kv = tombstone.Serialize(); + InternalKey tombstone_end = tombstone.SerializeEndKey(); + // TODO: the underlying iterator should support clamping the bounds. + // tombstone_end.Encode is of form user_key@kMaxSeqno + // if it is equal to lower_bound, there is no need to include + // such range tombstone. + if (!reached_lower_bound && lower_bound && + icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) { + continue; } + assert(!lower_bound || + icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0); + reached_lower_bound = true; - const size_t ts_sz = ucmp->timestamp_size(); // Garbage collection for range tombstones. // If user-defined timestamp is enabled, range tombstones are dropped if // they are at bottommost_level, below full_history_ts_low and not visible @@ -534,83 +608,93 @@ Status CompactionOutputs::AddRangeDels( continue; } - auto kv = tombstone.Serialize(); assert(lower_bound == nullptr || - ucmp->CompareWithoutTimestamp(*lower_bound, kv.second) < 0); + ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound), + kv.second) < 0); + InternalKey tombstone_start = kv.first; + if (lower_bound && + ucmp->CompareWithoutTimestamp(tombstone_start.user_key(), + ExtractUserKey(*lower_bound)) < 0) { + // This just updates the non-timestamp portion of `tombstone_start`'s user + // key. Ideally there would be a simpler API usage + ParsedInternalKey tombstone_start_parsed; + ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed, + false /* log_err_key */) + .PermitUncheckedError(); + // timestamp should be from where sequence number is from, which is from + // tombstone in this case + std::string ts = + tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size()) + .ToString(); + tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound); + tombstone_start.SetFrom(tombstone_start_parsed, ts); + } + if (upper_bound != nullptr && + icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) { + break; + } + // Here we show that *only* range tombstones that overlap with + // [lower_bound, upper_bound] are added to the current file, and + // sanity checking invariants that should hold: + // - [tombstone_start, tombstone_end] overlaps with [lower_bound, + // upper_bound] + // - meta.smallest <= meta.largest + // Corresponding assertions are made, the proof is broken is any of them + // fails. + // TODO: show that *all* range tombstones that overlap with + // [lower_bound, upper_bound] are added. + // TODO: some invariant about boundaries are correctly updated. + // + // Note that `tombstone_start` is updated in the if condition above, we use + // tombstone_start to refer to its initial value, i.e., + // it->Tombstone().first, and use tombstone_start* to refer to its value + // after the update. + // + // To show [lower_bound, upper_bound] overlaps with [tombstone_start, + // tombstone_end]: + // lower_bound <= upper_bound from the if condition right after all + // bounds are initialized. We assume each tombstone fragment has + // start_key.user_key < end_key.user_key, so + // tombstone_start < tombstone_end by + // FragmentedTombstoneIterator::Tombstone(). So these two ranges are both + // non-emtpy. The flag `reached_lower_bound` and the if logic before it + // ensures lower_bound <= tombstone_end. tombstone_start is only updated + // if it has a smaller user_key than lower_bound user_key, so + // tombstone_start <= tombstone_start*. The above if condition implies + // tombstone_start* <= upper_bound. So we have + // tombstone_start <= upper_bound and lower_bound <= tombstone_end + // and the two ranges overlap. + // + // To show meta.smallest <= meta.largest: + // From the implementation of UpdateBoundariesForRange(), it suffices to + // prove that when it is first called in this function, its parameters + // satisfy `start <= end`, where start = max(tombstone_start*, lower_bound) + // and end = min(tombstone_end, upper_bound). From the above proof we have + // lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need + // to show that tombstone_start* <= min(tombstone_end, upper_bound). + // Note that tombstone_start*.user_key = max(tombstone_start.user_key, + // lower_bound.user_key). Assuming tombstone_end always has + // kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber. + // Since lower_bound <= tombstone_end and lower_bound.seqno < + // tombstone_end.seqno (in absolute number order, not internal key order), + // lower_bound.user_key < tombstone_end.user_key. + // Since lower_bound.user_key < tombstone_end.user_key and + // tombstone_start.user_key < tombstone_end.user_key, tombstone_start* < + // tombstone_end. Since tombstone_start* <= upper_bound from the above proof + // and tombstone_start* < tombstone_end, tombstone_start* <= + // min(tombstone_end, upper_bound), so the two ranges overlap. + // Range tombstone is not supported by output validator yet. builder_->Add(kv.first.Encode(), kv.second); - InternalKey tombstone_start = std::move(kv.first); - InternalKey smallest_candidate{tombstone_start}; - if (lower_bound != nullptr && - ucmp->CompareWithoutTimestamp(smallest_candidate.user_key(), - *lower_bound) <= 0) { - // Pretend the smallest key has the same user key as lower_bound - // (the max key in the previous table or subcompaction) in order for - // files to appear key-space partitioned. - if (lower_bound_from_sub_compact) { - // When lower_bound is chosen by a subcompaction - // (lower_bound_from_sub_compact), we know that subcompactions over - // smaller keys cannot contain any keys at lower_bound. We also know - // that smaller subcompactions exist, because otherwise the - // subcompaction woud be unbounded on the left. As a result, we know - // that no other files on the output level will contain actual keys at - // lower_bound (an output file may have a largest key of - // lower_bound@kMaxSequenceNumber, but this only indicates a large range - // tombstone was truncated). Therefore, it is safe to use the - // tombstone's sequence number, to ensure that keys at lower_bound at - // lower levels are covered by truncated tombstones. - if (ts_sz) { - assert(tombstone.ts_.size() == ts_sz); - smallest_candidate = InternalKey(*lower_bound, tombstone.seq_, - kTypeRangeDeletion, tombstone.ts_); - } else { - smallest_candidate = - InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion); - } - } else { - // If lower_bound was chosen by the smallest data key in the file, - // choose lowest seqnum so this file's smallest internal key comes - // after the previous file's largest. The fake seqnum is OK because - // the read path's file-picking code only considers user key. - smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); - } + if (lower_bound && + icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) { + tombstone_start.DecodeFrom(*lower_bound); } - InternalKey tombstone_end = tombstone.SerializeEndKey(); - InternalKey largest_candidate{tombstone_end}; - if (upper_bound != nullptr && - ucmp->CompareWithoutTimestamp(*upper_bound, - largest_candidate.user_key()) <= 0) { - // Pretend the largest key has the same user key as upper_bound (the - // min key in the following table or subcompaction) in order for files - // to appear key-space partitioned. - // - // Choose highest seqnum so this file's largest internal key comes - // before the next file's/subcompaction's smallest. The fake seqnum is - // OK because the read path's file-picking code only considers the - // user key portion. - // - // Note Seek() also creates InternalKey with (user_key, - // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of - // kTypeRangeDeletion (0xF), so the range tombstone comes before the - // Seek() key in InternalKey's ordering. So Seek() will look in the - // next file for the user key - if (ts_sz) { - static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; - if (ts_sz <= strlen(kTsMax)) { - largest_candidate = - InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion, - Slice(kTsMax, ts_sz)); - } else { - largest_candidate = - InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion, - std::string(ts_sz, '\xff')); - } - } else { - largest_candidate = - InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion); - } + if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) { + tombstone_end.DecodeFrom(*upper_bound); } - meta.UpdateBoundariesForRange(smallest_candidate, largest_candidate, + assert(icmp.Compare(tombstone_start, tombstone_end) <= 0); + meta.UpdateBoundariesForRange(tombstone_start, tombstone_end, tombstone.seq_, icmp); if (!bottommost_level) { bool start_user_key_changed = @@ -618,17 +702,8 @@ Status CompactionOutputs::AddRangeDels( ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key, it->start_key()) < 0; last_tombstone_start_user_key = it->start_key(); - // Range tombstones are truncated at file boundaries - if (icmp.Compare(tombstone_start, meta.smallest) < 0) { - tombstone_start = meta.smallest; - } - if (icmp.Compare(tombstone_end, meta.largest) > 0) { - tombstone_end = meta.largest; - } - // this assertion validates invariant (2) in the comment below. - assert(icmp.Compare(tombstone_start, tombstone_end) <= 0); if (start_user_key_changed) { - // if tombstone_start >= tombstone_end, then either no key range is + // If tombstone_start >= tombstone_end, then either no key range is // covered, or that they have the same user key. If they have the same // user key, then the internal key range should only be within this // level, and no keys from older levels is covered. @@ -646,138 +721,6 @@ Status CompactionOutputs::AddRangeDels( } } } - // TODO: show invariants that ensure all necessary range tombstones are - // added - // and that file boundaries ensure no coverage is lost. - // Each range tombstone with internal key range [tombstone_start, - // tombstone_end] is being added to the current compaction output file here. - // The range tombstone is going to be truncated at range [meta.smallest, - // meta.largest] during reading/scanning. We should maintain invariants - // (1) meta.smallest <= meta.largest and, - // (2) [tombstone_start, tombstone_end] and [meta.smallest, meta.largest] - // overlaps, as there is no point adding range tombstone with a range - // outside the file's range. - // Since `tombstone_end` is always some user_key@kMaxSeqno, it is okay to - // use either open or closed range. Using closed range here to make - // reasoning easier, and it is more consistent with an ongoing work that - // tries to simplify this method. - // - // There are two cases: - // Case 1. Output file has no point key: - // First we show this case only happens when the entire compaction output - // is range tombstone only. This is true if CompactionIterator does not - // emit any point key. Suppose CompactionIterator emits some point key. - // Based on the assumption that CompactionOutputs::ShouldStopBefore() - // always return false for the first point key, the first compaction - // output file always contains a point key. Each new compaction output - // file is created if there is a point key for which ShouldStopBefore() - // returns true, and the point key would be added to the new compaction - // output file. So each new compaction file always contains a point key. - // So Case 1 only happens when CompactionIterator does not emit any - // point key. - // - // To show (1) meta.smallest <= meta.largest: - // Since the compaction output is range tombstone only, `lower_bound` and - // `upper_bound` are either null or comp_start/end_user_key respectively. - // According to how UpdateBoundariesForRange() is implemented, it blindly - // updates meta.smallest and meta.largest to smallest_candidate and - // largest_candidate the first time it is called. Subsequently, it - // compares input parameter with meta.smallest and meta.largest and only - // updates them when input is smaller/larger. So we only need to show - // smallest_candidate <= largest_candidate the first time - // UpdateBoundariesForRange() is called. Here we show something stronger - // that smallest_candidate.user_key < largest_candidate.user_key always - // hold for Case 1. - // We assume comp_start_user_key < comp_end_user_key, if provided. We - // assume that tombstone_start < tombstone_end. This assumption is based - // on that each fragment in FragmentedTombstoneList has - // start_key < end_key (user_key) and that - // FragmentedTombstoneIterator::Tombstone() returns the pair - // (start_key@tombstone_seqno with op_type kTypeRangeDeletion, end_key). - // The logic in this loop sets smallest_candidate to - // max(tombstone_start.user_key, comp_start_user_key)@tombstone.seq_ with - // op_type kTypeRangeDeletion, largest_candidate to - // min(tombstone_end.user_key, comp_end_user_key)@kMaxSequenceNumber with - // op_type kTypeRangeDeletion. When a bound is null, there is no - // truncation on that end. To show that smallest_candidate.user_key < - // largest_candidate.user_key, it suffices to show - // tombstone_start.user_key < comp_end_user_key (if not null) AND - // comp_start_user_key (if not null) < tombstone_end.user_key. - // Since the file has no point key, `has_overlapping_endpoints` is false. - // In the first sanity check of this for-loop, we compare - // tombstone_start.user_key against upper_bound = comp_end_user_key, - // and only proceed if tombstone_start.user_key < comp_end_user_key. - // We assume FragmentedTombstoneIterator::Seek(k) lands - // on a tombstone with end_key > k. So the call it->Seek(*lower_bound) - // above implies compact_start_user_key < tombstone_end.user_key. - // - // To show (2) [tombstone_start, tombstone_end] and [meta.smallest, - // meta.largest] overlaps (after the call to UpdateBoundariesForRange()): - // In the proof for (1) we have shown that - // smallest_candidate <= largest_candidate. Since tombstone_start <= - // smallest_candidate <= largest_candidate <= tombstone_end, for (2) to - // hold, it suffices to show that [smallest_candidate, largest_candidate] - // overlaps with [meta.smallest, meta.largest]. too. - // Given meta.smallest <= meta.largest shown above, we need to show - // that it is impossible to have largest_candidate < meta.smallest or - // meta.largest < smallest_candidate. If the above - // meta.UpdateBoundariesForRange(smallest_candidate, largest_candidate) - // updates meta.largest or meta.smallest, then the two ranges overlap. - // So we assume meta.UpdateBoundariesForRange(smallest_candidate, - // largest_candidate) did not update meta.smallest nor meta.largest, which - // means meta.smallest < smallest_candidate and largest_candidate < - // meta.largest. - // - // Case 2. Output file has >= 1 point key. This means meta.smallest and - // meta.largest are not empty when AddRangeDels() is called. - // To show (1) meta.smallest <= meta.largest: - // Assume meta.smallest <= meta.largest when AddRangeDels() is called, - // this follow from how UpdateBoundariesForRange() is implemented where it - // takes min or max to update meta.smallest or meta.largest. - // - // To show (2) [tombstone_start, tombstone_end] and [meta.smallest, - // meta.largest] overlaps (after the call to UpdateBoundariesForRange()): - // When smallest_candidate <= largest_candidate, the proof in Case 1 - // applies, so we only need to show (2) holds when smallest_candidate > - // largest_candidate. When both bounds are either null or from - // subcompaction boundary, the proof in Case 1 applies, so we only need to - // show (2) holds when at least one bound is from a point key (either - // meta.smallest for lower bound or next_table_min_key for upper bound). - // - // Suppose lower bound is meta.smallest.user_key. The call - // it->Seek(*lower_bound) implies tombstone_end.user_key > - // meta.smallest.user_key. We have smallest_candidate.user_key = - // max(tombstone_start.user_key, meta.smallest.user_key). For - // smallest_candidate to be > largest_candidate, we need - // largest_candidate.user_key = upper_bound = smallest_candidate.user_key, - // where tombstone_end is truncated to largest_candidate. - // Subcase 1: - // Suppose largest_candidate.user_key = comp_end_user_key (there is no - // next point key). Subcompaction ensures any point key from this - // subcompaction has a user_key < comp_end_user_key, so 1) - // meta.smallest.user_key < comp_end_user_key, 2) - // `has_overlapping_endpoints` is false, and the first if condition in - // this for-loop ensures tombstone_start.user_key < comp_end_user_key. So - // smallest_candidate.user_key < largest_candidate.user_key. This case - // cannot happen when smallest > largest_candidate. - // Subcase 2: - // Suppose largest_candidate.user_key = next_table_min_key.user_key. - // The first if condition in this for-loop together with - // smallest_candidate.user_key = next_table_min_key.user_key = - // upper_bound implies `has_overlapping_endpoints` is true (so meta - // largest.user_key = upper_bound) and - // tombstone.seq_ < meta.largest.seqno. So - // tombstone_start < meta.largest < tombstone_end. - // - // Suppose lower bound is comp_start_user_key and upper_bound is - // next_table_min_key. The call it->Seek(*lower_bound) implies we have - // tombstone_end_key.user_key > comp_start_user_key. So - // tombstone_end_key.user_key > smallest_candidate.user_key. For - // smallest_candidate to be > largest_candidate, we need - // tombstone_start.user_key = largest_candidate.user_key = upper_bound = - // next_table_min_key.user_key. This means `has_overlapping_endpoints` is - // true (so meta.largest.user_key = upper_bound) and tombstone.seq_ < - // meta.largest.seqno. So tombstone_start < meta.largest < tombstone_end. } return Status::OK(); } diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 52233917f..1b65ee662 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -167,9 +167,15 @@ class CompactionOutputs { current_output_file_size_ = 0; } - // Add range-dels from the aggregator to the current output file + // Add range deletions from the range_del_agg_ to the current output file. + // Input parameters, `range_tombstone_lower_bound_` and current output's + // metadata determine the bounds on range deletions to add. Updates output + // file metadata boundary if extended by range tombstones. + // // @param comp_start_user_key and comp_end_user_key include timestamp if - // user-defined timestamp is enabled. + // user-defined timestamp is enabled. Their timestamp should be max timestamp. + // @param next_table_min_key internal key lower bound for the next compaction + // output. // @param full_history_ts_low used for range tombstone garbage collection. Status AddRangeDels(const Slice* comp_start_user_key, const Slice* comp_end_user_key, @@ -314,6 +320,7 @@ class CompactionOutputs { std::unique_ptr partitioner_; // A flag determines if this subcompaction has been split by the cursor + // for RoundRobin compaction bool is_split_ = false; // We also maintain the output split key for each subcompaction to avoid @@ -345,6 +352,10 @@ class CompactionOutputs { // for the current output file, how many file boundaries has it crossed, // basically number of files overlapped * 2 size_t grandparent_boundary_switched_num_ = 0; + + // The smallest key of the current output file, this is set when current + // output file's smallest key is a range tombstone start key. + InternalKey range_tombstone_lower_bound_; }; // helper struct to concatenate the last level and penultimate level outputs diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 83353defc..b933a62a5 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -84,6 +84,11 @@ class SubcompactionState { // Assign range dels aggregator, for each range_del, it can only be assigned // to one output level, for per_key_placement, it's going to be the // penultimate level. + // TODO: This does not work for per_key_placement + user-defined timestamp + + // DeleteRange() combo. If user-defined timestamp is enabled, + // it is possible for a range tombstone to belong to bottommost level ( + // seqno < earliest snapshot) without being dropped (garbage collection + // for user-defined timestamp). void AssignRangeDelAggregator( std::unique_ptr&& range_del_agg) { if (compaction->SupportsPerKeyPlacement()) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 44f7da17a..08bd3af04 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -1661,6 +1661,217 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) { ASSERT_EQ(1, num_range_deletions); } +TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) { + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.compaction_pri = kMinOverlappingRatio; + options.disable_auto_compactions = true; + options.ttl = 24 * 60 * 60; // 24 hours + options.target_file_size_base = 8 << 10; + env_->SetMockSleep(); + options.env = env_; + DestroyAndReopen(options); + + Random rnd(301); + // Fill some data so that future compactions are not bottommost level + // compaction, and hence they would try cut around files for ttl + for (int i = 5; i < 10; ++i) { + ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_EQ("0,0,0,1", FilesPerLevel()); + + for (int i = 5; i < 10; ++i) { + ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_EQ("0,1,0,1", FilesPerLevel()); + + env_->MockSleepForSeconds(20 * 60 * 60); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key(11), Key(12))); + ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,0,1", FilesPerLevel()); + // L0 file is new, L1 and L3 file are old and qualified for TTL + env_->MockSleepForSeconds(10 * 60 * 60); + MoveFilesToLevel(1); + // L1 output should be cut into 3 files: + // File 0: Key(0) + // File 1: (qualified for TTL): Key(5) - Key(10) + // File 1: DeleteRange [11, 12) + ASSERT_EQ("0,3,0,1", FilesPerLevel()); +} + +// Test SST partitioner cut after every single key +class SingleKeySstPartitioner : public SstPartitioner { + public: + const char* Name() const override { return "SingleKeySstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + return kRequired; + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return false; + } +}; + +class SingleKeySstPartitionerFactory : public SstPartitionerFactory { + public: + static const char* kClassName() { return "SingleKeySstPartitionerFactory"; } + const char* Name() const override { return kClassName(); } + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& /* context */) const override { + return std::unique_ptr(new SingleKeySstPartitioner()); + } +}; + +TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) { + Options options = CurrentOptions(); + auto factory = std::make_shared(); + options.sst_partitioner_factory = factory; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + Random rnd(301); + // range deletion keys are not processed when compacting to bottommost level, + // so creating a file at older level to make the next compaction not + // bottommost level + ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10))); + ASSERT_OK(Flush()); + MoveFilesToLevel(5); + + ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2), + Key(5))); + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(1); + // SSTPartitioner decides to cut when range tombstone start key is passed to + // it. Note that the range tombstone [2, 5) itself span multiple keys, but we + // are not able to partition within its range yet. + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) { + // L2 has 2 files + // L2_0: 0, 1, 2, 3, 4 + // L2_1: 5, 6, 7 + // L0 has 1 file + // L0: 0, [5, 6), 8 + // max_compaction_bytes is less than the size of L2_0 and L2_1. + // When compacting L0 into L1, it should split into 3 files: + // compaction output should cut before key 5 and key 8 to + // limit future compaction size. + const int kNumPerFile = 4, kNumFiles = 2; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + std::vector values; + for (int j = 0; j < kNumPerFile; j++) { + values.push_back(rnd.RandomString(3 << 10)); + ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5), + Key(6))); + ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + ASSERT_EQ(3, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) { + // L2 has two files + // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7 + // L0 has two range tombstones [0, 1), [7, 8). + // max_compaction_bytes is less than the size of L2_0. + // When compacting L0 into L1, the two range tombstones should be + // split into two files. + const int kNumPerFile = 4, kNumFiles = 2; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + std::vector values; + // Write 12K (4 values, each 3K) + for (int j = 0; j < kNumPerFile; j++) { + values.push_back(rnd.RandomString(3 << 10)); + ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), + Key(1))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7), + Key(8))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + // This is L0 -> L1 compaction + // The two range tombstones are broken up into two output files + // to limit compaction size. + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) { + // L2 has two files + // L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8 + // L0 has [0, 9) and point key 5 + // max_compaction_bytes is less than the size of L2_0. + // When compacting L0 into L1, the compaction should cut at point key 5. + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < 9; ++i) { + if (i == 5) { + ++i; + } + ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10))); + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), + Key(9))); + ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + TEST_F(DBRangeDelTest, OverlappedTombstones) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); @@ -2093,6 +2304,7 @@ TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 2 * 1024; + options.level_compaction_dynamic_file_size = false; DestroyAndReopen(options); Random rnd(301); @@ -2508,7 +2720,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTest) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 1024; + options.max_compaction_bytes = 2048; DestroyAndReopen(options); // L2 @@ -2554,7 +2766,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 1024; + options.max_compaction_bytes = 3 * 1024; DestroyAndReopen(options); // L2 @@ -3015,6 +3227,183 @@ TEST_F(DBRangeDelTest, DoubleCountRangeTombstoneCompensatedSize) { db_->ReleaseSnapshot(snapshot); } +TEST_F(DBRangeDelTest, AddRangeDelsSameLowerAndUpperBound) { + // Test for an edge case where CompactionOutputs::AddRangeDels() + // is called with an empty range: `range_tombstone_lower_bound_` is not empty + // and have the same user_key and sequence number as `next_table_min_key. + // This used to cause file's smallest and largest key to be incorrectly set + // such that smallest > largest, and fail some assertions in iterator and/or + // assertion in VersionSet::ApproximateSize(). + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.target_file_size_base = 1 << 10; + opts.level_compaction_dynamic_file_size = false; + DestroyAndReopen(opts); + + Random rnd(301); + // Create file at bottommost level so the manual compaction below is + // non-bottommost level and goes through code path like compensate range + // tombstone size. + ASSERT_OK(Put(Key(1), "v1")); + ASSERT_OK(Put(Key(4), "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(6); + + ASSERT_OK(Put(Key(1), rnd.RandomString(4 << 10))); + ASSERT_OK(Put(Key(3), rnd.RandomString(4 << 10))); + // So Key(3) does not get dropped. + const Snapshot* snapshot = db_->GetSnapshot(); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2), + Key(4))); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(3), rnd.RandomString(4 << 10))); + ASSERT_OK(Put(Key(4), rnd.RandomString(4 << 10))); + ASSERT_OK(Flush()); + + MoveFilesToLevel(1); + // Each file will have two keys, with Key(3) straddle between two files. + // File 1: Key(1)@1, Key(3)@6, DeleteRange ends at Key(3)@6 + // File 2: Key(3)@4, Key(4)@7, DeleteRange start from Key(3)@4 + ASSERT_EQ(NumTableFilesAtLevel(1), 2); + + // Manually update compaction output file cutting decisions + // to cut before range tombstone sentinel Key(3)@4 + // and the point key Key(3)@4 itself + SyncPoint::GetInstance()->SetCallBack( + "CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) { + auto* pair = (std::pair*)p; + if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) == + 0) && + (GetInternalKeySeqno(pair->second) <= 4)) { + *(pair->first) = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + std::string begin_key = Key(0); + std::string end_key = Key(5); + Slice begin_slice{begin_key}; + Slice end_slice{end_key}; + ASSERT_OK(dbfull()->RunManualCompaction( + static_cast_with_check(db_->DefaultColumnFamily()) + ->cfd(), + 1, 2, CompactRangeOptions(), &begin_slice, &end_slice, true, + true /* disallow_trivial_move */, + std::numeric_limits::max() /*max_file_num_to_ignore*/, + "" /*trim_ts*/)); + // iterate through to check if any assertion breaks + std::unique_ptr iter{db_->NewIterator(ReadOptions())}; + iter->SeekToFirst(); + std::vector expected{1, 3, 4}; + for (auto i : expected) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + iter->Next(); + } + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + db_->ReleaseSnapshot(snapshot); +} + +TEST_F(DBRangeDelTest, AddRangeDelsSingleUserKeyTombstoneOnlyFile) { + // Test for an edge case where CompactionOutputs::AddRangeDels() + // is called with an SST file that has no point keys, and that + // the lower bound and upper bound have the same user key. + // This could cause a file's smallest and largest key to be incorrectly set + // such that smallest > largest, and fail some assertions in iterator and/or + // assertion in VersionSet::ApproximateSize(). + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.target_file_size_base = 1 << 10; + opts.level_compaction_dynamic_file_size = false; + DestroyAndReopen(opts); + + Random rnd(301); + // Create file at bottommost level so the manual compaction below is + // non-bottommost level and goes through code path like compensate range + // tombstone size. + ASSERT_OK(Put(Key(1), "v1")); + ASSERT_OK(Put(Key(4), "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(6); + + ASSERT_OK(Put(Key(1), rnd.RandomString(10))); + // Key(3)@4 + ASSERT_OK(Put(Key(3), rnd.RandomString(10))); + const Snapshot* snapshot1 = db_->GetSnapshot(); + // Key(3)@5 + ASSERT_OK(Put(Key(3), rnd.RandomString(10))); + const Snapshot* snapshot2 = db_->GetSnapshot(); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2), + Key(4))); + // Key(3)@7 + ASSERT_OK(Put(Key(3), rnd.RandomString(10))); + ASSERT_OK(Flush()); + + // L0 -> L1 compaction: cut output into two files: + // File 1: Key(1), Key(3)@7, Range tombstone ends at Key(3)@7 + // File 2: Key(3)@5, Key(3)@4, Range tombstone starts from Key(3)@5 + SyncPoint::GetInstance()->SetCallBack( + "CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) { + auto* pair = (std::pair*)p; + if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) == + 0) && + (GetInternalKeySeqno(pair->second) <= 6)) { + *(pair->first) = true; + SyncPoint::GetInstance()->DisableProcessing(); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + std::string begin_key = Key(0); + std::string end_key = Key(5); + Slice begin_slice{begin_key}; + Slice end_slice{end_key}; + ASSERT_OK(dbfull()->RunManualCompaction( + static_cast_with_check(db_->DefaultColumnFamily()) + ->cfd(), + 0, 1, CompactRangeOptions(), &begin_slice, &end_slice, true, + true /* disallow_trivial_move */, + std::numeric_limits::max() /*max_file_num_to_ignore*/, + "" /*trim_ts*/)); + ASSERT_EQ(NumTableFilesAtLevel(1), 2); + + // L1 -> L2 compaction, drop the snapshot protecting Key(3)@5. + // Let ShouldStopBefore() return true for Key(3)@5 (delete range sentinel) + // and Key(3)@4. + // Output should have two files: + // File 1: Key(1), Key(3)@7, range tombstone ends at Key(3)@7 + // File dropped: range tombstone only file (from Key(3)@5 to Key(3)@4) + // File 2: Range tombstone starting from Key(3)@4, Key(3)@4 + db_->ReleaseSnapshot(snapshot2); + SyncPoint::GetInstance()->SetCallBack( + "CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) { + auto* pair = (std::pair*)p; + if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) == + 0) && + (GetInternalKeySeqno(pair->second) <= 6)) { + *(pair->first) = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->RunManualCompaction( + static_cast_with_check(db_->DefaultColumnFamily()) + ->cfd(), + 1, 2, CompactRangeOptions(), &begin_slice, &end_slice, true, + true /* disallow_trivial_move */, + std::numeric_limits::max() /*max_file_num_to_ignore*/, + "" /*trim_ts*/)); + ASSERT_EQ(NumTableFilesAtLevel(2), 2); + // iterate through to check if any assertion breaks + std::unique_ptr iter{db_->NewIterator(ReadOptions())}; + iter->SeekToFirst(); + std::vector expected{1, 3, 4}; + for (auto i : expected) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + iter->Next(); + } + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + db_->ReleaseSnapshot(snapshot1); +} } // namespace ROCKSDB_NAMESPACE diff --git a/db/dbformat.h b/db/dbformat.h index d9fadea1c..3a6edc1bf 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -86,8 +86,10 @@ inline bool IsValueType(ValueType t) { // Checks whether a type is from user operation // kTypeRangeDeletion is in meta block so this API is separated from above +// kTypeMaxValid can be from keys generated by +// TruncatedRangeDelIterator::start_key() inline bool IsExtendedValueType(ValueType t) { - return IsValueType(t) || t == kTypeRangeDeletion; + return IsValueType(t) || t == kTypeRangeDeletion || t == kTypeMaxValid; } // We leave eight bits empty at the bottom so a type and sequence# diff --git a/db/history_trimming_iterator.h b/db/history_trimming_iterator.h index b445ced33..4af5cde72 100644 --- a/db/history_trimming_iterator.h +++ b/db/history_trimming_iterator.h @@ -82,6 +82,10 @@ class HistoryTrimmingIterator : public InternalIterator { bool IsValuePinned() const override { return input_->IsValuePinned(); } + bool IsDeleteRangeSentinelKey() const override { + return input_->IsDeleteRangeSentinelKey(); + } + private: InternalIterator* input_; const std::string filter_ts_; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index eceb9bcb8..976af55bc 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -231,6 +231,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, s = Status::ShutdownInProgress(); return s; } + // Skip range tombstones emitted by the compaction iterator. + if (iter->IsDeleteRangeSentinelKey()) { + continue; + } ParsedInternalKey ikey; assert(keys_.size() == merge_context_.GetNumOperands()); diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index c03efa11f..e66ce7cf7 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -36,6 +36,7 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator( Status pik_status = ParseInternalKey(smallest->Encode(), &parsed_smallest, false /* log_err_key */); // TODO pik_status.PermitUncheckedError(); + parsed_smallest.type = kTypeMaxValid; assert(pik_status.ok()); smallest_ = &parsed_smallest; } @@ -70,7 +71,7 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator( parsed_largest.sequence -= 1; // This line is not needed for correctness, but it ensures that the // truncated end key is not covering keys from the next SST file. - parsed_largest.type = kValueTypeForSeek; + parsed_largest.type = kTypeMaxValid; } largest_ = &parsed_largest; } @@ -393,21 +394,20 @@ bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed, namespace { // Produce a sorted (by start internal key) stream of range tombstones from -// `children`. lower_bound and upper_bound on user key can be +// `children`. lower_bound and upper_bound on internal key can be // optionally specified. Range tombstones that ends before lower_bound or starts // after upper_bound are excluded. // If user-defined timestamp is enabled, lower_bound and upper_bound should -// contain timestamp, but comparison is done ignoring timestamps. +// contain timestamp. class TruncatedRangeDelMergingIter : public InternalIterator { public: TruncatedRangeDelMergingIter( const InternalKeyComparator* icmp, const Slice* lower_bound, - const Slice* upper_bound, bool upper_bound_inclusive, + const Slice* upper_bound, const std::vector>& children) : icmp_(icmp), lower_bound_(lower_bound), upper_bound_(upper_bound), - upper_bound_inclusive_(upper_bound_inclusive), heap_(StartKeyMinComparator(icmp)), ts_sz_(icmp_->user_comparator()->timestamp_size()) { for (auto& child : children) { @@ -420,7 +420,7 @@ class TruncatedRangeDelMergingIter : public InternalIterator { } bool Valid() const override { - return !heap_.empty() && BeforeEndKey(heap_.top()); + return !heap_.empty() && !AfterEndKey(heap_.top()); } Status status() const override { return Status::OK(); } @@ -428,7 +428,13 @@ class TruncatedRangeDelMergingIter : public InternalIterator { heap_.clear(); for (auto& child : children_) { if (lower_bound_ != nullptr) { - child->Seek(*lower_bound_); + child->Seek(ExtractUserKey(*lower_bound_)); + // Since the above `Seek()` operates on a user key while `lower_bound_` + // is an internal key, we may need to advance `child` farther for it to + // be in bounds. + while (child->Valid() && BeforeStartKey(child)) { + child->InternalNext(); + } } else { child->SeekToFirst(); } @@ -481,19 +487,23 @@ class TruncatedRangeDelMergingIter : public InternalIterator { void SeekToLast() override { assert(false); } private: - bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const { + bool BeforeStartKey(const TruncatedRangeDelIterator* iter) const { + if (lower_bound_ == nullptr) { + return false; + } + return icmp_->Compare(iter->end_key(), *lower_bound_) <= 0; + } + + bool AfterEndKey(const TruncatedRangeDelIterator* iter) const { if (upper_bound_ == nullptr) { - return true; + return false; } - int cmp = icmp_->user_comparator()->CompareWithoutTimestamp( - iter->start_key().user_key, *upper_bound_); - return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0; + return icmp_->Compare(iter->start_key(), *upper_bound_) > 0; } const InternalKeyComparator* icmp_; const Slice* lower_bound_; const Slice* upper_bound_; - bool upper_bound_inclusive_; BinaryHeap heap_; std::vector children_; @@ -506,11 +516,10 @@ class TruncatedRangeDelMergingIter : public InternalIterator { std::unique_ptr CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound, - const Slice* upper_bound, - bool upper_bound_inclusive) { + const Slice* upper_bound) { InvalidateRangeDelMapPositions(); auto merging_iter = std::make_unique( - icmp_, lower_bound, upper_bound, upper_bound_inclusive, parent_iters_); + icmp_, lower_bound, upper_bound, parent_iters_); auto fragmented_tombstone_list = std::make_shared( diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 9bd40967d..611be2a31 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -452,16 +452,15 @@ class CompactionRangeDelAggregator : public RangeDelAggregator { } // Creates an iterator over all the range tombstones in the aggregator, for - // use in compaction. Nullptr arguments indicate that the iterator range is - // unbounded. - // NOTE: the boundaries are used for optimization purposes to reduce the - // number of tombstones that are passed to the fragmenter; they do not - // guarantee that the resulting iterator only contains range tombstones that - // cover keys in the provided range. If required, these bounds must be + // use in compaction. + // + // NOTE: the internal key boundaries are used for optimization purposes to + // reduce the number of tombstones that are passed to the fragmenter; they do + // not guarantee that the resulting iterator only contains range tombstones + // that cover keys in the provided range. If required, these bounds must be // enforced during iteration. std::unique_ptr NewIterator( - const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr, - bool upper_bound_inclusive = false); + const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr); private: std::vector> parent_iters_; diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc index 7fe35276a..89391c924 100644 --- a/db/range_del_aggregator_test.cc +++ b/db/range_del_aggregator_test.cc @@ -224,26 +224,32 @@ TEST_F(RangeDelAggregatorTest, UntruncatedIter) { TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, nullptr); - VerifyIterator(&iter, bytewise_icmp, - {{UncutEndpoint("a"), UncutEndpoint("e"), 10}, - {UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {UncutEndpoint("j"), UncutEndpoint("n"), 4}}); + VerifyIterator( + &iter, bytewise_icmp, + {{InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"), 10}, + {InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}}); VerifySeek( &iter, bytewise_icmp, - {{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, - {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, - {"", UncutEndpoint("a"), UncutEndpoint("e"), 10}}); + {{"d", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"), + 10}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}, + {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}, + {"", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"), + 10}}); VerifySeekForPrev( &iter, bytewise_icmp, - {{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, - {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); + {{"d", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"), + 10}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"n", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}, + {"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}}); } TEST_F(RangeDelAggregatorTest, UntruncatedIterWithSnapshot) { @@ -258,25 +264,29 @@ TEST_F(RangeDelAggregatorTest, UntruncatedIterWithSnapshot) { TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, nullptr); - VerifyIterator(&iter, bytewise_icmp, - {{UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {UncutEndpoint("j"), UncutEndpoint("n"), 4}}); + VerifyIterator( + &iter, bytewise_icmp, + {{InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}}); VerifySeek( &iter, bytewise_icmp, - {{"d", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, - {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, - {"", UncutEndpoint("e"), UncutEndpoint("g"), 8}}); + {{"d", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}, + {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}, + {"", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}}); VerifySeekForPrev( &iter, bytewise_icmp, - {{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, - {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); + {{"d", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"n", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}, + {"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}}); } TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) { @@ -295,27 +305,30 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) { VerifyIterator( &iter, bytewise_icmp, - {{InternalValue("d", 7), UncutEndpoint("e"), 10}, - {UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4}}); + {{InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10}, + {InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {InternalValue("j", 4, kTypeRangeDeletion), + InternalValue("m", 8, kTypeMaxValid), 4}}); VerifySeek( &iter, bytewise_icmp, - {{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4, - false /* invalid */}, - {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, - {"", InternalValue("d", 7), UncutEndpoint("e"), 10}}); + {{"d", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("j", 4, kTypeRangeDeletion), + InternalValue("m", 8, kTypeMaxValid), 4, false /* invalid */}, + {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}, + {"", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10}}); VerifySeekForPrev( &iter, bytewise_icmp, - {{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, - {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, - {"n", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4, - false /* invalid */}, - {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); + {{"d", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10}, + {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}, + {"n", InternalValue("j", 4, kTypeRangeDeletion), + InternalValue("m", 8, kTypeMaxValid), 4, false /* invalid */}, + {"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}}); } TEST_F(RangeDelAggregatorTest, TruncatedIterFullyCutTombstones) { @@ -332,20 +345,23 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterFullyCutTombstones) { TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, &smallest, &largest); - VerifyIterator(&iter, bytewise_icmp, - {{InternalValue("f", 7), UncutEndpoint("g"), 8}}); + VerifyIterator( + &iter, bytewise_icmp, + {{InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}}); VerifySeek( &iter, bytewise_icmp, - {{"d", InternalValue("f", 7), UncutEndpoint("g"), 8}, - {"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, - {"j", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); + {{"d", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}, + {"f", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}, + {"j", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}}); VerifySeekForPrev( &iter, bytewise_icmp, - {{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, - {"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, - {"j", InternalValue("f", 7), UncutEndpoint("g"), 8}}); + {{"d", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0, + true /* invalid */}, + {"f", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}, + {"j", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}}); } TEST_F(RangeDelAggregatorTest, SingleIterInAggregator) { @@ -627,15 +643,12 @@ TEST_F(RangeDelAggregatorTest, CompactionAggregatorEmptyIteratorRight) { range_del_agg.AddTombstones(std::move(input_iter)); } - Slice start("p"); - Slice end("q"); - auto range_del_compaction_iter1 = - range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); - VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {}); - - auto range_del_compaction_iter2 = - range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); - VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {}); + InternalKey start_buf("p", 0, kTypeRangeDeletion); + InternalKey end_buf("q", 0, kTypeRangeDeletion); + Slice start = start_buf.Encode(); + Slice end = end_buf.Encode(); + auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end); + VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {}); } TEST_F(RangeDelAggregatorTest, CompactionAggregatorBoundedIterator) { @@ -652,18 +665,13 @@ TEST_F(RangeDelAggregatorTest, CompactionAggregatorBoundedIterator) { range_del_agg.AddTombstones(std::move(input_iter)); } - Slice start("bb"); - Slice end("e"); - auto range_del_compaction_iter1 = - range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); - VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), + InternalKey start_buf("bb", 0, kTypeRangeDeletion); + InternalKey end_buf("e", 9, kTypeRangeDeletion); + Slice start = start_buf.Encode(); + Slice end = end_buf.Encode(); + auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end); + VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}}); - - auto range_del_compaction_iter2 = - range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); - VerifyFragmentedRangeDels( - range_del_compaction_iter2.get(), - {{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}, {"e", "g", 8}}); } TEST_F(RangeDelAggregatorTest, @@ -681,29 +689,19 @@ TEST_F(RangeDelAggregatorTest, range_del_agg.AddTombstones(std::move(input_iter)); } - Slice start("bb"); - Slice end("e"); - auto range_del_compaction_iter1 = - range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); - VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {{"a", "b", 10}, - {"b", "c", 20}, - {"b", "c", 10}, - {"c", "d", 10}, - {"c", "d", 8}, - {"d", "f", 30}, - {"d", "f", 8}, - {"f", "g", 8}}); - - auto range_del_compaction_iter2 = - range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); - VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {{"a", "b", 10}, - {"b", "c", 20}, - {"b", "c", 10}, - {"c", "d", 10}, - {"c", "d", 8}, - {"d", "f", 30}, - {"d", "f", 8}, - {"f", "g", 8}}); + InternalKey start_buf("bb", 0, kTypeRangeDeletion); + InternalKey end_buf("e", 0, kTypeRangeDeletion); + Slice start = start_buf.Encode(); + Slice end = end_buf.Encode(); + auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end); + VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 10}, + {"b", "c", 20}, + {"b", "c", 10}, + {"c", "d", 10}, + {"c", "d", 8}, + {"d", "f", 30}, + {"d", "f", 8}, + {"f", "g", 8}}); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index df07fa894..8c7d98297 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -218,8 +218,7 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { } ParsedInternalKey parsed_start_key() const { - return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber, - kTypeRangeDeletion); + return ParsedInternalKey(pos_->start_key, seq(), kTypeRangeDeletion); } ParsedInternalKey parsed_end_key() const { return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber, diff --git a/db/version_set.cc b/db/version_set.cc index a61fc3dbd..31da52289 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -38,6 +38,8 @@ #include "db/table_cache.h" #include "db/version_builder.h" #include "db/version_edit_handler.h" +#include "table/compaction_merging_iterator.h" + #if USE_COROUTINES #include "folly/experimental/coro/BlockingWait.h" #include "folly/experimental/coro/Collect.h" @@ -6635,6 +6637,14 @@ InternalIterator* VersionSet::MakeInputIterator( c->num_input_levels() - 1 : c->num_input_levels()); InternalIterator** list = new InternalIterator*[space]; + // First item in the pair is a pointer to range tombstones. + // Second item is a pointer to a member of a LevelIterator, + // that will be initialized to where CompactionMergingIterator stores + // pointer to its range tombstones. This is used by LevelIterator + // to update pointer to range tombstones as it traverse different SST files. + std::vector< + std::pair> + range_tombstones; size_t num = 0; for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->input_levels(which)->num_files != 0) { @@ -6655,7 +6665,7 @@ InternalIterator* VersionSet::MakeInputIterator( end.value(), fmd.smallest.user_key()) < 0) { continue; } - + TruncatedRangeDelIterator* range_tombstone_iter = nullptr; list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, cfd->internal_comparator(), fmd, range_del_agg, @@ -6668,10 +6678,13 @@ InternalIterator* VersionSet::MakeInputIterator( MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, - /*allow_unprepared_value=*/false); + /*allow_unprepared_value=*/false, + /*range_del_iter=*/&range_tombstone_iter); + range_tombstones.emplace_back(range_tombstone_iter, nullptr); } } else { // Create concatenating iterator for the files from this level + TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr; list[num++] = new LevelIterator( cfd->table_cache(), read_options, file_options_compactions, cfd->internal_comparator(), c->input_levels(which), @@ -6680,14 +6693,15 @@ InternalIterator* VersionSet::MakeInputIterator( /*no per level latency histogram=*/nullptr, TableReaderCaller::kCompaction, /*skip_filters=*/false, /*level=*/static_cast(c->level(which)), range_del_agg, - c->boundaries(which)); + c->boundaries(which), false, &tombstone_iter_ptr); + range_tombstones.emplace_back(nullptr, tombstone_iter_ptr); } } } assert(num <= space); - InternalIterator* result = - NewMergingIterator(&c->column_family_data()->internal_comparator(), list, - static_cast(num)); + InternalIterator* result = NewCompactionMergingIterator( + &c->column_family_data()->internal_comparator(), list, + static_cast(num), range_tombstones); delete[] list; return result; } diff --git a/src.mk b/src.mk index ae1523e71..9c1dea073 100644 --- a/src.mk +++ b/src.mk @@ -198,6 +198,7 @@ LIB_SOURCES = \ table/get_context.cc \ table/iterator.cc \ table/merging_iterator.cc \ + table/compaction_merging_iterator.cc \ table/meta_blocks.cc \ table/persistent_cache_helper.cc \ table/plain/plain_table_bloom.cc \ diff --git a/table/compaction_merging_iterator.cc b/table/compaction_merging_iterator.cc new file mode 100644 index 000000000..8a5c45240 --- /dev/null +++ b/table/compaction_merging_iterator.cc @@ -0,0 +1,370 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "table/compaction_merging_iterator.h" + +namespace ROCKSDB_NAMESPACE { +class CompactionMergingIterator : public InternalIterator { + public: + CompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, + int n, bool is_arena_mode, + std::vector< + std::pair> + range_tombstones) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), + current_(nullptr), + minHeap_(CompactionHeapItemComparator(comparator_)), + pinned_iters_mgr_(nullptr) { + children_.resize(n); + for (int i = 0; i < n; i++) { + children_[i].level = i; + children_[i].iter.Set(children[i]); + assert(children_[i].type == HeapItem::ITERATOR); + } + assert(range_tombstones.size() == static_cast(n)); + for (auto& p : range_tombstones) { + range_tombstone_iters_.push_back(p.first); + } + pinned_heap_item_.resize(n); + for (int i = 0; i < n; ++i) { + if (range_tombstones[i].second) { + // for LevelIterator + *range_tombstones[i].second = &range_tombstone_iters_[i]; + } + pinned_heap_item_[i].level = i; + pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START; + } + } + + void considerStatus(const Status& s) { + if (!s.ok() && status_.ok()) { + status_ = s; + } + } + + ~CompactionMergingIterator() override { + // TODO: use unique_ptr for range_tombstone_iters_ + for (auto child : range_tombstone_iters_) { + delete child; + } + + for (auto& child : children_) { + child.iter.DeleteIter(is_arena_mode_); + } + status_.PermitUncheckedError(); + } + + bool Valid() const override { return current_ != nullptr && status_.ok(); } + + Status status() const override { return status_; } + + void SeekToFirst() override; + + void Seek(const Slice& target) override; + + void Next() override; + + Slice key() const override { + assert(Valid()); + return current_->key(); + } + + Slice value() const override { + assert(Valid()); + if (LIKELY(current_->type == HeapItem::ITERATOR)) { + return current_->iter.value(); + } else { + return dummy_tombstone_val; + } + } + + // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result + // from current child iterator. Potentially as long as one of child iterator + // report out of bound is not possible, we know current key is within bound. + bool MayBeOutOfLowerBound() override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START || + current_->iter.MayBeOutOfLowerBound(); + } + + IterBoundCheck UpperBoundCheckResult() override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START + ? IterBoundCheck::kUnknown + : current_->iter.UpperBoundCheckResult(); + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + for (auto& child : children_) { + child.iter.SetPinnedItersMgr(pinned_iters_mgr); + } + } + + bool IsDeleteRangeSentinelKey() const override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START; + } + + // Compaction uses the above subset of InternalIterator interface. + void SeekToLast() override { assert(false); } + + void SeekForPrev(const Slice&) override { assert(false); } + + void Prev() override { assert(false); } + + bool NextAndGetResult(IterateResult*) override { + assert(false); + return false; + } + + bool IsKeyPinned() const override { + assert(false); + return false; + } + + bool IsValuePinned() const override { + assert(false); + return false; + } + + bool PrepareValue() override { + assert(false); + return false; + } + + private: + struct HeapItem { + HeapItem() = default; + + IteratorWrapper iter; + size_t level = 0; + std::string tombstone_str; + enum Type { ITERATOR, DELETE_RANGE_START }; + Type type = ITERATOR; + + explicit HeapItem(size_t _level, InternalIteratorBase* _iter) + : level(_level), type(Type::ITERATOR) { + iter.Set(_iter); + } + + void SetTombstoneForCompaction(const ParsedInternalKey&& pik) { + tombstone_str.clear(); + AppendInternalKey(&tombstone_str, pik); + } + + [[nodiscard]] Slice key() const { + return type == ITERATOR ? iter.key() : tombstone_str; + } + }; + + class CompactionHeapItemComparator { + public: + explicit CompactionHeapItemComparator( + const InternalKeyComparator* comparator) + : comparator_(comparator) {} + + bool operator()(HeapItem* a, HeapItem* b) const { + int r = comparator_->Compare(a->key(), b->key()); + // For each file, we assume all range tombstone start keys come before + // its file boundary sentinel key (file's meta.largest key). + // In the case when meta.smallest = meta.largest and range tombstone start + // key is truncated at meta.smallest, the start key will have op_type = + // kMaxValid to make it smaller (see TruncatedRangeDelIterator + // constructor). The following assertion validates this assumption. + assert(a->type == b->type || r != 0); + return r > 0; + } + + private: + const InternalKeyComparator* comparator_; + }; + + using CompactionMinHeap = BinaryHeap; + bool is_arena_mode_; + const InternalKeyComparator* comparator_; + // HeapItem for all child point iterators. + std::vector children_; + // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the + // current range tombstone from range_tombstone_iters_[i]. + std::vector pinned_heap_item_; + // range_tombstone_iters_[i] contains range tombstones in the sorted run that + // corresponds to children_[i]. range_tombstone_iters_[i] == + // nullptr means the sorted run of children_[i] does not have range + // tombstones (or the current SSTable does not have range tombstones in the + // case of LevelIterator). + std::vector range_tombstone_iters_; + // Used as value for range tombstone keys + std::string dummy_tombstone_val{}; + + // Skip file boundary sentinel keys. + void FindNextVisibleKey(); + + // top of minHeap_ + HeapItem* current_; + // If any of the children have non-ok status, this is one of them. + Status status_; + CompactionMinHeap minHeap_; + PinnedIteratorsManager* pinned_iters_mgr_; + // Process a child that is not in the min heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMinHeapOrCheckStatus(HeapItem*); + + HeapItem* CurrentForward() const { + return !minHeap_.empty() ? minHeap_.top() : nullptr; + } + + void InsertRangeTombstoneAtLevel(size_t level) { + if (range_tombstone_iters_[level]->Valid()) { + pinned_heap_item_[level].SetTombstoneForCompaction( + range_tombstone_iters_[level]->start_key()); + minHeap_.push(&pinned_heap_item_[level]); + } + } +}; + +void CompactionMergingIterator::SeekToFirst() { + minHeap_.clear(); + status_ = Status::OK(); + for (auto& child : children_) { + child.iter.SeekToFirst(); + AddToMinHeapOrCheckStatus(&child); + } + + for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { + if (range_tombstone_iters_[i]) { + range_tombstone_iters_[i]->SeekToFirst(); + InsertRangeTombstoneAtLevel(i); + } + } + + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::Seek(const Slice& target) { + minHeap_.clear(); + status_ = Status::OK(); + for (auto& child : children_) { + child.iter.Seek(target); + AddToMinHeapOrCheckStatus(&child); + } + + ParsedInternalKey pik; + ParseInternalKey(target, &pik, false /* log_err_key */) + .PermitUncheckedError(); + for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { + if (range_tombstone_iters_[i]) { + range_tombstone_iters_[i]->Seek(pik.user_key); + // For compaction, output keys should all be after seek target. + while (range_tombstone_iters_[i]->Valid() && + comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) < + 0) { + range_tombstone_iters_[i]->Next(); + } + InsertRangeTombstoneAtLevel(i); + } + } + + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::Next() { + assert(Valid()); + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentForward()); + // as the current points to the current record. move the iterator forward. + if (current_->type == HeapItem::ITERATOR) { + current_->iter.Next(); + if (current_->iter.Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + assert(current_->iter.status().ok()); + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + considerStatus(current_->iter.status()); + minHeap_.pop(); + } + } else { + assert(current_->type == HeapItem::DELETE_RANGE_START); + size_t level = current_->level; + assert(range_tombstone_iters_[level]); + range_tombstone_iters_[level]->Next(); + if (range_tombstone_iters_[level]->Valid()) { + pinned_heap_item_[level].SetTombstoneForCompaction( + range_tombstone_iters_[level]->start_key()); + minHeap_.replace_top(&pinned_heap_item_[level]); + } else { + minHeap_.pop(); + } + } + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::FindNextVisibleKey() { + while (!minHeap_.empty()) { + HeapItem* current = minHeap_.top(); + // IsDeleteRangeSentinelKey() here means file boundary sentinel keys. + if (current->type != HeapItem::ITERATOR || + !current->iter.IsDeleteRangeSentinelKey()) { + return; + } + // range tombstone start keys from the same SSTable should have been + // exhausted + assert(!range_tombstone_iters_[current->level] || + !range_tombstone_iters_[current->level]->Valid()); + // current->iter is a LevelIterator, and it enters a new SST file in the + // Next() call here. + current->iter.Next(); + if (current->iter.Valid()) { + assert(current->iter.status().ok()); + minHeap_.replace_top(current); + } else { + minHeap_.pop(); + } + if (range_tombstone_iters_[current->level]) { + InsertRangeTombstoneAtLevel(current->level); + } + } +} + +void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) { + if (child->iter.Valid()) { + assert(child->iter.status().ok()); + minHeap_.push(child); + } else { + considerStatus(child->iter.status()); + } +} + +InternalIterator* NewCompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, int n, + std::vector>& range_tombstone_iters, + Arena* arena) { + assert(n >= 0); + if (n == 0) { + return NewEmptyInternalIterator(arena); + } else { + if (arena == nullptr) { + return new CompactionMergingIterator(comparator, children, n, + false /* is_arena_mode */, + range_tombstone_iters); + } else { + auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator)); + return new (mem) CompactionMergingIterator(comparator, children, n, + true /* is_arena_mode */, + range_tombstone_iters); + } + } +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/compaction_merging_iterator.h b/table/compaction_merging_iterator.h new file mode 100644 index 000000000..e3fd7797f --- /dev/null +++ b/table/compaction_merging_iterator.h @@ -0,0 +1,44 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "db/range_del_aggregator.h" +#include "rocksdb/slice.h" +#include "rocksdb/types.h" +#include "table/merging_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +/* + * This is a simplified version of MergingIterator and is specifically used for + * compaction. It merges the input `children` iterators into a sorted stream of + * keys. Range tombstone start keys are also emitted to prevent oversize + * compactions. For example, consider an L1 file with content [a, b), y, z, + * where [a, b) is a range tombstone and y and z are point keys. This could + * cause an oversize compaction as it can overlap with a wide range of key space + * in L2. + * + * CompactionMergingIterator emits range tombstone start keys from each LSM + * level's range tombstone iterator, and for each range tombstone + * [start,end)@seqno, the key will be start@seqno with op_type + * kTypeRangeDeletion unless truncated at file boundary (see detail in + * TruncatedRangeDelIterator::start_key()). + * + * Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to + * check if the current key is a range tombstone key. + * TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at + * different layers: file boundary and range tombstone keys. Separate them into + * two APIs for clarity. + */ +class CompactionMergingIterator; + +InternalIterator* NewCompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, int n, + std::vector>& range_tombstone_iters, + Arena* arena = nullptr); +} // namespace ROCKSDB_NAMESPACE diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 309ae69c5..0865a49e7 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -10,121 +10,8 @@ #include "table/merging_iterator.h" #include "db/arena_wrapped_db_iter.h" -#include "db/dbformat.h" -#include "db/pinned_iterators_manager.h" -#include "memory/arena.h" -#include "monitoring/perf_context_imp.h" -#include "rocksdb/comparator.h" -#include "rocksdb/iterator.h" -#include "rocksdb/options.h" -#include "table/internal_iterator.h" -#include "table/iter_heap.h" -#include "table/iterator_wrapper.h" -#include "test_util/sync_point.h" -#include "util/autovector.h" -#include "util/heap.h" -#include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -// For merging iterator to process range tombstones, we treat the start and end -// keys of a range tombstone as point keys and put them into the minHeap/maxHeap -// used in merging iterator. Take minHeap for example, we are able to keep track -// of currently "active" range tombstones (the ones whose start keys are popped -// but end keys are still in the heap) in `active_`. This `active_` set of range -// tombstones is then used to quickly determine whether the point key at heap -// top is deleted (by heap property, the point key at heap top must be within -// internal key range of active range tombstones). -// -// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: -// point key and the start and end keys of a range tombstone. -struct HeapItem { - HeapItem() = default; - - enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; - IteratorWrapper iter; - size_t level = 0; - ParsedInternalKey parsed_ikey; - // Will be overwritten before use, initialize here so compiler does not - // complain. - Type type = ITERATOR; - - explicit HeapItem(size_t _level, InternalIteratorBase* _iter) - : level(_level), type(Type::ITERATOR) { - iter.Set(_iter); - } - - void SetTombstoneKey(ParsedInternalKey&& pik) { - // op_type is already initialized in MergingIterator::Finish(). - parsed_ikey.user_key = pik.user_key; - parsed_ikey.sequence = pik.sequence; - } - - Slice key() const { - assert(type == ITERATOR); - return iter.key(); - } - - bool IsDeleteRangeSentinelKey() const { - if (type == Type::ITERATOR) { - return iter.IsDeleteRangeSentinelKey(); - } - return false; - } -}; - -class MinHeapItemComparator { - public: - MinHeapItemComparator(const InternalKeyComparator* comparator) - : comparator_(comparator) {} - bool operator()(HeapItem* a, HeapItem* b) const { - if (LIKELY(a->type == HeapItem::ITERATOR)) { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->key(), b->key()) > 0; - } else { - return comparator_->Compare(a->key(), b->parsed_ikey) > 0; - } - } else { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->key()) > 0; - } else { - return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0; - } - } - } - - private: - const InternalKeyComparator* comparator_; -}; - -class MaxHeapItemComparator { - public: - MaxHeapItemComparator(const InternalKeyComparator* comparator) - : comparator_(comparator) {} - bool operator()(HeapItem* a, HeapItem* b) const { - if (LIKELY(a->type == HeapItem::ITERATOR)) { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->key(), b->key()) < 0; - } else { - return comparator_->Compare(a->key(), b->parsed_ikey) < 0; - } - } else { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->key()) < 0; - } else { - return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0; - } - } - } - - private: - const InternalKeyComparator* comparator_; -}; -// Without anonymous namespace here, we fail the warning -Wmissing-prototypes -namespace { -using MergerMinIterHeap = BinaryHeap; -using MergerMaxIterHeap = BinaryHeap; -} // namespace - class MergingIterator : public InternalIterator { public: MergingIterator(const InternalKeyComparator* comparator, @@ -136,7 +23,7 @@ class MergingIterator : public InternalIterator { direction_(kForward), comparator_(comparator), current_(nullptr), - minHeap_(comparator_), + minHeap_(MinHeapItemComparator(comparator_)), pinned_iters_mgr_(nullptr), iterate_upper_bound_(iterate_upper_bound) { children_.resize(n); @@ -199,7 +86,7 @@ class MergingIterator : public InternalIterator { // TruncatedRangeDelIterator since untruncated tombstone end points // always have kMaxSequenceNumber and kTypeRangeDeletion (see // TruncatedRangeDelIterator::start_key()/end_key()). - pinned_heap_item_[i].parsed_ikey.type = kTypeMaxValid; + pinned_heap_item_[i].tombstone_pik.type = kTypeMaxValid; } } } @@ -549,6 +436,92 @@ class MergingIterator : public InternalIterator { } private: + // For merging iterator to process range tombstones, we treat the start and + // end + // keys of a range tombstone as point keys and put them into the + // minHeap/maxHeap used in merging iterator. Take minHeap for example, we are + // able to keep track of currently "active" range tombstones (the ones whose + // start keys are popped but end keys are still in the heap) in `active_`. + // This `active_` set of range tombstones is then used to quickly determine + // whether the point key at heap top is deleted (by heap property, the point + // key at heap top must be within internal key range of active range + // tombstones). + // + // The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: + // point key and the start and end keys of a range tombstone. + struct HeapItem { + HeapItem() = default; + + IteratorWrapper iter; + size_t level = 0; + ParsedInternalKey tombstone_pik; + // Will be overwritten before use, initialize here so compiler does not + // complain. + enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; + Type type = ITERATOR; + + explicit HeapItem(size_t _level, InternalIteratorBase* _iter) + : level(_level), type(Type::ITERATOR) { + iter.Set(_iter); + } + + void SetTombstoneKey(ParsedInternalKey&& pik) { + // op_type is already initialized in MergingIterator::Finish(). + tombstone_pik.user_key = pik.user_key; + tombstone_pik.sequence = pik.sequence; + } + }; + + class MinHeapItemComparator { + public: + explicit MinHeapItemComparator(const InternalKeyComparator* comparator) + : comparator_(comparator) {} + bool operator()(HeapItem* a, HeapItem* b) const { + if (LIKELY(a->type == HeapItem::ITERATOR)) { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->iter.key(), b->iter.key()) > 0; + } else { + return comparator_->Compare(a->iter.key(), b->tombstone_pik) > 0; + } + } else { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->tombstone_pik, b->iter.key()) > 0; + } else { + return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) > 0; + } + } + } + + private: + const InternalKeyComparator* comparator_; + }; + + class MaxHeapItemComparator { + public: + explicit MaxHeapItemComparator(const InternalKeyComparator* comparator) + : comparator_(comparator) {} + bool operator()(HeapItem* a, HeapItem* b) const { + if (LIKELY(a->type == HeapItem::ITERATOR)) { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->iter.key(), b->iter.key()) < 0; + } else { + return comparator_->Compare(a->iter.key(), b->tombstone_pik) < 0; + } + } else { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->tombstone_pik, b->iter.key()) < 0; + } else { + return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) < 0; + } + } + } + + private: + const InternalKeyComparator* comparator_; + }; + + using MergerMinIterHeap = BinaryHeap; + using MergerMaxIterHeap = BinaryHeap; friend class MergeIteratorBuilder; // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(bool clear_active = true); @@ -1177,7 +1150,7 @@ void MergingIterator::SwitchToForward() { if (child.iter.status() == Status::TryAgain()) { continue; } - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1188,7 +1161,7 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (child.iter.status() == Status::TryAgain()) { child.iter.Seek(target); - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1239,7 +1212,7 @@ void MergingIterator::SwitchToBackward() { if (&child.iter != current_) { child.iter.SeekForPrev(target); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Prev(); } @@ -1297,7 +1270,8 @@ void MergingIterator::ClearHeaps(bool clear_active) { void MergingIterator::InitMaxHeap() { if (!maxHeap_) { - maxHeap_ = std::make_unique(comparator_); + maxHeap_ = + std::make_unique(MaxHeapItemComparator(comparator_)); } } @@ -1308,21 +1282,27 @@ void MergingIterator::InitMaxHeap() { // key's level, then the current child iterator is simply advanced to its next // key without reseeking. inline void MergingIterator::FindNextVisibleKey() { - // When active_ is empty, we know heap top cannot be a range tombstone end - // key. It cannot be a range tombstone start key per PopDeleteRangeStart(). PopDeleteRangeStart(); - while (!minHeap_.empty() && - (!active_.empty() || minHeap_.top()->IsDeleteRangeSentinelKey()) && - SkipNextDeleted()) { + // PopDeleteRangeStart() implies heap top is not DELETE_RANGE_START + // active_ being empty implies no DELETE_RANGE_END in heap. + // So minHeap_->top() must be of type ITERATOR. + while ( + !minHeap_.empty() && + (!active_.empty() || minHeap_.top()->iter.IsDeleteRangeSentinelKey()) && + SkipNextDeleted()) { PopDeleteRangeStart(); } } inline void MergingIterator::FindPrevVisibleKey() { PopDeleteRangeEnd(); - while (!maxHeap_->empty() && - (!active_.empty() || maxHeap_->top()->IsDeleteRangeSentinelKey()) && - SkipPrevDeleted()) { + // PopDeleteRangeEnd() implies heap top is not DELETE_RANGE_END + // active_ being empty implies no DELETE_RANGE_START in heap. + // So maxHeap_->top() must be of type ITERATOR. + while ( + !maxHeap_->empty() && + (!active_.empty() || maxHeap_->top()->iter.IsDeleteRangeSentinelKey()) && + SkipPrevDeleted()) { PopDeleteRangeEnd(); } } diff --git a/table/merging_iterator.h b/table/merging_iterator.h index 16fc0877e..bbf1320f9 100644 --- a/table/merging_iterator.h +++ b/table/merging_iterator.h @@ -12,6 +12,7 @@ #include "db/range_del_aggregator.h" #include "rocksdb/slice.h" #include "rocksdb/types.h" +#include "table/iterator_wrapper.h" namespace ROCKSDB_NAMESPACE {