diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c716874a..e4f0fe09e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -831,6 +831,7 @@ set(SOURCES table/get_context.cc table/iterator.cc table/merging_iterator.cc + table/compaction_merging_iterator.cc table/meta_blocks.cc table/persistent_cache_helper.cc table/plain/plain_table_bloom.cc diff --git a/HISTORY.md b/HISTORY.md index f695158d7..fd59277f0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Behavior changes * Make best-efforts recovery verify SST unique ID before Version construction (#10962) * Introduce `epoch_number` and sort L0 files by `epoch_number` instead of `largest_seqno`. `epoch_number` represents the order of a file being flushed or ingested/imported. Compaction output file will be assigned with the minimum `epoch_number` among input files'. For L0, larger `epoch_number` indicates newer L0 file. +* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys. ### Bug Fixes * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed. diff --git a/TARGETS b/TARGETS index 762afa351..32161166f 100644 --- a/TARGETS +++ b/TARGETS @@ -198,6 +198,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", + "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", @@ -538,6 +539,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", + "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h index de549afa2..ebc83192f 100644 --- a/db/blob/blob_counting_iterator.h +++ b/db/blob/blob_counting_iterator.h @@ -123,6 +123,10 @@ class BlobCountingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } + bool IsDeleteRangeSentinelKey() const override { + return iter_->IsDeleteRangeSentinelKey(); + } + private: void UpdateAndCountBlobIfNeeded() { assert(!iter_->Valid() || iter_->status().ok()); @@ -130,6 +134,13 @@ class BlobCountingIterator : public InternalIterator { if (!iter_->Valid()) { status_ = iter_->status(); return; + } else if (iter_->IsDeleteRangeSentinelKey()) { + // CompactionMergingIterator emits range tombstones, and range tombstone + // keys can be truncated at file boundaries. This means the range + // tombstone keys can have op_type kTypeBlobIndex. + // This could crash the ProcessInFlow() call below since + // value is empty for these keys. + return; } TEST_SYNC_POINT( diff --git a/db/compaction/clipping_iterator.h b/db/compaction/clipping_iterator.h index 1ed465c2c..3f50cdd9d 100644 --- a/db/compaction/clipping_iterator.h +++ b/db/compaction/clipping_iterator.h @@ -188,6 +188,11 @@ class ClippingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } + bool IsDeleteRangeSentinelKey() const override { + assert(valid_); + return iter_->IsDeleteRangeSentinelKey(); + } + private: void UpdateValid() { assert(!iter_->Valid() || iter_->status().ok()); diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 47ca8d1a9..3d6d334db 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -20,9 +20,6 @@ namespace ROCKSDB_NAMESPACE { -const uint64_t kRangeTombstoneSentinel = - PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); - int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b) { auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()); diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index ee8639601..10a258cb9 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -18,6 +18,8 @@ namespace ROCKSDB_NAMESPACE { // The file contains class Compaction, as well as some helper functions // and data structures used by the class. +const uint64_t kRangeTombstoneSentinel = + PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); // Utility for comparing sstable boundary keys. Returns -1 if either a or b is // null which provides the property that a==null indicates a key that is less // than any key and b==null indicates a key that is greater than any key. Note diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 9f54f7813..4a7c9adda 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -377,6 +377,7 @@ void CompactionIterator::NextFromInput() { value_ = input_.value(); blob_value_.Reset(); iter_stats_.num_input_records++; + is_range_del_ = input_.IsDeleteRangeSentinelKey(); Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); if (!pik_status.ok()) { @@ -396,7 +397,10 @@ void CompactionIterator::NextFromInput() { break; } TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); - + if (is_range_del_) { + validity_info_.SetValid(kRangeDeletion); + break; + } // Update input statistics if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || ikey_.type == kTypeDeletionWithTimestamp) { @@ -618,6 +622,14 @@ void CompactionIterator::NextFromInput() { ParsedInternalKey next_ikey; AdvanceInputIter(); + while (input_.Valid() && input_.IsDeleteRangeSentinelKey() && + ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) + .ok() && + cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { + // skip range tombstone start keys with the same user key + // since they are not "real" point keys. + AdvanceInputIter(); + } // Check whether the next key exists, is not corrupt, and is the same key // as the single delete. @@ -625,6 +637,7 @@ void CompactionIterator::NextFromInput() { ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok() && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { + assert(!input_.IsDeleteRangeSentinelKey()); #ifndef NDEBUG const Compaction* c = compaction_ ? compaction_->real_compaction() : nullptr; @@ -849,12 +862,14 @@ void CompactionIterator::NextFromInput() { // Note that a deletion marker of type kTypeDeletionWithTimestamp will be // considered to have a different user key unless the timestamp is older // than *full_history_ts_low_. + // + // Range tombstone start keys are skipped as they are not "real" keys. while (!IsPausingManualCompaction() && !IsShuttingDown() && input_.Valid() && (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok()) && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && - (prev_snapshot == 0 || + (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() || DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) { AdvanceInputIter(); } @@ -1147,10 +1162,12 @@ void CompactionIterator::DecideOutputLevel() { void CompactionIterator::PrepareOutput() { if (Valid()) { - if (ikey_.type == kTypeValue) { - ExtractLargeValueIfNeeded(); - } else if (ikey_.type == kTypeBlobIndex) { - GarbageCollectBlobIfNeeded(); + if (LIKELY(!is_range_del_)) { + if (ikey_.type == kTypeValue) { + ExtractLargeValueIfNeeded(); + } else if (ikey_.type == kTypeBlobIndex) { + GarbageCollectBlobIfNeeded(); + } } if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) { @@ -1173,7 +1190,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < preserve_time_min_seqno_) { + ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c215d2bbb..1e4f373e2 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -63,6 +63,10 @@ class SequenceIterWrapper : public InternalIterator { void SeekToLast() override { assert(false); } uint64_t num_itered() const { return num_itered_; } + bool IsDeleteRangeSentinelKey() const override { + assert(Valid()); + return inner_iter_->IsDeleteRangeSentinelKey(); + } private: InternalKeyComparator icmp_; @@ -242,7 +246,12 @@ class CompactionIterator { const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } inline bool Valid() const { return validity_info_.IsValid(); } - const Slice& user_key() const { return current_user_key_; } + const Slice& user_key() const { + if (UNLIKELY(is_range_del_)) { + return ikey_.user_key; + } + return current_user_key_; + } const CompactionIterationStats& iter_stats() const { return iter_stats_; } uint64_t num_input_entry_scanned() const { return input_.num_itered(); } // If the current key should be placed on penultimate level, only valid if @@ -252,6 +261,8 @@ class CompactionIterator { } Status InputStatus() const { return input_.status(); } + bool IsDeleteRangeSentinelKey() const { return is_range_del_; } + private: // Processes the input stream to find the next output void NextFromInput(); @@ -385,6 +396,7 @@ class CompactionIterator { kKeepSD = 8, kKeepDel = 9, kNewUserKey = 10, + kRangeDeletion = 11, }; struct ValidityInfo { @@ -492,6 +504,10 @@ class CompactionIterator { // This is a best-effort facility, so memory_order_relaxed is sufficient. return manual_compaction_canceled_.load(std::memory_order_relaxed); } + + // Stores whether the current compaction iterator output + // is a range tombstone start key. + bool is_range_del_{false}; }; inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 24b05a8de..a30b21195 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1286,7 +1286,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. - assert(!end.has_value() || cfd->user_comparator()->Compare( c_iter->user_key(), end.value()) < 0); diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index e74378e2a..bbb83961c 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -333,8 +333,14 @@ Status CompactionOutputs::AddToOutput( const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { Status s; + bool is_range_del = c_iter.IsDeleteRangeSentinelKey(); + if (is_range_del && compaction_->bottommost_level()) { + // We don't consider range tombstone for bottommost level since: + // 1. there is no grandparent and hence no overlap to consider + // 2. range tombstone may be dropped at bottommost level. + return s; + } const Slice& key = c_iter.key(); - if (ShouldStopBefore(c_iter) && HasBuilder()) { s = close_file_func(*this, c_iter.InputStatus(), key); if (!s.ok()) { @@ -344,6 +350,13 @@ Status CompactionOutputs::AddToOutput( grandparent_boundary_switched_num_ = 0; grandparent_overlapped_bytes_ = GetCurrentKeyGrandparentOverlappedBytes(key); + if (UNLIKELY(is_range_del)) { + // lower bound for this new output file, this is needed as the lower bound + // does not come from the smallest point key in this case. + range_tombstone_lower_bound_.DecodeFrom(key); + } else { + range_tombstone_lower_bound_.Clear(); + } } // Open output file if necessary @@ -354,6 +367,17 @@ Status CompactionOutputs::AddToOutput( } } + // c_iter may emit range deletion keys, so update `last_key_for_partitioner_` + // here before returning below when `is_range_del` is true + if (partitioner_) { + last_key_for_partitioner_.assign(c_iter.user_key().data_, + c_iter.user_key().size_); + } + + if (UNLIKELY(is_range_del)) { + return s; + } + assert(builder_ != nullptr); const Slice& value = c_iter.value(); s = current_output().validator.Add(key, value); @@ -377,11 +401,6 @@ Status CompactionOutputs::AddToOutput( s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, ikey.type); - if (partitioner_) { - last_key_for_partitioner_.assign(c_iter.user_key().data_, - c_iter.user_key().size_); - } - return s; } @@ -398,13 +417,19 @@ Status CompactionOutputs::AddRangeDels( std::string smallest_user_key; const Slice *lower_bound, *upper_bound; bool lower_bound_from_sub_compact = false; - + bool lower_bound_from_range_tombstone = false; size_t output_size = outputs_.size(); if (output_size == 1) { // For the first output table, include range tombstones before the min // key but after the subcompaction boundary. lower_bound = comp_start_user_key; lower_bound_from_sub_compact = true; + } else if (range_tombstone_lower_bound_.size() > 0) { + assert(meta.smallest.size() == 0 || + icmp.Compare(range_tombstone_lower_bound_, meta.smallest) <= 0); + lower_bound_guard = range_tombstone_lower_bound_.user_key(); + lower_bound = &lower_bound_guard; + lower_bound_from_range_tombstone = true; } else if (meta.smallest.size() > 0) { // For subsequent output tables, only include range tombstones from min // key onwards since the previous file was extended to contain range @@ -532,6 +557,39 @@ Status CompactionOutputs::AddRangeDels( smallest_candidate = InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion); } + } else if (lower_bound_from_range_tombstone) { + // Range tombstone keys can be truncated at file boundaries of the files + // that contain them. + // + // If this lower bound is from a range tombstone key that is not + // truncated, i.e., it was not truncated when reading from the input + // files, then its sequence number and `op_type` will be + // kMaxSequenceNumber and kTypeRangeDeletion (see + // TruncatedRangeDelIterator::start_key()). In this case, when this key + // was used as the upper bound to cut the previous compaction output + // file, the previous file's largest key could have the same value as + // this key (see the upperbound logic below). To guarantee + // non-overlapping ranges between output files, we use the range + // tombstone's actual sequence number (tombstone.seq_) for the lower + // bound of this file. If this range tombstone key is truncated, then + // the previous file's largest key will be smaller than this range + // tombstone key, so we can use it as the lower bound directly. + if (ExtractInternalKeyFooter(range_tombstone_lower_bound_.Encode()) == + kRangeTombstoneSentinel) { + if (ts_sz) { + smallest_candidate = + InternalKey(range_tombstone_lower_bound_.user_key(), + tombstone.seq_, kTypeRangeDeletion, tombstone.ts_); + } else { + smallest_candidate = + InternalKey(range_tombstone_lower_bound_.user_key(), + tombstone.seq_, kTypeRangeDeletion); + } + } else { + assert(GetInternalKeySeqno(range_tombstone_lower_bound_.Encode()) < + kMaxSequenceNumber); + smallest_candidate = range_tombstone_lower_bound_; + } } else { smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); } diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index f40aa8215..af55ee524 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -307,6 +307,7 @@ class CompactionOutputs { std::unique_ptr partitioner_; // A flag determines if this subcompaction has been split by the cursor + // for RoundRobin compaction bool is_split_ = false; // We also maintain the output split key for each subcompaction to avoid @@ -338,6 +339,10 @@ class CompactionOutputs { // for the current output file, how many file boundaries has it crossed, // basically number of files overlapped * 2 size_t grandparent_boundary_switched_num_ = 0; + + // The smallest key of the current output file, this is set when current + // output file's smallest key is a range tombstone start key. + InternalKey range_tombstone_lower_bound_; }; // helper struct to concatenate the last level and penultimate level outputs diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index c748be31b..06c2b73fd 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -84,6 +84,11 @@ class SubcompactionState { // Assign range dels aggregator, for each range_del, it can only be assigned // to one output level, for per_key_placement, it's going to be the // penultimate level. + // TODO: This does not work for per_key_placement + user-defined timestamp + + // DeleteRange() combo. If user-defined timestamp is enabled, + // it is possible for a range tombstone to belong to bottommost level ( + // seqno < earliest snapshot) without being dropped (garbage collection + // for user-defined timestamp). void AssignRangeDelAggregator( std::unique_ptr&& range_del_agg) { if (compaction->SupportsPerKeyPlacement()) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index d576f2217..3d468a1c0 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -1661,6 +1661,213 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) { ASSERT_EQ(1, num_range_deletions); } +// Test SST partitioner cut after every single key +class SingleKeySstPartitioner : public SstPartitioner { + public: + const char* Name() const override { return "SingleKeySstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + return kRequired; + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return false; + } +}; + +class SingleKeySstPartitionerFactory : public SstPartitionerFactory { + public: + static const char* kClassName() { return "SingleKeySstPartitionerFactory"; } + const char* Name() const override { return kClassName(); } + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& /* context */) const override { + return std::unique_ptr(new SingleKeySstPartitioner()); + } +}; + +TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) { + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.compaction_pri = kMinOverlappingRatio; + options.disable_auto_compactions = true; + options.ttl = 24 * 60 * 60; // 24 hours + options.target_file_size_base = 8 << 10; + env_->SetMockSleep(); + options.env = env_; + DestroyAndReopen(options); + + Random rnd(301); + // Fill some data so that future compactions are not bottommost level + // compaction, and hence they would try cut around files for ttl + for (int i = 5; i < 10; ++i) { + ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_EQ("0,0,0,1", FilesPerLevel()); + + for (int i = 5; i < 10; ++i) { + ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_EQ("0,1,0,1", FilesPerLevel()); + + env_->MockSleepForSeconds(20 * 60 * 60); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key(11), Key(12))); + ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,0,1", FilesPerLevel()); + // L0 file is new, L1 and L3 file are old and qualified for TTL + env_->MockSleepForSeconds(10 * 60 * 60); + MoveFilesToLevel(1); + // L1 output should be cut into 3 files: + // File 0: Key(0) + // File 1: (qualified for TTL): Key(5) - Key(10) + // File 1: DeleteRange [11, 12) + ASSERT_EQ("0,3,0,1", FilesPerLevel()); +} + +TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) { + Options options = CurrentOptions(); + auto factory = std::make_shared(); + options.sst_partitioner_factory = factory; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + Random rnd(301); + // range deletion keys are not processed when compacting to bottommost level, + // so creating a file at older level to make the next compaction not + // bottommost level + ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10))); + ASSERT_OK(Flush()); + MoveFilesToLevel(5); + + ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2), + Key(5))); + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(1); + // SSTPartitioner decides to cut when range tombstone start key is passed to + // it Note that the range tombstone [2, 5) itself span multiple keys but we + // are not able to partition in between yet. + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) { + // L2 has two files + // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7 + // L0 has 0, [5, 6), 8 + // max_compaction_bytes is less than the size of L2_0 and L2_1. + // When compacting L0 into L1, it should split into 3 files. + const int kNumPerFile = 4, kNumFiles = 2; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + std::vector values; + for (int j = 0; j < kNumPerFile; j++) { + values.push_back(rnd.RandomString(3 << 10)); + ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5), + Key(6))); + ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + ASSERT_EQ(3, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) { + // L2 has two files + // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7 + // L0 has two range tombstones [0, 1), [7, 8). + // max_compaction_bytes is less than the size of L2_0. + // When compacting L0 into L1, the two range tombstones should be + // split into two files. + const int kNumPerFile = 4, kNumFiles = 2; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { + std::vector values; + // Write 12K (4 values, each 3K) + for (int j = 0; j < kNumPerFile; j++) { + values.push_back(rnd.RandomString(3 << 10)); + ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), + Key(1))); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7), + Key(8))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + // This is L0 -> L1 compaction + // The two range tombstones are broken up into two output files + // to limit compaction size. + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + +TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) { + // L2 has two files + // L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8 + // L0 has [0, 9) and point key 5 + // max_compaction_bytes is less than the size of L2_0. + // When compacting L0 into L1, the compaction should cut at point key 5. + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; + options.max_compaction_bytes = 9 * 1024; + DestroyAndReopen(options); + Random rnd(301); + for (int i = 0; i < 9; ++i) { + if (i == 5) { + ++i; + } + ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10))); + } + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + MoveFilesToLevel(2); + ASSERT_EQ(2, NumTableFilesAtLevel(2)); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), + Key(9))); + ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10))); + ASSERT_OK(db_->Flush(FlushOptions())); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, + true /* disallow_trivial_move */)); + ASSERT_EQ(2, NumTableFilesAtLevel(1)); +} + TEST_F(DBRangeDelTest, OverlappedTombstones) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); @@ -2093,6 +2300,7 @@ TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 2 * 1024; + options.level_compaction_dynamic_file_size = false; DestroyAndReopen(options); Random rnd(301); @@ -2508,7 +2716,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTest) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 1024; + options.max_compaction_bytes = 2048; DestroyAndReopen(options); // L2 @@ -2554,7 +2762,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 1024; + options.max_compaction_bytes = 3 * 1024; DestroyAndReopen(options); // L2 diff --git a/db/history_trimming_iterator.h b/db/history_trimming_iterator.h index b445ced33..4af5cde72 100644 --- a/db/history_trimming_iterator.h +++ b/db/history_trimming_iterator.h @@ -82,6 +82,10 @@ class HistoryTrimmingIterator : public InternalIterator { bool IsValuePinned() const override { return input_->IsValuePinned(); } + bool IsDeleteRangeSentinelKey() const override { + return input_->IsDeleteRangeSentinelKey(); + } + private: InternalIterator* input_; const std::string filter_ts_; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 6df841012..671545e60 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -224,6 +224,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, s = Status::ShutdownInProgress(); return s; } + // Skip range tombstones emitted by the compaction iterator. + if (iter->IsDeleteRangeSentinelKey()) { + continue; + } ParsedInternalKey ikey; assert(keys_.size() == merge_context_.GetNumOperands()); diff --git a/db/version_set.cc b/db/version_set.cc index 1030e5e28..8a8fa1d75 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -39,6 +39,8 @@ #include "db/table_cache.h" #include "db/version_builder.h" #include "db/version_edit_handler.h" +#include "table/compaction_merging_iterator.h" + #if USE_COROUTINES #include "folly/experimental/coro/BlockingWait.h" #include "folly/experimental/coro/Collect.h" @@ -6584,6 +6586,14 @@ InternalIterator* VersionSet::MakeInputIterator( c->num_input_levels() - 1 : c->num_input_levels()); InternalIterator** list = new InternalIterator*[space]; + // First item in the pair is a pointer to range tombstones. + // Second item is a pointer to a member of a LevelIterator, + // that will be initialized to where CompactionMergingIterator stores + // pointer to its range tombstones. This is used by LevelIterator + // to update pointer to range tombstones as it traverse different SST files. + std::vector< + std::pair> + range_tombstones; size_t num = 0; for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->input_levels(which)->num_files != 0) { @@ -6604,7 +6614,7 @@ InternalIterator* VersionSet::MakeInputIterator( end.value(), fmd.smallest.user_key()) < 0) { continue; } - + TruncatedRangeDelIterator* range_tombstone_iter = nullptr; list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, cfd->internal_comparator(), fmd, range_del_agg, @@ -6617,10 +6627,13 @@ InternalIterator* VersionSet::MakeInputIterator( MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, - /*allow_unprepared_value=*/false); + /*allow_unprepared_value=*/false, + /*range_del_iter=*/&range_tombstone_iter); + range_tombstones.emplace_back(range_tombstone_iter, nullptr); } } else { // Create concatenating iterator for the files from this level + TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr; list[num++] = new LevelIterator( cfd->table_cache(), read_options, file_options_compactions, cfd->internal_comparator(), c->input_levels(which), @@ -6629,14 +6642,15 @@ InternalIterator* VersionSet::MakeInputIterator( /*no per level latency histogram=*/nullptr, TableReaderCaller::kCompaction, /*skip_filters=*/false, /*level=*/static_cast(c->level(which)), range_del_agg, - c->boundaries(which)); + c->boundaries(which), false, &tombstone_iter_ptr); + range_tombstones.emplace_back(nullptr, tombstone_iter_ptr); } } } assert(num <= space); - InternalIterator* result = - NewMergingIterator(&c->column_family_data()->internal_comparator(), list, - static_cast(num)); + InternalIterator* result = NewCompactionMergingIterator( + &c->column_family_data()->internal_comparator(), list, + static_cast(num), range_tombstones); delete[] list; return result; } diff --git a/src.mk b/src.mk index 0269fe1d3..ebc95a659 100644 --- a/src.mk +++ b/src.mk @@ -196,6 +196,7 @@ LIB_SOURCES = \ table/get_context.cc \ table/iterator.cc \ table/merging_iterator.cc \ + table/compaction_merging_iterator.cc \ table/meta_blocks.cc \ table/persistent_cache_helper.cc \ table/plain/plain_table_bloom.cc \ diff --git a/table/compaction_merging_iterator.cc b/table/compaction_merging_iterator.cc new file mode 100644 index 000000000..e48712652 --- /dev/null +++ b/table/compaction_merging_iterator.cc @@ -0,0 +1,142 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "table/compaction_merging_iterator.h" + +namespace ROCKSDB_NAMESPACE { +void CompactionMergingIterator::SeekToFirst() { + minHeap_.clear(); + status_ = Status::OK(); + for (auto& child : children_) { + child.iter.SeekToFirst(); + AddToMinHeapOrCheckStatus(&child); + } + + for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { + if (range_tombstone_iters_[i]) { + range_tombstone_iters_[i]->SeekToFirst(); + InsertRangeTombstoneAtLevel(i); + } + } + + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::Seek(const Slice& target) { + minHeap_.clear(); + status_ = Status::OK(); + for (auto& child : children_) { + child.iter.Seek(target); + AddToMinHeapOrCheckStatus(&child); + } + + ParsedInternalKey pik; + ParseInternalKey(target, &pik, false /* log_err_key */) + .PermitUncheckedError(); + for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { + if (range_tombstone_iters_[i]) { + range_tombstone_iters_[i]->Seek(pik.user_key); + // For compaction, output keys should all be after seek target. + while (range_tombstone_iters_[i]->Valid() && + comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) < + 0) { + range_tombstone_iters_[i]->Next(); + } + InsertRangeTombstoneAtLevel(i); + } + } + + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::Next() { + assert(Valid()); + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentForward()); + // as the current points to the current record. move the iterator forward. + if (current_->type == HeapItem::ITERATOR) { + current_->iter.Next(); + if (current_->iter.Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + assert(current_->iter.status().ok()); + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + considerStatus(current_->iter.status()); + minHeap_.pop(); + } + } else { + assert(current_->type == HeapItem::DELETE_RANGE_START); + size_t level = current_->level; + assert(range_tombstone_iters_[level]); + range_tombstone_iters_[level]->Next(); + if (range_tombstone_iters_[level]->Valid()) { + pinned_heap_item_[level].SetTombstoneForCompaction( + range_tombstone_iters_[level]->start_key()); + minHeap_.replace_top(&pinned_heap_item_[level]); + } else { + minHeap_.pop(); + } + } + FindNextVisibleKey(); + current_ = CurrentForward(); +} + +void CompactionMergingIterator::FindNextVisibleKey() { + // IsDeleteRangeSentinelKey() here means file boundary sentinel keys. + while (!minHeap_.empty() && minHeap_.top()->IsDeleteRangeSentinelKey()) { + HeapItem* current = minHeap_.top(); + // range tombstone start keys from the same SSTable should have been + // exhausted + assert(!range_tombstone_iters_[current->level] || + !range_tombstone_iters_[current->level]->Valid()); + // iter is a LevelIterator, and it enters a new SST file in the Next() + // call here. + current->iter.Next(); + if (current->iter.Valid()) { + assert(current->iter.status().ok()); + minHeap_.replace_top(current); + } else { + minHeap_.pop(); + } + if (range_tombstone_iters_[current->level]) { + InsertRangeTombstoneAtLevel(current->level); + } + } +} +void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) { + if (child->iter.Valid()) { + assert(child->iter.status().ok()); + minHeap_.push(child); + } else { + considerStatus(child->iter.status()); + } +} + +InternalIterator* NewCompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, int n, + std::vector>& range_tombstone_iters, + Arena* arena) { + assert(n >= 0); + if (n == 0) { + return NewEmptyInternalIterator(arena); + } else { + if (arena == nullptr) { + return new CompactionMergingIterator(comparator, children, n, false, + range_tombstone_iters); + } else { + auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator)); + return new (mem) CompactionMergingIterator(comparator, children, n, true, + range_tombstone_iters); + } + } +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/compaction_merging_iterator.h b/table/compaction_merging_iterator.h new file mode 100644 index 000000000..c374683c9 --- /dev/null +++ b/table/compaction_merging_iterator.h @@ -0,0 +1,241 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "db/range_del_aggregator.h" +#include "rocksdb/slice.h" +#include "rocksdb/types.h" +#include "table/merging_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +class CompactionHeapItemComparator { + public: + explicit CompactionHeapItemComparator(const InternalKeyComparator* comparator) + : comparator_(comparator) {} + bool operator()(HeapItem* a, HeapItem* b) const { + int r = comparator_->Compare(a->key(), b->key()); + if (r > 0) { + return true; + } else if (r < 0) { + return false; + } else { + // When range tombstone and point key have the same internal key, + // range tombstone comes first. So that when range tombstone and + // file's largest key are the same, the file boundary sentinel key + // comes after. + return a->type == HeapItem::ITERATOR && + b->type == HeapItem::DELETE_RANGE_START; + } + } + + private: + const InternalKeyComparator* comparator_; +}; + +using CompactionMinHeap = BinaryHeap; +/* + * This is a simplified version of MergingIterator and is specifically used for + * compaction. It merges the input `children` iterators into a sorted stream of + * keys. Range tombstone start keys are also emitted to prevent oversize + * compactions. For example, consider an L1 file with content [a, b), y, z, + * where [a, b) is a range tombstone and y and z are point keys. This could + * cause an oversize compaction as it can overlap with a wide range of key space + * in L2. + * + * CompactionMergingIterator emits range tombstone start keys from each LSM + * level's range tombstone iterator, and for each range tombstone + * [start,end)@seqno, the key will be start@kMaxSequenceNumber unless truncated + * at file boundary (see detail TruncatedRangeDelIterator::start_key()). + * + * Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to + * check if the current key is a range tombstone key. + * TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at + * different layers: file boundary and range tombstone keys. Separate them into + * two APIs for clarity. + */ +class CompactionMergingIterator : public InternalIterator { + public: + CompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, + int n, bool is_arena_mode, + std::vector< + std::pair> + range_tombstones) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), + current_(nullptr), + minHeap_(CompactionHeapItemComparator(comparator_)), + pinned_iters_mgr_(nullptr) { + children_.resize(n); + for (int i = 0; i < n; i++) { + children_[i].level = i; + children_[i].iter.Set(children[i]); + assert(children_[i].type == HeapItem::ITERATOR); + } + assert(range_tombstones.size() == static_cast(n)); + for (auto& p : range_tombstones) { + range_tombstone_iters_.push_back(p.first); + } + + pinned_heap_item_.resize(n); + for (int i = 0; i < n; ++i) { + if (range_tombstones[i].second) { + // for LevelIterator + *range_tombstones[i].second = &range_tombstone_iters_[i]; + } + pinned_heap_item_[i].level = i; + pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START; + } + } + + void considerStatus(const Status& s) { + if (!s.ok() && status_.ok()) { + status_ = s; + } + } + + ~CompactionMergingIterator() override { + // TODO: use unique_ptr for range_tombstone_iters_ + for (auto child : range_tombstone_iters_) { + delete child; + } + + for (auto& child : children_) { + child.iter.DeleteIter(is_arena_mode_); + } + status_.PermitUncheckedError(); + } + + bool Valid() const override { return current_ != nullptr && status_.ok(); } + + Status status() const override { return status_; } + + void SeekToFirst() override; + + void Seek(const Slice& target) override; + + void Next() override; + + Slice key() const override { + assert(Valid()); + return current_->key(); + } + + Slice value() const override { + assert(Valid()); + if (LIKELY(current_->type == HeapItem::ITERATOR)) { + return current_->iter.value(); + } else { + return dummy_tombstone_val; + } + } + + // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result + // from current child iterator. Potentially as long as one of child iterator + // report out of bound is not possible, we know current key is within bound. + bool MayBeOutOfLowerBound() override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START || + current_->iter.MayBeOutOfLowerBound(); + } + + IterBoundCheck UpperBoundCheckResult() override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START + ? IterBoundCheck::kUnknown + : current_->iter.UpperBoundCheckResult(); + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + for (auto& child : children_) { + child.iter.SetPinnedItersMgr(pinned_iters_mgr); + } + } + + bool IsDeleteRangeSentinelKey() const override { + assert(Valid()); + return current_->type == HeapItem::DELETE_RANGE_START; + } + + // Compaction uses the above subset of InternalIterator interface. + void SeekToLast() override { assert(false); } + + void SeekForPrev(const Slice&) override { assert(false); } + + void Prev() override { assert(false); } + + bool NextAndGetResult(IterateResult*) override { + assert(false); + return false; + } + + bool IsKeyPinned() const override { + assert(false); + return false; + } + + bool IsValuePinned() const override { + assert(false); + return false; + } + + bool PrepareValue() override { + assert(false); + return false; + } + + private: + bool is_arena_mode_; + const InternalKeyComparator* comparator_; + // HeapItem for all child point iterators. + std::vector children_; + // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the + // current range tombstone from range_tombstone_iters_[i]. + std::vector pinned_heap_item_; + // range_tombstone_iters_[i] contains range tombstones in the sorted run that + // corresponds to children_[i]. range_tombstone_iters_[i] == + // nullptr means the sorted run of children_[i] does not have range + // tombstones (or the current SSTable does not have range tombstones in the + // case of LevelIterator). + std::vector range_tombstone_iters_; + // Used as value for range tombstone keys + std::string dummy_tombstone_val{}; + + // Skip file boundary sentinel keys. + void FindNextVisibleKey(); + + // top of minHeap_ + HeapItem* current_; + // If any of the children have non-ok status, this is one of them. + Status status_; + CompactionMinHeap minHeap_; + PinnedIteratorsManager* pinned_iters_mgr_; + // Process a child that is not in the min heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMinHeapOrCheckStatus(HeapItem*); + + HeapItem* CurrentForward() const { + return !minHeap_.empty() ? minHeap_.top() : nullptr; + } + + void InsertRangeTombstoneAtLevel(size_t level) { + if (range_tombstone_iters_[level]->Valid()) { + pinned_heap_item_[level].SetTombstoneForCompaction( + range_tombstone_iters_[level]->start_key()); + minHeap_.push(&pinned_heap_item_[level]); + } + } +}; + +InternalIterator* NewCompactionMergingIterator( + const InternalKeyComparator* comparator, InternalIterator** children, int n, + std::vector>& range_tombstone_iters, + Arena* arena = nullptr); +} // namespace ROCKSDB_NAMESPACE diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 309ae69c5..320650960 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -10,92 +10,8 @@ #include "table/merging_iterator.h" #include "db/arena_wrapped_db_iter.h" -#include "db/dbformat.h" -#include "db/pinned_iterators_manager.h" -#include "memory/arena.h" -#include "monitoring/perf_context_imp.h" -#include "rocksdb/comparator.h" -#include "rocksdb/iterator.h" -#include "rocksdb/options.h" -#include "table/internal_iterator.h" -#include "table/iter_heap.h" -#include "table/iterator_wrapper.h" -#include "test_util/sync_point.h" -#include "util/autovector.h" -#include "util/heap.h" -#include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -// For merging iterator to process range tombstones, we treat the start and end -// keys of a range tombstone as point keys and put them into the minHeap/maxHeap -// used in merging iterator. Take minHeap for example, we are able to keep track -// of currently "active" range tombstones (the ones whose start keys are popped -// but end keys are still in the heap) in `active_`. This `active_` set of range -// tombstones is then used to quickly determine whether the point key at heap -// top is deleted (by heap property, the point key at heap top must be within -// internal key range of active range tombstones). -// -// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: -// point key and the start and end keys of a range tombstone. -struct HeapItem { - HeapItem() = default; - - enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; - IteratorWrapper iter; - size_t level = 0; - ParsedInternalKey parsed_ikey; - // Will be overwritten before use, initialize here so compiler does not - // complain. - Type type = ITERATOR; - - explicit HeapItem(size_t _level, InternalIteratorBase* _iter) - : level(_level), type(Type::ITERATOR) { - iter.Set(_iter); - } - - void SetTombstoneKey(ParsedInternalKey&& pik) { - // op_type is already initialized in MergingIterator::Finish(). - parsed_ikey.user_key = pik.user_key; - parsed_ikey.sequence = pik.sequence; - } - - Slice key() const { - assert(type == ITERATOR); - return iter.key(); - } - - bool IsDeleteRangeSentinelKey() const { - if (type == Type::ITERATOR) { - return iter.IsDeleteRangeSentinelKey(); - } - return false; - } -}; - -class MinHeapItemComparator { - public: - MinHeapItemComparator(const InternalKeyComparator* comparator) - : comparator_(comparator) {} - bool operator()(HeapItem* a, HeapItem* b) const { - if (LIKELY(a->type == HeapItem::ITERATOR)) { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->key(), b->key()) > 0; - } else { - return comparator_->Compare(a->key(), b->parsed_ikey) > 0; - } - } else { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->key()) > 0; - } else { - return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0; - } - } - } - - private: - const InternalKeyComparator* comparator_; -}; - class MaxHeapItemComparator { public: MaxHeapItemComparator(const InternalKeyComparator* comparator) @@ -103,13 +19,13 @@ class MaxHeapItemComparator { bool operator()(HeapItem* a, HeapItem* b) const { if (LIKELY(a->type == HeapItem::ITERATOR)) { if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->key(), b->key()) < 0; + return comparator_->Compare(a->iter.key(), b->iter.key()) < 0; } else { - return comparator_->Compare(a->key(), b->parsed_ikey) < 0; + return comparator_->Compare(a->iter.key(), b->parsed_ikey) < 0; } } else { if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->key()) < 0; + return comparator_->Compare(a->parsed_ikey, b->iter.key()) < 0; } else { return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0; } @@ -121,7 +37,6 @@ class MaxHeapItemComparator { }; // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -using MergerMinIterHeap = BinaryHeap; using MergerMaxIterHeap = BinaryHeap; } // namespace @@ -136,7 +51,7 @@ class MergingIterator : public InternalIterator { direction_(kForward), comparator_(comparator), current_(nullptr), - minHeap_(comparator_), + minHeap_(MinHeapItemComparator(comparator_)), pinned_iters_mgr_(nullptr), iterate_upper_bound_(iterate_upper_bound) { children_.resize(n); @@ -1177,7 +1092,7 @@ void MergingIterator::SwitchToForward() { if (child.iter.status() == Status::TryAgain()) { continue; } - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1188,7 +1103,7 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (child.iter.status() == Status::TryAgain()) { child.iter.Seek(target); - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1239,7 +1154,7 @@ void MergingIterator::SwitchToBackward() { if (&child.iter != current_) { child.iter.SeekForPrev(target); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); - if (child.iter.Valid() && comparator_->Equal(target, child.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Prev(); } diff --git a/table/merging_iterator.h b/table/merging_iterator.h index 16fc0877e..0f3592b99 100644 --- a/table/merging_iterator.h +++ b/table/merging_iterator.h @@ -12,6 +12,7 @@ #include "db/range_del_aggregator.h" #include "rocksdb/slice.h" #include "rocksdb/types.h" +#include "table/iterator_wrapper.h" namespace ROCKSDB_NAMESPACE { @@ -89,4 +90,83 @@ class MergeIteratorBuilder { range_del_iter_ptrs_; }; +// For merging iterator to process range tombstones, we treat the start and end +// keys of a range tombstone as point keys and put them into the minHeap/maxHeap +// used in merging iterator. Take minHeap for example, we are able to keep track +// of currently "active" range tombstones (the ones whose start keys are popped +// but end keys are still in the heap) in `active_`. This `active_` set of range +// tombstones is then used to quickly determine whether the point key at heap +// top is deleted (by heap property, the point key at heap top must be within +// internal key range of active range tombstones). +// +// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: +// point key and the start and end keys of a range tombstone. +struct HeapItem { + HeapItem() = default; + + enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; + IteratorWrapper iter; + size_t level = 0; + ParsedInternalKey parsed_ikey; + std::string range_tombstone_key; + // Will be overwritten before use, initialize here so compiler does not + // complain. + Type type = ITERATOR; + + explicit HeapItem(size_t _level, InternalIteratorBase* _iter) + : level(_level), type(Type::ITERATOR) { + iter.Set(_iter); + } + + void SetTombstoneKey(ParsedInternalKey&& pik) { + // op_type is already initialized in MergingIterator::Finish(). + parsed_ikey.user_key = pik.user_key; + parsed_ikey.sequence = pik.sequence; + } + + void SetTombstoneForCompaction(const ParsedInternalKey&& pik) { + range_tombstone_key.clear(); + AppendInternalKey(&range_tombstone_key, pik); + } + + Slice key() const { + if (LIKELY(type == ITERATOR)) { + return iter.key(); + } + return range_tombstone_key; + } + + bool IsDeleteRangeSentinelKey() const { + if (LIKELY(type == ITERATOR)) { + return iter.IsDeleteRangeSentinelKey(); + } + return false; + } +}; + +class MinHeapItemComparator { + public: + explicit MinHeapItemComparator(const InternalKeyComparator* comparator) + : comparator_(comparator) {} + bool operator()(HeapItem* a, HeapItem* b) const { + if (LIKELY(a->type == HeapItem::ITERATOR)) { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->iter.key(), b->iter.key()) > 0; + } else { + return comparator_->Compare(a->iter.key(), b->parsed_ikey) > 0; + } + } else { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->parsed_ikey, b->iter.key()) > 0; + } else { + return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0; + } + } + } + + private: + const InternalKeyComparator* comparator_; +}; + +using MergerMinIterHeap = BinaryHeap; } // namespace ROCKSDB_NAMESPACE