Introduce RangeDelAggregatorV2 (#4649)

Summary:
The old RangeDelAggregator did expensive pre-processing work
to create a collapsed, binary-searchable representation of range
tombstones. With FragmentedRangeTombstoneIterator, much of this work is
now unnecessary. RangeDelAggregatorV2 takes advantage of this by seeking
in each iterator to find a covering tombstone in ShouldDelete, while
doing minimal work in AddTombstones. The old RangeDelAggregator is still
used during flush/compaction for now, though RangeDelAggregatorV2 will
support those uses in a future PR.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4649

Differential Revision: D13146964

Pulled By: abhimadan

fbshipit-source-id: be29a4c020fc440500c137216fcc1cf529571eb3
main
Abhishek Madan 6 years ago committed by Facebook Github Bot
parent ed5aec5ba3
commit 457f77b9ff
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 6
      TARGETS
  4. 9
      db/column_family.cc
  5. 26
      db/compaction_job.cc
  6. 11
      db/db_compaction_filter_test.cc
  7. 6
      db/db_impl.cc
  8. 5
      db/db_impl.h
  9. 9
      db/db_iter.cc
  10. 4
      db/db_iter.h
  11. 9
      db/db_test_util.cc
  12. 17
      db/forward_iterator.cc
  13. 7
      db/memtable_list.cc
  14. 4
      db/memtable_list.h
  15. 55
      db/range_del_aggregator_bench.cc
  16. 195
      db/range_del_aggregator_v2.cc
  17. 134
      db/range_del_aggregator_v2.h
  18. 469
      db/range_del_aggregator_v2_test.cc
  19. 102
      db/range_tombstone_fragmenter.cc
  20. 35
      db/range_tombstone_fragmenter.h
  21. 36
      db/range_tombstone_fragmenter_test.cc
  22. 11
      db/table_cache.cc
  23. 4
      db/table_cache.h
  24. 28
      db/version_set.cc
  25. 8
      db/version_set.h
  26. 2
      src.mk
  27. 3
      utilities/debug.cc

@ -503,6 +503,7 @@ set(SOURCES
db/merge_helper.cc db/merge_helper.cc
db/merge_operator.cc db/merge_operator.cc
db/range_del_aggregator.cc db/range_del_aggregator.cc
db/range_del_aggregator_v2.cc
db/range_tombstone_fragmenter.cc db/range_tombstone_fragmenter.cc
db/repair.cc db/repair.cc
db/snapshot_impl.cc db/snapshot_impl.cc
@ -904,6 +905,7 @@ if(WITH_TESTS)
db/plain_table_db_test.cc db/plain_table_db_test.cc
db/prefix_test.cc db/prefix_test.cc
db/range_del_aggregator_test.cc db/range_del_aggregator_test.cc
db/range_del_aggregator_v2_test.cc
db/range_tombstone_fragmenter_test.cc db/range_tombstone_fragmenter_test.cc
db/repair_test.cc db/repair_test.cc
db/table_properties_collector_test.cc db/table_properties_collector_test.cc

@ -554,6 +554,7 @@ TESTS = \
trace_analyzer_test \ trace_analyzer_test \
repeatable_thread_test \ repeatable_thread_test \
range_tombstone_fragmenter_test \ range_tombstone_fragmenter_test \
range_del_aggregator_v2_test \
PARALLEL_TEST = \ PARALLEL_TEST = \
backupable_db_test \ backupable_db_test \
@ -1586,6 +1587,9 @@ repeatable_thread_test: util/repeatable_thread_test.o $(LIBOBJECTS) $(TESTHARNES
range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
range_del_aggregator_v2_test: db/range_del_aggregator_v2_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#------------------------------------------------- #-------------------------------------------------
# make install related stuff # make install related stuff
INSTALL_PATH ?= /usr/local INSTALL_PATH ?= /usr/local

@ -123,6 +123,7 @@ cpp_library(
"db/merge_helper.cc", "db/merge_helper.cc",
"db/merge_operator.cc", "db/merge_operator.cc",
"db/range_del_aggregator.cc", "db/range_del_aggregator.cc",
"db/range_del_aggregator_v2.cc",
"db/range_tombstone_fragmenter.cc", "db/range_tombstone_fragmenter.cc",
"db/repair.cc", "db/repair.cc",
"db/snapshot_impl.cc", "db/snapshot_impl.cc",
@ -932,6 +933,11 @@ ROCKS_TESTS = [
"db/range_del_aggregator_test.cc", "db/range_del_aggregator_test.cc",
"serial", "serial",
], ],
[
"range_del_aggregator_v2_test",
"db/range_del_aggregator_v2_test.cc",
"serial",
],
[ [
"range_tombstone_fragmenter_test", "range_tombstone_fragmenter_test",
"db/range_tombstone_fragmenter_test.cc", "db/range_tombstone_fragmenter_test.cc",

@ -24,6 +24,7 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/job_context.h" #include "db/job_context.h"
#include "db/range_del_aggregator_v2.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_controller.h" #include "db/write_controller.h"
@ -950,9 +951,8 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
} }
super_version->imm->AddRangeTombstoneIterators(read_opts, super_version->imm->AddRangeTombstoneIterators(read_opts,
&memtable_range_del_iters); &memtable_range_del_iters);
RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */, RangeDelAggregatorV2 range_del_agg(&internal_comparator_,
false /* collapse_deletions */); kMaxSequenceNumber /* upper_bound */);
Status status;
{ {
std::unique_ptr<InternalIterator> memtable_range_del_iter( std::unique_ptr<InternalIterator> memtable_range_del_iter(
NewMergingIterator(&internal_comparator_, NewMergingIterator(&internal_comparator_,
@ -960,8 +960,9 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
? nullptr ? nullptr
: &memtable_range_del_iters[0], : &memtable_range_del_iters[0],
static_cast<int>(memtable_range_del_iters.size()))); static_cast<int>(memtable_range_del_iters.size())));
status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter)); range_del_agg.AddUnfragmentedTombstones(std::move(memtable_range_del_iter));
} }
Status status;
for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) { for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
auto* vstorage = super_version->current->storage_info(); auto* vstorage = super_version->current->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator(); auto* ucmp = vstorage->InternalComparator()->user_comparator();

@ -36,6 +36,7 @@
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/merge_helper.h" #include "db/merge_helper.h"
#include "db/range_del_aggregator_v2.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
@ -804,10 +805,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr); assert(sub_compact != nullptr);
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
std::unique_ptr<RangeDelAggregator> range_del_agg( RangeDelAggregatorV2 range_del_agg_v2(&cfd->internal_comparator(),
new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); kMaxSequenceNumber /* upper_bound */);
auto* range_del_agg =
range_del_agg_v2.DelegateToRangeDelAggregator(existing_snapshots_);
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator( std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_)); sub_compact->compaction, &range_del_agg_v2, env_optiosn_for_read_));
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV); ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@ -896,8 +902,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, &existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
range_del_agg.get(), sub_compact->compaction, compaction_filter, range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_,
shutting_down_, preserve_deletes_seqnum_)); preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get(); auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst(); c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
@ -1034,9 +1040,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
next_key = &c_iter->key(); next_key = &c_iter->key();
} }
CompactionIterationStats range_del_out_stats; CompactionIterationStats range_del_out_stats;
status = FinishCompactionOutputFile(input_status, sub_compact, status =
range_del_agg.get(), FinishCompactionOutputFile(input_status, sub_compact, range_del_agg,
&range_del_out_stats, next_key); &range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats, RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats); &sub_compact->compaction_job_stats);
if (sub_compact->outputs.size() == 1) { if (sub_compact->outputs.size() == 1) {
@ -1096,8 +1102,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// close the output file. // close the output file.
if (sub_compact->builder != nullptr) { if (sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats; CompactionIterationStats range_del_out_stats;
Status s = FinishCompactionOutputFile( Status s = FinishCompactionOutputFile(status, sub_compact, range_del_agg,
status, sub_compact, range_del_agg.get(), &range_del_out_stats); &range_del_out_stats);
if (status.ok()) { if (status.ok()) {
status = s; status = s;
} }

@ -340,7 +340,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
Arena arena; Arena arena;
{ {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ScopedArenaIterator iter( ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
iter->SeekToFirst(); iter->SeekToFirst();
@ -429,7 +430,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
count = 0; count = 0;
{ {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ScopedArenaIterator iter( ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
iter->SeekToFirst(); iter->SeekToFirst();
@ -646,7 +648,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
int total = 0; int total = 0;
Arena arena; Arena arena;
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* snapshots */);
ScopedArenaIterator iter( ScopedArenaIterator iter(
dbfull()->NewInternalIterator(&arena, &range_del_agg)); dbfull()->NewInternalIterator(&arena, &range_del_agg));
iter->SeekToFirst(); iter->SeekToFirst();
@ -848,7 +851,7 @@ TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
DestroyAndReopen(options); DestroyAndReopen(options);
Put("0000000010", "v10"); Put("0000000010", "v10");
Put("0000000020", "v20"); // skipped Put("0000000020", "v20"); // skipped
Put("0000000050", "v50"); Put("0000000050", "v50");
Flush(); Flush();

@ -1032,7 +1032,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
} }
InternalIterator* DBImpl::NewInternalIterator( InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, Arena* arena, RangeDelAggregatorV2* range_del_agg,
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
if (column_family == nullptr) { if (column_family == nullptr) {
@ -1152,7 +1152,7 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
InternalIterator* DBImpl::NewInternalIterator( InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd, const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena, SuperVersion* super_version, Arena* arena,
RangeDelAggregator* range_del_agg) { RangeDelAggregatorV2* range_del_agg) {
InternalIterator* internal_iter; InternalIterator* internal_iter;
assert(arena != nullptr); assert(arena != nullptr);
assert(range_del_agg != nullptr); assert(range_del_agg != nullptr);
@ -1169,7 +1169,7 @@ InternalIterator* DBImpl::NewInternalIterator(
if (!read_options.ignore_range_deletions) { if (!read_options.ignore_range_deletions) {
range_del_iter.reset( range_del_iter.reset(
super_version->mem->NewRangeTombstoneIterator(read_options)); super_version->mem->NewRangeTombstoneIterator(read_options));
s = range_del_agg->AddTombstones(std::move(range_del_iter)); range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter));
} }
// Collect all needed child iterators for immutable memtables // Collect all needed child iterators for immutable memtables
if (s.ok()) { if (s.ok()) {

@ -31,6 +31,7 @@
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h" #include "db/logs_with_prep_tracker.h"
#include "db/pre_release_callback.h" #include "db/pre_release_callback.h"
#include "db/range_del_aggregator_v2.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/snapshot_checker.h" #include "db/snapshot_checker.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
@ -373,7 +374,7 @@ class DBImpl : public DB {
// The keys of this iterator are internal keys (see format.h). // The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed. // The returned iterator should be deleted when no longer needed.
InternalIterator* NewInternalIterator( InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, Arena* arena, RangeDelAggregatorV2* range_del_agg,
ColumnFamilyHandle* column_family = nullptr); ColumnFamilyHandle* column_family = nullptr);
LogsWithPrepTracker* logs_with_prep_tracker() { LogsWithPrepTracker* logs_with_prep_tracker() {
@ -581,7 +582,7 @@ class DBImpl : public DB {
ColumnFamilyData* cfd, ColumnFamilyData* cfd,
SuperVersion* super_version, SuperVersion* super_version,
Arena* arena, Arena* arena,
RangeDelAggregator* range_del_agg); RangeDelAggregatorV2* range_del_agg);
// hollow transactions shell used for recovery. // hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that // these will then be passed to TransactionDB so that

@ -134,8 +134,7 @@ class DBIter final: public Iterator {
prefix_same_as_start_(read_options.prefix_same_as_start), prefix_same_as_start_(read_options.prefix_same_as_start),
pin_thru_lifetime_(read_options.pin_data), pin_thru_lifetime_(read_options.pin_data),
total_order_seek_(read_options.total_order_seek), total_order_seek_(read_options.total_order_seek),
range_del_agg_(cf_options.internal_comparator, s, range_del_agg_(&cf_options.internal_comparator, s),
true /* collapse_deletions */),
read_callback_(read_callback), read_callback_(read_callback),
db_impl_(db_impl), db_impl_(db_impl),
cfd_(cfd), cfd_(cfd),
@ -172,7 +171,7 @@ class DBIter final: public Iterator {
iter_ = iter; iter_ = iter;
iter_->SetPinnedItersMgr(&pinned_iters_mgr_); iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
} }
virtual RangeDelAggregator* GetRangeDelAggregator() { virtual RangeDelAggregatorV2* GetRangeDelAggregator() {
return &range_del_agg_; return &range_del_agg_;
} }
@ -342,7 +341,7 @@ class DBIter final: public Iterator {
const bool total_order_seek_; const bool total_order_seek_;
// List of operands for merge operator. // List of operands for merge operator.
MergeContext merge_context_; MergeContext merge_context_;
RangeDelAggregator range_del_agg_; RangeDelAggregatorV2 range_del_agg_;
LocalStatistics local_stats_; LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_; PinnedIteratorsManager pinned_iters_mgr_;
ReadCallback* read_callback_; ReadCallback* read_callback_;
@ -1480,7 +1479,7 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() { RangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator(); return db_iter_->GetRangeDelAggregator();
} }

@ -12,7 +12,7 @@
#include <string> #include <string>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator_v2.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -48,7 +48,7 @@ class ArenaWrappedDBIter : public Iterator {
// Get the arena to be used to allocate memory for DBIter to be wrapped, // Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it. // as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; } virtual Arena* GetArena() { return &arena_; }
virtual RangeDelAggregator* GetRangeDelAggregator(); virtual RangeDelAggregatorV2* GetRangeDelAggregator();
// Set the internal iterator wrapped inside the DB Iterator. Usually it is // Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator. // a merging iterator.

@ -814,7 +814,8 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
Arena arena; Arena arena;
auto options = CurrentOptions(); auto options = CurrentOptions();
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ScopedArenaIterator iter; ScopedArenaIterator iter;
if (cf == 0) { if (cf == 0) {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg)); iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
@ -1225,7 +1226,8 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
Arena arena; Arena arena;
auto options = CurrentOptions(); auto options = CurrentOptions();
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
// This should be defined after range_del_agg so that it destructs the // This should be defined after range_del_agg so that it destructs the
// assigned iterator before it range_del_agg is already destructed. // assigned iterator before it range_del_agg is already destructed.
ScopedArenaIterator iter; ScopedArenaIterator iter;
@ -1433,7 +1435,8 @@ void DBTestBase::VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data) { std::vector<std::pair<std::string, std::string>> true_data) {
Arena arena; Arena arena;
InternalKeyComparator icmp(last_options_.comparator); InternalKeyComparator icmp(last_options_.comparator);
RangeDelAggregator range_del_agg(icmp, {}); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg); auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
iter->SeekToFirst(); iter->SeekToFirst();
for (auto p : true_data) { for (auto p : true_data) {

@ -15,6 +15,7 @@
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/job_context.h" #include "db/job_context.h"
#include "db/range_del_aggregator_v2.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
@ -71,8 +72,8 @@ class ForwardLevelIterator : public InternalIterator {
delete file_iter_; delete file_iter_;
} }
RangeDelAggregator range_del_agg( RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
cfd_->internal_comparator(), {} /* snapshots */); kMaxSequenceNumber /* upper_bound */);
file_iter_ = cfd_->table_cache()->NewIterator( file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
*files_[file_index_], *files_[file_index_],
@ -608,14 +609,14 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
// New // New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
} }
RangeDelAggregator range_del_agg( RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
cfd_->internal_comparator(), {} /* snapshots */); kMaxSequenceNumber /* upper_bound */);
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
if (!read_options_.ignore_range_deletions) { if (!read_options_.ignore_range_deletions) {
std::unique_ptr<InternalIterator> range_del_iter( std::unique_ptr<InternalIterator> range_del_iter(
sv_->mem->NewRangeTombstoneIterator(read_options_)); sv_->mem->NewRangeTombstoneIterator(read_options_));
range_del_agg.AddTombstones(std::move(range_del_iter)); range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter));
sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_, sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
&range_del_agg); &range_del_agg);
} }
@ -666,12 +667,12 @@ void ForwardIterator::RenewIterators() {
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
RangeDelAggregator range_del_agg( RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
cfd_->internal_comparator(), {} /* snapshots */); kMaxSequenceNumber /* upper_bound */);
if (!read_options_.ignore_range_deletions) { if (!read_options_.ignore_range_deletions) {
std::unique_ptr<InternalIterator> range_del_iter( std::unique_ptr<InternalIterator> range_del_iter(
svnew->mem->NewRangeTombstoneIterator(read_options_)); svnew->mem->NewRangeTombstoneIterator(read_options_));
range_del_agg.AddTombstones(std::move(range_del_iter)); range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter));
svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_, svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
&range_del_agg); &range_del_agg);
} }

@ -158,15 +158,12 @@ bool MemTableListVersion::GetFromList(
Status MemTableListVersion::AddRangeTombstoneIterators( Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, Arena* /*arena*/, const ReadOptions& read_opts, Arena* /*arena*/,
RangeDelAggregator* range_del_agg) { RangeDelAggregatorV2* range_del_agg) {
assert(range_del_agg != nullptr); assert(range_del_agg != nullptr);
for (auto& m : memlist_) { for (auto& m : memlist_) {
std::unique_ptr<InternalIterator> range_del_iter( std::unique_ptr<InternalIterator> range_del_iter(
m->NewRangeTombstoneIterator(read_opts)); m->NewRangeTombstoneIterator(read_opts));
Status s = range_del_agg->AddTombstones(std::move(range_del_iter)); range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter));
if (!s.ok()) {
return s;
}
} }
return Status::OK(); return Status::OK();
} }

