DeleteRange user iterator support

Summary:
Note: reviewed in  https://reviews.facebook.net/D65115

- DBIter maintains a range tombstone accumulator. We don't cleanup obsolete tombstones yet, so if the user seeks back and forth, the same tombstones would be added to the accumulator multiple times.
- DBImpl::NewInternalIterator() (used to make DBIter's underlying iterator) adds memtable/L0 range tombstones, L1+ range tombstones are added on-demand during NewSecondaryIterator() (see D62205)
- DBIter uses ShouldDelete() when advancing to check whether keys are covered by range tombstones
Closes https://github.com/facebook/rocksdb/pull/1464

Differential Revision: D4131753

Pulled By: ajkr

fbshipit-source-id: be86559
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 5c5d01ae74
commit 9e7cf3469b
  1. 3
      db/compaction_iterator.cc
  2. 13
      db/db_compaction_filter_test.cc
  3. 57
      db/db_impl.cc
  4. 6
      db/db_impl.h
  5. 10
      db/db_impl_readonly.cc
  6. 97
      db/db_iter.cc
  7. 2
      db/db_iter.h
  8. 16
      db/db_test_util.cc
  9. 8
      db/external_sst_file_ingestion_job.cc
  10. 18
      db/memtable_list.cc
  11. 3
      db/memtable_list.h
  12. 52
      db/range_del_aggregator.cc
  13. 8
      db/range_del_aggregator.h
  14. 2
      db/table_cache.cc
  15. 40
      db/version_set.cc
  16. 5
      db/version_set.h
  17. 4
      tools/ldb_cmd.cc
  18. 3
      utilities/date_tiered/date_tiered_db_impl.cc

@ -422,8 +422,7 @@ void CompactionIterator::NextFromInput() {
} else {
// 1. new user key -OR-
// 2. different snapshot stripe
bool should_delete =
range_del_agg_->ShouldDelete(key_, true /* for_compaction */);
bool should_delete = range_del_agg_->ShouldDelete(key_);
if (should_delete) {
input_->Next();
} else {

@ -261,8 +261,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
int total = 0;
Arena arena;
{
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
{} /* snapshots */);
ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, handles_[1]));
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
@ -349,8 +351,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
// level Lmax because this record is at the tip
count = 0;
{
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
{} /* snapshots */);
ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, handles_[1]));
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
@ -566,7 +570,10 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
int count = 0;
int total = 0;
Arena arena;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(&arena));
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
{} /* snapshots */);
ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, &range_del_agg));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {

@ -2885,7 +2885,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
}
InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, ColumnFamilyHandle* column_family) {
Arena* arena, RangeDelAggregator* range_del_agg,
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
@ -2898,7 +2899,8 @@ InternalIterator* DBImpl::NewInternalIterator(
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
ReadOptions roptions;
return NewInternalIterator(roptions, cfd, super_version, arena);
return NewInternalIterator(roptions, cfd, super_version, arena,
range_del_agg);
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
@ -3852,12 +3854,13 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
}
} // namespace
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena) {
InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena,
RangeDelAggregator* range_del_agg) {
InternalIterator* internal_iter;
assert(arena != nullptr);
assert(range_del_agg != nullptr);
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(
&cfd->internal_comparator(), arena,
@ -3866,18 +3869,34 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena));
ScopedArenaIterator range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {
range_del_iter.set(
super_version->mem->NewRangeTombstoneIterator(read_options, arena));
s = range_del_agg->AddTombstones(std::move(range_del_iter));
}
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(read_options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(read_options, env_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
IterState* cleanup =
new IterState(this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
if (s.ok()) {
super_version->imm->AddIterators(read_options, &merge_iter_builder);
if (!read_options.ignore_range_deletions) {
s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
range_del_agg);
}
}
if (s.ok()) {
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(read_options, env_options_,
&merge_iter_builder, range_del_agg);
internal_iter = merge_iter_builder.Finish();
IterState* cleanup =
new IterState(this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
return internal_iter;
return internal_iter;
}
return NewErrorInternalIterator(s);
}
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
@ -4419,7 +4438,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
read_options.total_order_seek);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
@ -4492,7 +4512,8 @@ Status DBImpl::NewIterators(
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, nullptr, false, read_options.pin_data);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}

