From 9e7cf3469bc626b092ec48366d12873ecab22b4e Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Fri, 4 Nov 2016 11:53:38 -0700 Subject: [PATCH] DeleteRange user iterator support Summary: Note: reviewed in https://reviews.facebook.net/D65115 - DBIter maintains a range tombstone accumulator. We don't cleanup obsolete tombstones yet, so if the user seeks back and forth, the same tombstones would be added to the accumulator multiple times. - DBImpl::NewInternalIterator() (used to make DBIter's underlying iterator) adds memtable/L0 range tombstones, L1+ range tombstones are added on-demand during NewSecondaryIterator() (see D62205) - DBIter uses ShouldDelete() when advancing to check whether keys are covered by range tombstones Closes https://github.com/facebook/rocksdb/pull/1464 Differential Revision: D4131753 Pulled By: ajkr fbshipit-source-id: be86559 --- db/compaction_iterator.cc | 3 +- db/db_compaction_filter_test.cc | 13 ++- db/db_impl.cc | 57 ++++++++---- db/db_impl.h | 6 +- db/db_impl_readonly.cc | 10 +- db/db_iter.cc | 97 ++++++++++++++------ db/db_iter.h | 2 + db/db_test_util.cc | 16 +++- db/external_sst_file_ingestion_job.cc | 8 +- db/memtable_list.cc | 18 ++++ db/memtable_list.h | 3 + db/range_del_aggregator.cc | 52 +++++------ db/range_del_aggregator.h | 8 +- db/table_cache.cc | 2 +- db/version_set.cc | 40 ++++---- db/version_set.h | 5 +- tools/ldb_cmd.cc | 4 +- utilities/date_tiered/date_tiered_db_impl.cc | 3 +- 18 files changed, 233 insertions(+), 114 deletions(-) diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index f6725cbe3..58503dd15 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -422,8 +422,7 @@ void CompactionIterator::NextFromInput() { } else { // 1. new user key -OR- // 2. different snapshot stripe - bool should_delete = - range_del_agg_->ShouldDelete(key_, true /* for_compaction */); + bool should_delete = range_del_agg_->ShouldDelete(key_); if (should_delete) { input_->Next(); } else { diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index ff6945cf8..cba0b3467 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -261,8 +261,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { int total = 0; Arena arena; { + RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator), + {} /* snapshots */); ScopedArenaIterator iter( - dbfull()->NewInternalIterator(&arena, handles_[1])); + dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { @@ -349,8 +351,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { // level Lmax because this record is at the tip count = 0; { + RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator), + {} /* snapshots */); ScopedArenaIterator iter( - dbfull()->NewInternalIterator(&arena, handles_[1])); + dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { @@ -566,7 +570,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { int count = 0; int total = 0; Arena arena; - ScopedArenaIterator iter(dbfull()->NewInternalIterator(&arena)); + RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator), + {} /* snapshots */); + ScopedArenaIterator iter( + dbfull()->NewInternalIterator(&arena, &range_del_agg)); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 0948039a0..e02cd7f84 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2885,7 +2885,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, } InternalIterator* DBImpl::NewInternalIterator( - Arena* arena, ColumnFamilyHandle* column_family) { + Arena* arena, RangeDelAggregator* range_del_agg, + ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; if (column_family == nullptr) { cfd = default_cf_handle_->cfd(); @@ -2898,7 +2899,8 @@ InternalIterator* DBImpl::NewInternalIterator( SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); mutex_.Unlock(); ReadOptions roptions; - return NewInternalIterator(roptions, cfd, super_version, arena); + return NewInternalIterator(roptions, cfd, super_version, arena, + range_del_agg); } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, @@ -3852,12 +3854,13 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } } // namespace -InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, - ColumnFamilyData* cfd, - SuperVersion* super_version, - Arena* arena) { +InternalIterator* DBImpl::NewInternalIterator( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SuperVersion* super_version, Arena* arena, + RangeDelAggregator* range_del_agg) { InternalIterator* internal_iter; assert(arena != nullptr); + assert(range_del_agg != nullptr); // Need to create internal iterator from the arena. MergeIteratorBuilder merge_iter_builder( &cfd->internal_comparator(), arena, @@ -3866,18 +3869,34 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, // Collect iterator for mutable mem merge_iter_builder.AddIterator( super_version->mem->NewIterator(read_options, arena)); + ScopedArenaIterator range_del_iter; + Status s; + if (!read_options.ignore_range_deletions) { + range_del_iter.set( + super_version->mem->NewRangeTombstoneIterator(read_options, arena)); + s = range_del_agg->AddTombstones(std::move(range_del_iter)); + } // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(read_options, &merge_iter_builder); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(read_options, env_options_, - &merge_iter_builder); - internal_iter = merge_iter_builder.Finish(); - IterState* cleanup = - new IterState(this, &mutex_, super_version, - read_options.background_purge_on_iterator_cleanup); - internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); + if (s.ok()) { + super_version->imm->AddIterators(read_options, &merge_iter_builder); + if (!read_options.ignore_range_deletions) { + s = super_version->imm->AddRangeTombstoneIterators(read_options, arena, + range_del_agg); + } + } + if (s.ok()) { + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(read_options, env_options_, + &merge_iter_builder, range_del_agg); + internal_iter = merge_iter_builder.Finish(); + IterState* cleanup = + new IterState(this, &mutex_, super_version, + read_options.background_purge_on_iterator_cleanup); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); - return internal_iter; + return internal_iter; + } + return NewErrorInternalIterator(s); } ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { @@ -4419,7 +4438,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, read_options.total_order_seek); InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; @@ -4492,7 +4512,8 @@ Status DBImpl::NewIterators( sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, nullptr, false, read_options.pin_data); InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_impl.h b/db/db_impl.h index 87e9edfbb..cbf9a292d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -290,7 +290,8 @@ class DBImpl : public DB { // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. InternalIterator* NewInternalIterator( - Arena* arena, ColumnFamilyHandle* column_family = nullptr); + Arena* arena, RangeDelAggregator* range_del_agg, + ColumnFamilyHandle* column_family = nullptr); #ifndef NDEBUG // Extra methods (for testing) that are not in the public DB interface @@ -525,7 +526,8 @@ class DBImpl : public DB { InternalIterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, - Arena* arena); + Arena* arena, + RangeDelAggregator* range_del_agg); // Except in DB::Open(), WriteOptionsFile can only be called when: // 1. WriteThread::Writer::EnterUnbatched() is used. diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 3e185288d..4a01849c2 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -64,8 +64,9 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, : latest_snapshot), super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->version_number); - auto internal_iter = NewInternalIterator( - read_options, cfd, super_version, db_iter->GetArena()); + auto internal_iter = + NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } @@ -92,8 +93,9 @@ Status DBImplReadOnly::NewIterators( : latest_snapshot), sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number); - auto* internal_iter = NewInternalIterator( - read_options, cfd, sv, db_iter->GetArena()); + auto* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_iter.cc b/db/db_iter.cc index eb89d167f..80dcd4106 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -122,7 +122,8 @@ class DBIter: public Iterator { iterate_upper_bound_(iterate_upper_bound), prefix_same_as_start_(prefix_same_as_start), pin_thru_lifetime_(pin_data), - total_order_seek_(total_order_seek) { + total_order_seek_(total_order_seek), + range_del_agg_(InternalKeyComparator(cmp), {s}) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -151,6 +152,10 @@ class DBIter: public Iterator { iter_ = iter; iter_->SetPinnedItersMgr(&pinned_iters_mgr_); } + virtual RangeDelAggregator* GetRangeDelAggregator() { + return &range_del_agg_; + } + virtual bool Valid() const override { return valid_; } virtual Slice key() const override { assert(valid_); @@ -273,6 +278,7 @@ class DBIter: public Iterator { const bool total_order_seek_; // List of operands for merge operator. MergeContext merge_context_; + RangeDelAggregator range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; @@ -384,20 +390,39 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: - valid_ = true; saved_key_.SetKey( ikey.user_key, !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); - return; + if (range_del_agg_.ShouldDelete(ikey)) { + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + skipping = true; + num_skipped = 0; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else { + valid_ = true; + return; + } + break; case kTypeMerge: - // By now, we are sure the current ikey is going to yield a value saved_key_.SetKey( ikey.user_key, !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); - current_entry_is_merged_ = true; - valid_ = true; - MergeValuesNewToOld(); // Go to a different state machine - return; + if (range_del_agg_.ShouldDelete(ikey)) { + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + skipping = true; + num_skipped = 0; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else { + // By now, we are sure the current ikey is going to yield a + // value + current_entry_is_merged_ = true; + valid_ = true; + MergeValuesNewToOld(); // Go to a different state machine + return; + } + break; default: assert(false); break; @@ -456,7 +481,8 @@ void DBIter::MergeValuesNewToOld() { if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { // hit the next user key, stop right here break; - } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) { + } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || + range_del_agg_.ShouldDelete(ikey)) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_->Next(); @@ -624,10 +650,15 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: + if (range_del_agg_.ShouldDelete(ikey)) { + last_key_entry_type = kTypeRangeDeletion; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else { + assert(iter_->IsValuePinned()); + pinned_value_ = iter_->value(); + } merge_context_.Clear(); - assert(iter_->IsValuePinned()); - pinned_value_ = iter_->value(); - last_not_merge_type = kTypeValue; + last_not_merge_type = last_key_entry_type; break; case kTypeDeletion: case kTypeSingleDeletion: @@ -636,9 +667,16 @@ bool DBIter::FindValueForCurrentKey() { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: - assert(merge_operator_ != nullptr); - merge_context_.PushOperandBack( - iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + if (range_del_agg_.ShouldDelete(ikey)) { + merge_context_.Clear(); + last_key_entry_type = kTypeRangeDeletion; + last_not_merge_type = last_key_entry_type; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else { + assert(merge_operator_ != nullptr); + merge_context_.PushOperandBack( + iter_->value(), iter_->IsValuePinned() /* operand_pinned */); + } break; default: assert(false); @@ -654,12 +692,14 @@ bool DBIter::FindValueForCurrentKey() { switch (last_key_entry_type) { case kTypeDeletion: case kTypeSingleDeletion: + case kTypeRangeDeletion: valid_ = false; return false; case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion || - last_not_merge_type == kTypeSingleDeletion) { + last_not_merge_type == kTypeSingleDeletion || + last_not_merge_type == kTypeRangeDeletion) { MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, @@ -699,17 +739,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { ParsedInternalKey ikey; FindParseableKey(&ikey, kForward); - if (ikey.type == kTypeValue || ikey.type == kTypeDeletion || - ikey.type == kTypeSingleDeletion) { - if (ikey.type == kTypeValue) { - assert(iter_->IsValuePinned()); - pinned_value_ = iter_->value(); - valid_ = true; - return true; - } + if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + range_del_agg_.ShouldDelete(ikey)) { valid_ = false; return false; } + if (ikey.type == kTypeValue) { + assert(iter_->IsValuePinned()); + pinned_value_ = iter_->value(); + valid_ = true; + return true; + } // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands @@ -717,7 +757,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { merge_context_.Clear(); while (iter_->Valid() && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) && - ikey.type == kTypeMerge) { + ikey.type == kTypeMerge && !range_del_agg_.ShouldDelete(ikey)) { merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); iter_->Next(); @@ -726,7 +766,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) || - ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { + ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + range_del_agg_.ShouldDelete(ikey)) { MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, &pinned_value_); @@ -972,6 +1013,10 @@ ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; } +RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() { + return db_iter_->GetRangeDelAggregator(); +} + void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) { static_cast(db_iter_)->SetIter(iter); } diff --git a/db/db_iter.h b/db/db_iter.h index 989188232..ee7755221 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -11,6 +11,7 @@ #include #include #include "db/dbformat.h" +#include "db/range_del_aggregator.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" #include "util/arena.h" @@ -46,6 +47,7 @@ class ArenaWrappedDBIter : public Iterator { // Get the arena to be used to allocate memory for DBIter to be wrapped, // as well as child iterators in it. virtual Arena* GetArena() { return &arena_; } + virtual RangeDelAggregator* GetRangeDelAggregator(); // Set the DB Iterator to be wrapped diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 16147b603..dc5ad992e 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -585,11 +585,15 @@ std::string DBTestBase::Contents(int cf) { std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) { Arena arena; + auto options = CurrentOptions(); + RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator), + {} /* snapshots */); ScopedArenaIterator iter; if (cf == 0) { - iter.set(dbfull()->NewInternalIterator(&arena)); + iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg)); } else { - iter.set(dbfull()->NewInternalIterator(&arena, handles_[cf])); + iter.set( + dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf])); } InternalKey target(user_key, kMaxSequenceNumber, kTypeValue); iter->Seek(target.Encode()); @@ -990,10 +994,14 @@ UpdateStatus DBTestBase::updateInPlaceNoAction(char* prevValue, void DBTestBase::validateNumberOfEntries(int numValues, int cf) { ScopedArenaIterator iter; Arena arena; + auto options = CurrentOptions(); + RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator), + {} /* snapshots */); if (cf != 0) { - iter.set(dbfull()->NewInternalIterator(&arena, handles_[cf])); + iter.set( + dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf])); } else { - iter.set(dbfull()->NewInternalIterator(&arena)); + iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg)); } iter->SeekToFirst(); ASSERT_EQ(iter->status().ok(), true); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index a2dbad026..6fc35f3fc 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -372,8 +372,14 @@ Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile( bool overlap_with_level = false; MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); + RangeDelAggregator range_del_agg(cfd_->internal_comparator(), + {} /* snapshots */); sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, - lvl); + lvl, &range_del_agg); + if (!range_del_agg.IsEmpty()) { + return Status::NotSupported( + "file ingestion with range tombstones is currently unsupported"); + } ScopedArenaIterator level_iter(merge_iter_builder.Finish()); status = IngestedFileOverlapWithIteratorRange( diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 1c460324b..4442cf55a 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -144,10 +144,28 @@ bool MemTableListVersion::GetFromList(std::list* list, assert(*seq != kMaxSequenceNumber); return true; } + if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { + return false; + } } return false; } +Status MemTableListVersion::AddRangeTombstoneIterators( + const ReadOptions& read_opts, Arena* arena, + RangeDelAggregator* range_del_agg) { + assert(range_del_agg != nullptr); + for (auto& m : memlist_) { + ScopedArenaIterator range_del_iter( + m->NewRangeTombstoneIterator(read_opts, arena)); + Status s = range_del_agg->AddTombstones(std::move(range_del_iter)); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + void MemTableListVersion::AddIterators( const ReadOptions& options, std::vector* iterator_list, Arena* arena) { diff --git a/db/memtable_list.h b/db/memtable_list.h index a8f2b38c7..67ef95bd3 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -81,6 +81,9 @@ class MemTableListVersion { read_opts); } + Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, + RangeDelAggregator* range_del_agg); + void AddIterators(const ReadOptions& options, std::vector* iterator_list, Arena* arena); diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index ba874c02b..ccf12ee69 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -23,39 +23,29 @@ RangeDelAggregator::RangeDelAggregator( stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap()); } -bool RangeDelAggregator::ShouldDelete(const Slice& internal_key, - bool for_compaction /* = false */) { +bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) { ParsedInternalKey parsed; if (!ParseInternalKey(internal_key, &parsed)) { assert(false); } - return ShouldDelete(parsed, for_compaction); + return ShouldDelete(parsed); } -bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed, - bool for_compaction /* = false */) { +bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { assert(IsValueType(parsed.type)); - // Starting point is the snapshot stripe in which the key lives, then need to - // search all earlier stripes too, unless it's for compaction. - for (auto stripe_map_iter = GetStripeMapIter(parsed.sequence); - stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { - const auto& tombstone_map = stripe_map_iter->second; - for (const auto& start_key_and_tombstone : tombstone_map) { - const auto& tombstone = start_key_and_tombstone.second; - if (icmp_.user_comparator()->Compare(parsed.user_key, - tombstone.start_key_) < 0) { - break; - } - if (parsed.sequence < tombstone.seq_ && - icmp_.user_comparator()->Compare(parsed.user_key, - tombstone.end_key_) <= 0) { - return true; - } - } - if (for_compaction) { + const auto& tombstone_map = GetTombstoneMap(parsed.sequence); + for (const auto& start_key_and_tombstone : tombstone_map) { + const auto& tombstone = start_key_and_tombstone.second; + if (icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.start_key_) < 0) { break; } + if (parsed.sequence < tombstone.seq_ && + icmp_.user_comparator()->Compare(parsed.user_key, tombstone.end_key_) <= + 0) { + return true; + } } return false; } @@ -96,7 +86,7 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { return Status::Corruption("Unable to parse range tombstone InternalKey"); } RangeTombstone tombstone(parsed_key, input->value()); - auto& tombstone_map = GetStripeMapIter(tombstone.seq_)->second; + auto& tombstone_map = GetTombstoneMap(tombstone.seq_); tombstone_map.emplace(tombstone.start_key_.ToString(), std::move(tombstone)); input->Next(); @@ -104,7 +94,7 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { return Status::OK(); } -RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter( +RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap( SequenceNumber seq) { // The stripe includes seqnum for the snapshot above and excludes seqnum for // the snapshot below. @@ -117,7 +107,7 @@ RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter( } // catch-all stripe justifies this assertion in either of above cases assert(iter != stripe_map_.end()); - return iter; + return iter->second; } // TODO(andrewkr): We should implement an iterator over range tombstones in our @@ -202,4 +192,14 @@ void RangeDelAggregator::AddToBuilder(TableBuilder* builder, } } +bool RangeDelAggregator::IsEmpty() { + for (auto stripe_map_iter = stripe_map_.begin(); + stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { + if (!stripe_map_iter->second.empty()) { + return false; + } + } + return true; +} + } // namespace rocksdb diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 64d1becab..b406c9552 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -42,9 +42,8 @@ class RangeDelAggregator { // Returns whether the key should be deleted, which is the case when it is // covered by a range tombstone residing in the same snapshot stripe. - bool ShouldDelete(const ParsedInternalKey& parsed, - bool for_compaction = false); - bool ShouldDelete(const Slice& internal_key, bool for_compaction = false); + bool ShouldDelete(const ParsedInternalKey& parsed); + bool ShouldDelete(const Slice& internal_key); bool ShouldAddTombstones(bool bottommost_level = false); // Adds tombstones to the tombstone aggregation structure maintained by this @@ -72,6 +71,7 @@ class RangeDelAggregator { const Slice* next_table_min_key, FileMetaData* meta, bool bottommost_level = false); Arena* GetArena() { return &arena_; } + bool IsEmpty(); private: // Maps tombstone start key -> tombstone object @@ -82,7 +82,7 @@ class RangeDelAggregator { typedef std::map StripeMap; Status AddTombstones(InternalIterator* input, bool arena); - StripeMap::iterator GetStripeMapIter(SequenceNumber seq); + TombstoneMap& GetTombstoneMap(SequenceNumber seq); PinnedIteratorsManager pinned_iters_mgr_; StripeMap stripe_map_; diff --git a/db/table_cache.cc b/db/table_cache.cc index 01606271a..728e20bc0 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -223,7 +223,7 @@ InternalIterator* TableCache::NewIterator( } } - if (range_del_agg != nullptr) { + if (range_del_agg != nullptr && !options.ignore_range_deletions) { std::unique_ptr iter( table_reader->NewRangeTombstoneIterator(options)); Status s = range_del_agg->AddTombstones(std::move(iter)); diff --git a/db/version_set.cc b/db/version_set.cc index a78db579e..dbd65dca8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -514,15 +514,14 @@ class LevelFileIteratorState : public TwoLevelIteratorState { if (meta_handle.size() != sizeof(FileDescriptor)) { return NewErrorInternalIterator( Status::Corruption("FileReader invoked with unexpected value")); - } else { - const FileDescriptor* fd = - reinterpret_cast(meta_handle.data()); - return table_cache_->NewIterator( - read_options_, env_options_, icomparator_, *fd, - nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_, - range_del_agg_, false /* is_range_del_only */); } + const FileDescriptor* fd = + reinterpret_cast(meta_handle.data()); + return table_cache_->NewIterator( + read_options_, env_options_, icomparator_, *fd, + nullptr /* don't need reference to table */, file_read_hist_, + for_compaction_, nullptr /* arena */, skip_filters_, level_, + range_del_agg_, false /* is_range_del_only */); } bool PrefixMayMatch(const Slice& internal_key) override { @@ -805,18 +804,21 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( void Version::AddIterators(const ReadOptions& read_options, const EnvOptions& soptions, - MergeIteratorBuilder* merge_iter_builder) { + MergeIteratorBuilder* merge_iter_builder, + RangeDelAggregator* range_del_agg) { assert(storage_info_.finalized_); for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { - AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level); + AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level, + range_del_agg); } } void Version::AddIteratorsForLevel(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - int level) { + int level, + RangeDelAggregator* range_del_agg) { assert(storage_info_.finalized_); if (level >= storage_info_.num_non_empty_levels()) { // This is an empty level @@ -834,20 +836,20 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, cfd_->internal_stats()->GetFileReadHist(0), false, arena, - false /* skip_filters */, 0 /* level */)); + false /* skip_filters */, 0 /* level */, range_del_agg)); } } else { // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) LevelFileIteratorState( - cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), - cfd_->internal_stats()->GetFileReadHist(level), - false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level), - level, nullptr /* range_del_agg */); + auto* state = new (mem) + LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr, + IsFilterSkipped(level), level, range_del_agg); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); diff --git a/db/version_set.h b/db/version_set.h index 2842ff5dc..0d7b85e8c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -434,11 +434,12 @@ class Version { // yield the contents of this Version when merged together. // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, const EnvOptions& soptions, - MergeIteratorBuilder* merger_iter_builder); + MergeIteratorBuilder* merger_iter_builder, + RangeDelAggregator* range_del_agg); void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - int level); + int level, RangeDelAggregator* range_del_agg); // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 28978b415..fd1348d59 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1183,7 +1183,9 @@ void InternalDumpCommand::DoCommand() { uint64_t s1=0,s2=0; // Setup internal key iterator Arena arena; - ScopedArenaIterator iter(idb->NewInternalIterator(&arena)); + RangeDelAggregator range_del_agg(InternalKeyComparator(options_.comparator), + {} /* snapshots */); + ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg)); Status st = iter->status(); if (!st.ok()) { exec_state_ = diff --git a/utilities/date_tiered/date_tiered_db_impl.cc b/utilities/date_tiered/date_tiered_db_impl.cc index 2352aa2e9..ea5e93343 100644 --- a/utilities/date_tiered/date_tiered_db_impl.cc +++ b/utilities/date_tiered/date_tiered_db_impl.cc @@ -385,7 +385,8 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) { MergeIteratorBuilder builder(cf_options_.comparator, arena); for (auto& item : handle_map_) { auto handle = item.second; - builder.AddIterator(db_impl->NewInternalIterator(arena, handle)); + builder.AddIterator(db_impl->NewInternalIterator( + arena, db_iter->GetRangeDelAggregator(), handle)); } auto internal_iter = builder.Finish(); db_iter->SetIterUnderDBIter(internal_iter);