@ -15,7 +15,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/logs_with_prep_tracker.h" #include "db/logs_with_prep_tracker.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator_v2.h"
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -91,7 +91,7 @@ class MemTableListVersion {
} }
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg); RangeDelAggregatorV2* range_del_agg);
Status AddRangeTombstoneIterators( Status AddRangeTombstoneIterators(
const ReadOptions& read_opts, const ReadOptions& read_opts,
std::vector<InternalIterator*>* range_del_iters); std::vector<InternalIterator*>* range_del_iters);

@ -20,6 +20,8 @@ int main() {
#include <vector> #include <vector>
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "db/range_del_aggregator_v2.h"
#include "db/range_tombstone_fragmenter.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/coding.h" #include "util/coding.h"
@ -33,7 +35,7 @@ using GFLAGS_NAMESPACE::ParseCommandLineFlags;
DEFINE_int32(num_range_tombstones, 1000, "number of range tombstones created"); DEFINE_int32(num_range_tombstones, 1000, "number of range tombstones created");
DEFINE_int32(num_runs, 10000, "number of test runs"); DEFINE_int32(num_runs, 1000, "number of test runs");
DEFINE_int32(tombstone_start_upper_bound, 1000, DEFINE_int32(tombstone_start_upper_bound, 1000,
"exclusive upper bound on range tombstone start keys"); "exclusive upper bound on range tombstone start keys");
@ -55,6 +57,8 @@ DEFINE_int32(should_deletes_per_run, 1, "number of ShouldDelete calls per run");
DEFINE_int32(add_tombstones_per_run, 1, DEFINE_int32(add_tombstones_per_run, 1,
"number of AddTombstones calls per run"); "number of AddTombstones calls per run");
DEFINE_bool(use_v2_aggregator, false, "benchmark RangeDelAggregatorV2");
namespace { namespace {
struct Stats { struct Stats {
@ -85,6 +89,8 @@ std::ostream& operator<<(std::ostream& os, const Stats& s) {
return os; return os;
} }
auto icmp = rocksdb::InternalKeyComparator(rocksdb::BytewiseComparator());
} // anonymous namespace } // anonymous namespace
namespace rocksdb { namespace rocksdb {
@ -186,9 +192,13 @@ int main(int argc, char** argv) {
: rocksdb::RangeDelPositioningMode::kFullScan; : rocksdb::RangeDelPositioningMode::kFullScan;
for (int i = 0; i < FLAGS_num_runs; i++) { for (int i = 0; i < FLAGS_num_runs; i++) {
auto icmp = rocksdb::InternalKeyComparator(rocksdb::BytewiseComparator());
rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */,
FLAGS_use_collapsed); FLAGS_use_collapsed);
rocksdb::RangeDelAggregatorV2 range_del_agg_v2(
&icmp, rocksdb::kMaxSequenceNumber /* upper_bound */);
std::vector<std::unique_ptr<rocksdb::FragmentedRangeTombstoneList> >
fragmented_range_tombstone_lists(FLAGS_add_tombstones_per_run);
for (auto& persistent_range_tombstones : all_persistent_range_tombstones) { for (auto& persistent_range_tombstones : all_persistent_range_tombstones) {
// TODO(abhimadan): consider whether creating the range tombstones right // TODO(abhimadan): consider whether creating the range tombstones right
@ -203,10 +213,27 @@ int main(int argc, char** argv) {
auto range_del_iter = auto range_del_iter =
rocksdb::MakeRangeDelIterator(persistent_range_tombstones); rocksdb::MakeRangeDelIterator(persistent_range_tombstones);
rocksdb::StopWatchNano stop_watch_add_tombstones(rocksdb::Env::Default(), fragmented_range_tombstone_lists.emplace_back(
true /* auto_start */); new rocksdb::FragmentedRangeTombstoneList(
range_del_agg.AddTombstones(std::move(range_del_iter)); rocksdb::MakeRangeDelIterator(persistent_range_tombstones), icmp,
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos(); false /* one_time_use */));
std::unique_ptr<rocksdb::FragmentedRangeTombstoneIterator>
fragmented_range_del_iter(
new rocksdb::FragmentedRangeTombstoneIterator(
fragmented_range_tombstone_lists.back().get(),
rocksdb::kMaxSequenceNumber, icmp));
if (FLAGS_use_v2_aggregator) {
rocksdb::StopWatchNano stop_watch_add_tombstones(
rocksdb::Env::Default(), true /* auto_start */);
range_del_agg_v2.AddTombstones(std::move(fragmented_range_del_iter));
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos();
} else {
rocksdb::StopWatchNano stop_watch_add_tombstones(
rocksdb::Env::Default(), true /* auto_start */);
range_del_agg.AddTombstones(std::move(range_del_iter));
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos();
}
} }
rocksdb::ParsedInternalKey parsed_key; rocksdb::ParsedInternalKey parsed_key;
@ -220,10 +247,18 @@ int main(int argc, char** argv) {
std::string key_string = rocksdb::Key(first_key + j); std::string key_string = rocksdb::Key(first_key + j);
parsed_key.user_key = key_string; parsed_key.user_key = key_string;
rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(), uint64_t call_time;
true /* auto_start */); if (FLAGS_use_v2_aggregator) {
range_del_agg.ShouldDelete(parsed_key, mode); rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(),
uint64_t call_time = stop_watch_should_delete.ElapsedNanos(); true /* auto_start */);
range_del_agg_v2.ShouldDelete(parsed_key, mode);
call_time = stop_watch_should_delete.ElapsedNanos();
} else {
rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(),
true /* auto_start */);
range_del_agg.ShouldDelete(parsed_key, mode);
call_time = stop_watch_should_delete.ElapsedNanos();
}
if (j == 0) { if (j == 0) {
stats.time_first_should_delete += call_time; stats.time_first_should_delete += call_time;

@ -0,0 +1,195 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/range_del_aggregator_v2.h"
#include "db/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/version_edit.h"
#include "include/rocksdb/comparator.h"
#include "include/rocksdb/types.h"
#include "table/internal_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "table/table_builder.h"
#include "util/heap.h"
#include "util/kv_map.h"
#include "util/vector_iterator.h"
namespace rocksdb {
TruncatedRangeDelIterator::TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
const InternalKeyComparator* icmp, const InternalKey* smallest,
const InternalKey* largest)
: iter_(std::move(iter)), icmp_(icmp) {
if (smallest != nullptr) {
pinned_bounds_.emplace_back();
auto& parsed_smallest = pinned_bounds_.back();
if (!ParseInternalKey(smallest->Encode(), &parsed_smallest)) {
assert(false);
}
smallest_ = &parsed_smallest;
}
if (largest != nullptr) {
pinned_bounds_.emplace_back();
auto& parsed_largest = pinned_bounds_.back();
if (!ParseInternalKey(largest->Encode(), &parsed_largest)) {
assert(false);
}
if (parsed_largest.type == kTypeRangeDeletion &&
parsed_largest.sequence == kMaxSequenceNumber) {
// The file boundary has been artificially extended by a range tombstone.
// We do not need to adjust largest to properly truncate range
// tombstones that extend past the boundary.
} else if (parsed_largest.sequence == 0) {
// The largest key in the sstable has a sequence number of 0. Since we
// guarantee that no internal keys with the same user key and sequence
// number can exist in a DB, we know that the largest key in this sstable
// cannot exist as the smallest key in the next sstable. This further
// implies that no range tombstone in this sstable covers largest;
// otherwise, the file boundary would have been artificially extended.
//
// Therefore, we will never truncate a range tombstone at largest, so we
// can leave it unchanged.
} else {
// The same user key may straddle two sstable boundaries. To ensure that
// the truncated end key can cover the largest key in this sstable, reduce
// its sequence number by 1.
parsed_largest.sequence -= 1;
}
largest_ = &parsed_largest;
}
}
bool TruncatedRangeDelIterator::Valid() const {
return iter_->Valid() &&
(smallest_ == nullptr ||
icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) &&
(largest_ == nullptr ||
icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0);
}
void TruncatedRangeDelIterator::Next() { iter_->TopNext(); }
void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); }
// NOTE: target is a user key
void TruncatedRangeDelIterator::Seek(const Slice& target) {
if (largest_ != nullptr &&
icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber,
kTypeRangeDeletion)) <= 0) {
iter_->Invalidate();
return;
}
iter_->Seek(target);
}
// NOTE: target is a user key
void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
if (smallest_ != nullptr &&
icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion),
*smallest_) < 0) {
iter_->Invalidate();
return;
}
iter_->SeekForPrev(target);
}
void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); }
void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); }
RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp,
SequenceNumber upper_bound)
: icmp_(icmp), upper_bound_(upper_bound) {}
void RangeDelAggregatorV2::AddTombstones(
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
const InternalKey* smallest, const InternalKey* largest) {
if (input_iter == nullptr || input_iter->empty()) {
return;
}
if (wrapped_range_del_agg != nullptr) {
wrapped_range_del_agg->AddTombstones(std::move(input_iter), smallest,
largest);
// TODO: this eats the status of the wrapped call; may want to propagate it
return;
}
iters_.emplace_back(new TruncatedRangeDelIterator(std::move(input_iter),
icmp_, smallest, largest));
}
void RangeDelAggregatorV2::AddUnfragmentedTombstones(
std::unique_ptr<InternalIterator> input_iter) {
assert(wrapped_range_del_agg == nullptr);
if (input_iter == nullptr) {
return;
}
pinned_fragments_.emplace_back(new FragmentedRangeTombstoneList(
std::move(input_iter), *icmp_, false /* one_time_use */));
auto fragmented_iter = new FragmentedRangeTombstoneIterator(
pinned_fragments_.back().get(), upper_bound_, *icmp_);
AddTombstones(
std::unique_ptr<FragmentedRangeTombstoneIterator>(fragmented_iter));
}
bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
RangeDelPositioningMode mode) {
if (wrapped_range_del_agg != nullptr) {
return wrapped_range_del_agg->ShouldDelete(parsed, mode);
}
// TODO: avoid re-seeking every call
for (auto& iter : iters_) {
iter->Seek(parsed.user_key);
if (iter->Valid() && icmp_->Compare(iter->start_key(), parsed) <= 0 &&
iter->seq() > parsed.sequence) {
return true;
}
}
return false;
}
bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
const Slice& end) {
assert(wrapped_range_del_agg == nullptr);
// Set the internal start/end keys so that:
// - if start_ikey has the same user key and sequence number as the current
// end key, start_ikey will be considered greater; and
// - if end_ikey has the same user key and sequence number as the current
// start key, end_ikey will be considered greater.
ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
static_cast<ValueType>(0));
ParsedInternalKey end_ikey(end, 0, static_cast<ValueType>(0));
for (auto& iter : iters_) {
bool checked_candidate_tombstones = false;
for (iter->SeekForPrev(start);
iter->Valid() && icmp_->Compare(iter->start_key(), end_ikey) <= 0;
iter->Next()) {
checked_candidate_tombstones = true;
if (icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
return true;
}
}
if (!checked_candidate_tombstones) {
// Do an additional check for when the end of the range is the begin key
// of a tombstone, which we missed earlier since SeekForPrev'ing to the
// start was invalid.
iter->SeekForPrev(end);
if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
return true;
}
}
}
return false;
}
} // namespace rocksdb

