diff --git a/db/builder.cc b/db/builder.cc index a6d87358f..5488daee0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -102,7 +102,7 @@ Status BuildTable( #endif // !ROCKSDB_LITE TableProperties tp; - if (iter->Valid() || range_del_agg->ShouldAddTombstones()) { + if (iter->Valid() || !range_del_agg->IsEmpty()) { TableBuilder* builder; unique_ptr file_writer; { @@ -156,9 +156,14 @@ Status BuildTable( ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); } } - // nullptr for table_{min,max} so all range tombstones will be flushed - range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */, - nullptr /* upper_bound */, meta); + + for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) { + auto tombstone = it->Tombstone(); + auto kv = tombstone.Serialize(); + builder->Add(kv.first.Encode(), kv.second); + meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), + tombstone.seq_, internal_comparator); + } // Finish and check for builder errors tp = builder->GetTableProperties(); diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index bf7595495..7b096ca04 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -554,7 +554,7 @@ void CompactionIterator::NextFromInput() { // 1. new user key -OR- // 2. different snapshot stripe bool should_delete = range_del_agg_->ShouldDelete( - key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal); + key_, RangeDelPositioningMode::kForwardTraversal); if (should_delete) { ++iter_stats_.num_record_drop_hidden; ++iter_stats_.num_record_drop_range_del; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index a9f87d7c2..15ba15c51 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1075,8 +1075,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } if (status.ok() && sub_compact->builder == nullptr && - sub_compact->outputs.size() == 0 && - range_del_agg->ShouldAddTombstones(bottommost_level_)) { + sub_compact->outputs.size() == 0) { // handle subcompaction containing only range deletions status = OpenCompactionOutputFile(sub_compact); } @@ -1164,6 +1163,9 @@ Status CompactionJob::FinishCompactionOutputFile( uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + const Comparator* ucmp = cfd->user_comparator(); + // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; @@ -1196,9 +1198,71 @@ Status CompactionJob::FinishCompactionOutputFile( // subcompaction ends. upper_bound = sub_compact->end; } - range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound, - upper_bound, meta, range_del_out_stats, - bottommost_level_); + auto earliest_snapshot = kMaxSequenceNumber; + if (existing_snapshots_.size() > 0) { + earliest_snapshot = existing_snapshots_[0]; + } + auto it = range_del_agg->NewIterator(); + if (lower_bound != nullptr) { + it->Seek(*lower_bound); + } + for (; it->Valid(); it->Next()) { + auto tombstone = it->Tombstone(); + if (upper_bound != nullptr && + ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) { + // Tombstones starting at upper_bound or later only need to be included + // in the next table. Break because subsequent tombstones will start + // even later. + break; + } + + if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) { + // TODO(andrewkr): tombstones that span multiple output files are + // counted for each compaction output file, so lots of double counting. + range_del_out_stats->num_range_del_drop_obsolete++; + range_del_out_stats->num_record_drop_obsolete++; + continue; + } + + auto kv = tombstone.Serialize(); + sub_compact->builder->Add(kv.first.Encode(), kv.second); + InternalKey smallest_candidate = std::move(kv.first); + if (lower_bound != nullptr && + ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { + // Pretend the smallest key has the same user key as lower_bound + // (the max key in the previous table or subcompaction) in order for + // files to appear key-space partitioned. + // + // Choose lowest seqnum so this file's smallest internal key comes + // after the previous file's/subcompaction's largest. The fake seqnum + // is OK because the read path's file-picking code only considers user + // key. + smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); + } + InternalKey largest_candidate = tombstone.SerializeEndKey(); + if (upper_bound != nullptr && + ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) { + // Pretend the largest key has the same user key as upper_bound (the + // min key in the following table or subcompaction) in order for files + // to appear key-space partitioned. + // + // Choose highest seqnum so this file's largest internal key comes + // before the next file's/subcompaction's smallest. The fake seqnum is + // OK because the read path's file-picking code only considers the user + // key portion. + // + // Note Seek() also creates InternalKey with (user_key, + // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of + // kTypeRangeDeletion (0xF), so the range tombstone comes before the + // Seek() key in InternalKey's ordering. So Seek() will look in the + // next file for the user key. + largest_candidate = + InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion); + } + meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate, + tombstone.seq_, + cfd->internal_comparator()); + } meta->marked_for_compaction = sub_compact->builder->NeedCompact(); } const uint64_t current_entries = sub_compact->builder->NumEntries(); @@ -1247,7 +1311,6 @@ Status CompactionJob::FinishCompactionOutputFile( return s; } - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) { // Output to event logger and fire events. sub_compact->current_output()->table_properties = diff --git a/db/db_iter.cc b/db/db_iter.cc index 90ad2cfd6..2c6c96580 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -518,8 +518,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); if (range_del_agg_.ShouldDelete( - ikey_, RangeDelAggregator::RangePositioningMode:: - kForwardTraversal)) { + ikey_, RangeDelPositioningMode::kForwardTraversal)) { // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. skipping = true; @@ -549,8 +548,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); if (range_del_agg_.ShouldDelete( - ikey_, RangeDelAggregator::RangePositioningMode:: - kForwardTraversal)) { + ikey_, RangeDelPositioningMode::kForwardTraversal)) { // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. skipping = true; @@ -657,8 +655,7 @@ bool DBIter::MergeValuesNewToOld() { break; } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode:: - kForwardTraversal)) { + ikey, RangeDelPositioningMode::kForwardTraversal)) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_->Next(); @@ -916,8 +913,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeValue: case kTypeBlobIndex: if (range_del_agg_.ShouldDelete( - ikey, - RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + ikey, RangeDelPositioningMode::kBackwardTraversal)) { last_key_entry_type = kTypeRangeDeletion; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } else { @@ -935,8 +931,7 @@ bool DBIter::FindValueForCurrentKey() { break; case kTypeMerge: if (range_del_agg_.ShouldDelete( - ikey, - RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + ikey, RangeDelPositioningMode::kBackwardTraversal)) { merge_context_.Clear(); last_key_entry_type = kTypeRangeDeletion; last_not_merge_type = last_key_entry_type; @@ -1069,7 +1064,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + ikey, RangeDelPositioningMode::kBackwardTraversal)) { valid_ = false; return true; } @@ -1114,8 +1109,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( - ikey, - RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + ikey, RangeDelPositioningMode::kBackwardTraversal)) { break; } else if (ikey.type == kTypeValue) { const Slice val = iter_->value(); @@ -1283,7 +1277,7 @@ void DBIter::Seek(const Slice& target) { { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->Seek(saved_key_.GetInternalKey()); - range_del_agg_.InvalidateTombstoneMapPositions(); + range_del_agg_.InvalidateRangeDelMapPositions(); } RecordTick(statistics_, NUMBER_DB_SEEK); if (iter_->Valid()) { @@ -1334,7 +1328,7 @@ void DBIter::SeekForPrev(const Slice& target) { { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->SeekForPrev(saved_key_.GetInternalKey()); - range_del_agg_.InvalidateTombstoneMapPositions(); + range_del_agg_.InvalidateRangeDelMapPositions(); } RecordTick(statistics_, NUMBER_DB_SEEK); @@ -1383,7 +1377,7 @@ void DBIter::SeekToFirst() { { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->SeekToFirst(); - range_del_agg_.InvalidateTombstoneMapPositions(); + range_del_agg_.InvalidateRangeDelMapPositions(); } RecordTick(statistics_, NUMBER_DB_SEEK); @@ -1434,7 +1428,7 @@ void DBIter::SeekToLast() { { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->SeekToLast(); - range_del_agg_.InvalidateTombstoneMapPositions(); + range_del_agg_.InvalidateRangeDelMapPositions(); } PrevInternal(); if (statistics_ != nullptr) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 4d37a37fe..cf009746b 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -508,12 +508,12 @@ TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) { Reopen(opts); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", - "dr1"); // obsolete after compaction + "dr10"); // obsolete after compaction db_->Put(WriteOptions(), "key", "val"); db_->Flush(FlushOptions()); const Snapshot* snapshot = db_->GetSnapshot(); db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2", - "dr2"); // protected by snapshot + "dr20"); // protected by snapshot db_->Put(WriteOptions(), "key", "val"); db_->Flush(FlushOptions()); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index f2d4fa450..dc6baa963 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -237,8 +237,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil && range_del_agg != nullptr && range_del_agg->ShouldDelete( - iter->key(), - RangeDelAggregator::RangePositioningMode::kForwardTraversal)) { + iter->key(), RangeDelPositioningMode::kForwardTraversal)) { filter = CompactionFilter::Decision::kRemove; } if (filter == CompactionFilter::Decision::kKeep || diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index 18eeff152..2fb5752ee 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -4,11 +4,352 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/range_del_aggregator.h" +#include "util/heap.h" #include namespace rocksdb { +struct TombstoneStartKeyComparator { + TombstoneStartKeyComparator(const Comparator* c) : cmp(c) {} + + bool operator()(const RangeTombstone& a, const RangeTombstone& b) const { + return cmp->Compare(a.start_key_, b.start_key_) < 0; + } + + const Comparator* cmp; +}; + +// An UncollapsedRangeDelMap is quick to create but slow to answer ShouldDelete +// queries. +class UncollapsedRangeDelMap : public RangeDelMap { + typedef std::multiset Rep; + + class Iterator : public RangeDelIterator { + const Rep& rep_; + Rep::const_iterator iter_; + + public: + Iterator(const Rep& rep) : rep_(rep), iter_(rep.begin()) {} + bool Valid() const override { return iter_ != rep_.end(); } + void Next() override { iter_++; } + + void Seek(const Slice&) override { + fprintf(stderr, "UncollapsedRangeDelMap::Iterator::Seek unimplemented\n"); + abort(); + } + + RangeTombstone Tombstone() const override { return *iter_; } + }; + + Rep rep_; + const Comparator* ucmp_; + + public: + UncollapsedRangeDelMap(const Comparator* ucmp) + : rep_(TombstoneStartKeyComparator(ucmp)), ucmp_(ucmp) {} + + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) { + (void)mode; + assert(mode == RangeDelPositioningMode::kFullScan); + for (const auto& tombstone : rep_) { + if (ucmp_->Compare(parsed.user_key, tombstone.start_key_) < 0) { + break; + } + if (parsed.sequence < tombstone.seq_ && + ucmp_->Compare(parsed.user_key, tombstone.end_key_) < 0) { + return true; + } + } + return false; + } + + bool IsRangeOverlapped(const Slice& start, const Slice& end) { + for (const auto& tombstone : rep_) { + if (ucmp_->Compare(start, tombstone.end_key_) < 0 && + ucmp_->Compare(tombstone.start_key_, end) <= 0 && + ucmp_->Compare(tombstone.start_key_, tombstone.end_key_) < 0) { + return true; + } + } + return false; + } + + void AddTombstone(RangeTombstone tombstone) { rep_.emplace(tombstone); } + + size_t Size() const { return rep_.size(); } + + void InvalidatePosition() {} // no-op + + std::unique_ptr NewIterator() { + return std::unique_ptr(new Iterator(this->rep_)); + } +}; + +// A CollapsedRangeDelMap is slow to create but quick to answer ShouldDelete +// queries. +// +// An explanation of the design follows. Suppose we have tombstones [b, n) @ 1, +// [e, h) @ 2, [q, t) @ 2, and [g, k) @ 3. Visually, the tombstones look like +// this: +// +// 3: g---k +// 2: e---h q--t +// 1: b------------n +// +// The CollapsedRangeDelMap representation is based on the observation that +// wherever tombstones overlap, we need only store the tombstone with the +// largest seqno. From the perspective of a read at seqno 4 or greater, this set +// of tombstones is exactly equivalent: +// +// 3: g---k +// 2: e--g q--t +// 1: b--e k--n +// +// Because these tombstones do not overlap, they can be efficiently represented +// in an ordered map from keys to sequence numbers. Each entry should be thought +// of as a transition from one tombstone to the next. In this example, the +// CollapsedRangeDelMap would store the following entries, in order: +// +// b → 1, e → 2, g → 3, k → 1, n → 0, q → 2, t → 0 +// +// If a tombstone ends before the next tombstone begins, a sentinel seqno of 0 +// is installed to indicate that no tombstone exists. This occurs at keys n and +// t in the example above. +// +// To check whether a key K is covered by a tombstone, the map is binary +// searched for the last key less than K. K is covered iff the map entry has a +// larger seqno than K. As an example, consider the key h @ 4. It would be +// compared against the map entry g → 3 and determined to be uncovered. By +// contrast, the key h @ 2 would be determined to be covered. +class CollapsedRangeDelMap : public RangeDelMap { + typedef std::map Rep; + + class Iterator : public RangeDelIterator { + void MaybeSeekPastSentinel() { + if (Valid() && iter_->second == 0) { + iter_++; + } + } + + const Rep& rep_; + Rep::const_iterator iter_; + + public: + Iterator(const Rep& rep) : rep_(rep), iter_(rep.begin()) {} + + bool Valid() const override { return iter_ != rep_.end(); } + + void Next() override { + iter_++; + MaybeSeekPastSentinel(); + } + + void Seek(const Slice& target) override { + iter_ = rep_.upper_bound(target); + if (iter_ != rep_.begin()) { + iter_--; + } + MaybeSeekPastSentinel(); + } + + RangeTombstone Tombstone() const override { + RangeTombstone tombstone; + tombstone.start_key_ = iter_->first; + tombstone.end_key_ = std::next(iter_)->first; + tombstone.seq_ = iter_->second; + return tombstone; + } + }; + + Rep rep_; + Rep::iterator iter_; + const Comparator* ucmp_; + + public: + CollapsedRangeDelMap(const Comparator* ucmp) : ucmp_(ucmp) { + InvalidatePosition(); + } + + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) { + if (iter_ == rep_.end() && + (mode == RangeDelPositioningMode::kForwardTraversal || + mode == RangeDelPositioningMode::kBackwardTraversal)) { + // invalid (e.g., if AddTombstones() changed the deletions), so need to + // reseek + mode = RangeDelPositioningMode::kBinarySearch; + } + switch (mode) { + case RangeDelPositioningMode::kFullScan: + assert(false); + case RangeDelPositioningMode::kForwardTraversal: + assert(iter_ != rep_.end()); + if (iter_ == rep_.begin() && + ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + // before start of deletion intervals + return false; + } + while (std::next(iter_) != rep_.end() && + ucmp_->Compare(std::next(iter_)->first, parsed.user_key) <= 0) { + ++iter_; + } + break; + case RangeDelPositioningMode::kBackwardTraversal: + assert(iter_ != rep_.end()); + while (iter_ != rep_.begin() && + ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + --iter_; + } + if (iter_ == rep_.begin() && + ucmp_->Compare(parsed.user_key, iter_->first) < 0) { + // before start of deletion intervals + return false; + } + break; + case RangeDelPositioningMode::kBinarySearch: + iter_ = rep_.upper_bound(parsed.user_key); + if (iter_ == rep_.begin()) { + // before start of deletion intervals + return false; + } + --iter_; + break; + } + assert(iter_ != rep_.end() && + ucmp_->Compare(iter_->first, parsed.user_key) <= 0); + assert(std::next(iter_) == rep_.end() || + ucmp_->Compare(parsed.user_key, std::next(iter_)->first) < 0); + return parsed.sequence < iter_->second; + } + + bool IsRangeOverlapped(const Slice&, const Slice&) { + // Unimplemented because the only client of this method, file ingestion, + // uses uncollapsed maps. + fprintf(stderr, "CollapsedRangeDelMap::IsRangeOverlapped unimplemented"); + abort(); + } + + void AddTombstone(RangeTombstone t) { + if (ucmp_->Compare(t.start_key_, t.end_key_) >= 0) { + // The tombstone covers no keys. Nothing to do. + return; + } + + auto it = rep_.upper_bound(t.start_key_); + auto prev_seq = [&]() { + return it == rep_.begin() ? 0 : std::prev(it)->second; + }; + + // end_seq stores the seqno of the last transition that the new tombstone + // covered. This is the seqno that we'll install if we need to insert a + // transition for the new tombstone's end key. + SequenceNumber end_seq = 0; + + // In the diagrams below, the new tombstone is always [c, k) @ 2. The + // existing tombstones are varied to depict different scenarios. Uppercase + // letters are used to indicate points that exist in the map, while + // lowercase letters are used to indicate points that do not exist in the + // map. The location of the iterator is marked with a caret; it may point + // off the end of the diagram to indicate that it is positioned at a + // entry with a larger key whose specific key is irrelevant. + + if (t.seq_ > prev_seq()) { + // The new tombstone's start point covers the existing tombstone: + // + // 3: 3: A--C 3: 3: + // 2: c--- OR 2: c--- OR 2: c--- OR 2: c------ + // 1: A--C 1: 1: A------ 1: C------ + // ^ ^ ^ ^ + // Insert a new transition at the new tombstone's start point, or raise + // the existing transition at that point to the new tombstone's seqno. + end_seq = prev_seq(); + rep_[t.start_key_] = t.seq_; // operator[] will overwrite existing entry + } else { + // The new tombstone's start point is covered by an existing tombstone: + // + // 3: A----- OR 3: C------ + // 2: c--- 2: c------ + // ^ ^ + // Do nothing. + } + + // Look at all the existing transitions that overlap the new tombstone. + while (it != rep_.end() && ucmp_->Compare(it->first, t.end_key_) < 0) { + if (t.seq_ > it->second) { + // The transition is to an existing tombstone that the new tombstone + // covers. Save the covered tombstone's seqno. We'll need to return to + // it if the new tombstone ends before the existing tombstone. + end_seq = it->second; + + if (t.seq_ == prev_seq()) { + // The previous transition is to the seqno of the new tombstone: + // + // 3: 3: 3: --F + // 2: C------ OR 2: C------ OR 2: F---- + // 1: F--- 1: ---F 1: H-- + // ^ ^ ^ + // + // Erase this transition. It's been superseded. + it = rep_.erase(it); + continue; // skip increment; erase positions iterator correctly + } else { + // The previous transition is to a tombstone that covers the new + // tombstone, but this transition is to a tombstone that is covered by + // the new tombstone. That is, this is the end of a run of existing + // tombstones that cover the new tombstone: + // + // 3: A---E OR 3: E-G + // 2: c---- 2: ------ + // ^ ^ + // Preserve this transition point, but raise it to the new tombstone's + // seqno. + it->second = t.seq_; + } + } else { + // The transition is to an existing tombstone that covers the new + // tombstone: + // + // 4: 4: --F + // 3: F-- OR 3: F-- + // 2: ----- 2: ----- + // ^ ^ + // Do nothing. + } + ++it; + } + + if (t.seq_ == prev_seq()) { + // The new tombstone is unterminated in the map: + // + // 3: OR 3: --G OR 3: --G K-- + // 2: C-------k 2: G---k 2: G---k + // ^ ^ ^ + // End it now, returning to the last seqno we covered. Because end keys + // are exclusive, if there's an existing transition at t.end_key_, it + // takes precedence over the transition that we install here. + rep_.emplace(t.end_key_, end_seq); // emplace is a noop if existing entry + } else { + // The new tombstone is implicitly ended because its end point is covered + // by an existing tombstone with a higher seqno. + // + // 3: I---M OR 3: A-----------M + // 2: ----k 2: c-------k + // ^ ^ + // Do nothing. + } + } + + size_t Size() const { return rep_.size() - 1; } + + void InvalidatePosition() { iter_ = rep_.end(); } + + std::unique_ptr NewIterator() { + return std::unique_ptr(new Iterator(this->rep_)); + } +}; + RangeDelAggregator::RangeDelAggregator( const InternalKeyComparator& icmp, const std::vector& snapshots, @@ -30,21 +371,25 @@ void RangeDelAggregator::InitRep(const std::vector& snapshots) { assert(rep_ == nullptr); rep_.reset(new Rep()); for (auto snapshot : snapshots) { - rep_->stripe_map_.emplace( - snapshot, - PositionalTombstoneMap(TombstoneMap( - stl_wrappers::LessOfComparator(icmp_.user_comparator())))); + rep_->stripe_map_.emplace(snapshot, NewRangeDelMap()); } // Data newer than any snapshot falls in this catch-all stripe - rep_->stripe_map_.emplace( - kMaxSequenceNumber, - PositionalTombstoneMap(TombstoneMap( - stl_wrappers::LessOfComparator(icmp_.user_comparator())))); + rep_->stripe_map_.emplace(kMaxSequenceNumber, NewRangeDelMap()); rep_->pinned_iters_mgr_.StartPinning(); } -bool RangeDelAggregator::ShouldDeleteImpl( - const Slice& internal_key, RangeDelAggregator::RangePositioningMode mode) { +std::unique_ptr RangeDelAggregator::NewRangeDelMap() { + RangeDelMap* tombstone_map; + if (collapse_deletions_) { + tombstone_map = new CollapsedRangeDelMap(icmp_.user_comparator()); + } else { + tombstone_map = new UncollapsedRangeDelMap(icmp_.user_comparator()); + } + return std::unique_ptr(tombstone_map); +} + +bool RangeDelAggregator::ShouldDeleteImpl(const Slice& internal_key, + RangeDelPositioningMode mode) { assert(rep_ != nullptr); ParsedInternalKey parsed; if (!ParseInternalKey(internal_key, &parsed)) { @@ -53,136 +398,29 @@ bool RangeDelAggregator::ShouldDeleteImpl( return ShouldDelete(parsed, mode); } -bool RangeDelAggregator::ShouldDeleteImpl( - const ParsedInternalKey& parsed, - RangeDelAggregator::RangePositioningMode mode) { +bool RangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) { assert(IsValueType(parsed.type)); assert(rep_ != nullptr); - auto& positional_tombstone_map = GetPositionalTombstoneMap(parsed.sequence); - const auto& tombstone_map = positional_tombstone_map.raw_map; - if (tombstone_map.empty()) { + auto& tombstone_map = GetRangeDelMap(parsed.sequence); + if (tombstone_map.IsEmpty()) { return false; } - auto& tombstone_map_iter = positional_tombstone_map.iter; - if (tombstone_map_iter == tombstone_map.end() && - (mode == kForwardTraversal || mode == kBackwardTraversal)) { - // invalid (e.g., if AddTombstones() changed the deletions), so need to - // reseek - mode = kBinarySearch; - } - switch (mode) { - case kFullScan: - assert(!collapse_deletions_); - // The maintained state (PositionalTombstoneMap::iter) isn't useful when - // we linear scan from the beginning each time, but we maintain it anyways - // for consistency. - tombstone_map_iter = tombstone_map.begin(); - while (tombstone_map_iter != tombstone_map.end()) { - const auto& tombstone = tombstone_map_iter->second; - if (icmp_.user_comparator()->Compare(parsed.user_key, - tombstone.start_key_) < 0) { - break; - } - if (parsed.sequence < tombstone.seq_ && - icmp_.user_comparator()->Compare(parsed.user_key, - tombstone.end_key_) < 0) { - return true; - } - ++tombstone_map_iter; - } - return false; - case kForwardTraversal: - assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); - if (tombstone_map_iter == tombstone_map.begin() && - icmp_.user_comparator()->Compare(parsed.user_key, - tombstone_map_iter->first) < 0) { - // before start of deletion intervals - return false; - } - while (std::next(tombstone_map_iter) != tombstone_map.end() && - icmp_.user_comparator()->Compare( - std::next(tombstone_map_iter)->first, parsed.user_key) <= 0) { - ++tombstone_map_iter; - } - break; - case kBackwardTraversal: - assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); - while (tombstone_map_iter != tombstone_map.begin() && - icmp_.user_comparator()->Compare(parsed.user_key, - tombstone_map_iter->first) < 0) { - --tombstone_map_iter; - } - if (tombstone_map_iter == tombstone_map.begin() && - icmp_.user_comparator()->Compare(parsed.user_key, - tombstone_map_iter->first) < 0) { - // before start of deletion intervals - return false; - } - break; - case kBinarySearch: - assert(collapse_deletions_); - tombstone_map_iter = - tombstone_map.upper_bound(parsed.user_key); - if (tombstone_map_iter == tombstone_map.begin()) { - // before start of deletion intervals - return false; - } - --tombstone_map_iter; - break; - } - assert(mode != kFullScan); - assert(tombstone_map_iter != tombstone_map.end() && - icmp_.user_comparator()->Compare(tombstone_map_iter->first, - parsed.user_key) <= 0); - assert(std::next(tombstone_map_iter) == tombstone_map.end() || - icmp_.user_comparator()->Compare( - parsed.user_key, std::next(tombstone_map_iter)->first) < 0); - return parsed.sequence < tombstone_map_iter->second.seq_; + return tombstone_map.ShouldDelete(parsed, mode); } bool RangeDelAggregator::IsRangeOverlapped(const Slice& start, const Slice& end) { - // so far only implemented for non-collapsed mode since file ingestion (only - // client) doesn't use collapsing + // Unimplemented because the only client of this method, file ingestion, + // uses uncollapsed maps. assert(!collapse_deletions_); if (rep_ == nullptr) { return false; } - for (const auto& seqnum_and_tombstone_map : rep_->stripe_map_) { - for (const auto& start_key_and_tombstone : - seqnum_and_tombstone_map.second.raw_map) { - const auto& tombstone = start_key_and_tombstone.second; - if (icmp_.user_comparator()->Compare(start, tombstone.end_key_) < 0 && - icmp_.user_comparator()->Compare(tombstone.start_key_, end) <= 0 && - icmp_.user_comparator()->Compare(tombstone.start_key_, - tombstone.end_key_) < 0) { - return true; - } - } - } - return false; -} - -bool RangeDelAggregator::ShouldAddTombstones( - bool bottommost_level /* = false */) { - // TODO(andrewkr): can we just open a file and throw it away if it ends up - // empty after AddToBuilder()? This function doesn't take into subcompaction - // boundaries so isn't completely accurate. - if (rep_ == nullptr) { - return false; - } - auto stripe_map_iter = rep_->stripe_map_.begin(); - assert(stripe_map_iter != rep_->stripe_map_.end()); - if (bottommost_level) { - // For the bottommost level, keys covered by tombstones in the first - // (oldest) stripe have been compacted away, so the tombstones are obsolete. - ++stripe_map_iter; - } - while (stripe_map_iter != rep_->stripe_map_.end()) { - if (!stripe_map_iter->second.raw_map.empty()) { + for (const auto& stripe : rep_->stripe_map_) { + if (stripe.second->IsRangeOverlapped(start, end)) { return true; } - ++stripe_map_iter; } return false; } @@ -203,7 +441,7 @@ Status RangeDelAggregator::AddTombstones( if (rep_ == nullptr) { InitRep({upper_bound_}); } else { - InvalidateTombstoneMapPositions(); + InvalidateRangeDelMapPositions(); } first_iter = false; } @@ -212,7 +450,7 @@ Status RangeDelAggregator::AddTombstones( return Status::Corruption("Unable to parse range tombstone InternalKey"); } RangeTombstone tombstone(parsed_key, input->value()); - AddTombstone(std::move(tombstone)); + GetRangeDelMap(tombstone.seq_).AddTombstone(std::move(tombstone)); input->Next(); } if (!first_iter) { @@ -221,173 +459,16 @@ Status RangeDelAggregator::AddTombstones( return Status::OK(); } -void RangeDelAggregator::InvalidateTombstoneMapPositions() { +void RangeDelAggregator::InvalidateRangeDelMapPositions() { if (rep_ == nullptr) { return; } - for (auto stripe_map_iter = rep_->stripe_map_.begin(); - stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { - stripe_map_iter->second.iter = stripe_map_iter->second.raw_map.end(); - } -} - -Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) { - auto& positional_tombstone_map = GetPositionalTombstoneMap(tombstone.seq_); - auto& tombstone_map = positional_tombstone_map.raw_map; - if (collapse_deletions_) { - // In collapsed mode, we only fill the seq_ field in the TombstoneMap's - // values. The end_key is unneeded because we assume the tombstone extends - // until the next tombstone starts. For gaps between real tombstones and - // for the last real tombstone, we denote end keys by inserting fake - // tombstones with sequence number zero. - std::vector new_range_dels{ - tombstone, RangeTombstone(tombstone.end_key_, Slice(), 0)}; - auto new_range_dels_iter = new_range_dels.begin(); - // Position at the first overlapping existing tombstone; if none exists, - // insert until we find an existing one overlapping a new point - const Slice* tombstone_map_begin = nullptr; - if (!tombstone_map.empty()) { - tombstone_map_begin = &tombstone_map.begin()->first; - } - auto last_range_dels_iter = new_range_dels_iter; - while (new_range_dels_iter != new_range_dels.end() && - (tombstone_map_begin == nullptr || - icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, - *tombstone_map_begin) < 0)) { - tombstone_map.emplace( - new_range_dels_iter->start_key_, - RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); - last_range_dels_iter = new_range_dels_iter; - ++new_range_dels_iter; - } - if (new_range_dels_iter == new_range_dels.end()) { - return Status::OK(); - } - // above loop advances one too far - new_range_dels_iter = last_range_dels_iter; - auto tombstone_map_iter = - tombstone_map.upper_bound(new_range_dels_iter->start_key_); - // if nothing overlapped we would've already inserted all the new points - // and returned early - assert(tombstone_map_iter != tombstone_map.begin()); - tombstone_map_iter--; - - // untermed_seq is non-kMaxSequenceNumber when we covered an existing point - // but haven't seen its corresponding endpoint. It's used for (1) deciding - // whether to forcibly insert the new interval's endpoint; and (2) possibly - // raising the seqnum for the to-be-inserted element (we insert the max - // seqnum between the next new interval and the unterminated interval). - SequenceNumber untermed_seq = kMaxSequenceNumber; - while (tombstone_map_iter != tombstone_map.end() && - new_range_dels_iter != new_range_dels.end()) { - const Slice *tombstone_map_iter_end = nullptr, - *new_range_dels_iter_end = nullptr; - if (tombstone_map_iter != tombstone_map.end()) { - auto next_tombstone_map_iter = std::next(tombstone_map_iter); - if (next_tombstone_map_iter != tombstone_map.end()) { - tombstone_map_iter_end = &next_tombstone_map_iter->first; - } - } - if (new_range_dels_iter != new_range_dels.end()) { - auto next_new_range_dels_iter = std::next(new_range_dels_iter); - if (next_new_range_dels_iter != new_range_dels.end()) { - new_range_dels_iter_end = &next_new_range_dels_iter->start_key_; - } - } - - // our positions in existing/new tombstone collections should always - // overlap. The non-overlapping cases are handled above and below this - // loop. - assert(new_range_dels_iter_end == nullptr || - icmp_.user_comparator()->Compare(tombstone_map_iter->first, - *new_range_dels_iter_end) < 0); - assert(tombstone_map_iter_end == nullptr || - icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, - *tombstone_map_iter_end) < 0); - - int new_to_old_start_cmp = icmp_.user_comparator()->Compare( - new_range_dels_iter->start_key_, tombstone_map_iter->first); - // nullptr end means extends infinitely rightwards, set new_to_old_end_cmp - // accordingly so we can use common code paths later. - int new_to_old_end_cmp; - if (new_range_dels_iter_end == nullptr && - tombstone_map_iter_end == nullptr) { - new_to_old_end_cmp = 0; - } else if (new_range_dels_iter_end == nullptr) { - new_to_old_end_cmp = 1; - } else if (tombstone_map_iter_end == nullptr) { - new_to_old_end_cmp = -1; - } else { - new_to_old_end_cmp = icmp_.user_comparator()->Compare( - *new_range_dels_iter_end, *tombstone_map_iter_end); - } - - if (new_to_old_start_cmp < 0) { - // the existing one's left endpoint comes after, so raise/delete it if - // it's covered. - if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { - untermed_seq = tombstone_map_iter->second.seq_; - if (tombstone_map_iter != tombstone_map.begin() && - std::prev(tombstone_map_iter)->second.seq_ == - new_range_dels_iter->seq_) { - tombstone_map_iter = tombstone_map.erase(tombstone_map_iter); - --tombstone_map_iter; - } else { - tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; - } - } - } else if (new_to_old_start_cmp > 0) { - if (untermed_seq != kMaxSequenceNumber || - tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { - auto seq = tombstone_map_iter->second.seq_; - // need to adjust this element if not intended to span beyond the new - // element (i.e., was_tombstone_map_iter_raised == true), or if it - // can be raised - tombstone_map_iter = tombstone_map.emplace( - new_range_dels_iter->start_key_, - RangeTombstone( - Slice(), Slice(), - std::max( - untermed_seq == kMaxSequenceNumber ? 0 : untermed_seq, - new_range_dels_iter->seq_))); - untermed_seq = seq; - } - } else { - // their left endpoints coincide, so raise the existing one if needed - if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { - untermed_seq = tombstone_map_iter->second.seq_; - tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; - } - } - - // advance whichever one ends earlier, or both if their right endpoints - // coincide - if (new_to_old_end_cmp < 0) { - ++new_range_dels_iter; - } else if (new_to_old_end_cmp > 0) { - ++tombstone_map_iter; - untermed_seq = kMaxSequenceNumber; - } else { - ++new_range_dels_iter; - ++tombstone_map_iter; - untermed_seq = kMaxSequenceNumber; - } - } - while (new_range_dels_iter != new_range_dels.end()) { - tombstone_map.emplace( - new_range_dels_iter->start_key_, - RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); - ++new_range_dels_iter; - } - } else { - auto start_key = tombstone.start_key_; - tombstone_map.emplace(start_key, std::move(tombstone)); + for (auto& stripe : rep_->stripe_map_) { + stripe.second->InvalidatePosition(); } - return Status::OK(); } -RangeDelAggregator::PositionalTombstoneMap& -RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) { +RangeDelMap& RangeDelAggregator::GetRangeDelMap(SequenceNumber seq) { assert(rep_ != nullptr); // The stripe includes seqnum for the snapshot above and excludes seqnum for // the snapshot below. @@ -400,140 +481,15 @@ RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) { } // catch-all stripe justifies this assertion in either of above cases assert(iter != rep_->stripe_map_.end()); - return iter->second; -} - -// TODO(andrewkr): We should implement an iterator over range tombstones in our -// map. It'd enable compaction to open tables on-demand, i.e., only once range -// tombstones are known to be available, without the code duplication we have -// in ShouldAddTombstones(). It'll also allow us to move the table-modifying -// code into more coherent places: CompactionJob and BuildTable(). -void RangeDelAggregator::AddToBuilder( - TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound, - FileMetaData* meta, - CompactionIterationStats* range_del_out_stats /* = nullptr */, - bool bottommost_level /* = false */) { - if (rep_ == nullptr) { - return; - } - auto stripe_map_iter = rep_->stripe_map_.begin(); - assert(stripe_map_iter != rep_->stripe_map_.end()); - if (bottommost_level) { - // TODO(andrewkr): these are counted for each compaction output file, so - // lots of double-counting. - if (!stripe_map_iter->second.raw_map.empty()) { - range_del_out_stats->num_range_del_drop_obsolete += - static_cast(stripe_map_iter->second.raw_map.size()) - - (collapse_deletions_ ? 1 : 0); - range_del_out_stats->num_record_drop_obsolete += - static_cast(stripe_map_iter->second.raw_map.size()) - - (collapse_deletions_ ? 1 : 0); - } - // For the bottommost level, keys covered by tombstones in the first - // (oldest) stripe have been compacted away, so the tombstones are obsolete. - ++stripe_map_iter; - } - - // Note the order in which tombstones are stored is insignificant since we - // insert them into a std::map on the read path. - while (stripe_map_iter != rep_->stripe_map_.end()) { - bool first_added = false; - for (auto tombstone_map_iter = stripe_map_iter->second.raw_map.begin(); - tombstone_map_iter != stripe_map_iter->second.raw_map.end(); - ++tombstone_map_iter) { - RangeTombstone tombstone; - if (collapse_deletions_) { - auto next_tombstone_map_iter = std::next(tombstone_map_iter); - if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() || - tombstone_map_iter->second.seq_ == 0) { - // it's a sentinel tombstone - continue; - } - tombstone.start_key_ = tombstone_map_iter->first; - tombstone.end_key_ = next_tombstone_map_iter->first; - tombstone.seq_ = tombstone_map_iter->second.seq_; - } else { - tombstone = tombstone_map_iter->second; - } - if (upper_bound != nullptr && - icmp_.user_comparator()->Compare(*upper_bound, - tombstone.start_key_) <= 0) { - // Tombstones starting at upper_bound or later only need to be included - // in the next table. Break because subsequent tombstones will start - // even later. - break; - } - if (lower_bound != nullptr && - icmp_.user_comparator()->Compare(tombstone.end_key_, - *lower_bound) <= 0) { - // Tombstones ending before or at lower_bound only need to be included - // in the prev table. Continue because subsequent tombstones may still - // overlap [lower_bound, upper_bound). - continue; - } - - auto ikey_and_end_key = tombstone.Serialize(); - builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); - if (!first_added) { - first_added = true; - InternalKey smallest_candidate = std::move(ikey_and_end_key.first); - if (lower_bound != nullptr && - icmp_.user_comparator()->Compare(smallest_candidate.user_key(), - *lower_bound) <= 0) { - // Pretend the smallest key has the same user key as lower_bound - // (the max key in the previous table or subcompaction) in order for - // files to appear key-space partitioned. - // - // Choose lowest seqnum so this file's smallest internal key comes - // after the previous file's/subcompaction's largest. The fake seqnum - // is OK because the read path's file-picking code only considers user - // key. - smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); - } - if (meta->smallest.size() == 0 || - icmp_.Compare(smallest_candidate, meta->smallest) < 0) { - meta->smallest = std::move(smallest_candidate); - } - } - InternalKey largest_candidate = tombstone.SerializeEndKey(); - if (upper_bound != nullptr && - icmp_.user_comparator()->Compare(*upper_bound, - largest_candidate.user_key()) <= 0) { - // Pretend the largest key has the same user key as upper_bound (the - // min key in the following table or subcompaction) in order for files - // to appear key-space partitioned. - // - // Choose highest seqnum so this file's largest internal key comes - // before the next file's/subcompaction's smallest. The fake seqnum is - // OK because the read path's file-picking code only considers the user - // key portion. - // - // Note Seek() also creates InternalKey with (user_key, - // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of - // kTypeRangeDeletion (0xF), so the range tombstone comes before the - // Seek() key in InternalKey's ordering. So Seek() will look in the - // next file for the user key. - largest_candidate = InternalKey(*upper_bound, kMaxSequenceNumber, - kTypeRangeDeletion); - } - if (meta->largest.size() == 0 || - icmp_.Compare(meta->largest, largest_candidate) < 0) { - meta->largest = std::move(largest_candidate); - } - meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_); - meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_); - } - ++stripe_map_iter; - } + return *iter->second; } bool RangeDelAggregator::IsEmpty() { if (rep_ == nullptr) { return true; } - for (auto stripe_map_iter = rep_->stripe_map_.begin(); - stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { - if (!stripe_map_iter->second.raw_map.empty()) { + for (const auto& stripe : rep_->stripe_map_) { + if (!stripe.second->IsEmpty()) { return false; } } @@ -547,4 +503,75 @@ bool RangeDelAggregator::AddFile(uint64_t file_number) { return rep_->added_files_.emplace(file_number).second; } +class MergingRangeDelIter : public RangeDelIterator { + public: + MergingRangeDelIter(const Comparator* c) + : heap_(IterMinHeap(IterComparator(c))), current_(nullptr) {} + + void AddIterator(std::unique_ptr iter) { + if (iter->Valid()) { + heap_.push(iter.get()); + iters_.push_back(std::move(iter)); + current_ = heap_.top(); + } + } + + bool Valid() const override { return current_ != nullptr; } + + void Next() override { + current_->Next(); + if (current_->Valid()) { + heap_.replace_top(current_); + } else { + heap_.pop(); + } + current_ = heap_.empty() ? nullptr : heap_.top(); + } + + void Seek(const Slice& target) override { + heap_.clear(); + for (auto& iter : iters_) { + iter->Seek(target); + if (iter->Valid()) { + heap_.push(iter.get()); + } + } + current_ = heap_.empty() ? nullptr : heap_.top(); + } + + RangeTombstone Tombstone() const override { return current_->Tombstone(); } + + private: + struct IterComparator { + IterComparator(const Comparator* c) : cmp(c) {} + + bool operator()(const RangeDelIterator* a, + const RangeDelIterator* b) const { + // Note: counterintuitively, returning the tombstone with the larger start + // key puts the tombstone with the smallest key at the top of the heap. + return cmp->Compare(a->Tombstone().start_key_, + b->Tombstone().start_key_) > 0; + } + + const Comparator* cmp; + }; + + typedef BinaryHeap IterMinHeap; + + std::vector> iters_; + IterMinHeap heap_; + RangeDelIterator* current_; +}; + +std::unique_ptr RangeDelAggregator::NewIterator() { + std::unique_ptr iter( + new MergingRangeDelIter(icmp_.user_comparator())); + if (rep_ != nullptr) { + for (const auto& stripe : rep_->stripe_map_) { + iter->AddIterator(stripe.second->NewIterator()); + } + } + return std::move(iter); +} + } // namespace rocksdb diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 99ac62f65..5ba858143 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -23,6 +23,54 @@ namespace rocksdb { +// RangeDelMaps maintain position across calls to ShouldDelete. The caller may +// wish to specify a mode to optimize positioning the iterator during the next +// call to ShouldDelete. The non-kFullScan modes are only available when +// deletion collapsing is enabled. +// +// For example, if we invoke Next() on an iterator, kForwardTraversal should be +// specified to advance one-by-one through deletions until one is found with its +// interval containing the key. This will typically be faster than doing a full +// binary search (kBinarySearch). +enum class RangeDelPositioningMode { + kFullScan, // used iff collapse_deletions_ == false + kForwardTraversal, + kBackwardTraversal, + kBinarySearch, +}; + +// A RangeDelIterator iterates over range deletion tombstones. +class RangeDelIterator { + public: + virtual ~RangeDelIterator() = default; + + virtual bool Valid() const = 0; + virtual void Next() = 0; + virtual void Seek(const Slice& target) = 0; + virtual RangeTombstone Tombstone() const = 0; +}; + +// A RangeDelMap keeps track of range deletion tombstones within a snapshot +// stripe. +// +// RangeDelMaps are used internally by RangeDelAggregator. They are not intended +// to be used directly. +class RangeDelMap { + public: + virtual ~RangeDelMap() = default; + + virtual bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) = 0; + virtual bool IsRangeOverlapped(const Slice& start, const Slice& end) = 0; + virtual void InvalidatePosition() = 0; + + virtual size_t Size() const = 0; + bool IsEmpty() const { return Size() == 0; } + + virtual void AddTombstone(RangeTombstone tombstone) = 0; + virtual std::unique_ptr NewIterator() = 0; +}; + // A RangeDelAggregator aggregates range deletion tombstones as they are // encountered in memtables/SST files. It provides methods that check whether a // key is covered by range tombstones or write the relevant tombstones to a new @@ -53,45 +101,31 @@ class RangeDelAggregator { SequenceNumber upper_bound, bool collapse_deletions = false); - // We maintain position in the tombstone map across calls to ShouldDelete. The - // caller may wish to specify a mode to optimize positioning the iterator - // during the next call to ShouldDelete. The non-kFullScan modes are only - // available when deletion collapsing is enabled. - // - // For example, if we invoke Next() on an iterator, kForwardTraversal should - // be specified to advance one-by-one through deletions until one is found - // with its interval containing the key. This will typically be faster than - // doing a full binary search (kBinarySearch). - enum RangePositioningMode { - kFullScan, // used iff collapse_deletions_ == false - kForwardTraversal, - kBackwardTraversal, - kBinarySearch, - }; - // Returns whether the key should be deleted, which is the case when it is // covered by a range tombstone residing in the same snapshot stripe. // @param mode If collapse_deletions_ is true, this dictates how we will find // the deletion whose interval contains this key. Otherwise, its // value must be kFullScan indicating linear scan from beginning.. - bool ShouldDelete(const ParsedInternalKey& parsed, - RangePositioningMode mode = kFullScan) { + bool ShouldDelete( + const ParsedInternalKey& parsed, + RangeDelPositioningMode mode = RangeDelPositioningMode::kFullScan) { if (rep_ == nullptr) { return false; } return ShouldDeleteImpl(parsed, mode); } - bool ShouldDelete(const Slice& internal_key, - RangePositioningMode mode = kFullScan) { + bool ShouldDelete( + const Slice& internal_key, + RangeDelPositioningMode mode = RangeDelPositioningMode::kFullScan) { if (rep_ == nullptr) { return false; } return ShouldDeleteImpl(internal_key, mode); } bool ShouldDeleteImpl(const ParsedInternalKey& parsed, - RangePositioningMode mode = kFullScan); + RangeDelPositioningMode mode); bool ShouldDeleteImpl(const Slice& internal_key, - RangePositioningMode mode = kFullScan); + RangeDelPositioningMode mode); // Checks whether range deletions cover any keys between `start` and `end`, // inclusive. @@ -102,8 +136,6 @@ class RangeDelAggregator { // `end` causes this to return true. bool IsRangeOverlapped(const Slice& start, const Slice& end); - bool ShouldAddTombstones(bool bottommost_level = false); - // Adds tombstones to the tombstone aggregation structure maintained by this // object. // @return non-OK status if any of the tombstone keys are corrupted. @@ -114,56 +146,23 @@ class RangeDelAggregator { // if it's an iterator that just seeked to an arbitrary position. The effect // of invalidation is that the following call to ShouldDelete() will binary // search for its tombstone. - void InvalidateTombstoneMapPositions(); - - // Writes tombstones covering a range to a table builder. - // @param extend_before_min_key If true, the range of tombstones to be added - // to the TableBuilder starts from the beginning of the key-range; - // otherwise, it starts from meta->smallest. - // @param lower_bound/upper_bound Any range deletion with [start_key, end_key) - // that overlaps the target range [*lower_bound, *upper_bound) is added to - // the builder. If lower_bound is nullptr, the target range extends - // infinitely to the left. If upper_bound is nullptr, the target range - // extends infinitely to the right. If both are nullptr, the target range - // extends infinitely in both directions, i.e., all range deletions are - // added to the builder. - // @param meta The file's metadata. We modify the begin and end keys according - // to the range tombstones added to this file such that the read path does - // not miss range tombstones that cover gaps before/after/between files in - // a level. lower_bound/upper_bound above constrain how far file boundaries - // can be extended. - // @param bottommost_level If true, we will filter out any tombstones - // belonging to the oldest snapshot stripe, because all keys potentially - // covered by this tombstone are guaranteed to have been deleted by - // compaction. - void AddToBuilder(TableBuilder* builder, const Slice* lower_bound, - const Slice* upper_bound, FileMetaData* meta, - CompactionIterationStats* range_del_out_stats = nullptr, - bool bottommost_level = false); + void InvalidateRangeDelMapPositions(); + bool IsEmpty(); bool AddFile(uint64_t file_number); - private: - // Maps tombstone user start key -> tombstone object - typedef std::multimap - TombstoneMap; - // Also maintains position in TombstoneMap last seen by ShouldDelete(). The - // end iterator indicates invalidation (e.g., if AddTombstones() changes the - // underlying map). End iterator cannot be invalidated. - struct PositionalTombstoneMap { - explicit PositionalTombstoneMap(TombstoneMap _raw_map) - : raw_map(std::move(_raw_map)), iter(raw_map.end()) {} - PositionalTombstoneMap(const PositionalTombstoneMap&) = delete; - PositionalTombstoneMap(PositionalTombstoneMap&& other) - : raw_map(std::move(other.raw_map)), iter(raw_map.end()) {} - - TombstoneMap raw_map; - TombstoneMap::const_iterator iter; - }; + // Create a new iterator over the range deletion tombstones in all of the + // snapshot stripes in this aggregator. Tombstones are presented in start key + // order. Tombstones with the same start key are presented in arbitrary order. + // + // The iterator is invalidated after any call to AddTombstones. It is the + // caller's responsibility to avoid using invalid iterators. + std::unique_ptr NewIterator(); + private: // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e., // their seqnums are greater than the next smaller snapshot's seqnum. - typedef std::map StripeMap; + typedef std::map> StripeMap; struct Rep { StripeMap stripe_map_; @@ -175,8 +174,8 @@ class RangeDelAggregator { // once the first range deletion is encountered. void InitRep(const std::vector& snapshots); - PositionalTombstoneMap& GetPositionalTombstoneMap(SequenceNumber seq); - Status AddTombstone(RangeTombstone tombstone); + std::unique_ptr NewRangeDelMap(); + RangeDelMap& GetRangeDelMap(SequenceNumber seq); SequenceNumber upper_bound_; std::unique_ptr rep_; diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc index 5896a5638..85db17b16 100644 --- a/db/range_del_aggregator_test.cc +++ b/db/range_del_aggregator_test.cc @@ -26,45 +26,10 @@ enum Direction { kReverse, }; -void VerifyRangeDels(const std::vector& range_dels, - const std::vector& expected_points) { - auto icmp = InternalKeyComparator(BytewiseComparator()); - // Test same result regardless of which order the range deletions are added. - for (Direction dir : {kForward, kReverse}) { - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true); - std::vector keys, values; - for (const auto& range_del : range_dels) { - auto key_and_value = range_del.Serialize(); - keys.push_back(key_and_value.first.Encode().ToString()); - values.push_back(key_and_value.second.ToString()); - } - if (dir == kReverse) { - std::reverse(keys.begin(), keys.end()); - std::reverse(values.begin(), values.end()); - } - std::unique_ptr range_del_iter( - new test::VectorIterator(keys, values)); - range_del_agg.AddTombstones(std::move(range_del_iter)); - - for (const auto expected_point : expected_points) { - ParsedInternalKey parsed_key; - parsed_key.user_key = expected_point.begin; - parsed_key.sequence = expected_point.seq; - parsed_key.type = kTypeValue; - ASSERT_FALSE(range_del_agg.ShouldDelete( - parsed_key, - RangeDelAggregator::RangePositioningMode::kForwardTraversal)); - if (parsed_key.sequence > 0) { - --parsed_key.sequence; - ASSERT_TRUE(range_del_agg.ShouldDelete( - parsed_key, - RangeDelAggregator::RangePositioningMode::kForwardTraversal)); - } - } - } +static auto icmp = InternalKeyComparator(BytewiseComparator()); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, - false /* collapse_deletions */); +void AddTombstones(RangeDelAggregator* range_del_agg, + const std::vector& range_dels) { std::vector keys, values; for (const auto& range_del : range_dels) { auto key_and_value = range_del.Serialize(); @@ -73,7 +38,78 @@ void VerifyRangeDels(const std::vector& range_dels, } std::unique_ptr range_del_iter( new test::VectorIterator(keys, values)); - range_del_agg.AddTombstones(std::move(range_del_iter)); + range_del_agg->AddTombstones(std::move(range_del_iter)); +} + +void VerifyTombstonesEq(const RangeTombstone& a, const RangeTombstone& b) { + ASSERT_EQ(a.seq_, b.seq_); + ASSERT_EQ(a.start_key_, b.start_key_); + ASSERT_EQ(a.end_key_, b.end_key_); +} + +void VerifyRangeDelIter( + RangeDelIterator* range_del_iter, + const std::vector& expected_range_dels) { + size_t i = 0; + for (; range_del_iter->Valid() && i < expected_range_dels.size(); + range_del_iter->Next(), i++) { + VerifyTombstonesEq(expected_range_dels[i], range_del_iter->Tombstone()); + } + ASSERT_EQ(expected_range_dels.size(), i); + ASSERT_FALSE(range_del_iter->Valid()); +} + +void VerifyRangeDels( + const std::vector& range_dels_in, + const std::vector& expected_points, + const std::vector& expected_collapsed_range_dels) { + // Test same result regardless of which order the range deletions are added + // and regardless of collapsed mode. + for (bool collapsed : {false, true}) { + for (Direction dir : {kForward, kReverse}) { + RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, collapsed); + + std::vector range_dels = range_dels_in; + if (dir == kReverse) { + std::reverse(range_dels.begin(), range_dels.end()); + } + AddTombstones(&range_del_agg, range_dels); + + auto mode = RangeDelPositioningMode::kFullScan; + if (collapsed) { + mode = RangeDelPositioningMode::kForwardTraversal; + } + + for (const auto expected_point : expected_points) { + ParsedInternalKey parsed_key; + parsed_key.user_key = expected_point.begin; + parsed_key.sequence = expected_point.seq; + parsed_key.type = kTypeValue; + ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key, mode)); + if (parsed_key.sequence > 0) { + --parsed_key.sequence; + ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key, mode)); + } + } + + if (collapsed) { + range_dels = expected_collapsed_range_dels; + } else { + // Tombstones in an uncollapsed map are presented in start key order. + // Tombstones with the same start key are presented in insertion order. + std::stable_sort(range_dels.begin(), range_dels.end(), + [&](const RangeTombstone& a, const RangeTombstone& b) { + return icmp.user_comparator()->Compare( + a.start_key_, b.start_key_) < 0; + }); + } + VerifyRangeDelIter(range_del_agg.NewIterator().get(), range_dels); + } + } + + RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, + false /* collapse_deletions */); + AddTombstones(&range_del_agg, range_dels_in); for (size_t i = 1; i < expected_points.size(); ++i) { bool overlapped = range_del_agg.IsRangeOverlapped( expected_points[i - 1].begin, expected_points[i].begin); @@ -87,60 +123,69 @@ void VerifyRangeDels(const std::vector& range_dels, } // anonymous namespace -TEST_F(RangeDelAggregatorTest, Empty) { VerifyRangeDels({}, {{"a", 0}}); } +TEST_F(RangeDelAggregatorTest, Empty) { VerifyRangeDels({}, {{"a", 0}}, {}); } TEST_F(RangeDelAggregatorTest, SameStartAndEnd) { - VerifyRangeDels({{"a", "a", 5}}, {{" ", 0}, {"a", 0}, {"b", 0}}); + VerifyRangeDels({{"a", "a", 5}}, {{" ", 0}, {"a", 0}, {"b", 0}}, {}); } TEST_F(RangeDelAggregatorTest, Single) { - VerifyRangeDels({{"a", "b", 10}}, {{" ", 0}, {"a", 10}, {"b", 0}}); + VerifyRangeDels({{"a", "b", 10}}, {{" ", 0}, {"a", 10}, {"b", 0}}, + {{"a", "b", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveLeft) { VerifyRangeDels({{"a", "c", 10}, {"b", "d", 5}}, - {{" ", 0}, {"a", 10}, {"c", 5}, {"d", 0}}); + {{" ", 0}, {"a", 10}, {"c", 5}, {"d", 0}}, + {{"a", "c", 10}, {"c", "d", 5}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveRight) { VerifyRangeDels({{"a", "c", 5}, {"b", "d", 10}}, - {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}); + {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}, + {{"a", "b", 5}, {"b", "d", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapAboveMiddle) { VerifyRangeDels({{"a", "d", 5}, {"b", "c", 10}}, - {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 5}, {"d", 0}}); + {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 5}, {"d", 0}}, + {{"a", "b", 5}, {"b", "c", 10}, {"c", "d", 5}}); } TEST_F(RangeDelAggregatorTest, OverlapFully) { VerifyRangeDels({{"a", "d", 10}, {"b", "c", 5}}, - {{" ", 0}, {"a", 10}, {"d", 0}}); + {{" ", 0}, {"a", 10}, {"d", 0}}, {{"a", "d", 10}}); } TEST_F(RangeDelAggregatorTest, OverlapPoint) { VerifyRangeDels({{"a", "b", 5}, {"b", "c", 10}}, - {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 0}}); + {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 0}}, + {{"a", "b", 5}, {"b", "c", 10}}); } TEST_F(RangeDelAggregatorTest, SameStartKey) { VerifyRangeDels({{"a", "c", 5}, {"a", "b", 10}}, - {{" ", 0}, {"a", 10}, {"b", 5}, {"c", 0}}); + {{" ", 0}, {"a", 10}, {"b", 5}, {"c", 0}}, + {{"a", "b", 10}, {"b", "c", 5}}); } TEST_F(RangeDelAggregatorTest, SameEndKey) { VerifyRangeDels({{"a", "d", 5}, {"b", "d", 10}}, - {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}); + {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}, + {{"a", "b", 5}, {"b", "d", 10}}); } TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { - VerifyRangeDels({{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, {{" ", 0}, - {"a", 5}, - {"b", 0}, - {"c", 10}, - {"d", 0}, - {"da", 0}, - {"e", 15}, - {"f", 0}}); + VerifyRangeDels({{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, + {{" ", 0}, + {"a", 5}, + {"b", 0}, + {"c", 10}, + {"d", 0}, + {"da", 0}, + {"e", 15}, + {"f", 0}}, + {{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}); } // Note the Cover* tests also test cases where tombstones are inserted under a @@ -148,31 +193,86 @@ TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { TEST_F(RangeDelAggregatorTest, CoverMultipleFromLeft) { VerifyRangeDels( {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "f", 20}}, - {{" ", 0}, {"a", 20}, {"f", 15}, {"g", 0}}); + {{" ", 0}, {"a", 20}, {"f", 15}, {"g", 0}}, + {{"a", "f", 20}, {"f", "g", 15}}); } TEST_F(RangeDelAggregatorTest, CoverMultipleFromRight) { VerifyRangeDels( {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"c", "h", 20}}, - {{" ", 0}, {"b", 5}, {"c", 20}, {"h", 0}}); + {{" ", 0}, {"b", 5}, {"c", 20}, {"h", 0}}, + {{"b", "c", 5}, {"c", "h", 20}}); } TEST_F(RangeDelAggregatorTest, CoverMultipleFully) { VerifyRangeDels( {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "h", 20}}, - {{" ", 0}, {"a", 20}, {"h", 0}}); + {{" ", 0}, {"a", 20}, {"h", 0}}, {{"a", "h", 20}}); } TEST_F(RangeDelAggregatorTest, AlternateMultipleAboveBelow) { VerifyRangeDels( {{"b", "d", 15}, {"c", "f", 10}, {"e", "g", 20}, {"a", "h", 5}}, - {{" ", 0}, - {"a", 5}, - {"b", 15}, - {"d", 10}, - {"e", 20}, - {"g", 5}, - {"h", 0}}); + {{" ", 0}, {"a", 5}, {"b", 15}, {"d", 10}, {"e", 20}, {"g", 5}, {"h", 0}}, + {{"a", "b", 5}, + {"b", "d", 15}, + {"d", "e", 10}, + {"e", "g", 20}, + {"g", "h", 5}}); +} + +TEST_F(RangeDelAggregatorTest, MergingIteratorAllEmptyStripes) { + for (bool collapsed : {true, false}) { + RangeDelAggregator range_del_agg(icmp, {1, 2}, collapsed); + VerifyRangeDelIter(range_del_agg.NewIterator().get(), {}); + } +} + +TEST_F(RangeDelAggregatorTest, MergingIteratorOverlappingStripes) { + for (bool collapsed : {true, false}) { + RangeDelAggregator range_del_agg(icmp, {5, 15, 25, 35}, collapsed); + AddTombstones( + &range_del_agg, + {{"d", "e", 10}, {"aa", "b", 20}, {"c", "d", 30}, {"a", "b", 10}}); + VerifyRangeDelIter( + range_del_agg.NewIterator().get(), + {{"a", "b", 10}, {"aa", "b", 20}, {"c", "d", 30}, {"d", "e", 10}}); + } +} + +TEST_F(RangeDelAggregatorTest, MergingIteratorSeek) { + RangeDelAggregator range_del_agg(icmp, {5, 15}, true /* collapsed */); + AddTombstones(&range_del_agg, {{"a", "c", 10}, + {"b", "c", 11}, + {"f", "g", 10}, + {"c", "d", 20}, + {"e", "f", 20}}); + auto it = range_del_agg.NewIterator(); + + // Verify seek positioning. + it->Seek(""); + VerifyTombstonesEq(it->Tombstone(), {"a", "b", 10}); + it->Seek("a"); + VerifyTombstonesEq(it->Tombstone(), {"a", "b", 10}); + it->Seek("aa"); + VerifyTombstonesEq(it->Tombstone(), {"a", "b", 10}); + it->Seek("b"); + VerifyTombstonesEq(it->Tombstone(), {"b", "c", 11}); + it->Seek("c"); + VerifyTombstonesEq(it->Tombstone(), {"c", "d", 20}); + it->Seek("dd"); + VerifyTombstonesEq(it->Tombstone(), {"e", "f", 20}); + it->Seek("f"); + VerifyTombstonesEq(it->Tombstone(), {"f", "g", 10}); + it->Seek("g"); + ASSERT_EQ(it->Valid(), false); + it->Seek("h"); + ASSERT_EQ(it->Valid(), false); + + // Verify iteration after seek. + it->Seek("c"); + VerifyRangeDelIter(it.get(), + {{"c", "d", 20}, {"e", "f", 20}, {"f", "g", 10}}); } } // namespace rocksdb diff --git a/db/version_edit.h b/db/version_edit.h index 5b858391e..5728827a2 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -131,6 +131,21 @@ struct FileMetaData { smallest_seqno = std::min(smallest_seqno, seqno); largest_seqno = std::max(largest_seqno, seqno); } + + // Unlike UpdateBoundaries, ranges do not need to be presented in any + // particular order. + void UpdateBoundariesForRange(const InternalKey& start, + const InternalKey& end, SequenceNumber seqno, + const InternalKeyComparator& icmp) { + if (smallest.size() == 0 || icmp.Compare(start, smallest) < 0) { + smallest = start; + } + if (largest.size() == 0 || icmp.Compare(largest, end) < 0) { + largest = end; + } + smallest_seqno = std::min(smallest_seqno, seqno); + largest_seqno = std::max(largest_seqno, seqno); + } }; // A compressed copy of file meta data that just contain minimum data needed