@ -290,7 +290,8 @@ class DBImpl : public DB {
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
InternalIterator* NewInternalIterator(
Arena* arena, ColumnFamilyHandle* column_family = nullptr);
Arena* arena, RangeDelAggregator* range_del_agg,
ColumnFamilyHandle* column_family = nullptr);
#ifndef NDEBUG
// Extra methods (for testing) that are not in the public DB interface
@ -525,7 +526,8 @@ class DBImpl : public DB {
InternalIterator* NewInternalIterator(const ReadOptions&,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena);
Arena* arena,
RangeDelAggregator* range_del_agg);
// Except in DB::Open(), WriteOptionsFile can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.

@ -64,8 +64,9 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
: latest_snapshot),
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number);
auto internal_iter = NewInternalIterator(
read_options, cfd, super_version, db_iter->GetArena());
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
@ -92,8 +93,9 @@ Status DBImplReadOnly::NewIterators(
: latest_snapshot),
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number);
auto* internal_iter = NewInternalIterator(
read_options, cfd, sv, db_iter->GetArena());
auto* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}

@ -122,7 +122,8 @@ class DBIter: public Iterator {
iterate_upper_bound_(iterate_upper_bound),
prefix_same_as_start_(prefix_same_as_start),
pin_thru_lifetime_(pin_data),
total_order_seek_(total_order_seek) {
total_order_seek_(total_order_seek),
range_del_agg_(InternalKeyComparator(cmp), {s}) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
@ -151,6 +152,10 @@ class DBIter: public Iterator {
iter_ = iter;
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
}
virtual RangeDelAggregator* GetRangeDelAggregator() {
return &range_del_agg_;
}
virtual bool Valid() const override { return valid_; }
virtual Slice key() const override {
assert(valid_);
@ -273,6 +278,7 @@ class DBIter: public Iterator {
const bool total_order_seek_;
// List of operands for merge operator.
MergeContext merge_context_;
RangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
@ -384,20 +390,39 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
return;
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
valid_ = true;
return;
}
break;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
// By now, we are sure the current ikey is going to yield a
// value
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
}
break;
default:
assert(false);
break;
@ -456,7 +481,8 @@ void DBIter::MergeValuesNewToOld() {
if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
// hit the next user key, stop right here
break;
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
range_del_agg_.ShouldDelete(ikey)) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
@ -624,10 +650,15 @@ bool DBIter::FindValueForCurrentKey() {
last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
if (range_del_agg_.ShouldDelete(ikey)) {
last_key_entry_type = kTypeRangeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
}
merge_context_.Clear();
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
last_not_merge_type = kTypeValue;
last_not_merge_type = last_key_entry_type;
break;
case kTypeDeletion:
case kTypeSingleDeletion:
@ -636,9 +667,16 @@ bool DBIter::FindValueForCurrentKey() {
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
if (range_del_agg_.ShouldDelete(ikey)) {
merge_context_.Clear();
last_key_entry_type = kTypeRangeDeletion;
last_not_merge_type = last_key_entry_type;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
}
break;
default:
assert(false);
@ -654,12 +692,14 @@ bool DBIter::FindValueForCurrentKey() {
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
valid_ = false;
return false;
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion) {
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
@ -699,17 +739,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
ikey.type == kTypeSingleDeletion) {
if (ikey.type == kTypeValue) {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
valid_ = true;
return true;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(ikey)) {
valid_ = false;
return false;
}
if (ikey.type == kTypeValue) {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
valid_ = true;
return true;
}
// kTypeMerge. We need to collect all kTypeMerge values and save them
// in operands
@ -717,7 +757,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
merge_context_.Clear();
while (iter_->Valid() &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
ikey.type == kTypeMerge) {
ikey.type == kTypeMerge && !range_del_agg_.ShouldDelete(ikey)) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
iter_->Next();
@ -726,7 +766,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(ikey)) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
@ -972,6 +1013,10 @@ ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
static_cast<DBIter*>(db_iter_)->SetIter(iter);
}

@ -11,6 +11,7 @@
#include <stdint.h>
#include <string>
#include "db/dbformat.h"
#include "db/range_del_aggregator.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "util/arena.h"
@ -46,6 +47,7 @@ class ArenaWrappedDBIter : public Iterator {
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
virtual RangeDelAggregator* GetRangeDelAggregator();
// Set the DB Iterator to be wrapped

@ -585,11 +585,15 @@ std::string DBTestBase::Contents(int cf) {
std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
Arena arena;
auto options = CurrentOptions();
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
{} /* snapshots */);
ScopedArenaIterator iter;
if (cf == 0) {
iter.set(dbfull()->NewInternalIterator(&arena));
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
} else {
iter.set(dbfull()->NewInternalIterator(&arena, handles_[cf]));
iter.set(
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
}
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
iter->Seek(target.Encode());
@ -990,10 +994,14 @@ UpdateStatus DBTestBase::updateInPlaceNoAction(char* prevValue,
void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
ScopedArenaIterator iter;
Arena arena;
auto options = CurrentOptions();
RangeDelAggregator range_del_agg(InternalKeyComparator(options.comparator),
{} /* snapshots */);
if (cf != 0) {
iter.set(dbfull()->NewInternalIterator(&arena, handles_[cf]));
iter.set(
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
} else {
iter.set(dbfull()->NewInternalIterator(&arena));
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
}
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);

@ -372,8 +372,14 @@ Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
bool overlap_with_level = false;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder,
lvl);
lvl, &range_del_agg);
if (!range_del_agg.IsEmpty()) {
return Status::NotSupported(
"file ingestion with range tombstones is currently unsupported");
}
ScopedArenaIterator level_iter(merge_iter_builder.Finish());
status = IngestedFileOverlapWithIteratorRange(

@ -144,10 +144,28 @@ bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
assert(*seq != kMaxSequenceNumber);
return true;
}
if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
return false;
}
}
return false;
}
Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg) {
assert(range_del_agg != nullptr);
for (auto& m : memlist_) {
ScopedArenaIterator range_del_iter(
m->NewRangeTombstoneIterator(read_opts, arena));
Status s = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
Arena* arena) {

@ -81,6 +81,9 @@ class MemTableListVersion {
read_opts);
}
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg);
void AddIterators(const ReadOptions& options,
std::vector<InternalIterator*>* iterator_list,
Arena* arena);