@ -0,0 +1,134 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <list>
#include <map>
#include <set>
#include <string>
#include <vector>
#include "db/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/version_edit.h"
#include "include/rocksdb/comparator.h"
#include "include/rocksdb/types.h"
#include "table/internal_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "table/table_builder.h"
#include "util/heap.h"
#include "util/kv_map.h"
namespace rocksdb {
class RangeDelAggregatorV2;
class TruncatedRangeDelIterator {
public:
TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
const InternalKeyComparator* icmp, const InternalKey* smallest,
const InternalKey* largest);
bool Valid() const;
void Next();
void Prev();
// Seeks to the tombstone with the highest viisble sequence number that covers
// target (a user key). If no such tombstone exists, the position will be at
// the earliest tombstone that ends after target.
void Seek(const Slice& target);
// Seeks to the tombstone with the highest viisble sequence number that covers
// target (a user key). If no such tombstone exists, the position will be at
// the latest tombstone that starts before target.
void SeekForPrev(const Slice& target);
void SeekToFirst();
void SeekToLast();
ParsedInternalKey start_key() const {
return (smallest_ == nullptr ||
icmp_->Compare(*smallest_, iter_->parsed_start_key()) <= 0)
? iter_->parsed_start_key()
: *smallest_;
}
ParsedInternalKey end_key() const {
return (largest_ == nullptr ||
icmp_->Compare(iter_->parsed_end_key(), *largest_) <= 0)
? iter_->parsed_end_key()
: *largest_;
}
SequenceNumber seq() const { return iter_->seq(); }
private:
std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
const InternalKeyComparator* icmp_;
const ParsedInternalKey* smallest_ = nullptr;
const ParsedInternalKey* largest_ = nullptr;
std::list<ParsedInternalKey> pinned_bounds_;
};
class RangeDelAggregatorV2 {
public:
RangeDelAggregatorV2(const InternalKeyComparator* icmp,
SequenceNumber upper_bound);
void AddTombstones(
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
const InternalKey* smallest = nullptr,
const InternalKey* largest = nullptr);
void AddUnfragmentedTombstones(std::unique_ptr<InternalIterator> input_iter);
bool ShouldDelete(const ParsedInternalKey& parsed,
RangeDelPositioningMode mode);
bool IsRangeOverlapped(const Slice& start, const Slice& end);
// TODO: no-op for now, but won't be once ShouldDelete leverages positioning
// mode and doesn't re-seek every ShouldDelete
void InvalidateRangeDelMapPositions() {}
bool IsEmpty() const { return iters_.empty(); }
bool AddFile(uint64_t file_number) {
return files_seen_.insert(file_number).second;
}
// Adaptor method to pass calls through to an old-style RangeDelAggregator.
// Will be removed once this new version supports an iterator that can be used
// during flush/compaction.
RangeDelAggregator* DelegateToRangeDelAggregator(
const std::vector<SequenceNumber>& snapshots) {
wrapped_range_del_agg.reset(new RangeDelAggregator(
*icmp_, snapshots, true /* collapse_deletions */));
return wrapped_range_del_agg.get();
}
std::unique_ptr<RangeDelIterator> NewIterator() {
assert(wrapped_range_del_agg != nullptr);
return wrapped_range_del_agg->NewIterator();
}
private:
const InternalKeyComparator* icmp_;
SequenceNumber upper_bound_;
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
std::list<std::unique_ptr<FragmentedRangeTombstoneList>> pinned_fragments_;
std::set<uint64_t> files_seen_;
// TODO: remove once V2 supports exposing tombstone iterators
std::unique_ptr<RangeDelAggregator> wrapped_range_del_agg;
};
} // namespace rocksdb

