diff --git a/db/db_impl.cc b/db/db_impl.cc index a399fee39..3abb36092 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3970,7 +3970,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SuperVersion* sv = GetAndRefSuperVersion(cfd); // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; - RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); + RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); Status s; // First look in the memtable, then in the immutable memtable (if any). @@ -4079,7 +4079,7 @@ std::vector DBImpl::MultiGet( LookupKey lkey(keys[i], snapshot); auto cfh = reinterpret_cast(column_family[i]); RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), - {snapshot}); + snapshot); auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; @@ -6359,7 +6359,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool* found_record_for_key) { Status s; MergeContext merge_context; - RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), {}); + RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), + kMaxSequenceNumber); ReadOptions read_options; SequenceNumber current_seq = versions_->LastSequence(); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 4a01849c2..f185209c1 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -38,7 +38,7 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; - RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); + RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); LookupKey lkey(key, snapshot); if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, read_options)) { diff --git a/db/db_iter.cc b/db/db_iter.cc index 80dcd4106..1f49bb3d4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -123,7 +123,7 @@ class DBIter: public Iterator { prefix_same_as_start_(prefix_same_as_start), pin_thru_lifetime_(pin_data), total_order_seek_(total_order_seek), - range_del_agg_(InternalKeyComparator(cmp), {s}) { + range_del_agg_(InternalKeyComparator(cmp), s) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index 64b2dcdf4..9ae63ffdb 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -12,17 +12,31 @@ namespace rocksdb { RangeDelAggregator::RangeDelAggregator( const InternalKeyComparator& icmp, const std::vector& snapshots) - : icmp_(icmp) { - pinned_iters_mgr_.StartPinning(); + : upper_bound_(kMaxSequenceNumber), icmp_(icmp) { + InitRep(snapshots); +} + +RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp, + SequenceNumber snapshot) + : upper_bound_(snapshot), icmp_(icmp) {} + +void RangeDelAggregator::InitRep(const std::vector& snapshots) { + assert(rep_ == nullptr); + rep_.reset(new Rep()); for (auto snapshot : snapshots) { - stripe_map_.emplace(snapshot, - TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); + rep_->stripe_map_.emplace( + snapshot, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); } // Data newer than any snapshot falls in this catch-all stripe - stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap()); + rep_->stripe_map_.emplace( + kMaxSequenceNumber, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); + rep_->pinned_iters_mgr_.StartPinning(); } bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) { + if (rep_ == nullptr) { + return false; + } ParsedInternalKey parsed; if (!ParseInternalKey(internal_key, &parsed)) { assert(false); @@ -32,7 +46,9 @@ bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) { bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { assert(IsValueType(parsed.type)); - + if (rep_ == nullptr) { + return false; + } const auto& tombstone_map = GetTombstoneMap(parsed.sequence); for (const auto& start_key_and_tombstone : tombstone_map) { const auto& tombstone = start_key_and_tombstone.second; @@ -51,14 +67,17 @@ bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { bool RangeDelAggregator::ShouldAddTombstones( bool bottommost_level /* = false */) { - auto stripe_map_iter = stripe_map_.begin(); - assert(stripe_map_iter != stripe_map_.end()); + if (rep_ == nullptr) { + return false; + } + auto stripe_map_iter = rep_->stripe_map_.begin(); + assert(stripe_map_iter != rep_->stripe_map_.end()); if (bottommost_level) { // For the bottommost level, keys covered by tombstones in the first // (oldest) stripe have been compacted away, so the tombstones are obsolete. ++stripe_map_iter; } - while (stripe_map_iter != stripe_map_.end()) { + while (stripe_map_iter != rep_->stripe_map_.end()) { if (!stripe_map_iter->second.empty()) { return true; } @@ -77,9 +96,15 @@ Status RangeDelAggregator::AddTombstones( } Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { - pinned_iters_mgr_.PinIterator(input, arena); input->SeekToFirst(); + bool first_iter = true; while (input->Valid()) { + if (first_iter) { + if (rep_ == nullptr) { + InitRep({upper_bound_}); + } + first_iter = false; + } ParsedInternalKey parsed_key; if (!ParseInternalKey(input->key(), &parsed_key)) { return Status::Corruption("Unable to parse range tombstone InternalKey"); @@ -89,22 +114,30 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { tombstone_map.emplace(input->key(), std::move(tombstone)); input->Next(); } + if (!first_iter) { + rep_->pinned_iters_mgr_.PinIterator(input, arena); + } else if (arena) { + input->~InternalIterator(); + } else { + delete input; + } return Status::OK(); } RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap( SequenceNumber seq) { + assert(rep_ != nullptr); // The stripe includes seqnum for the snapshot above and excludes seqnum for // the snapshot below. StripeMap::iterator iter; if (seq > 0) { // upper_bound() checks strict inequality so need to subtract one - iter = stripe_map_.upper_bound(seq - 1); + iter = rep_->stripe_map_.upper_bound(seq - 1); } else { - iter = stripe_map_.begin(); + iter = rep_->stripe_map_.begin(); } // catch-all stripe justifies this assertion in either of above cases - assert(iter != stripe_map_.end()); + assert(iter != rep_->stripe_map_.end()); return iter->second; } @@ -117,8 +150,11 @@ void RangeDelAggregator::AddToBuilder( TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound, FileMetaData* meta, bool bottommost_level /* = false */) { - auto stripe_map_iter = stripe_map_.begin(); - assert(stripe_map_iter != stripe_map_.end()); + if (rep_ == nullptr) { + return; + } + auto stripe_map_iter = rep_->stripe_map_.begin(); + assert(stripe_map_iter != rep_->stripe_map_.end()); if (bottommost_level) { // For the bottommost level, keys covered by tombstones in the first // (oldest) stripe have been compacted away, so the tombstones are obsolete. @@ -128,7 +164,7 @@ void RangeDelAggregator::AddToBuilder( // Note the order in which tombstones are stored is insignificant since we // insert them into a std::map on the read path. bool first_added = false; - while (stripe_map_iter != stripe_map_.end()) { + while (stripe_map_iter != rep_->stripe_map_.end()) { for (const auto& start_key_and_tombstone : stripe_map_iter->second) { const auto& tombstone = start_key_and_tombstone.second; if (upper_bound != nullptr && @@ -204,8 +240,11 @@ void RangeDelAggregator::AddToBuilder( } bool RangeDelAggregator::IsEmpty() { - for (auto stripe_map_iter = stripe_map_.begin(); - stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { + if (rep_ == nullptr) { + return true; + } + for (auto stripe_map_iter = rep_->stripe_map_.begin(); + stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { if (!stripe_map_iter->second.empty()) { return false; } diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 9935cd33e..8b9ddc4eb 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -37,9 +37,17 @@ class RangeDelAggregator { // (get/iterator), only the user snapshot is provided such that the seqnum // space is divided into two stripes, where only tombstones in the older // stripe are considered by ShouldDelete(). + // Note this overload does not lazily initialize Rep. RangeDelAggregator(const InternalKeyComparator& icmp, const std::vector& snapshots); + // @param upper_bound Similar to snapshots above, except with a single + // snapshot, which allows us to store the snapshot on the stack and defer + // initialization of heap-allocating members (in Rep) until the first range + // deletion is encountered. + RangeDelAggregator(const InternalKeyComparator& icmp, + SequenceNumber upper_bound); + // Returns whether the key should be deleted, which is the case when it is // covered by a range tombstone residing in the same snapshot stripe. bool ShouldDelete(const ParsedInternalKey& parsed); @@ -86,13 +94,22 @@ class RangeDelAggregator { // their seqnums are greater than the next smaller snapshot's seqnum. typedef std::map StripeMap; + struct Rep { + StripeMap stripe_map_; + PinnedIteratorsManager pinned_iters_mgr_; + }; + // Initializes rep_ lazily. This aggregator object is constructed for every + // read, so expensive members should only be created when necessary, i.e., + // once the first range deletion is encountered. + void InitRep(const std::vector& snapshots); + Status AddTombstones(InternalIterator* input, bool arena); TombstoneMap& GetTombstoneMap(SequenceNumber seq); - StripeMap stripe_map_; - const InternalKeyComparator icmp_; + SequenceNumber upper_bound_; Arena arena_; // must be destroyed after pinned_iters_mgr_ which references // memory in this arena - PinnedIteratorsManager pinned_iters_mgr_; + std::unique_ptr rep_; + const InternalKeyComparator icmp_; }; } // namespace rocksdb diff --git a/db/table_cache.cc b/db/table_cache.cc index a49a95787..bee2565b1 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -245,7 +245,7 @@ InternalIterator* TableCache::NewIterator( if (handle != nullptr) { ReleaseHandle(handle); } - return NewErrorInternalIterator(s); + return NewErrorInternalIterator(s, arena); } InternalIterator* TableCache::NewRangeDeletionIterator(