@ -23,39 +23,29 @@ RangeDelAggregator::RangeDelAggregator(
stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap());
}
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key,
bool for_compaction /* = false */) {
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
ParsedInternalKey parsed;
if (!ParseInternalKey(internal_key, &parsed)) {
assert(false);
}
return ShouldDelete(parsed, for_compaction);
return ShouldDelete(parsed);
}
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
bool for_compaction /* = false */) {
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) {
assert(IsValueType(parsed.type));
// Starting point is the snapshot stripe in which the key lives, then need to
// search all earlier stripes too, unless it's for compaction.
for (auto stripe_map_iter = GetStripeMapIter(parsed.sequence);
stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) {
const auto& tombstone_map = stripe_map_iter->second;
for (const auto& start_key_and_tombstone : tombstone_map) {
const auto& tombstone = start_key_and_tombstone.second;
if (icmp_.user_comparator()->Compare(parsed.user_key,
tombstone.start_key_) < 0) {
break;
}
if (parsed.sequence < tombstone.seq_ &&
icmp_.user_comparator()->Compare(parsed.user_key,
tombstone.end_key_) <= 0) {
return true;
}
}
if (for_compaction) {
const auto& tombstone_map = GetTombstoneMap(parsed.sequence);
for (const auto& start_key_and_tombstone : tombstone_map) {
const auto& tombstone = start_key_and_tombstone.second;
if (icmp_.user_comparator()->Compare(parsed.user_key,
tombstone.start_key_) < 0) {
break;
}
if (parsed.sequence < tombstone.seq_ &&
icmp_.user_comparator()->Compare(parsed.user_key, tombstone.end_key_) <=
0) {
return true;
}
}
return false;
}
@ -96,7 +86,7 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
return Status::Corruption("Unable to parse range tombstone InternalKey");
}
RangeTombstone tombstone(parsed_key, input->value());
auto& tombstone_map = GetStripeMapIter(tombstone.seq_)->second;
auto& tombstone_map = GetTombstoneMap(tombstone.seq_);
tombstone_map.emplace(tombstone.start_key_.ToString(),
std::move(tombstone));
input->Next();
@ -104,7 +94,7 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
return Status::OK();
}
RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter(
RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap(
SequenceNumber seq) {
// The stripe includes seqnum for the snapshot above and excludes seqnum for
// the snapshot below.
@ -117,7 +107,7 @@ RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter(
}
// catch-all stripe justifies this assertion in either of above cases
assert(iter != stripe_map_.end());
return iter;
return iter->second;
}
// TODO(andrewkr): We should implement an iterator over range tombstones in our
@ -202,4 +192,14 @@ void RangeDelAggregator::AddToBuilder(TableBuilder* builder,
}
}
bool RangeDelAggregator::IsEmpty() {
for (auto stripe_map_iter = stripe_map_.begin();
stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) {
if (!stripe_map_iter->second.empty()) {
return false;
}
}
return true;
}
} // namespace rocksdb