@ -0,0 +1,469 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/range_del_aggregator_v2.h"
#include <memory>
#include <string>
#include <vector>
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "db/range_tombstone_fragmenter.h"
#include "util/testutil.h"
namespace rocksdb {
class RangeDelAggregatorV2Test : public testing::Test {};
namespace {
static auto bytewise_icmp = InternalKeyComparator(BytewiseComparator());
std::unique_ptr<InternalIterator> MakeRangeDelIter(
const std::vector<RangeTombstone>& range_dels) {
std::vector<std::string> keys, values;
for (const auto& range_del : range_dels) {
auto key_and_value = range_del.Serialize();
keys.push_back(key_and_value.first.Encode().ToString());
values.push_back(key_and_value.second.ToString());
}
return std::unique_ptr<test::VectorIterator>(
new test::VectorIterator(keys, values));
}
std::vector<std::unique_ptr<FragmentedRangeTombstoneList>>
MakeFragmentedTombstoneLists(
const std::vector<std::vector<RangeTombstone>>& range_dels_list) {
std::vector<std::unique_ptr<FragmentedRangeTombstoneList>> fragment_lists;
for (const auto& range_dels : range_dels_list) {
auto range_del_iter = MakeRangeDelIter(range_dels);
fragment_lists.emplace_back(new FragmentedRangeTombstoneList(
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */));
}
return fragment_lists;
}
struct TruncatedIterScanTestCase {
ParsedInternalKey start;
ParsedInternalKey end;
SequenceNumber seq;
};
struct TruncatedIterSeekTestCase {
Slice target;
ParsedInternalKey start;
ParsedInternalKey end;
SequenceNumber seq;
bool invalid;
};
struct ShouldDeleteTestCase {
ParsedInternalKey lookup_key;
bool result;
};
struct IsRangeOverlappedTestCase {
Slice start;
Slice end;
bool result;
};
ParsedInternalKey UncutEndpoint(const Slice& s) {
return ParsedInternalKey(s, kMaxSequenceNumber, kTypeRangeDeletion);
}
ParsedInternalKey InternalValue(const Slice& key, SequenceNumber seq) {
return ParsedInternalKey(key, seq, kTypeValue);
}
void VerifyIterator(
TruncatedRangeDelIterator* iter, const InternalKeyComparator& icmp,
const std::vector<TruncatedIterScanTestCase>& expected_range_dels) {
// Test forward iteration.
iter->SeekToFirst();
for (size_t i = 0; i < expected_range_dels.size(); i++, iter->Next()) {
ASSERT_TRUE(iter->Valid());
EXPECT_EQ(0, icmp.Compare(iter->start_key(), expected_range_dels[i].start));
EXPECT_EQ(0, icmp.Compare(iter->end_key(), expected_range_dels[i].end));
EXPECT_EQ(expected_range_dels[i].seq, iter->seq());
}
EXPECT_FALSE(iter->Valid());
// Test reverse iteration.
iter->SeekToLast();
std::vector<TruncatedIterScanTestCase> reverse_expected_range_dels(
expected_range_dels.rbegin(), expected_range_dels.rend());
for (size_t i = 0; i < reverse_expected_range_dels.size();
i++, iter->Prev()) {
ASSERT_TRUE(iter->Valid());
EXPECT_EQ(0, icmp.Compare(iter->start_key(),
reverse_expected_range_dels[i].start));
EXPECT_EQ(
0, icmp.Compare(iter->end_key(), reverse_expected_range_dels[i].end));
EXPECT_EQ(reverse_expected_range_dels[i].seq, iter->seq());
}
EXPECT_FALSE(iter->Valid());
}
void VerifySeek(TruncatedRangeDelIterator* iter,
const InternalKeyComparator& icmp,
const std::vector<TruncatedIterSeekTestCase>& test_cases) {
for (const auto& test_case : test_cases) {
iter->Seek(test_case.target);
if (test_case.invalid) {
ASSERT_FALSE(iter->Valid());
} else {
ASSERT_TRUE(iter->Valid());
EXPECT_EQ(0, icmp.Compare(iter->start_key(), test_case.start));
EXPECT_EQ(0, icmp.Compare(iter->end_key(), test_case.end));
EXPECT_EQ(test_case.seq, iter->seq());
}
}
}
void VerifySeekForPrev(
TruncatedRangeDelIterator* iter, const InternalKeyComparator& icmp,
const std::vector<TruncatedIterSeekTestCase>& test_cases) {
for (const auto& test_case : test_cases) {
iter->SeekForPrev(test_case.target);
if (test_case.invalid) {
ASSERT_FALSE(iter->Valid());
} else {
ASSERT_TRUE(iter->Valid());
EXPECT_EQ(0, icmp.Compare(iter->start_key(), test_case.start));
EXPECT_EQ(0, icmp.Compare(iter->end_key(), test_case.end));
EXPECT_EQ(test_case.seq, iter->seq());
}
}
}
void VerifyShouldDelete(RangeDelAggregatorV2* range_del_agg,
const std::vector<ShouldDeleteTestCase>& test_cases) {
for (const auto& test_case : test_cases) {
EXPECT_EQ(
test_case.result,
range_del_agg->ShouldDelete(
test_case.lookup_key, RangeDelPositioningMode::kForwardTraversal));
}
for (auto it = test_cases.rbegin(); it != test_cases.rend(); ++it) {
const auto& test_case = *it;
EXPECT_EQ(
test_case.result,
range_del_agg->ShouldDelete(
test_case.lookup_key, RangeDelPositioningMode::kBackwardTraversal));
}
}
void VerifyIsRangeOverlapped(
RangeDelAggregatorV2* range_del_agg,
const std::vector<IsRangeOverlappedTestCase>& test_cases) {
for (const auto& test_case : test_cases) {
EXPECT_EQ(test_case.result,
range_del_agg->IsRangeOverlapped(test_case.start, test_case.end));
}
}
} // namespace
TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
auto range_del_iter = MakeRangeDelIter({});
FragmentedRangeTombstoneList fragment_list(
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
iter.SeekToFirst();
ASSERT_FALSE(iter.Valid());
iter.SeekToLast();
ASSERT_FALSE(iter.Valid());
}
TEST_F(RangeDelAggregatorV2Test, UntruncatedIter) {
auto range_del_iter =
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
FragmentedRangeTombstoneList fragment_list(
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
VerifyIterator(&iter, bytewise_icmp,
{{UncutEndpoint("a"), UncutEndpoint("e"), 10},
{UncutEndpoint("e"), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), UncutEndpoint("n"), 4}});
VerifySeek(
&iter, bytewise_icmp,
{{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
{"", UncutEndpoint("a"), UncutEndpoint("e"), 10}});
VerifySeekForPrev(
&iter, bytewise_icmp,
{{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), UncutEndpoint("n"), 4},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
}
TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) {
auto range_del_iter =
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
FragmentedRangeTombstoneList fragment_list(
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, 9 /* snapshot */,
bytewise_icmp));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
VerifyIterator(&iter, bytewise_icmp,
{{UncutEndpoint("e"), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), UncutEndpoint("n"), 4}});
VerifySeek(
&iter, bytewise_icmp,
{{"d", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
{"", UncutEndpoint("e"), UncutEndpoint("g"), 8}});
VerifySeekForPrev(
&iter, bytewise_icmp,
{{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), UncutEndpoint("n"), 4},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
}
TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
auto range_del_iter =
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
FragmentedRangeTombstoneList fragment_list(
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
InternalKey smallest("d", 7, kTypeValue);
InternalKey largest("m", 9, kTypeValue);
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp,
&smallest, &largest);
VerifyIterator(&iter, bytewise_icmp,
{{InternalValue("d", 7), UncutEndpoint("e"), 10},
{UncutEndpoint("e"), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), InternalValue("m", 8), 4}});
VerifySeek(
&iter, bytewise_icmp,
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), InternalValue("m", 8), 4},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
{"", InternalValue("d", 7), UncutEndpoint("e"), 10}});
VerifySeekForPrev(
&iter, bytewise_icmp,
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), InternalValue("m", 8), 4},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
}
TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}});
FragmentedRangeTombstoneList fragment_list(
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
range_del_agg.AddTombstones(std::move(input_iter));
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false},
{InternalValue("b", 9), true},
{InternalValue("d", 9), true},
{InternalValue("e", 7), true},
{InternalValue("g", 7), false}});
VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false},
{"_", "a", true},
{"a", "c", true},
{"d", "f", true},
{"g", "l", false}});
}
TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregator) {
auto fragment_lists = MakeFragmentedTombstoneLists(
{{{"a", "e", 10}, {"c", "g", 8}},
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
for (const auto& fragment_list : fragment_lists) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(
fragment_list.get(), kMaxSequenceNumber, bytewise_icmp));
range_del_agg.AddTombstones(std::move(input_iter));
}
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), true},
{InternalValue("b", 19), false},
{InternalValue("b", 9), true},
{InternalValue("d", 9), true},
{InternalValue("e", 7), true},
{InternalValue("g", 7), false},
{InternalValue("h", 24), true},
{InternalValue("i", 24), false},
{InternalValue("ii", 14), true},
{InternalValue("j", 14), false}});
VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false},
{"_", "a", true},
{"a", "c", true},
{"d", "f", true},
{"g", "l", true},
{"x", "y", false}});
}
TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregatorWithUpperBound) {
auto fragment_lists = MakeFragmentedTombstoneLists(
{{{"a", "e", 10}, {"c", "g", 8}},
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
for (const auto& fragment_list : fragment_lists) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_list.get(),
19 /* snapshot */, bytewise_icmp));
range_del_agg.AddTombstones(std::move(input_iter));
}
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false},
{InternalValue("a", 9), true},
{InternalValue("b", 9), true},
{InternalValue("d", 9), true},
{InternalValue("e", 7), true},
{InternalValue("g", 7), false},
{InternalValue("h", 24), false},
{InternalValue("i", 24), false},
{InternalValue("ii", 14), true},
{InternalValue("j", 14), false}});
VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false},
{"_", "a", true},
{"a", "c", true},
{"d", "f", true},
{"g", "l", true},
{"x", "y", false}});
}
TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregator) {
auto fragment_lists = MakeFragmentedTombstoneLists(
{{{"a", "z", 10}}, {{"a", "z", 10}}, {{"a", "z", 10}}});
std::vector<std::pair<InternalKey, InternalKey>> iter_bounds = {
{InternalKey("a", 4, kTypeValue),
InternalKey("m", kMaxSequenceNumber, kTypeRangeDeletion)},
{InternalKey("m", 20, kTypeValue),
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
for (size_t i = 0; i < fragment_lists.size(); i++) {
const auto& fragment_list = fragment_lists[i];
const auto& bounds = iter_bounds[i];
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_list.get(),
19 /* snapshot */, bytewise_icmp));
range_del_agg.AddTombstones(std::move(input_iter), &bounds.first,
&bounds.second);
}
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 10), false},
{InternalValue("a", 9), false},
{InternalValue("a", 4), true},
{InternalValue("m", 10), false},
{InternalValue("m", 9), true},
{InternalValue("x", 10), false},
{InternalValue("x", 9), false},
{InternalValue("x", 5), true},
{InternalValue("z", 9), false}});
VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false},
{"_", "a", true},
{"a", "n", true},
{"l", "x", true},
{"w", "z", true},
{"zzz", "zz", false},
{"zz", "zzz", false}});
}
TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) {
auto fragment_lists = MakeFragmentedTombstoneLists(
{{{"a", "z", 10}}, {{"a", "z", 10}}, {{"a", "z", 10}}});
std::vector<std::pair<InternalKey, InternalKey>> iter_bounds = {
{InternalKey("a", 4, kTypeValue),
InternalKey("m", kMaxSequenceNumber, kTypeRangeDeletion)},
{InternalKey("m", 20, kTypeValue),
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
auto add_iter_to_agg = [&](size_t i) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_lists[i].get(),
19 /* snapshot */, bytewise_icmp));
range_del_agg.AddTombstones(std::move(input_iter), &iter_bounds[i].first,
&iter_bounds[i].second);
};
add_iter_to_agg(0);
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 10), false},
{InternalValue("a", 9), false},
{InternalValue("a", 4), true}});
add_iter_to_agg(1);
VerifyShouldDelete(&range_del_agg, {{InternalValue("m", 10), false},
{InternalValue("m", 9), true}});
add_iter_to_agg(2);
VerifyShouldDelete(&range_del_agg, {{InternalValue("x", 10), false},
{InternalValue("x", 9), false},
{InternalValue("x", 5), true},
{InternalValue("z", 9), false}});
VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false},
{"_", "a", true},
{"a", "n", true},
{"l", "x", true},
{"w", "z", true},
{"zzz", "zz", false},
{"zz", "zzz", false}});
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -231,10 +231,33 @@ void FragmentedRangeTombstoneIterator::SeekToFirst() {
seq_pos_ = tombstones_->seq_begin(); seq_pos_ = tombstones_->seq_begin();
} }
void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
if (tombstones_->empty()) {
Invalidate();
return;
}
pos_ = tombstones_->begin();
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
ScanForwardToVisibleTombstone();
}
void FragmentedRangeTombstoneIterator::SeekToLast() { void FragmentedRangeTombstoneIterator::SeekToLast() {
pos_ = tombstones_->end(); pos_ = std::prev(tombstones_->end());
seq_pos_ = tombstones_->seq_end(); seq_pos_ = std::prev(tombstones_->seq_end());
Prev(); }
void FragmentedRangeTombstoneIterator::SeekToTopLast() {
if (tombstones_->empty()) {
Invalidate();
return;
}
pos_ = std::prev(tombstones_->end());
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
ScanBackwardToVisibleTombstone();
} }
void FragmentedRangeTombstoneIterator::Seek(const Slice& target) { void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
@ -243,16 +266,7 @@ void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
return; return;
} }
SeekToCoveringTombstone(target); SeekToCoveringTombstone(target);
while (pos_ != tombstones_->end() && ScanForwardToVisibleTombstone();
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
++pos_;
if (pos_ == tombstones_->end()) {
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
}
} }
void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) { void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
@ -261,17 +275,7 @@ void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
return; return;
} }
SeekForPrevToCoveringTombstone(target); SeekForPrevToCoveringTombstone(target);
while (pos_ != tombstones_->end() && ScanBackwardToVisibleTombstone();
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
if (pos_ == tombstones_->begin()) {
Invalidate();
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
}
} }
void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone( void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
@ -307,6 +311,33 @@ void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
snapshot_, std::greater<SequenceNumber>()); snapshot_, std::greater<SequenceNumber>());
} }
void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
while (pos_ != tombstones_->end() &&
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
++pos_;
if (pos_ == tombstones_->end()) {
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
}
}
void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
while (pos_ != tombstones_->end() &&
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
if (pos_ == tombstones_->begin()) {
Invalidate();
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
}
}
void FragmentedRangeTombstoneIterator::Next() { void FragmentedRangeTombstoneIterator::Next() {
++seq_pos_; ++seq_pos_;
if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
@ -314,6 +345,17 @@ void FragmentedRangeTombstoneIterator::Next() {
} }
} }
void FragmentedRangeTombstoneIterator::TopNext() {
++pos_;
if (pos_ == tombstones_->end()) {
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
ScanForwardToVisibleTombstone();
}
void FragmentedRangeTombstoneIterator::Prev() { void FragmentedRangeTombstoneIterator::Prev() {
if (seq_pos_ == tombstones_->seq_begin()) { if (seq_pos_ == tombstones_->seq_begin()) {
pos_ = tombstones_->end(); pos_ = tombstones_->end();
@ -327,6 +369,18 @@ void FragmentedRangeTombstoneIterator::Prev() {
} }
} }
void FragmentedRangeTombstoneIterator::TopPrev() {
if (pos_ == tombstones_->begin()) {
Invalidate();
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
ScanBackwardToVisibleTombstone();
}
bool FragmentedRangeTombstoneIterator::Valid() const { bool FragmentedRangeTombstoneIterator::Valid() const {
return tombstones_ != nullptr && pos_ != tombstones_->end(); return tombstones_ != nullptr && pos_ != tombstones_->end();
} }

@ -95,9 +95,13 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
FragmentedRangeTombstoneIterator( FragmentedRangeTombstoneIterator(
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones, const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
SequenceNumber snapshot, const InternalKeyComparator& icmp); SequenceNumber snapshot, const InternalKeyComparator& icmp);
void SeekToFirst() override; void SeekToFirst() override;
void SeekToLast() override; void SeekToLast() override;
void SeekToTopFirst();
void SeekToTopLast();
// NOTE: Seek and SeekForPrev do not behave in the way InternalIterator // NOTE: Seek and SeekForPrev do not behave in the way InternalIterator
// seeking should behave. This is OK because they are not currently used, but // seeking should behave. This is OK because they are not currently used, but
// eventually FragmentedRangeTombstoneIterator should no longer implement // eventually FragmentedRangeTombstoneIterator should no longer implement
@ -114,6 +118,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
void Next() override; void Next() override;
void Prev() override; void Prev() override;
void TopNext();
void TopPrev();
bool Valid() const override; bool Valid() const override;
Slice key() const override { Slice key() const override {
MaybePinKey(); MaybePinKey();
@ -124,9 +132,30 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
bool IsValuePinned() const override { return true; } bool IsValuePinned() const override { return true; }
Status status() const override { return Status::OK(); } Status status() const override { return Status::OK(); }
bool empty() const { return tombstones_->empty(); }
void Invalidate() {
pos_ = tombstones_->end();
seq_pos_ = tombstones_->seq_end();
}
// TODO: implement properly
RangeTombstone tombstone() const {
return RangeTombstone(start_key(), end_key(), seq());
}
Slice start_key() const { return pos_->start_key; } Slice start_key() const { return pos_->start_key; }
Slice end_key() const { return pos_->end_key; } Slice end_key() const { return pos_->end_key; }
SequenceNumber seq() const { return *seq_pos_; } SequenceNumber seq() const { return *seq_pos_; }
ParsedInternalKey parsed_start_key() const {
return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber,
kTypeRangeDeletion);
}
ParsedInternalKey parsed_end_key() const {
return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber,
kTypeRangeDeletion);
}
ParsedInternalKey internal_key() const {
return ParsedInternalKey(pos_->start_key, *seq_pos_, kTypeRangeDeletion);
}
SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key); SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key);
@ -182,10 +211,8 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
void SeekToCoveringTombstone(const Slice& key); void SeekToCoveringTombstone(const Slice& key);
void SeekForPrevToCoveringTombstone(const Slice& key); void SeekForPrevToCoveringTombstone(const Slice& key);
void Invalidate() { void ScanForwardToVisibleTombstone();
pos_ = tombstones_->end(); void ScanBackwardToVisibleTombstone();
seq_pos_ = tombstones_->seq_end();
}
bool ValidPos() const { bool ValidPos() const {
return Valid() && seq_pos_ != tombstones_->seq_iter(pos_->seq_end_idx); return Valid() && seq_pos_ != tombstones_->seq_iter(pos_->seq_end_idx);
} }

@ -42,6 +42,19 @@ void VerifyFragmentedRangeDels(
EXPECT_FALSE(iter->Valid()); EXPECT_FALSE(iter->Valid());
} }
void VerifyVisibleTombstones(
FragmentedRangeTombstoneIterator* iter,
const std::vector<RangeTombstone>& expected_tombstones) {
iter->SeekToTopFirst();
for (size_t i = 0; i < expected_tombstones.size() && iter->Valid();
i++, iter->TopNext()) {
EXPECT_EQ(iter->start_key(), expected_tombstones[i].start_key_);
EXPECT_EQ(iter->value(), expected_tombstones[i].end_key_);
EXPECT_EQ(iter->seq(), expected_tombstones[i].seq_);
}
EXPECT_FALSE(iter->Valid());
}
struct SeekTestCase { struct SeekTestCase {
Slice seek_target; Slice seek_target;
RangeTombstone expected_position; RangeTombstone expected_position;
@ -262,14 +275,37 @@ TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyMultiUse) {
{"j", "l", 2}, {"j", "l", 2},
{"l", "n", 4}}); {"l", "n", 4}});
} }
VerifyVisibleTombstones(&iter1, {{"a", "c", 10},
{"c", "e", 10},
{"e", "g", 8},
{"g", "i", 6},
{"j", "l", 4},
{"l", "n", 4}});
VerifyMaxCoveringTombstoneSeqnum( VerifyMaxCoveringTombstoneSeqnum(
&iter1, {{"a", 10}, {"c", 10}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); &iter1, {{"a", 10}, {"c", 10}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
VerifyVisibleTombstones(&iter2, {{"c", "e", 8},
{"e", "g", 8},
{"g", "i", 6},
{"j", "l", 4},
{"l", "n", 4}});
VerifyMaxCoveringTombstoneSeqnum( VerifyMaxCoveringTombstoneSeqnum(
&iter2, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); &iter2, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
VerifyVisibleTombstones(&iter3, {{"c", "e", 6},
{"e", "g", 6},
{"g", "i", 6},
{"j", "l", 4},
{"l", "n", 4}});
VerifyMaxCoveringTombstoneSeqnum( VerifyMaxCoveringTombstoneSeqnum(
&iter3, {{"a", 0}, {"c", 6}, {"e", 6}, {"i", 0}, {"j", 4}, {"m", 4}}); &iter3, {{"a", 0}, {"c", 6}, {"e", 6}, {"i", 0}, {"j", 4}, {"m", 4}});
VerifyVisibleTombstones(&iter4, {{"j", "l", 4}, {"l", "n", 4}});
VerifyMaxCoveringTombstoneSeqnum( VerifyMaxCoveringTombstoneSeqnum(
&iter4, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 4}, {"m", 4}}); &iter4, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 4}, {"m", 4}});
VerifyVisibleTombstones(&iter5, {{"j", "l", 2}});
VerifyMaxCoveringTombstoneSeqnum( VerifyMaxCoveringTombstoneSeqnum(
&iter5, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 2}, {"m", 0}}); &iter5, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 2}, {"m", 0}});
} }

