diff --git a/CMakeLists.txt b/CMakeLists.txt index aaabfe1f7..d0112580d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -844,7 +844,6 @@ set(SOURCES table/get_context.cc table/iterator.cc table/merging_iterator.cc - table/compaction_merging_iterator.cc table/meta_blocks.cc table/persistent_cache_helper.cc table/plain/plain_table_bloom.cc diff --git a/HISTORY.md b/HISTORY.md index be8222cd3..e458fdfc7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,7 +3,6 @@ ### Behavior changes * Make best-efforts recovery verify SST unique ID before Version construction (#10962) * Introduce `epoch_number` and sort L0 files by `epoch_number` instead of `largest_seqno`. `epoch_number` represents the order of a file being flushed or ingested/imported. Compaction output file will be assigned with the minimum `epoch_number` among input files'. For L0, larger `epoch_number` indicates newer L0 file. -* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys. ### Bug Fixes * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed. diff --git a/TARGETS b/TARGETS index e5f887d49..f77eba770 100644 --- a/TARGETS +++ b/TARGETS @@ -200,7 +200,6 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", - "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", @@ -543,7 +542,6 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", - "table/compaction_merging_iterator.cc", "table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_reader.cc", diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h index ebc83192f..de549afa2 100644 --- a/db/blob/blob_counting_iterator.h +++ b/db/blob/blob_counting_iterator.h @@ -123,10 +123,6 @@ class BlobCountingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } - bool IsDeleteRangeSentinelKey() const override { - return iter_->IsDeleteRangeSentinelKey(); - } - private: void UpdateAndCountBlobIfNeeded() { assert(!iter_->Valid() || iter_->status().ok()); @@ -134,13 +130,6 @@ class BlobCountingIterator : public InternalIterator { if (!iter_->Valid()) { status_ = iter_->status(); return; - } else if (iter_->IsDeleteRangeSentinelKey()) { - // CompactionMergingIterator emits range tombstones, and range tombstone - // keys can be truncated at file boundaries. This means the range - // tombstone keys can have op_type kTypeBlobIndex. - // This could crash the ProcessInFlow() call below since - // value is empty for these keys. - return; } TEST_SYNC_POINT( diff --git a/db/compaction/clipping_iterator.h b/db/compaction/clipping_iterator.h index 3f50cdd9d..1ed465c2c 100644 --- a/db/compaction/clipping_iterator.h +++ b/db/compaction/clipping_iterator.h @@ -188,11 +188,6 @@ class ClippingIterator : public InternalIterator { return iter_->GetProperty(prop_name, prop); } - bool IsDeleteRangeSentinelKey() const override { - assert(valid_); - return iter_->IsDeleteRangeSentinelKey(); - } - private: void UpdateValid() { assert(!iter_->Valid() || iter_->status().ok()); diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 894754589..d7d57bbf5 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -20,6 +20,9 @@ namespace ROCKSDB_NAMESPACE { +const uint64_t kRangeTombstoneSentinel = + PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); + int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey& b) { auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key()); diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 10a258cb9..ee8639601 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -18,8 +18,6 @@ namespace ROCKSDB_NAMESPACE { // The file contains class Compaction, as well as some helper functions // and data structures used by the class. -const uint64_t kRangeTombstoneSentinel = - PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion); // Utility for comparing sstable boundary keys. Returns -1 if either a or b is // null which provides the property that a==null indicates a key that is less // than any key and b==null indicates a key that is greater than any key. Note diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 4a7c9adda..9f54f7813 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -377,7 +377,6 @@ void CompactionIterator::NextFromInput() { value_ = input_.value(); blob_value_.Reset(); iter_stats_.num_input_records++; - is_range_del_ = input_.IsDeleteRangeSentinelKey(); Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); if (!pik_status.ok()) { @@ -397,10 +396,7 @@ void CompactionIterator::NextFromInput() { break; } TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); - if (is_range_del_) { - validity_info_.SetValid(kRangeDeletion); - break; - } + // Update input statistics if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || ikey_.type == kTypeDeletionWithTimestamp) { @@ -622,14 +618,6 @@ void CompactionIterator::NextFromInput() { ParsedInternalKey next_ikey; AdvanceInputIter(); - while (input_.Valid() && input_.IsDeleteRangeSentinelKey() && - ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) - .ok() && - cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { - // skip range tombstone start keys with the same user key - // since they are not "real" point keys. - AdvanceInputIter(); - } // Check whether the next key exists, is not corrupt, and is the same key // as the single delete. @@ -637,7 +625,6 @@ void CompactionIterator::NextFromInput() { ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok() && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { - assert(!input_.IsDeleteRangeSentinelKey()); #ifndef NDEBUG const Compaction* c = compaction_ ? compaction_->real_compaction() : nullptr; @@ -862,14 +849,12 @@ void CompactionIterator::NextFromInput() { // Note that a deletion marker of type kTypeDeletionWithTimestamp will be // considered to have a different user key unless the timestamp is older // than *full_history_ts_low_. - // - // Range tombstone start keys are skipped as they are not "real" keys. while (!IsPausingManualCompaction() && !IsShuttingDown() && input_.Valid() && (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok()) && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && - (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() || + (prev_snapshot == 0 || DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) { AdvanceInputIter(); } @@ -1162,12 +1147,10 @@ void CompactionIterator::DecideOutputLevel() { void CompactionIterator::PrepareOutput() { if (Valid()) { - if (LIKELY(!is_range_del_)) { - if (ikey_.type == kTypeValue) { - ExtractLargeValueIfNeeded(); - } else if (ikey_.type == kTypeBlobIndex) { - GarbageCollectBlobIfNeeded(); - } + if (ikey_.type == kTypeValue) { + ExtractLargeValueIfNeeded(); + } else if (ikey_.type == kTypeBlobIndex) { + GarbageCollectBlobIfNeeded(); } if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) { @@ -1190,7 +1173,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) { + ikey_.sequence < preserve_time_min_seqno_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 1e4f373e2..c215d2bbb 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -63,10 +63,6 @@ class SequenceIterWrapper : public InternalIterator { void SeekToLast() override { assert(false); } uint64_t num_itered() const { return num_itered_; } - bool IsDeleteRangeSentinelKey() const override { - assert(Valid()); - return inner_iter_->IsDeleteRangeSentinelKey(); - } private: InternalKeyComparator icmp_; @@ -246,12 +242,7 @@ class CompactionIterator { const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } inline bool Valid() const { return validity_info_.IsValid(); } - const Slice& user_key() const { - if (UNLIKELY(is_range_del_)) { - return ikey_.user_key; - } - return current_user_key_; - } + const Slice& user_key() const { return current_user_key_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; } uint64_t num_input_entry_scanned() const { return input_.num_itered(); } // If the current key should be placed on penultimate level, only valid if @@ -261,8 +252,6 @@ class CompactionIterator { } Status InputStatus() const { return input_.status(); } - bool IsDeleteRangeSentinelKey() const { return is_range_del_; } - private: // Processes the input stream to find the next output void NextFromInput(); @@ -396,7 +385,6 @@ class CompactionIterator { kKeepSD = 8, kKeepDel = 9, kNewUserKey = 10, - kRangeDeletion = 11, }; struct ValidityInfo { @@ -504,10 +492,6 @@ class CompactionIterator { // This is a best-effort facility, so memory_order_relaxed is sufficient. return manual_compaction_canceled_.load(std::memory_order_relaxed); } - - // Stores whether the current compaction iterator output - // is a range tombstone start key. - bool is_range_del_{false}; }; inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 0f1dde327..302023d65 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1288,6 +1288,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. + assert(!end.has_value() || cfd->user_comparator()->Compare( c_iter->user_key(), end.value()) < 0); diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index e1fa21b4f..7bcce37ed 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -333,14 +333,8 @@ Status CompactionOutputs::AddToOutput( const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { Status s; - bool is_range_del = c_iter.IsDeleteRangeSentinelKey(); - if (is_range_del && compaction_->bottommost_level()) { - // We don't consider range tombstone for bottommost level since: - // 1. there is no grandparent and hence no overlap to consider - // 2. range tombstone may be dropped at bottommost level. - return s; - } const Slice& key = c_iter.key(); + if (ShouldStopBefore(c_iter) && HasBuilder()) { s = close_file_func(*this, c_iter.InputStatus(), key); if (!s.ok()) { @@ -350,13 +344,6 @@ Status CompactionOutputs::AddToOutput( grandparent_boundary_switched_num_ = 0; grandparent_overlapped_bytes_ = GetCurrentKeyGrandparentOverlappedBytes(key); - if (UNLIKELY(is_range_del)) { - // lower bound for this new output file, this is needed as the lower bound - // does not come from the smallest point key in this case. - range_tombstone_lower_bound_.DecodeFrom(key); - } else { - range_tombstone_lower_bound_.Clear(); - } } // Open output file if necessary @@ -367,17 +354,6 @@ Status CompactionOutputs::AddToOutput( } } - // c_iter may emit range deletion keys, so update `last_key_for_partitioner_` - // here before returning below when `is_range_del` is true - if (partitioner_) { - last_key_for_partitioner_.assign(c_iter.user_key().data_, - c_iter.user_key().size_); - } - - if (UNLIKELY(is_range_del)) { - return s; - } - assert(builder_ != nullptr); const Slice& value = c_iter.value(); s = current_output().validator.Add(key, value); @@ -401,6 +377,11 @@ Status CompactionOutputs::AddToOutput( s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, ikey.type); + if (partitioner_) { + last_key_for_partitioner_.assign(c_iter.user_key().data_, + c_iter.user_key().size_); + } + return s; } @@ -417,19 +398,13 @@ Status CompactionOutputs::AddRangeDels( std::string smallest_user_key; const Slice *lower_bound, *upper_bound; bool lower_bound_from_sub_compact = false; - bool lower_bound_from_range_tombstone = false; + size_t output_size = outputs_.size(); if (output_size == 1) { // For the first output table, include range tombstones before the min // key but after the subcompaction boundary. lower_bound = comp_start_user_key; lower_bound_from_sub_compact = true; - } else if (range_tombstone_lower_bound_.size() > 0) { - assert(meta.smallest.size() == 0 || - icmp.Compare(range_tombstone_lower_bound_, meta.smallest) <= 0); - lower_bound_guard = range_tombstone_lower_bound_.user_key(); - lower_bound = &lower_bound_guard; - lower_bound_from_range_tombstone = true; } else if (meta.smallest.size() > 0) { // For subsequent output tables, only include range tombstones from min // key onwards since the previous file was extended to contain range @@ -553,40 +528,6 @@ Status CompactionOutputs::AddRangeDels( smallest_candidate = InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion); } - } else if (lower_bound_from_range_tombstone) { - // When lower_bound is chosen from a range tombtone start key: - // Range tombstone keys can be truncated at file boundaries of the files - // that contain them. - // - // If this lower bound is from a range tombstone key that is not - // truncated, i.e., it was not truncated when reading from the input - // files, then its sequence number and `op_type` will be - // kMaxSequenceNumber and kTypeRangeDeletion (see - // TruncatedRangeDelIterator::start_key()). In this case, when this key - // was used as the upper bound to cut the previous compaction output - // file, the previous file's largest key could have the same value as - // this key (see the upperbound logic below). To guarantee - // non-overlapping ranges between output files, we use the range - // tombstone's actual sequence number (tombstone.seq_) for the lower - // bound of this file. If this range tombstone key is truncated, then - // the previous file's largest key will be smaller than this range - // tombstone key, so we can use it as the lower bound directly. - if (ExtractInternalKeyFooter(range_tombstone_lower_bound_.Encode()) == - kRangeTombstoneSentinel) { - if (ts_sz) { - smallest_candidate = - InternalKey(range_tombstone_lower_bound_.user_key(), - tombstone.seq_, kTypeRangeDeletion, tombstone.ts_); - } else { - smallest_candidate = - InternalKey(range_tombstone_lower_bound_.user_key(), - tombstone.seq_, kTypeRangeDeletion); - } - } else { - assert(GetInternalKeySeqno(range_tombstone_lower_bound_.Encode()) < - kMaxSequenceNumber); - smallest_candidate = range_tombstone_lower_bound_; - } } else { // If lower_bound was chosen by the smallest data key in the file, // choose lowest seqnum so this file's smallest internal key comes @@ -660,7 +601,7 @@ Status CompactionOutputs::AddRangeDels( // it cannot have a seqnum of 0 (unless the smallest data key in a file // has a seqnum of 0). Otherwise, the truncated tombstone may expose // deleted keys at lower levels. - assert(smallest_ikey_seqnum == 0 || lower_bound_from_range_tombstone || + assert(smallest_ikey_seqnum == 0 || ExtractInternalKeyFooter(meta.smallest.Encode()) != PackSequenceAndType(0, kTypeRangeDeletion)); } diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index af55ee524..f40aa8215 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -307,7 +307,6 @@ class CompactionOutputs { std::unique_ptr partitioner_; // A flag determines if this subcompaction has been split by the cursor - // for RoundRobin compaction bool is_split_ = false; // We also maintain the output split key for each subcompaction to avoid @@ -339,10 +338,6 @@ class CompactionOutputs { // for the current output file, how many file boundaries has it crossed, // basically number of files overlapped * 2 size_t grandparent_boundary_switched_num_ = 0; - - // The smallest key of the current output file, this is set when current - // output file's smallest key is a range tombstone start key. - InternalKey range_tombstone_lower_bound_; }; // helper struct to concatenate the last level and penultimate level outputs diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 06c2b73fd..c748be31b 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -84,11 +84,6 @@ class SubcompactionState { // Assign range dels aggregator, for each range_del, it can only be assigned // to one output level, for per_key_placement, it's going to be the // penultimate level. - // TODO: This does not work for per_key_placement + user-defined timestamp + - // DeleteRange() combo. If user-defined timestamp is enabled, - // it is possible for a range tombstone to belong to bottommost level ( - // seqno < earliest snapshot) without being dropped (garbage collection - // for user-defined timestamp). void AssignRangeDelAggregator( std::unique_ptr&& range_del_agg) { if (compaction->SupportsPerKeyPlacement()) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index e6f4e0e4d..19f7e95ba 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -1670,213 +1670,6 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) { ASSERT_EQ(1, num_range_deletions); } -// Test SST partitioner cut after every single key -class SingleKeySstPartitioner : public SstPartitioner { - public: - const char* Name() const override { return "SingleKeySstPartitioner"; } - - PartitionerResult ShouldPartition( - const PartitionerRequest& /*request*/) override { - return kRequired; - } - - bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, - const Slice& /*largest_user_key*/) override { - return false; - } -}; - -class SingleKeySstPartitionerFactory : public SstPartitionerFactory { - public: - static const char* kClassName() { return "SingleKeySstPartitionerFactory"; } - const char* Name() const override { return kClassName(); } - - std::unique_ptr CreatePartitioner( - const SstPartitioner::Context& /* context */) const override { - return std::unique_ptr(new SingleKeySstPartitioner()); - } -}; - -TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) { - Options options = CurrentOptions(); - options.compression = kNoCompression; - options.compaction_pri = kMinOverlappingRatio; - options.disable_auto_compactions = true; - options.ttl = 24 * 60 * 60; // 24 hours - options.target_file_size_base = 8 << 10; - env_->SetMockSleep(); - options.env = env_; - DestroyAndReopen(options); - - Random rnd(301); - // Fill some data so that future compactions are not bottommost level - // compaction, and hence they would try cut around files for ttl - for (int i = 5; i < 10; ++i) { - ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); - } - ASSERT_OK(Flush()); - MoveFilesToLevel(3); - ASSERT_EQ("0,0,0,1", FilesPerLevel()); - - for (int i = 5; i < 10; ++i) { - ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10))); - } - ASSERT_OK(Flush()); - MoveFilesToLevel(1); - ASSERT_EQ("0,1,0,1", FilesPerLevel()); - - env_->MockSleepForSeconds(20 * 60 * 60); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), - Key(11), Key(12))); - ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); - ASSERT_OK(Flush()); - ASSERT_EQ("1,1,0,1", FilesPerLevel()); - // L0 file is new, L1 and L3 file are old and qualified for TTL - env_->MockSleepForSeconds(10 * 60 * 60); - MoveFilesToLevel(1); - // L1 output should be cut into 3 files: - // File 0: Key(0) - // File 1: (qualified for TTL): Key(5) - Key(10) - // File 1: DeleteRange [11, 12) - ASSERT_EQ("0,3,0,1", FilesPerLevel()); -} - -TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) { - Options options = CurrentOptions(); - auto factory = std::make_shared(); - options.sst_partitioner_factory = factory; - options.disable_auto_compactions = true; - DestroyAndReopen(options); - - Random rnd(301); - // range deletion keys are not processed when compacting to bottommost level, - // so creating a file at older level to make the next compaction not - // bottommost level - ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10))); - ASSERT_OK(Flush()); - MoveFilesToLevel(5); - - ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10))); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2), - Key(5))); - ASSERT_OK(Flush()); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - MoveFilesToLevel(1); - // SSTPartitioner decides to cut when range tombstone start key is passed to - // it Note that the range tombstone [2, 5) itself span multiple keys but we - // are not able to partition in between yet. - ASSERT_EQ(2, NumTableFilesAtLevel(1)); -} - -TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) { - // L2 has two files - // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7 - // L0 has 0, [5, 6), 8 - // max_compaction_bytes is less than the size of L2_0 and L2_1. - // When compacting L0 into L1, it should split into 3 files. - const int kNumPerFile = 4, kNumFiles = 2; - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - options.target_file_size_base = 9 * 1024; - options.max_compaction_bytes = 9 * 1024; - DestroyAndReopen(options); - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - std::vector values; - for (int j = 0; j < kNumPerFile; j++) { - values.push_back(rnd.RandomString(3 << 10)); - ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); - } - } - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - MoveFilesToLevel(2); - ASSERT_EQ(2, NumTableFilesAtLevel(2)); - ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10))); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5), - Key(6))); - ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10))); - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - - ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, - true /* disallow_trivial_move */)); - ASSERT_EQ(3, NumTableFilesAtLevel(1)); -} - -TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) { - // L2 has two files - // L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7 - // L0 has two range tombstones [0, 1), [7, 8). - // max_compaction_bytes is less than the size of L2_0. - // When compacting L0 into L1, the two range tombstones should be - // split into two files. - const int kNumPerFile = 4, kNumFiles = 2; - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - options.target_file_size_base = 9 * 1024; - options.max_compaction_bytes = 9 * 1024; - DestroyAndReopen(options); - Random rnd(301); - for (int i = 0; i < kNumFiles; ++i) { - std::vector values; - // Write 12K (4 values, each 3K) - for (int j = 0; j < kNumPerFile; j++) { - values.push_back(rnd.RandomString(3 << 10)); - ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j])); - } - } - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - MoveFilesToLevel(2); - ASSERT_EQ(2, NumTableFilesAtLevel(2)); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), - Key(1))); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7), - Key(8))); - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - - ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, - true /* disallow_trivial_move */)); - // This is L0 -> L1 compaction - // The two range tombstones are broken up into two output files - // to limit compaction size. - ASSERT_EQ(2, NumTableFilesAtLevel(1)); -} - -TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) { - // L2 has two files - // L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8 - // L0 has [0, 9) and point key 5 - // max_compaction_bytes is less than the size of L2_0. - // When compacting L0 into L1, the compaction should cut at point key 5. - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - options.target_file_size_base = 9 * 1024; - options.max_compaction_bytes = 9 * 1024; - DestroyAndReopen(options); - Random rnd(301); - for (int i = 0; i < 9; ++i) { - if (i == 5) { - ++i; - } - ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10))); - } - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - MoveFilesToLevel(2); - ASSERT_EQ(2, NumTableFilesAtLevel(2)); - ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), - Key(9))); - ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10))); - ASSERT_OK(db_->Flush(FlushOptions())); - ASSERT_EQ(1, NumTableFilesAtLevel(0)); - ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, - true /* disallow_trivial_move */)); - ASSERT_EQ(2, NumTableFilesAtLevel(1)); -} - TEST_F(DBRangeDelTest, OverlappedTombstones) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); @@ -2309,7 +2102,6 @@ TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 2 * 1024; - options.level_compaction_dynamic_file_size = false; DestroyAndReopen(options); Random rnd(301); @@ -2725,7 +2517,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTest) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 2048; + options.max_compaction_bytes = 1024; DestroyAndReopen(options); // L2 @@ -2771,7 +2563,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) { options.compression = kNoCompression; options.disable_auto_compactions = true; options.target_file_size_base = 3 * 1024; - options.max_compaction_bytes = 3 * 1024; + options.max_compaction_bytes = 1024; DestroyAndReopen(options); // L2 diff --git a/db/history_trimming_iterator.h b/db/history_trimming_iterator.h index 4af5cde72..b445ced33 100644 --- a/db/history_trimming_iterator.h +++ b/db/history_trimming_iterator.h @@ -82,10 +82,6 @@ class HistoryTrimmingIterator : public InternalIterator { bool IsValuePinned() const override { return input_->IsValuePinned(); } - bool IsDeleteRangeSentinelKey() const override { - return input_->IsDeleteRangeSentinelKey(); - } - private: InternalIterator* input_; const std::string filter_ts_; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 671545e60..6df841012 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -224,10 +224,6 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, s = Status::ShutdownInProgress(); return s; } - // Skip range tombstones emitted by the compaction iterator. - if (iter->IsDeleteRangeSentinelKey()) { - continue; - } ParsedInternalKey ikey; assert(keys_.size() == merge_context_.GetNumOperands()); diff --git a/db/version_set.cc b/db/version_set.cc index 886d8dc3e..f29ffbead 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -39,8 +39,6 @@ #include "db/table_cache.h" #include "db/version_builder.h" #include "db/version_edit_handler.h" -#include "table/compaction_merging_iterator.h" - #if USE_COROUTINES #include "folly/experimental/coro/BlockingWait.h" #include "folly/experimental/coro/Collect.h" @@ -6592,14 +6590,6 @@ InternalIterator* VersionSet::MakeInputIterator( c->num_input_levels() - 1 : c->num_input_levels()); InternalIterator** list = new InternalIterator*[space]; - // First item in the pair is a pointer to range tombstones. - // Second item is a pointer to a member of a LevelIterator, - // that will be initialized to where CompactionMergingIterator stores - // pointer to its range tombstones. This is used by LevelIterator - // to update pointer to range tombstones as it traverse different SST files. - std::vector< - std::pair> - range_tombstones; size_t num = 0; for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->input_levels(which)->num_files != 0) { @@ -6620,7 +6610,7 @@ InternalIterator* VersionSet::MakeInputIterator( end.value(), fmd.smallest.user_key()) < 0) { continue; } - TruncatedRangeDelIterator* range_tombstone_iter = nullptr; + list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, cfd->internal_comparator(), fmd, range_del_agg, @@ -6633,13 +6623,10 @@ InternalIterator* VersionSet::MakeInputIterator( MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, - /*allow_unprepared_value=*/false, - /*range_del_iter=*/&range_tombstone_iter); - range_tombstones.emplace_back(range_tombstone_iter, nullptr); + /*allow_unprepared_value=*/false); } } else { // Create concatenating iterator for the files from this level - TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr; list[num++] = new LevelIterator( cfd->table_cache(), read_options, file_options_compactions, cfd->internal_comparator(), c->input_levels(which), @@ -6648,15 +6635,14 @@ InternalIterator* VersionSet::MakeInputIterator( /*no per level latency histogram=*/nullptr, TableReaderCaller::kCompaction, /*skip_filters=*/false, /*level=*/static_cast(c->level(which)), range_del_agg, - c->boundaries(which), false, &tombstone_iter_ptr); - range_tombstones.emplace_back(nullptr, tombstone_iter_ptr); + c->boundaries(which)); } } } assert(num <= space); - InternalIterator* result = NewCompactionMergingIterator( - &c->column_family_data()->internal_comparator(), list, - static_cast(num), range_tombstones); + InternalIterator* result = + NewMergingIterator(&c->column_family_data()->internal_comparator(), list, + static_cast(num)); delete[] list; return result; } diff --git a/src.mk b/src.mk index 79f57c738..b82570686 100644 --- a/src.mk +++ b/src.mk @@ -198,7 +198,6 @@ LIB_SOURCES = \ table/get_context.cc \ table/iterator.cc \ table/merging_iterator.cc \ - table/compaction_merging_iterator.cc \ table/meta_blocks.cc \ table/persistent_cache_helper.cc \ table/plain/plain_table_bloom.cc \ diff --git a/table/compaction_merging_iterator.cc b/table/compaction_merging_iterator.cc deleted file mode 100644 index e48712652..000000000 --- a/table/compaction_merging_iterator.cc +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -#include "table/compaction_merging_iterator.h" - -namespace ROCKSDB_NAMESPACE { -void CompactionMergingIterator::SeekToFirst() { - minHeap_.clear(); - status_ = Status::OK(); - for (auto& child : children_) { - child.iter.SeekToFirst(); - AddToMinHeapOrCheckStatus(&child); - } - - for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { - if (range_tombstone_iters_[i]) { - range_tombstone_iters_[i]->SeekToFirst(); - InsertRangeTombstoneAtLevel(i); - } - } - - FindNextVisibleKey(); - current_ = CurrentForward(); -} - -void CompactionMergingIterator::Seek(const Slice& target) { - minHeap_.clear(); - status_ = Status::OK(); - for (auto& child : children_) { - child.iter.Seek(target); - AddToMinHeapOrCheckStatus(&child); - } - - ParsedInternalKey pik; - ParseInternalKey(target, &pik, false /* log_err_key */) - .PermitUncheckedError(); - for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { - if (range_tombstone_iters_[i]) { - range_tombstone_iters_[i]->Seek(pik.user_key); - // For compaction, output keys should all be after seek target. - while (range_tombstone_iters_[i]->Valid() && - comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) < - 0) { - range_tombstone_iters_[i]->Next(); - } - InsertRangeTombstoneAtLevel(i); - } - } - - FindNextVisibleKey(); - current_ = CurrentForward(); -} - -void CompactionMergingIterator::Next() { - assert(Valid()); - // For the heap modifications below to be correct, current_ must be the - // current top of the heap. - assert(current_ == CurrentForward()); - // as the current points to the current record. move the iterator forward. - if (current_->type == HeapItem::ITERATOR) { - current_->iter.Next(); - if (current_->iter.Valid()) { - // current is still valid after the Next() call above. Call - // replace_top() to restore the heap property. When the same child - // iterator yields a sequence of keys, this is cheap. - assert(current_->iter.status().ok()); - minHeap_.replace_top(current_); - } else { - // current stopped being valid, remove it from the heap. - considerStatus(current_->iter.status()); - minHeap_.pop(); - } - } else { - assert(current_->type == HeapItem::DELETE_RANGE_START); - size_t level = current_->level; - assert(range_tombstone_iters_[level]); - range_tombstone_iters_[level]->Next(); - if (range_tombstone_iters_[level]->Valid()) { - pinned_heap_item_[level].SetTombstoneForCompaction( - range_tombstone_iters_[level]->start_key()); - minHeap_.replace_top(&pinned_heap_item_[level]); - } else { - minHeap_.pop(); - } - } - FindNextVisibleKey(); - current_ = CurrentForward(); -} - -void CompactionMergingIterator::FindNextVisibleKey() { - // IsDeleteRangeSentinelKey() here means file boundary sentinel keys. - while (!minHeap_.empty() && minHeap_.top()->IsDeleteRangeSentinelKey()) { - HeapItem* current = minHeap_.top(); - // range tombstone start keys from the same SSTable should have been - // exhausted - assert(!range_tombstone_iters_[current->level] || - !range_tombstone_iters_[current->level]->Valid()); - // iter is a LevelIterator, and it enters a new SST file in the Next() - // call here. - current->iter.Next(); - if (current->iter.Valid()) { - assert(current->iter.status().ok()); - minHeap_.replace_top(current); - } else { - minHeap_.pop(); - } - if (range_tombstone_iters_[current->level]) { - InsertRangeTombstoneAtLevel(current->level); - } - } -} -void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) { - if (child->iter.Valid()) { - assert(child->iter.status().ok()); - minHeap_.push(child); - } else { - considerStatus(child->iter.status()); - } -} - -InternalIterator* NewCompactionMergingIterator( - const InternalKeyComparator* comparator, InternalIterator** children, int n, - std::vector>& range_tombstone_iters, - Arena* arena) { - assert(n >= 0); - if (n == 0) { - return NewEmptyInternalIterator(arena); - } else { - if (arena == nullptr) { - return new CompactionMergingIterator(comparator, children, n, false, - range_tombstone_iters); - } else { - auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator)); - return new (mem) CompactionMergingIterator(comparator, children, n, true, - range_tombstone_iters); - } - } -} -} // namespace ROCKSDB_NAMESPACE diff --git a/table/compaction_merging_iterator.h b/table/compaction_merging_iterator.h deleted file mode 100644 index c374683c9..000000000 --- a/table/compaction_merging_iterator.h +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright (c) Meta Platforms, Inc. and affiliates. -// -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#pragma once - -#include "db/range_del_aggregator.h" -#include "rocksdb/slice.h" -#include "rocksdb/types.h" -#include "table/merging_iterator.h" - -namespace ROCKSDB_NAMESPACE { - -class CompactionHeapItemComparator { - public: - explicit CompactionHeapItemComparator(const InternalKeyComparator* comparator) - : comparator_(comparator) {} - bool operator()(HeapItem* a, HeapItem* b) const { - int r = comparator_->Compare(a->key(), b->key()); - if (r > 0) { - return true; - } else if (r < 0) { - return false; - } else { - // When range tombstone and point key have the same internal key, - // range tombstone comes first. So that when range tombstone and - // file's largest key are the same, the file boundary sentinel key - // comes after. - return a->type == HeapItem::ITERATOR && - b->type == HeapItem::DELETE_RANGE_START; - } - } - - private: - const InternalKeyComparator* comparator_; -}; - -using CompactionMinHeap = BinaryHeap; -/* - * This is a simplified version of MergingIterator and is specifically used for - * compaction. It merges the input `children` iterators into a sorted stream of - * keys. Range tombstone start keys are also emitted to prevent oversize - * compactions. For example, consider an L1 file with content [a, b), y, z, - * where [a, b) is a range tombstone and y and z are point keys. This could - * cause an oversize compaction as it can overlap with a wide range of key space - * in L2. - * - * CompactionMergingIterator emits range tombstone start keys from each LSM - * level's range tombstone iterator, and for each range tombstone - * [start,end)@seqno, the key will be start@kMaxSequenceNumber unless truncated - * at file boundary (see detail TruncatedRangeDelIterator::start_key()). - * - * Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to - * check if the current key is a range tombstone key. - * TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at - * different layers: file boundary and range tombstone keys. Separate them into - * two APIs for clarity. - */ -class CompactionMergingIterator : public InternalIterator { - public: - CompactionMergingIterator( - const InternalKeyComparator* comparator, InternalIterator** children, - int n, bool is_arena_mode, - std::vector< - std::pair> - range_tombstones) - : is_arena_mode_(is_arena_mode), - comparator_(comparator), - current_(nullptr), - minHeap_(CompactionHeapItemComparator(comparator_)), - pinned_iters_mgr_(nullptr) { - children_.resize(n); - for (int i = 0; i < n; i++) { - children_[i].level = i; - children_[i].iter.Set(children[i]); - assert(children_[i].type == HeapItem::ITERATOR); - } - assert(range_tombstones.size() == static_cast(n)); - for (auto& p : range_tombstones) { - range_tombstone_iters_.push_back(p.first); - } - - pinned_heap_item_.resize(n); - for (int i = 0; i < n; ++i) { - if (range_tombstones[i].second) { - // for LevelIterator - *range_tombstones[i].second = &range_tombstone_iters_[i]; - } - pinned_heap_item_[i].level = i; - pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START; - } - } - - void considerStatus(const Status& s) { - if (!s.ok() && status_.ok()) { - status_ = s; - } - } - - ~CompactionMergingIterator() override { - // TODO: use unique_ptr for range_tombstone_iters_ - for (auto child : range_tombstone_iters_) { - delete child; - } - - for (auto& child : children_) { - child.iter.DeleteIter(is_arena_mode_); - } - status_.PermitUncheckedError(); - } - - bool Valid() const override { return current_ != nullptr && status_.ok(); } - - Status status() const override { return status_; } - - void SeekToFirst() override; - - void Seek(const Slice& target) override; - - void Next() override; - - Slice key() const override { - assert(Valid()); - return current_->key(); - } - - Slice value() const override { - assert(Valid()); - if (LIKELY(current_->type == HeapItem::ITERATOR)) { - return current_->iter.value(); - } else { - return dummy_tombstone_val; - } - } - - // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result - // from current child iterator. Potentially as long as one of child iterator - // report out of bound is not possible, we know current key is within bound. - bool MayBeOutOfLowerBound() override { - assert(Valid()); - return current_->type == HeapItem::DELETE_RANGE_START || - current_->iter.MayBeOutOfLowerBound(); - } - - IterBoundCheck UpperBoundCheckResult() override { - assert(Valid()); - return current_->type == HeapItem::DELETE_RANGE_START - ? IterBoundCheck::kUnknown - : current_->iter.UpperBoundCheckResult(); - } - - void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { - pinned_iters_mgr_ = pinned_iters_mgr; - for (auto& child : children_) { - child.iter.SetPinnedItersMgr(pinned_iters_mgr); - } - } - - bool IsDeleteRangeSentinelKey() const override { - assert(Valid()); - return current_->type == HeapItem::DELETE_RANGE_START; - } - - // Compaction uses the above subset of InternalIterator interface. - void SeekToLast() override { assert(false); } - - void SeekForPrev(const Slice&) override { assert(false); } - - void Prev() override { assert(false); } - - bool NextAndGetResult(IterateResult*) override { - assert(false); - return false; - } - - bool IsKeyPinned() const override { - assert(false); - return false; - } - - bool IsValuePinned() const override { - assert(false); - return false; - } - - bool PrepareValue() override { - assert(false); - return false; - } - - private: - bool is_arena_mode_; - const InternalKeyComparator* comparator_; - // HeapItem for all child point iterators. - std::vector children_; - // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the - // current range tombstone from range_tombstone_iters_[i]. - std::vector pinned_heap_item_; - // range_tombstone_iters_[i] contains range tombstones in the sorted run that - // corresponds to children_[i]. range_tombstone_iters_[i] == - // nullptr means the sorted run of children_[i] does not have range - // tombstones (or the current SSTable does not have range tombstones in the - // case of LevelIterator). - std::vector range_tombstone_iters_; - // Used as value for range tombstone keys - std::string dummy_tombstone_val{}; - - // Skip file boundary sentinel keys. - void FindNextVisibleKey(); - - // top of minHeap_ - HeapItem* current_; - // If any of the children have non-ok status, this is one of them. - Status status_; - CompactionMinHeap minHeap_; - PinnedIteratorsManager* pinned_iters_mgr_; - // Process a child that is not in the min heap. - // If valid, add to the min heap. Otherwise, check status. - void AddToMinHeapOrCheckStatus(HeapItem*); - - HeapItem* CurrentForward() const { - return !minHeap_.empty() ? minHeap_.top() : nullptr; - } - - void InsertRangeTombstoneAtLevel(size_t level) { - if (range_tombstone_iters_[level]->Valid()) { - pinned_heap_item_[level].SetTombstoneForCompaction( - range_tombstone_iters_[level]->start_key()); - minHeap_.push(&pinned_heap_item_[level]); - } - } -}; - -InternalIterator* NewCompactionMergingIterator( - const InternalKeyComparator* comparator, InternalIterator** children, int n, - std::vector>& range_tombstone_iters, - Arena* arena = nullptr); -} // namespace ROCKSDB_NAMESPACE diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 320650960..309ae69c5 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -10,8 +10,92 @@ #include "table/merging_iterator.h" #include "db/arena_wrapped_db_iter.h" +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "memory/arena.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "table/internal_iterator.h" +#include "table/iter_heap.h" +#include "table/iterator_wrapper.h" +#include "test_util/sync_point.h" +#include "util/autovector.h" +#include "util/heap.h" +#include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { +// For merging iterator to process range tombstones, we treat the start and end +// keys of a range tombstone as point keys and put them into the minHeap/maxHeap +// used in merging iterator. Take minHeap for example, we are able to keep track +// of currently "active" range tombstones (the ones whose start keys are popped +// but end keys are still in the heap) in `active_`. This `active_` set of range +// tombstones is then used to quickly determine whether the point key at heap +// top is deleted (by heap property, the point key at heap top must be within +// internal key range of active range tombstones). +// +// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: +// point key and the start and end keys of a range tombstone. +struct HeapItem { + HeapItem() = default; + + enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; + IteratorWrapper iter; + size_t level = 0; + ParsedInternalKey parsed_ikey; + // Will be overwritten before use, initialize here so compiler does not + // complain. + Type type = ITERATOR; + + explicit HeapItem(size_t _level, InternalIteratorBase* _iter) + : level(_level), type(Type::ITERATOR) { + iter.Set(_iter); + } + + void SetTombstoneKey(ParsedInternalKey&& pik) { + // op_type is already initialized in MergingIterator::Finish(). + parsed_ikey.user_key = pik.user_key; + parsed_ikey.sequence = pik.sequence; + } + + Slice key() const { + assert(type == ITERATOR); + return iter.key(); + } + + bool IsDeleteRangeSentinelKey() const { + if (type == Type::ITERATOR) { + return iter.IsDeleteRangeSentinelKey(); + } + return false; + } +}; + +class MinHeapItemComparator { + public: + MinHeapItemComparator(const InternalKeyComparator* comparator) + : comparator_(comparator) {} + bool operator()(HeapItem* a, HeapItem* b) const { + if (LIKELY(a->type == HeapItem::ITERATOR)) { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->key(), b->key()) > 0; + } else { + return comparator_->Compare(a->key(), b->parsed_ikey) > 0; + } + } else { + if (LIKELY(b->type == HeapItem::ITERATOR)) { + return comparator_->Compare(a->parsed_ikey, b->key()) > 0; + } else { + return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0; + } + } + } + + private: + const InternalKeyComparator* comparator_; +}; + class MaxHeapItemComparator { public: MaxHeapItemComparator(const InternalKeyComparator* comparator) @@ -19,13 +103,13 @@ class MaxHeapItemComparator { bool operator()(HeapItem* a, HeapItem* b) const { if (LIKELY(a->type == HeapItem::ITERATOR)) { if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->iter.key(), b->iter.key()) < 0; + return comparator_->Compare(a->key(), b->key()) < 0; } else { - return comparator_->Compare(a->iter.key(), b->parsed_ikey) < 0; + return comparator_->Compare(a->key(), b->parsed_ikey) < 0; } } else { if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->iter.key()) < 0; + return comparator_->Compare(a->parsed_ikey, b->key()) < 0; } else { return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0; } @@ -37,6 +121,7 @@ class MaxHeapItemComparator { }; // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { +using MergerMinIterHeap = BinaryHeap; using MergerMaxIterHeap = BinaryHeap; } // namespace @@ -51,7 +136,7 @@ class MergingIterator : public InternalIterator { direction_(kForward), comparator_(comparator), current_(nullptr), - minHeap_(MinHeapItemComparator(comparator_)), + minHeap_(comparator_), pinned_iters_mgr_(nullptr), iterate_upper_bound_(iterate_upper_bound) { children_.resize(n); @@ -1092,7 +1177,7 @@ void MergingIterator::SwitchToForward() { if (child.iter.status() == Status::TryAgain()) { continue; } - if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1103,7 +1188,7 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (child.iter.status() == Status::TryAgain()) { child.iter.Seek(target); - if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.key())) { assert(child.iter.status().ok()); child.iter.Next(); } @@ -1154,7 +1239,7 @@ void MergingIterator::SwitchToBackward() { if (&child.iter != current_) { child.iter.SeekForPrev(target); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); - if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { + if (child.iter.Valid() && comparator_->Equal(target, child.key())) { assert(child.iter.status().ok()); child.iter.Prev(); } diff --git a/table/merging_iterator.h b/table/merging_iterator.h index 0f3592b99..16fc0877e 100644 --- a/table/merging_iterator.h +++ b/table/merging_iterator.h @@ -12,7 +12,6 @@ #include "db/range_del_aggregator.h" #include "rocksdb/slice.h" #include "rocksdb/types.h" -#include "table/iterator_wrapper.h" namespace ROCKSDB_NAMESPACE { @@ -90,83 +89,4 @@ class MergeIteratorBuilder { range_del_iter_ptrs_; }; -// For merging iterator to process range tombstones, we treat the start and end -// keys of a range tombstone as point keys and put them into the minHeap/maxHeap -// used in merging iterator. Take minHeap for example, we are able to keep track -// of currently "active" range tombstones (the ones whose start keys are popped -// but end keys are still in the heap) in `active_`. This `active_` set of range -// tombstones is then used to quickly determine whether the point key at heap -// top is deleted (by heap property, the point key at heap top must be within -// internal key range of active range tombstones). -// -// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap: -// point key and the start and end keys of a range tombstone. -struct HeapItem { - HeapItem() = default; - - enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; - IteratorWrapper iter; - size_t level = 0; - ParsedInternalKey parsed_ikey; - std::string range_tombstone_key; - // Will be overwritten before use, initialize here so compiler does not - // complain. - Type type = ITERATOR; - - explicit HeapItem(size_t _level, InternalIteratorBase* _iter) - : level(_level), type(Type::ITERATOR) { - iter.Set(_iter); - } - - void SetTombstoneKey(ParsedInternalKey&& pik) { - // op_type is already initialized in MergingIterator::Finish(). - parsed_ikey.user_key = pik.user_key; - parsed_ikey.sequence = pik.sequence; - } - - void SetTombstoneForCompaction(const ParsedInternalKey&& pik) { - range_tombstone_key.clear(); - AppendInternalKey(&range_tombstone_key, pik); - } - - Slice key() const { - if (LIKELY(type == ITERATOR)) { - return iter.key(); - } - return range_tombstone_key; - } - - bool IsDeleteRangeSentinelKey() const { - if (LIKELY(type == ITERATOR)) { - return iter.IsDeleteRangeSentinelKey(); - } - return false; - } -}; - -class MinHeapItemComparator { - public: - explicit MinHeapItemComparator(const InternalKeyComparator* comparator) - : comparator_(comparator) {} - bool operator()(HeapItem* a, HeapItem* b) const { - if (LIKELY(a->type == HeapItem::ITERATOR)) { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->iter.key(), b->iter.key()) > 0; - } else { - return comparator_->Compare(a->iter.key(), b->parsed_ikey) > 0; - } - } else { - if (LIKELY(b->type == HeapItem::ITERATOR)) { - return comparator_->Compare(a->parsed_ikey, b->iter.key()) > 0; - } else { - return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0; - } - } - } - - private: - const InternalKeyComparator* comparator_; -}; - -using MergerMinIterHeap = BinaryHeap; } // namespace ROCKSDB_NAMESPACE