@ -42,9 +42,8 @@ class RangeDelAggregator {
// Returns whether the key should be deleted, which is the case when it is
// covered by a range tombstone residing in the same snapshot stripe.
bool ShouldDelete(const ParsedInternalKey& parsed,
bool for_compaction = false);
bool ShouldDelete(const Slice& internal_key, bool for_compaction = false);
bool ShouldDelete(const ParsedInternalKey& parsed);
bool ShouldDelete(const Slice& internal_key);
bool ShouldAddTombstones(bool bottommost_level = false);
// Adds tombstones to the tombstone aggregation structure maintained by this
@ -72,6 +71,7 @@ class RangeDelAggregator {
const Slice* next_table_min_key, FileMetaData* meta,
bool bottommost_level = false);
Arena* GetArena() { return &arena_; }
bool IsEmpty();
private:
// Maps tombstone start key -> tombstone object
@ -82,7 +82,7 @@ class RangeDelAggregator {
typedef std::map<SequenceNumber, TombstoneMap> StripeMap;
Status AddTombstones(InternalIterator* input, bool arena);
StripeMap::iterator GetStripeMapIter(SequenceNumber seq);
TombstoneMap& GetTombstoneMap(SequenceNumber seq);
PinnedIteratorsManager pinned_iters_mgr_;
StripeMap stripe_map_;

@ -223,7 +223,7 @@ InternalIterator* TableCache::NewIterator(
}
}
if (range_del_agg != nullptr) {
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
std::unique_ptr<InternalIterator> iter(
table_reader->NewRangeTombstoneIterator(options));
Status s = range_del_agg->AddTombstones(std::move(iter));

@ -514,15 +514,14 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
if (meta_handle.size() != sizeof(FileDescriptor)) {
return NewErrorInternalIterator(
Status::Corruption("FileReader invoked with unexpected value"));
} else {
const FileDescriptor* fd =
reinterpret_cast<const FileDescriptor*>(meta_handle.data());
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr /* don't need reference to table*/, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_,
range_del_agg_, false /* is_range_del_only */);
}
const FileDescriptor* fd =
reinterpret_cast<const FileDescriptor*>(meta_handle.data());
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr /* don't need reference to table */, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_,
range_del_agg_, false /* is_range_del_only */);
}
bool PrefixMayMatch(const Slice& internal_key) override {
@ -805,18 +804,21 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) {
MergeIteratorBuilder* merge_iter_builder,
RangeDelAggregator* range_del_agg) {
assert(storage_info_.finalized_);
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level);
AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
range_del_agg);
}
}
void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level) {
int level,
RangeDelAggregator* range_del_agg) {
assert(storage_info_.finalized_);
if (level >= storage_info_.num_non_empty_levels()) {
// This is an empty level
@ -834,20 +836,20 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
cfd_->internal_stats()->GetFileReadHist(0), false, arena,
false /* skip_filters */, 0 /* level */));
false /* skip_filters */, 0 /* level */, range_del_agg));
}
} else {
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
auto* state = new (mem) LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level),
level, nullptr /* range_del_agg */);
auto* state = new (mem)
LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level), level, range_del_agg);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));

@ -434,11 +434,12 @@ class Version {
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder);
MergeIteratorBuilder* merger_iter_builder,
RangeDelAggregator* range_del_agg);
void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder,
int level);
int level, RangeDelAggregator* range_del_agg);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status.

@ -1183,7 +1183,9 @@ void InternalDumpCommand::DoCommand() {
uint64_t s1=0,s2=0;
// Setup internal key iterator
Arena arena;
ScopedArenaIterator iter(idb->NewInternalIterator(&arena));
RangeDelAggregator range_del_agg(InternalKeyComparator(options_.comparator),
{} /* snapshots */);
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg));
Status st = iter->status();
if (!st.ok()) {
exec_state_ =

@ -385,7 +385,8 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
MergeIteratorBuilder builder(cf_options_.comparator, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
builder.AddIterator(db_impl->NewInternalIterator(arena, handle));
builder.AddIterator(db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), handle));
}
auto internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter);

Loading…
Cancel
Save