@ -183,7 +183,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,
InternalIterator* TableCache::NewIterator( InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options, const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileMetaData& file_meta, const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, RangeDelAggregatorV2* range_del_agg, const SliceTransform* prefix_extractor,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist, TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
bool for_compaction, Arena* arena, bool skip_filters, int level, bool for_compaction, Arena* arena, bool skip_filters, int level,
const InternalKey* smallest_compaction_key, const InternalKey* smallest_compaction_key,
@ -264,8 +264,9 @@ InternalIterator* TableCache::NewIterator(
} }
if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) { if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
if (range_del_agg->AddFile(fd.GetNumber())) { if (range_del_agg->AddFile(fd.GetNumber())) {
std::unique_ptr<InternalIterator> range_del_iter( std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
table_reader->NewRangeTombstoneIterator(options)); static_cast<FragmentedRangeTombstoneIterator*>(
table_reader->NewRangeTombstoneIterator(options)));
if (range_del_iter != nullptr) { if (range_del_iter != nullptr) {
s = range_del_iter->status(); s = range_del_iter->status();
} }
@ -278,8 +279,8 @@ InternalIterator* TableCache::NewIterator(
if (largest_compaction_key != nullptr) { if (largest_compaction_key != nullptr) {
largest = largest_compaction_key; largest = largest_compaction_key;
} }
s = range_del_agg->AddTombstones(std::move(range_del_iter), smallest, range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
largest); largest);
} }
} }
} }

@ -15,7 +15,7 @@
#include <stdint.h> #include <stdint.h>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator_v2.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
@ -52,7 +52,7 @@ class TableCache {
InternalIterator* NewIterator( InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions, const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, RangeDelAggregator* range_del_agg, const FileMetaData& file_meta, RangeDelAggregatorV2* range_del_agg,
const SliceTransform* prefix_extractor = nullptr, const SliceTransform* prefix_extractor = nullptr,
TableReader** table_reader_ptr = nullptr, TableReader** table_reader_ptr = nullptr,
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,

@ -454,15 +454,14 @@ namespace {
class LevelIterator final : public InternalIterator { class LevelIterator final : public InternalIterator {
public: public:
LevelIterator(TableCache* table_cache, const ReadOptions& read_options, LevelIterator(
const EnvOptions& env_options, TableCache* table_cache, const ReadOptions& read_options,
const InternalKeyComparator& icomparator, const EnvOptions& env_options, const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel, const LevelFilesBrief* flevel, const SliceTransform* prefix_extractor,
const SliceTransform* prefix_extractor, bool should_sample, bool should_sample, HistogramImpl* file_read_hist, bool for_compaction,
HistogramImpl* file_read_hist, bool for_compaction, bool skip_filters, int level, RangeDelAggregatorV2* range_del_agg,
bool skip_filters, int level, RangeDelAggregator* range_del_agg, const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
const std::vector<AtomicCompactionUnitBoundary>* nullptr)
compaction_boundaries = nullptr)
: table_cache_(table_cache), : table_cache_(table_cache),
read_options_(read_options), read_options_(read_options),
env_options_(env_options), env_options_(env_options),
@ -572,7 +571,7 @@ class LevelIterator final : public InternalIterator {
bool skip_filters_; bool skip_filters_;
size_t file_index_; size_t file_index_;
int level_; int level_;
RangeDelAggregator* range_del_agg_; RangeDelAggregatorV2* range_del_agg_;
IteratorWrapper file_iter_; // May be nullptr IteratorWrapper file_iter_; // May be nullptr
PinnedIteratorsManager* pinned_iters_mgr_; PinnedIteratorsManager* pinned_iters_mgr_;
@ -986,7 +985,7 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
void Version::AddIterators(const ReadOptions& read_options, void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions, const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder, MergeIteratorBuilder* merge_iter_builder,
RangeDelAggregator* range_del_agg) { RangeDelAggregatorV2* range_del_agg) {
assert(storage_info_.finalized_); assert(storage_info_.finalized_);
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
@ -999,7 +998,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const EnvOptions& soptions, const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder, MergeIteratorBuilder* merge_iter_builder,
int level, int level,
RangeDelAggregator* range_del_agg) { RangeDelAggregatorV2* range_del_agg) {
assert(storage_info_.finalized_); assert(storage_info_.finalized_);
if (level >= storage_info_.num_non_empty_levels()) { if (level >= storage_info_.num_non_empty_levels()) {
// This is an empty level // This is an empty level
@ -1058,7 +1057,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
Arena arena; Arena arena;
Status status; Status status;
RangeDelAggregator range_del_agg(icmp, {}, false); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
*overlap = false; *overlap = false;
@ -4254,7 +4254,7 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
} }
InternalIterator* VersionSet::MakeInputIterator( InternalIterator* VersionSet::MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg, const Compaction* c, RangeDelAggregatorV2* range_del_agg,
const EnvOptions& env_options_compactions) { const EnvOptions& env_options_compactions) {
auto cfd = c->column_family_data(); auto cfd = c->column_family_data();
ReadOptions read_options; ReadOptions read_options;

@ -34,7 +34,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/file_indexer.h" #include "db/file_indexer.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator_v2.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_builder.h" #include "db/version_builder.h"
@ -538,11 +538,11 @@ class Version {
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, const EnvOptions& soptions, void AddIterators(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder, MergeIteratorBuilder* merger_iter_builder,
RangeDelAggregator* range_del_agg); RangeDelAggregatorV2* range_del_agg);
void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions, void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder, MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg); int level, RangeDelAggregatorV2* range_del_agg);
Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&, Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&,
const Slice& smallest_user_key, const Slice& smallest_user_key,
@ -935,7 +935,7 @@ class VersionSet {
// Create an iterator that reads over the compaction inputs for "*c". // Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed. // The caller should delete the iterator when no longer needed.
InternalIterator* MakeInputIterator( InternalIterator* MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg, const Compaction* c, RangeDelAggregatorV2* range_del_agg,
const EnvOptions& env_options_compactions); const EnvOptions& env_options_compactions);
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live.

@ -43,6 +43,7 @@ LIB_SOURCES = \
db/merge_helper.cc \ db/merge_helper.cc \
db/merge_operator.cc \ db/merge_operator.cc \
db/range_del_aggregator.cc \ db/range_del_aggregator.cc \
db/range_del_aggregator_v2.cc \
db/range_tombstone_fragmenter.cc \ db/range_tombstone_fragmenter.cc \
db/repair.cc \ db/repair.cc \
db/snapshot_impl.cc \ db/snapshot_impl.cc \
@ -331,6 +332,7 @@ MAIN_SOURCES = \
db/repair_test.cc \ db/repair_test.cc \
db/range_del_aggregator_test.cc \ db/range_del_aggregator_test.cc \
db/range_del_aggregator_bench.cc \ db/range_del_aggregator_bench.cc \
db/range_del_aggregator_v2_test.cc \
db/range_tombstone_fragmenter_test.cc \ db/range_tombstone_fragmenter_test.cc \
db/table_properties_collector_test.cc \ db/table_properties_collector_test.cc \
db/util_merge_operators_test.cc \ db/util_merge_operators_test.cc \

@ -19,7 +19,8 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB()); DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
auto icmp = InternalKeyComparator(idb->GetOptions().comparator); auto icmp = InternalKeyComparator(idb->GetOptions().comparator);
RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); RangeDelAggregatorV2 range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
Arena arena; Arena arena;
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg)); ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg));

Loading…
Cancel
Save