Skip swaths of range tombstone covered keys in merging iterator (2022 edition) (#10449)

Summary:
Delete range logic is moved from `DBIter` to `MergingIterator`, and `MergingIterator` will seek to the end of a range deletion if possible instead of scanning through each key and check with `RangeDelAggregator`.

With the invariant that a key in level L (consider memtable as the first level, each immutable and L0 as a separate level) has a larger sequence number than all keys in any level >L, a range tombstone `[start, end)` from level L covers all keys in its range in any level >L. This property motivates optimizations in iterator:
- in `Seek(target)`, if level L has a range tombstone `[start, end)` that covers `target.UserKey`, then for all levels > L, we can do Seek() on `end` instead of `target` to skip some range tombstone covered keys.
- in `Next()/Prev()`, if the current key is covered by a range tombstone `[start, end)` from level L, we can do `Seek` to `end` for all levels > L.

This PR implements the above optimizations in `MergingIterator`. As all range tombstone covered keys are now skipped in `MergingIterator`, the range tombstone logic is removed from `DBIter`. The idea in this PR is similar to https://github.com/facebook/rocksdb/issues/7317, but this PR leaves `InternalIterator` interface mostly unchanged. **Credit**: the cascading seek optimization and the sentinel key (discussed below) are inspired by [Pebble](https://github.com/cockroachdb/pebble/blob/master/merging_iter.go) and suggested by ajkr in https://github.com/facebook/rocksdb/issues/7317. The two optimizations are mostly implemented in `SeekImpl()/SeekForPrevImpl()` and `IsNextDeleted()/IsPrevDeleted()` in `merging_iterator.cc`. See comments for each method for more detail.

One notable change is that the minHeap/maxHeap used by `MergingIterator` now contains range tombstone end keys besides point key iterators. This helps to reduce the number of key comparisons. For example, for a range tombstone `[start, end)`, a `start` and an `end` `HeapItem` are inserted into the heap. When a `HeapItem` for range tombstone start key is popped from the minHeap, we know this range tombstone becomes "active" in the sense that, before the range tombstone's end key is popped from the minHeap, all the keys popped from this heap is covered by the range tombstone's internal key range `[start, end)`.

Another major change, *delete range sentinel key*, is made to `LevelIterator`. Before this PR, when all point keys in an SST file are iterated through in `MergingIterator`, a level iterator would advance to the next SST file in its level. In the case when an SST file has a range tombstone that covers keys beyond the SST file's last point key, advancing to the next SST file would lose this range tombstone. Consequently, `MergingIterator` could return keys that should have been deleted by some range tombstone. We prevent this by pretending that file boundaries in each SST file are sentinel keys. A `LevelIterator` now only advance the file iterator once the sentinel key is processed.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10449

Test Plan:
- Added many unit tests in db_range_del_test
- Stress test: `./db_stress --readpercent=5 --prefixpercent=19 --writepercent=20 -delpercent=10 --iterpercent=44 --delrangepercent=2`
- Additional iterator stress test is added to verify against iterators against expected state: https://github.com/facebook/rocksdb/issues/10538. This is based on ajkr's previous attempt https://github.com/facebook/rocksdb/pull/5506#issuecomment-506021913.

```
python3 ./tools/db_crashtest.py blackbox --simple --write_buffer_size=524288 --target_file_size_base=524288 --max_bytes_for_level_base=2097152 --compression_type=none --max_background_compactions=8 --value_size_mult=33 --max_key=5000000 --interval=10 --duration=7200 --delrangepercent=3 --delpercent=9 --iterpercent=25 --writepercent=60 --readpercent=3 --prefixpercent=0 --num_iterations=1000 --range_deletion_width=100 --verify_iterator_with_expected_state_one_in=1
```

- Performance benchmark: I used a similar setup as in the blog [post](http://rocksdb.org/blog/2018/11/21/delete-range.html) that introduced DeleteRange, "a database with 5 million data keys, and 10000 range tombstones (ignoring those dropped during compaction) that were written in regular intervals after 4.5 million data keys were written".  As expected, the performance with this PR depends on the range tombstone width.
```
# Setup:
TEST_TMPDIR=/dev/shm ./db_bench_main --benchmarks=fillrandom --writes=4500000 --num=5000000
TEST_TMPDIR=/dev/shm ./db_bench_main --benchmarks=overwrite --writes=500000 --num=5000000 --use_existing_db=true --writes_per_range_tombstone=50

# Scan entire DB
TEST_TMPDIR=/dev/shm ./db_bench_main --benchmarks=readseq[-X5] --use_existing_db=true --num=5000000 --disable_auto_compactions=true

# Short range scan (10 Next())
TEST_TMPDIR=/dev/shm/width-100/ ./db_bench_main --benchmarks=seekrandom[-X5] --use_existing_db=true --num=500000 --reads=100000 --seek_nexts=10 --disable_auto_compactions=true

# Long range scan(1000 Next())
TEST_TMPDIR=/dev/shm/width-100/ ./db_bench_main --benchmarks=seekrandom[-X5] --use_existing_db=true --num=500000 --reads=2500 --seek_nexts=1000 --disable_auto_compactions=true
```
Avg over of 10 runs (some slower tests had fews runs):

For the first column (tombstone), 0 means no range tombstone, 100-10000 means width of the 10k range tombstones, and 1 means there is a single range tombstone in the entire DB (width is 1000). The 1 tombstone case is to test regression when there's very few range tombstones in the DB, as no range tombstone is likely to take a different code path than with range tombstones.

- Scan entire DB

| tombstone width | Pre-PR ops/sec | Post-PR ops/sec | ±% |
| ------------- | ------------- | ------------- |  ------------- |
| 0 range tombstone    |2525600 (± 43564)    |2486917 (± 33698)    |-1.53%               |
| 100   |1853835 (± 24736)    |2073884 (± 32176)    |+11.87%              |
| 1000  |422415 (± 7466)      |1115801 (± 22781)    |+164.15%             |
| 10000 |22384 (± 227)        |227919 (± 6647)      |+918.22%             |
| 1 range tombstone      |2176540 (± 39050)    |2434954 (± 24563)    |+11.87%              |
- Short range scan

| tombstone width | Pre-PR ops/sec | Post-PR ops/sec | ±% |
| ------------- | ------------- | ------------- |  ------------- |
| 0  range tombstone   |35398 (± 533)        |35338 (± 569)        |-0.17%               |
| 100   |28276 (± 664)        |31684 (± 331)        |+12.05%              |
| 1000  |7637 (± 77)          |25422 (± 277)        |+232.88%             |
| 10000 |1367                 |28667                |+1997.07%            |
| 1 range tombstone      |32618 (± 581)        |32748 (± 506)        |+0.4%                |

- Long range scan

| tombstone width | Pre-PR ops/sec | Post-PR ops/sec | ±% |
| ------------- | ------------- | ------------- |  ------------- |
| 0 range tombstone     |2262 (± 33)          |2353 (± 20)          |+4.02%               |
| 100   |1696 (± 26)          |1926 (± 18)          |+13.56%              |
| 1000  |410 (± 6)            |1255 (± 29)          |+206.1%              |
| 10000 |25                   |414                  |+1556.0%             |
| 1 range tombstone   |1957 (± 30)          |2185 (± 44)          |+11.65%              |

- Microbench does not show significant regression: https://gist.github.com/cbi42/59f280f85a59b678e7e5d8561e693b61

Reviewed By: ajkr

Differential Revision: D38450331

Pulled By: cbi42

fbshipit-source-id: b5ef12e8d8c289ed2e163ccdf277f5039b511fca
main
Changyu Bi 2 years ago committed by Facebook GitHub Bot
parent 3770d6b74b
commit 30bc495c03
  1. 3
      HISTORY.md
  2. 27
      db/arena_wrapped_db_iter.cc
  3. 9
      db/arena_wrapped_db_iter.h
  4. 2
      db/c.cc
  5. 14
      db/db_compaction_filter_test.cc
  6. 45
      db/db_impl/db_impl.cc
  7. 29
      db/db_impl/db_impl.h
  8. 8
      db/db_impl/db_impl_readonly.cc
  9. 3
      db/db_impl/db_impl_secondary.cc
  10. 101
      db/db_iter.cc
  11. 2
      db/db_iter.h
  12. 932
      db/db_range_del_test.cc
  13. 20
      db/db_test_util.cc
  14. 3
      db/dbformat.h
  15. 24
      db/memtable_list.cc
  16. 3
      db/memtable_list.h
  17. 5
      db/range_del_aggregator.cc
  18. 1
      db/range_del_aggregator.h
  19. 20
      db/range_del_aggregator_test.cc
  20. 50
      db/table_cache.cc
  21. 8
      db/table_cache.h
  22. 352
      db/version_set.cc
  23. 8
      db/version_set.h
  24. 3
      include/rocksdb/c.h
  25. 4
      include/rocksdb/perf_context.h
  26. 5
      monitoring/perf_context.cc
  27. 7
      table/internal_iterator.h
  28. 4
      table/iterator_wrapper.h
  29. 1037
      table/merging_iterator.cc
  30. 23
      table/merging_iterator.h
  31. 6
      utilities/debug.cc

@ -22,6 +22,9 @@
### New Features
* RocksDB does internal auto prefetching if it notices 2 sequential reads if readahead_size is not specified. New option `num_file_reads_for_auto_readahead` is added in BlockBasedTableOptions which indicates after how many sequential reads internal auto prefetching should be start (default is 2).
### Performance Improvements
* Iterator performance is improved for `DeleteRange()` users. Internally, iterator will skip to the end of a range tombstone when possible, instead of looping through each key and check individually if a key is range deleted.
## 7.6.0 (08/19/2022)
### New Features
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.

@ -77,22 +77,29 @@ Status ArenaWrappedDBIter::Refresh() {
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
latest_seq, /* allow_unprepared_value */ true);
read_options_, cfd_, sv, &arena_, latest_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
break;
} else {
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
ReadRangeDelAggregator* range_del_agg =
db_iter_->GetRangeDelAggregator();
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
range_del_iter.reset(sv->mem->NewRangeTombstoneIterator(
read_options_, latest_seq, false /* immutable_memtable */));
range_del_agg->AddTombstones(std::move(range_del_iter));
cfd_->ReturnThreadLocalSuperVersion(sv);
assert(memtable_range_tombstone_iter_ != nullptr);
if (memtable_range_tombstone_iter_ != nullptr) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, latest_seq, false /* immutable_memtable */);
delete *memtable_range_tombstone_iter_;
if (t == nullptr || t->empty()) {
*memtable_range_tombstone_iter_ = nullptr;
} else {
*memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
&cfd_->internal_comparator(), nullptr, nullptr);
}
cfd_->ReturnThreadLocalSuperVersion(sv);
}
}
// Refresh latest sequence number
db_iter_->set_sequence(latest_seq);

@ -44,9 +44,7 @@ class ArenaWrappedDBIter : public Iterator {
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
const ReadOptions& GetReadOptions() { return read_options_; }
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
@ -55,6 +53,10 @@ class ArenaWrappedDBIter : public Iterator {
db_iter_->SetIter(iter);
}
void SetMemtableRangetombstoneIter(TruncatedRangeDelIterator** iter) {
memtable_range_tombstone_iter_ = iter;
}
bool Valid() const override { return db_iter_->Valid(); }
void SeekToFirst() override { db_iter_->SeekToFirst(); }
void SeekToLast() override { db_iter_->SeekToLast(); }
@ -104,6 +106,7 @@ class ArenaWrappedDBIter : public Iterator {
ReadCallback* read_callback_;
bool expose_blob_index_ = false;
bool allow_refresh_ = true;
TruncatedRangeDelIterator** memtable_range_tombstone_iter_ = nullptr;
};
// Generate the arena wrapped iterator class.

@ -4116,6 +4116,8 @@ uint64_t rocksdb_perfcontext_metric(rocksdb_perfcontext_t* context,
return rep->blob_checksum_time;
case rocksdb_blob_decompress_time:
return rep->blob_decompress_time;
case rocksdb_internal_range_del_reseek_count:
return rep->internal_range_del_reseek_count;
default:
break;
}

@ -328,11 +328,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
Arena arena;
{
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
read_options, &arena, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
@ -422,11 +420,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
count = 0;
{
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
read_options, &arena, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
@ -701,11 +697,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
int total = 0;
Arena arena;
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* snapshots */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
read_options, &arena, &range_del_agg, kMaxSequenceNumber));
ScopedArenaIterator iter(dbfull()->NewInternalIterator(read_options, &arena,
kMaxSequenceNumber));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {

@ -1641,7 +1641,6 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
ColumnFamilyHandle* column_family,
bool allow_unprepared_value) {
@ -1656,8 +1655,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
return NewInternalIterator(read_options, cfd, super_version, arena,
range_del_agg, sequence, allow_unprepared_value);
return NewInternalIterator(read_options, cfd, super_version, arena, sequence,
allow_unprepared_value);
}
void DBImpl::SchedulePurge() {
@ -1788,16 +1787,12 @@ static void CleanupGetMergeOperandsState(void* arg1, void* /*arg2*/) {
} // namespace
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
bool allow_unprepared_value) {
InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena, SequenceNumber sequence,
bool allow_unprepared_value, ArenaWrappedDBIter* db_iter) {
InternalIterator* internal_iter;
assert(arena != nullptr);
assert(range_del_agg != nullptr);
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(
&cfd->internal_comparator(), arena,
@ -1806,19 +1801,27 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena));
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {
range_del_iter.reset(super_version->mem->NewRangeTombstoneIterator(
read_options, sequence, false /* immutable_memtable */));
range_del_agg->AddTombstones(std::move(range_del_iter));
auto range_del_iter = super_version->mem->NewRangeTombstoneIterator(
read_options, sequence, false /* immutable_memtable */);
if (range_del_iter == nullptr || range_del_iter->empty()) {
delete range_del_iter;
merge_iter_builder.AddRangeTombstoneIterator(nullptr);
} else {
merge_iter_builder.AddRangeTombstoneIterator(
new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(range_del_iter),
&cfd->ioptions()->internal_comparator, nullptr /* smallest */,
nullptr /* largest */));
}
}
// Collect all needed child iterators for immutable memtables
if (s.ok()) {
super_version->imm->AddIterators(read_options, &merge_iter_builder);
if (!read_options.ignore_range_deletions) {
s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
range_del_agg);
merge_iter_builder);
}
}
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
@ -1826,10 +1829,11 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
// Collect iterators for files in L0 - Ln
if (read_options.read_tier != kMemtableTier) {
super_version->current->AddIterators(read_options, file_options_,
&merge_iter_builder, range_del_agg,
&merge_iter_builder,
allow_unprepared_value);
}
internal_iter = merge_iter_builder.Finish();
internal_iter = merge_iter_builder.Finish(
read_options.ignore_range_deletions ? nullptr : db_iter);
SuperVersionHandle* cleanup = new SuperVersionHandle(
this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
@ -3354,9 +3358,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
read_options.snapshot != nullptr ? false : allow_refresh);
InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
/* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;

@ -739,13 +739,29 @@ class DBImpl : public DB {
// the value and so will require PrepareValue() to be called before value();
// allow_unprepared_value = false is convenient when this optimization is not
// useful, e.g. when reading the whole column family.
//
// read_options.ignore_range_deletions determines whether range tombstones are
// processed in the returned interator internally, i.e., whether range
// tombstone covered keys are in this iterator's output.
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator(
const ReadOptions& read_options, Arena* arena,
RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const ReadOptions& read_options, Arena* arena, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr,
bool allow_unprepared_value = false);
// Note: to support DB iterator refresh, memtable range tombstones in the
// underlying merging iterator needs to be refreshed. If db_iter is not
// nullptr, db_iter->SetMemtableRangetombstoneIter() is called with the
// memtable range tombstone iterator used by the underlying merging iterator.
// This range tombstone iterator can be refreshed later by db_iter.
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena, SequenceNumber sequence,
bool allow_unprepared_value,
ArenaWrappedDBIter* db_iter = nullptr);
LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
}
@ -868,15 +884,6 @@ class DBImpl : public DB {
const WriteController& write_controller() { return write_controller_; }
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
bool allow_unprepared_value);
// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
// locks can be reacquired before writing can resume.

@ -143,8 +143,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
read_seq, /* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
@ -194,9 +193,8 @@ Status DBImplReadOnly::NewIterators(
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback);
auto* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), read_seq,
/* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}

@ -493,8 +493,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
expose_blob_index, read_options.snapshot ? false : allow_refresh);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
snapshot, /* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}

@ -78,7 +78,6 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
is_blob_(false),
is_wide_(false),
arena_mode_(arena_mode),
range_del_agg_(&ioptions.internal_comparator, s),
db_impl_(db_impl),
cfd_(cfd),
timestamp_ub_(read_options.timestamp),
@ -394,49 +393,27 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
saved_key_.SetUserKey(
ikey_.user_key, !pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping_saved_key = true;
num_skipped = 0;
reseek_done = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
valid_ = true;
return true;
}
valid_ = true;
return true;
}
break;
case kTypeMerge:
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping_saved_key = true;
num_skipped = 0;
reseek_done = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
// By now, we are sure the current ikey is going to yield a
// value
current_entry_is_merged_ = true;
valid_ = true;
return MergeValuesNewToOld(); // Go to a different state machine
}
// By now, we are sure the current ikey is going to yield a value
current_entry_is_merged_ = true;
valid_ = true;
return MergeValuesNewToOld(); // Go to a different state machine
break;
default:
valid_ = false;
@ -562,9 +539,7 @@ bool DBIter::MergeValuesNewToOld() {
// hit the next user key, stop right here
break;
}
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal)) {
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_.Next();
@ -913,11 +888,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
case kTypeBlobIndex:
case kTypeWideColumnEntity:
if (range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
last_key_entry_type = kTypeRangeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (iter_.iter()->IsValuePinned()) {
if (iter_.iter()->IsValuePinned()) {
pinned_value_ = iter_.value();
} else {
valid_ = false;
@ -938,21 +909,12 @@ bool DBIter::FindValueForCurrentKey() {
last_not_merge_type = last_key_entry_type;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
if (range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
merge_context_.Clear();
last_key_entry_type = kTypeRangeDeletion;
last_not_merge_type = last_key_entry_type;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_.value(),
iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
}
break;
case kTypeMerge: {
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} break;
default:
valid_ = false;
status_ = Status::Corruption(
@ -989,8 +951,7 @@ bool DBIter::FindValueForCurrentKey() {
}
if (timestamp_lb_ != nullptr) {
assert(last_key_entry_type == ikey_.type ||
last_key_entry_type == kTypeRangeDeletion);
assert(last_key_entry_type == ikey_.type);
}
Status s;
@ -1005,7 +966,6 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
if (timestamp_lb_ == nullptr) {
valid_ = false;
} else {
@ -1016,8 +976,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
last_not_merge_type == kTypeSingleDeletion) {
s = Merge(nullptr, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
@ -1157,8 +1116,6 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal) ||
kTypeDeletionWithTimestamp == ikey.type) {
if (timestamp_lb_ == nullptr) {
valid_ = false;
@ -1221,9 +1178,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal)) {
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
break;
}
if (!iter_.PrepareValue()) {
@ -1498,7 +1453,6 @@ void DBIter::Seek(const Slice& target) {
SetSavedKeyToSeekTarget(target);
iter_.Seek(saved_key_.GetInternalKey());
range_del_agg_.InvalidateRangeDelMapPositions();
RecordTick(statistics_, NUMBER_DB_SEEK);
}
if (!iter_.Valid()) {
@ -1574,7 +1528,6 @@ void DBIter::SeekForPrev(const Slice& target) {
PERF_TIMER_GUARD(seek_internal_seek_time);
SetSavedKeyToSeekForPrevTarget(target);
iter_.SeekForPrev(saved_key_.GetInternalKey());
range_del_agg_.InvalidateRangeDelMapPositions();
RecordTick(statistics_, NUMBER_DB_SEEK);
}
if (!iter_.Valid()) {
@ -1622,6 +1575,8 @@ void DBIter::SeekToFirst() {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
// if iterator is empty, this status_ could be unchecked.
status_.PermitUncheckedError();
direction_ = kForward;
ReleaseTempPinnedData();
ResetBlobValue();
@ -1633,7 +1588,6 @@ void DBIter::SeekToFirst() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_.SeekToFirst();
range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
@ -1692,6 +1646,8 @@ void DBIter::SeekToLast() {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
// if iterator is empty, this status_ could be unchecked.
status_.PermitUncheckedError();
direction_ = kReverse;
ReleaseTempPinnedData();
ResetBlobValue();
@ -1703,7 +1659,6 @@ void DBIter::SeekToLast() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_.SeekToLast();
range_del_agg_.InvalidateRangeDelMapPositions();
}
PrevInternal(nullptr);
if (statistics_ != nullptr) {

@ -139,7 +139,6 @@ class DBIter final : public Iterator {
iter_.Set(iter);
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; }
bool Valid() const override {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
@ -380,7 +379,6 @@ class DBIter final : public Iterator {
bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;
ReadRangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
#ifdef ROCKSDB_LITE

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "db/version_set.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "test_util/testutil.h"
@ -1756,6 +1757,937 @@ TEST_F(DBRangeDelTest, IteratorRefresh) {
}
}
void VerifyIteratorReachesEnd(InternalIterator* iter) {
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
}
void VerifyIteratorReachesEnd(Iterator* iter) {
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
}
TEST_F(DBRangeDelTest, IteratorReseek) {
// Range tombstone triggers reseek (seeking to a range tombstone end key) in
// merging iterator. Test set up:
// one memtable: range tombstone [0, 1)
// one immutable memtable: range tombstone [1, 2)
// one L0 file with range tombstone [2, 3)
// one L1 file with range tombstone [3, 4)
// Seek(0) should trigger cascading reseeks at all levels below memtable.
// Seek(1) should trigger cascading reseeks at all levels below immutable
// memtable. SeekToFirst and SeekToLast trigger no reseek.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
// L1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(4)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L0
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(3)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Immutable memtable
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(1),
Key(2)));
ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable());
std::string value;
ASSERT_TRUE(dbfull()->GetProperty(db_->DefaultColumnFamily(),
"rocksdb.num-immutable-mem-table", &value));
ASSERT_EQ(1, std::stoi(value));
// live memtable
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(1)));
// this memtable is still active
ASSERT_TRUE(dbfull()->GetProperty(db_->DefaultColumnFamily(),
"rocksdb.num-immutable-mem-table", &value));
ASSERT_EQ(1, std::stoi(value));
auto iter = db_->NewIterator(ReadOptions());
get_perf_context()->Reset();
iter->Seek(Key(0));
// Reseeked immutable memtable, L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 3);
VerifyIteratorReachesEnd(iter);
get_perf_context()->Reset();
iter->SeekForPrev(Key(1));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
VerifyIteratorReachesEnd(iter);
get_perf_context()->Reset();
iter->SeekToFirst();
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 0);
VerifyIteratorReachesEnd(iter);
iter->SeekToLast();
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 0);
VerifyIteratorReachesEnd(iter);
delete iter;
}
TEST_F(DBRangeDelTest, ReseekDuringNextAndPrev) {
// Range tombstone triggers reseek during Next()/Prev() in merging iterator.
// Test set up:
// memtable has: [0, 1) [2, 3)
// L0 has: 2
// L1 has: 1, 2, 3
// Seek(0) will reseek to 1 for L0 and L1. Seek(1) will not trigger any
// reseek. Then Next() determines 2 is covered by [2, 3), it will try to
// reseek to 3 for L0 and L1. Similar story for Prev() and SeekForPrev() is
// tested.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
// L1
ASSERT_OK(db_->Put(WriteOptions(), Key(1), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(3), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L0
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Memtable
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(1)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(3)));
auto iter = db_->NewIterator(ReadOptions());
auto iter_test_forward = [&] {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(1));
get_perf_context()->Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(3));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
// Next to Prev
get_perf_context()->Reset();
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(1));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
// Prev to Next
get_perf_context()->Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(3));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
iter->Next();
VerifyIteratorReachesEnd(iter);
};
get_perf_context()->Reset();
iter->Seek(Key(0));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
iter_test_forward();
get_perf_context()->Reset();
iter->Seek(Key(1));
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 0);
iter_test_forward();
get_perf_context()->Reset();
iter->SeekForPrev(Key(2));
// Reseeked L0 and L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
iter_test_forward();
get_perf_context()->Reset();
iter->SeekForPrev(Key(1));
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 0);
iter_test_forward();
get_perf_context()->Reset();
iter->SeekToFirst();
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 0);
iter_test_forward();
iter->SeekToLast();
iter->Prev();
iter_test_forward();
delete iter;
}
TEST_F(DBRangeDelTest, TombstoneFromCurrentLevel) {
// Range tombstone triggers reseek when covering key from the same level.
// in merging iterator. Test set up:
// memtable has: [0, 1)
// L0 has: [2, 3), 2
// L1 has: 1, 2, 3
// Seek(0) will reseek to 1 for L0 and L1.
// Then Next() will reseek to 3 for L1 since 2 in L0 is covered by [2, 3) in
// L0.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
// L1
ASSERT_OK(db_->Put(WriteOptions(), Key(1), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(3), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L0
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(3)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Memtable
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(1)));
auto iter = db_->NewIterator(ReadOptions());
get_perf_context()->Reset();
iter->Seek(Key(0));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(1));
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 2);
get_perf_context()->Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(3));
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 1);
delete iter;
}
TEST_F(DBRangeDelTest, TombstoneAcrossFileBoundary) {
// Verify that a range tombstone across file boundary covers keys from older
// levels. Test set up:
// L1_0: 1, 3, [2, 6) L1_1: 5, 7, [2, 6) ([2, 6) is from compaction with
// L1_0) L2 has: 5
// Seek(1) and then Next() should move the L1 level iterator to
// L1_1. Check if 5 is returned after Next().
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 2 * 1024;
options.max_compaction_bytes = 2 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(5), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_1
ASSERT_OK(db_->Put(WriteOptions(), Key(5), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(7), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(3), rnd.RandomString(1 << 10)));
// Prevent keys being compacted away
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(6)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(2, NumTableFilesAtLevel(0));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
get_perf_context()->Reset();
iter->Seek(Key(1));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(1));
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(7));
// 1 reseek into L2 when key 5 in L2 is covered by [2, 6) from L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 1);
delete iter;
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) {
// Verify that a range tombstone across file boundary covers keys from older
// levels.
// Test set up:
// L1_0: 1, 3, [4, 7) L1_1: 6, 8, [4, 7)
// L2: 5
// Note that [4, 7) is at end of L1_0 and not overlapping with any point key
// in L1_0. [4, 7) from L1_0 should cover 5 is sentinel works
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 2 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(5), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_1
ASSERT_OK(db_->Put(WriteOptions(), Key(6), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(8), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(3), rnd.RandomString(4 << 10)));
// Prevent keys being compacted away
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(7)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(2, NumTableFilesAtLevel(0));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(3));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(3));
get_perf_context()->Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(8));
// 1 reseek into L1 since 5 from L2 is covered by [4, 7) from L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 1);
for (auto& k : {4, 5, 6}) {
get_perf_context()->Reset();
iter->Seek(Key(k));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(8));
// 1 reseek into L1
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count, 1);
}
delete iter;
}
TEST_F(DBRangeDelTest, OlderLevelHasNewerData) {
// L1_0: 1, 3, [2, 7) L1_1: 5, 6 at a newer sequence number than [2, 7)
// Compact L1_1 to L2. Seek(3) should not skip 5 or 6.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(3), rnd.RandomString(4 << 10)));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(7)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L1_1
ASSERT_OK(db_->Put(WriteOptions(), Key(5), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(6), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
auto key = Key(6);
Slice begin(key);
EXPECT_OK(dbfull()->TEST_CompactRange(1, &begin, nullptr));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(3));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(5));
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), Key(6));
delete iter;
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, LevelBoundaryDefinedByTombstone) {
// L1 has: 1, 2, [4, 5)
// L2 has: 4
// Seek(3), which is over all points keys in L1, check whether
// sentinel key from L1 works in this case.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(4), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
const Snapshot* snapshot = db_->GetSnapshot();
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(5)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(3));
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
get_perf_context()->Reset();
iter->SeekForPrev(Key(5));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(2));
db_->ReleaseSnapshot(snapshot);
delete iter;
}
TEST_F(DBRangeDelTest, TombstoneOnlyFile) {
// L1_0: 1, 2, L1_1: [3, 5)
// L2: 3
// Seek(2) then Next() should advance L1 iterator into L1_1.
// If sentinel works with tombstone only file, it should cover the key in L2.
// Similar story for SeekForPrev(4).
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(3), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(5)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(2));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(2));
iter->Next();
VerifyIteratorReachesEnd(iter);
iter->SeekForPrev(Key(4));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(2));
iter->Next();
VerifyIteratorReachesEnd(iter);
delete iter;
}
void VerifyIteratorKey(InternalIterator* iter,
const std::vector<std::string>& expected_keys,
bool forward = true) {
for (auto& key : expected_keys) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->user_key(), key);
if (forward) {
iter->Next();
} else {
iter->Prev();
}
}
}
TEST_F(DBRangeDelTest, TombstoneOnlyLevel) {
// L1 [3, 5)
// L2 has: 3, 4
// Any kind of iterator seek should skip 3 and 4 in L2.
// L1 level iterator should produce sentinel key.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(3), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(4), "bar"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(5)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
get_perf_context()->Reset();
uint64_t expected_reseek = 0;
for (auto i = 0; i < 7; ++i) {
iter->Seek(Key(i));
VerifyIteratorReachesEnd(iter);
if (i < 5) {
++expected_reseek;
}
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count,
expected_reseek);
iter->SeekForPrev(Key(i));
VerifyIteratorReachesEnd(iter);
if (i > 2) {
++expected_reseek;
}
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count,
expected_reseek);
iter->SeekToFirst();
VerifyIteratorReachesEnd(iter);
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count,
++expected_reseek);
iter->SeekToLast();
VerifyIteratorReachesEnd(iter);
ASSERT_EQ(get_perf_context()->internal_range_del_reseek_count,
++expected_reseek);
}
delete iter;
// Check L1 LevelIterator behavior
ColumnFamilyData* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
SuperVersion* sv = cfd->GetSuperVersion();
Arena arena;
ReadOptions read_options;
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), &arena,
false /* prefix seek */);
InternalIterator* level_iter = sv->current->TEST_GetLevelIterator(
read_options, &merge_iter_builder, 1 /* level */, true);
// This is needed to make LevelIterator range tombstone aware
merge_iter_builder.AddIterator(level_iter);
auto miter = merge_iter_builder.Finish();
auto k = Key(3);
IterKey target;
target.SetInternalKey(k, kMaxSequenceNumber, kValueTypeForSeek);
level_iter->Seek(target.GetInternalKey());
// sentinel key (file boundary as a fake key)
VerifyIteratorKey(level_iter, {Key(5)});
VerifyIteratorReachesEnd(level_iter);
k = Key(5);
target.SetInternalKey(k, 0, kValueTypeForSeekForPrev);
level_iter->SeekForPrev(target.GetInternalKey());
VerifyIteratorKey(level_iter, {Key(3)}, false);
VerifyIteratorReachesEnd(level_iter);
level_iter->SeekToFirst();
VerifyIteratorKey(level_iter, {Key(5)});
VerifyIteratorReachesEnd(level_iter);
level_iter->SeekToLast();
VerifyIteratorKey(level_iter, {Key(3)}, false);
VerifyIteratorReachesEnd(level_iter);
miter->~InternalIterator();
}
TEST_F(DBRangeDelTest, TombstoneOnlyWithOlderVisibleKey) {
// L1: [3, 5)
// L2: 2, 4, 5
// 2 and 5 should be visible
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->Put(WriteOptions(), Key(4), "bar"));
ASSERT_OK(db_->Put(WriteOptions(), Key(5), "foobar"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// l1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(5)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
auto iter_test_backward = [&] {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(5));
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(2));
iter->Prev();
VerifyIteratorReachesEnd(iter);
};
auto iter_test_forward = [&] {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(2));
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(5));
iter->Next();
VerifyIteratorReachesEnd(iter);
};
iter->Seek(Key(4));
iter_test_backward();
iter->SeekForPrev(Key(4));
iter->Next();
iter_test_backward();
iter->Seek(Key(4));
iter->Prev();
iter_test_forward();
iter->SeekForPrev(Key(4));
iter_test_forward();
iter->SeekToFirst();
iter_test_forward();
iter->SeekToLast();
iter_test_backward();
delete iter;
}
TEST_F(DBRangeDelTest, TombstoneSentinelDirectionChange) {
// L1: 7
// L2: [4, 6)
// L3: 4
// Seek(5) will have sentinel key 6 at the top of minHeap in merging iterator.
// then do a prev, how would sentinel work?
// Redo the test after Put(5) into L1 so that there is a visible key in range
// [4, 6).
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
// L3
ASSERT_OK(db_->Put(WriteOptions(), Key(4), "bar"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(3);
ASSERT_EQ(1, NumTableFilesAtLevel(3));
// L2
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(6)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1
ASSERT_OK(db_->Put(WriteOptions(), Key(7), "foobar"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(5));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(7));
iter->Prev();
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
delete iter;
ASSERT_OK(db_->Put(WriteOptions(), Key(5), "foobar"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
iter = db_->NewIterator(ReadOptions());
iter->Seek(Key(5));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(5));
iter->Prev();
ASSERT_TRUE(!iter->Valid() && iter->status().ok());
delete iter;
}
// Right sentinel tested in many test cases above
TEST_F(DBRangeDelTest, LeftSentinelKeyTest) {
// L1_0: 0, 1 L1_1: [2, 3), 5
// L2: 2
// SeekForPrev(4) should give 1 due to sentinel key keeping [2, 3) alive.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
options.max_compaction_bytes = 1024;
DestroyAndReopen(options);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(2), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_0
Random rnd(301);
ASSERT_OK(db_->Put(WriteOptions(), Key(0), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L1_1
ASSERT_OK(db_->Put(WriteOptions(), Key(5), "bar"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(3)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
iter->SeekForPrev(Key(4));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(1));
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(0));
iter->Prev();
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) {
// L1_0: 1, 2 newer than L1_1, L1_1: [2, 4), 5
// L2: 3
// SeekForPrev(4) then Prev() should give 2 and then 1.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
options.max_compaction_bytes = 1024;
DestroyAndReopen(options);
// L2
ASSERT_OK(db_->Put(WriteOptions(), Key(3), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1_1
ASSERT_OK(db_->Put(WriteOptions(), Key(5), "bar"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(4)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L1_0
Random rnd(301);
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), rnd.RandomString(4 << 10)));
// Used to verify sequence number of iterator key later.
auto seq = dbfull()->TEST_GetLastVisibleSequence();
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
Arena arena;
InternalKeyComparator icmp(options.comparator);
ReadOptions read_options;
ScopedArenaIterator iter;
iter.set(
dbfull()->NewInternalIterator(read_options, &arena, kMaxSequenceNumber));
auto k = Key(4);
IterKey target;
target.SetInternalKey(k, 0 /* sequence_number */, kValueTypeForSeekForPrev);
iter->SeekForPrev(target.GetInternalKey());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->user_key(), Key(2));
SequenceNumber actual_seq;
ValueType type;
UnPackSequenceAndType(ExtractInternalKeyFooter(iter->key()), &actual_seq,
&type);
ASSERT_EQ(seq, actual_seq);
// might as well check type
ASSERT_EQ(type, kTypeValue);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->user_key(), Key(1));
iter->Prev();
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
}
TEST_F(DBRangeDelTest, SentinelKeyCommonCaseTest) {
// L1 has 3 files
// L1_0: 1, 2 L1_1: [3, 4) 5, 6, [7, 8) L1_2: 9
// Check iterator operations on LevelIterator.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024;
DestroyAndReopen(options);
Random rnd(301);
// L1_0
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(2), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
// L1_1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(4)));
ASSERT_OK(db_->Put(WriteOptions(), Key(5), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Put(WriteOptions(), Key(6), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7),
Key(8)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
// L1_2
ASSERT_OK(db_->Put(WriteOptions(), Key(9), rnd.RandomString(4 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(3, NumTableFilesAtLevel(1));
ColumnFamilyData* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
SuperVersion* sv = cfd->GetSuperVersion();
Arena arena;
ReadOptions read_options;
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), &arena,
false /* prefix seek */);
InternalIterator* level_iter = sv->current->TEST_GetLevelIterator(
read_options, &merge_iter_builder, 1 /* level */, true);
// This is needed to make LevelIterator range tombstone aware
auto miter = merge_iter_builder.Finish();
auto k = Key(7);
IterKey target;
target.SetInternalKey(k, kMaxSequenceNumber, kValueTypeForSeek);
level_iter->Seek(target.GetInternalKey());
// The last Key(9) is a sentinel key.
VerifyIteratorKey(level_iter, {Key(8), Key(9), Key(9)});
ASSERT_TRUE(!level_iter->Valid() && level_iter->status().ok());
k = Key(6);
target.SetInternalKey(k, kMaxSequenceNumber, kValueTypeForSeek);
level_iter->Seek(target.GetInternalKey());
VerifyIteratorKey(level_iter, {Key(6), Key(8), Key(9), Key(9)});
ASSERT_TRUE(!level_iter->Valid() && level_iter->status().ok());
k = Key(4);
target.SetInternalKey(k, 0, kValueTypeForSeekForPrev);
level_iter->SeekForPrev(target.GetInternalKey());
VerifyIteratorKey(level_iter, {Key(3), Key(2), Key(1), Key(1)}, false);
ASSERT_TRUE(!level_iter->Valid() && level_iter->status().ok());
k = Key(5);
target.SetInternalKey(k, 0, kValueTypeForSeekForPrev);
level_iter->SeekForPrev(target.GetInternalKey());
VerifyIteratorKey(level_iter, {Key(5), Key(3), Key(2), Key(1), Key(1)},
false);
level_iter->SeekToFirst();
VerifyIteratorKey(level_iter, {Key(1), Key(2), Key(2), Key(5), Key(6), Key(8),
Key(9), Key(9)});
ASSERT_TRUE(!level_iter->Valid() && level_iter->status().ok());
level_iter->SeekToLast();
VerifyIteratorKey(
level_iter,
{Key(9), Key(9), Key(6), Key(5), Key(3), Key(2), Key(1), Key(1)}, false);
ASSERT_TRUE(!level_iter->Valid() && level_iter->status().ok());
miter->~InternalIterator();
}
TEST_F(DBRangeDelTest, PrefixSentinelKey) {
// L1: ['aaaa', 'aaad'), 'bbbb'
// L2: 'aaac', 'aaae'
// Prefix extracts first 3 chars
// Seek('aaab') should give 'aaae' as first key.
// This is to test a previous bug where prefix seek sees there is no prefix in
// the SST file, and will just set file iter to null in LevelIterator and may
// just skip to the next SST file. But in this case, we should keep the file's
// tombstone alive.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.disable_auto_compactions = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
Random rnd(301);
// L2:
ASSERT_OK(db_->Put(WriteOptions(), "aaac", rnd.RandomString(10)));
ASSERT_OK(db_->Put(WriteOptions(), "aaae", rnd.RandomString(10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(2);
ASSERT_EQ(1, NumTableFilesAtLevel(2));
// L1
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "aaaa",
"aaad"));
ASSERT_OK(db_->Put(WriteOptions(), "bbbb", rnd.RandomString(10)));
ASSERT_OK(db_->Flush(FlushOptions()));
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
auto iter = db_->NewIterator(ReadOptions());
iter->Seek("aaab");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), "aaae");
delete iter;
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -967,15 +967,13 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
Arena arena;
auto options = CurrentOptions();
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter;
if (cf == 0) {
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena,
kMaxSequenceNumber));
} else {
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena,
kMaxSequenceNumber, handles_[cf]));
}
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
@ -1431,17 +1429,13 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
Arena arena;
auto options = CurrentOptions();
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
// This should be defined after range_del_agg so that it destructs the
// assigned iterator before it range_del_agg is already destructed.
ReadOptions read_options;
ScopedArenaIterator iter;
if (cf != 0) {
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena,
kMaxSequenceNumber, handles_[cf]));
} else {
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena,
kMaxSequenceNumber));
}
iter->SeekToFirst();
@ -1646,11 +1640,9 @@ void DBTestBase::VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data) {
Arena arena;
InternalKeyComparator icmp(last_options_.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
auto iter = dbfull()->NewInternalIterator(read_options, &arena,
&range_del_agg, kMaxSequenceNumber);
auto iter =
dbfull()->NewInternalIterator(read_options, &arena, kMaxSequenceNumber);
iter->SeekToFirst();
for (auto p : true_data) {
ASSERT_TRUE(iter->Valid());

@ -139,7 +139,8 @@ inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
// Pack a sequence number and a ValueType into a uint64_t
inline uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(IsExtendedValueType(t));
// kTypeMaxValid is used in TruncatedRangeDelIterator, see its constructor.
assert(IsExtendedValueType(t) || t == kTypeMaxValid);
return (seq << 8) | t;
}

@ -210,6 +210,30 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
return Status::OK();
}
Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, Arena* /*arena*/,
MergeIteratorBuilder& builder) {
// Except for snapshot read, using kMaxSequenceNumber is OK because these
// are immutable memtables.
SequenceNumber read_seq = read_opts.snapshot != nullptr
? read_opts.snapshot->GetSequenceNumber()
: kMaxSequenceNumber;
for (auto& m : memlist_) {
auto range_del_iter = m->NewRangeTombstoneIterator(
read_opts, read_seq, true /* immutale_memtable */);
if (range_del_iter == nullptr || range_del_iter->empty()) {
delete range_del_iter;
builder.AddRangeTombstoneIterator(nullptr);
} else {
builder.AddRangeTombstoneIterator(new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(range_del_iter),
&m->GetInternalKeyComparator(), nullptr /* smallest */,
nullptr /* largest */));
}
}
return Status::OK();
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
Arena* arena) {

@ -111,6 +111,9 @@ class MemTableListVersion {
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg);
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
MergeIteratorBuilder& builder);
void AddIterators(const ReadOptions& options,
std::vector<InternalIterator*>* iterator_list,
Arena* arena);

@ -37,7 +37,6 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator(
false /* log_err_key */); // TODO
pik_status.PermitUncheckedError();
assert(pik_status.ok());
smallest_ = &parsed_smallest;
}
if (largest != nullptr) {
@ -69,12 +68,16 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator(
// the truncated end key can cover the largest key in this sstable, reduce
// its sequence number by 1.
parsed_largest.sequence -= 1;
// This line is not needed for correctness, but it ensures that the
// truncated end key is not covering keys from the next SST file.
parsed_largest.type = kValueTypeForSeek;
}
largest_ = &parsed_largest;
}
}
bool TruncatedRangeDelIterator::Valid() const {
assert(iter_ != nullptr);
return iter_->Valid() &&
(smallest_ == nullptr ||
icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) &&

@ -46,6 +46,7 @@ class TruncatedRangeDelIterator {
// Seeks to the tombstone with the highest visible sequence number that covers
// target (a user key). If no such tombstone exists, the position will be at
// the earliest tombstone that ends after target.
// REQUIRES: target is a user key.
void Seek(const Slice& target);
// Seeks to the tombstone with the highest visible sequence number that covers

@ -76,8 +76,9 @@ ParsedInternalKey UncutEndpoint(const Slice& s) {
return ParsedInternalKey(s, kMaxSequenceNumber, kTypeRangeDeletion);
}
ParsedInternalKey InternalValue(const Slice& key, SequenceNumber seq) {
return ParsedInternalKey(key, seq, kTypeValue);
ParsedInternalKey InternalValue(const Slice& key, SequenceNumber seq,
ValueType type = kTypeValue) {
return ParsedInternalKey(key, seq, type);
}
void VerifyIterator(
@ -292,16 +293,18 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) {
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp,
&smallest, &largest);
VerifyIterator(&iter, bytewise_icmp,
{{InternalValue("d", 7), UncutEndpoint("e"), 10},
{UncutEndpoint("e"), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), InternalValue("m", 8), 4}});
VerifyIterator(
&iter, bytewise_icmp,
{{InternalValue("d", 7), UncutEndpoint("e"), 10},
{UncutEndpoint("e"), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4}});
VerifySeek(
&iter, bytewise_icmp,
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), InternalValue("m", 8), 4},
{"ia", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4,
false /* invalid */},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
{"", InternalValue("d", 7), UncutEndpoint("e"), 10}});
@ -310,7 +313,8 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) {
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), InternalValue("m", 8), 4},
{"n", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4,
false /* invalid */},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
}

@ -236,7 +236,8 @@ InternalIterator* TableCache::NewIterator(
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value) {
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
TruncatedRangeDelIterator** range_del_iter) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
Status s;
@ -281,25 +282,40 @@ InternalIterator* TableCache::NewIterator(
*table_reader_ptr = table_reader;
}
}
if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
if (range_del_agg->AddFile(fd.GetNumber())) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
static_cast<FragmentedRangeTombstoneIterator*>(
table_reader->NewRangeTombstoneIterator(options)));
if (range_del_iter != nullptr) {
s = range_del_iter->status();
if (s.ok() && !options.ignore_range_deletions) {
if (range_del_iter != nullptr) {
auto new_range_del_iter =
table_reader->NewRangeTombstoneIterator(options);
if (new_range_del_iter == nullptr || new_range_del_iter->empty()) {
delete new_range_del_iter;
*range_del_iter = nullptr;
} else {
*range_del_iter = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(
new_range_del_iter),
&icomparator, &file_meta.smallest, &file_meta.largest);
}
if (s.ok()) {
const InternalKey* smallest = &file_meta.smallest;
const InternalKey* largest = &file_meta.largest;
if (smallest_compaction_key != nullptr) {
smallest = smallest_compaction_key;
}
if (range_del_agg != nullptr) {
if (range_del_agg->AddFile(fd.GetNumber())) {
std::unique_ptr<FragmentedRangeTombstoneIterator> new_range_del_iter(
static_cast<FragmentedRangeTombstoneIterator*>(
table_reader->NewRangeTombstoneIterator(options)));
if (new_range_del_iter != nullptr) {
s = new_range_del_iter->status();
}
if (largest_compaction_key != nullptr) {
largest = largest_compaction_key;
if (s.ok()) {
const InternalKey* smallest = &file_meta.smallest;
const InternalKey* largest = &file_meta.largest;
if (smallest_compaction_key != nullptr) {
smallest = smallest_compaction_key;
}
if (largest_compaction_key != nullptr) {
largest = largest_compaction_key;
}
range_del_agg->AddTombstones(std::move(new_range_del_iter), smallest,
largest);
}
range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
largest);
}
}
}

@ -63,6 +63,11 @@ class TableCache {
// the returned iterator. The returned "*table_reader_ptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
// If !options.ignore_range_deletions, and range_del_iter is non-nullptr,
// then range_del_iter is set to a TruncatedRangeDelIterator for range
// tombstones in the SST file corresponding to the specified file number. The
// upper/lower bounds for the TruncatedRangeDelIterator are set to the SST
// file's boundary.
// @param options Must outlive the returned iterator.
// @param range_del_agg If non-nullptr, adds range deletions to the
// aggregator. If an error occurs, returns it in a NewErrorInternalIterator
@ -79,7 +84,8 @@ class TableCache {
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value);
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
TruncatedRangeDelIterator** range_del_iter = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call get_context->SaveValue() repeatedly until

@ -950,7 +950,8 @@ class LevelIterator final : public InternalIterator {
RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>*
compaction_boundaries = nullptr,
bool allow_unprepared_value = false)
bool allow_unprepared_value = false,
MergeIteratorBuilder* merge_iter_builder = nullptr)
: table_cache_(table_cache),
read_options_(read_options),
file_options_(file_options),
@ -968,13 +969,25 @@ class LevelIterator final : public InternalIterator {
range_del_agg_(range_del_agg),
pinned_iters_mgr_(nullptr),
compaction_boundaries_(compaction_boundaries),
is_next_read_sequential_(false) {
is_next_read_sequential_(false),
range_tombstone_iter_(nullptr),
to_return_sentinel_(false) {
// Empty level is not supported.
assert(flevel_ != nullptr && flevel_->num_files > 0);
if (merge_iter_builder && !read_options.ignore_range_deletions) {
// lazily initialize range_tombstone_iter_ together with file_iter_
merge_iter_builder->AddRangeTombstoneIterator(nullptr,
&range_tombstone_iter_);
}
}
~LevelIterator() override { delete file_iter_.Set(nullptr); }
// Seek to the first file with a key >= target.
// If range_tombstone_iter_ is not nullptr, then we pretend that file
// boundaries are fake keys (sentinel keys). These keys are used to keep range
// tombstones alive even when all point keys in an SST file are exhausted.
// These sentinel keys will be skipped in merging iterator.
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
@ -983,14 +996,29 @@ class LevelIterator final : public InternalIterator {
bool NextAndGetResult(IterateResult* result) override;
void Prev() override;
bool Valid() const override { return file_iter_.Valid(); }
// In addition to valid and invalid state (!file_iter.Valid() and
// status.ok()), a third state of the iterator is when !file_iter_.Valid() and
// to_return_sentinel_. This means we are at the end of a file, and a sentinel
// key (the file boundary that we pretend as a key) is to be returned next.
// file_iter_.Valid() and to_return_sentinel_ should not both be true.
bool Valid() const override {
assert(!(file_iter_.Valid() && to_return_sentinel_));
return file_iter_.Valid() || to_return_sentinel_;
}
Slice key() const override {
assert(Valid());
if (to_return_sentinel_) {
// Sentinel should be returned after file_iter_ reaches the end of the
// file
assert(!file_iter_.Valid());
return sentinel_;
}
return file_iter_.key();
}
Slice value() const override {
assert(Valid());
assert(!to_return_sentinel_);
return file_iter_.value();
}
@ -1032,6 +1060,8 @@ class LevelIterator final : public InternalIterator {
file_iter_.iter() && file_iter_.IsValuePinned();
}
bool IsDeleteRangeSentinelKey() const override { return to_return_sentinel_; }
private:
// Return true if at least one invalid file is seen and skipped.
bool SkipEmptyFileForward();
@ -1044,6 +1074,11 @@ class LevelIterator final : public InternalIterator {
return flevel_->files[file_index].smallest_key;
}
const Slice& file_largest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].largest_key;
}
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
user_comparator_.CompareWithoutTimestamp(
@ -1051,6 +1086,16 @@ class LevelIterator final : public InternalIterator {
*read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
}
void ClearRangeTombstoneIter() {
if (range_tombstone_iter_ && *range_tombstone_iter_) {
delete *range_tombstone_iter_;
*range_tombstone_iter_ = nullptr;
}
}
// Move file_iter_ to the file at file_index_.
// range_tombstone_iter_ is updated with a range tombstone iterator
// into the new file. Old range tombstone iterator is cleared.
InternalIterator* NewFileIterator() {
assert(file_index_ < flevel_->num_files);
auto file_meta = flevel_->files[file_index_];
@ -1065,13 +1110,14 @@ class LevelIterator final : public InternalIterator {
largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
}
CheckMayBeOutOfLowerBound();
ClearRangeTombstoneIter();
return table_cache_->NewIterator(
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, prefix_extractor_,
nullptr /* don't need reference to table */, file_read_hist_, caller_,
/*arena=*/nullptr, skip_filters_, level_,
/*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
largest_compaction_key, allow_unprepared_value_);
largest_compaction_key, allow_unprepared_value_, range_tombstone_iter_);
}
// Check if current file being fully within iterate_lower_bound.
@ -1117,9 +1163,51 @@ class LevelIterator final : public InternalIterator {
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
bool is_next_read_sequential_;
// This is set when this level iterator is used under a merging iterator
// that processes range tombstones. range_tombstone_iter_ points to where the
// merging iterator stores the range tombstones iterator for this level. When
// this level iterator moves to a new SST file, it updates the range
// tombstones accordingly through this pointer. So the merging iterator always
// has access to the current SST file's range tombstones.
//
// The level iterator treats file boundary as fake keys (sentinel keys) to
// keep range tombstones alive if needed and make upper level, i.e. merging
// iterator, aware of file changes (when level iterator moves to a new SST
// file, there is some bookkeeping work that needs to be done at merging
// iterator end).
//
// *range_tombstone_iter_ points to range tombstones of the current SST file
TruncatedRangeDelIterator** range_tombstone_iter_;
// Whether next/prev key is a sentinel key.
bool to_return_sentinel_ = false;
// The sentinel key to be returned
Slice sentinel_;
// Sets flags for if we should return the sentinel key next.
// The condition for returning sentinel is reaching the end of current
// file_iter_: !Valid() && status.().ok().
void TrySetDeleteRangeSentinel(const Slice& boundary_key);
void ClearSentinel() { to_return_sentinel_ = false; }
// Set in Seek() when a prefix seek reaches end of the current file,
// and the next file has a different prefix. SkipEmptyFileForward()
// will not move to next file when this flag is set.
bool prefix_exhausted_ = false;
};
void LevelIterator::TrySetDeleteRangeSentinel(const Slice& boundary_key) {
assert(range_tombstone_iter_);
if (file_iter_.iter() != nullptr && !file_iter_.Valid() &&
file_iter_.status().ok()) {
to_return_sentinel_ = true;
sentinel_ = boundary_key;
}
}
void LevelIterator::Seek(const Slice& target) {
prefix_exhausted_ = false;
ClearSentinel();
// Check whether the seek key fall under the same file
bool need_to_reseek = true;
if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
@ -1148,44 +1236,82 @@ void LevelIterator::Seek(const Slice& target) {
if (file_iter_.status() == Status::TryAgain()) {
return;
}
}
if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
!read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
file_iter_.iter() != nullptr && file_iter_.Valid()) {
// We've skipped the file we initially positioned to. In the prefix
// seek case, it is likely that the file is skipped because of
// prefix bloom or hash, where more keys are skipped. We then check
// the current key and invalidate the iterator if the prefix is
// already passed.
// When doing prefix iterator seek, when keys for one prefix have
// been exhausted, it can jump to any key that is larger. Here we are
// enforcing a stricter contract than that, in order to make it easier for
// higher layers (merging and DB iterator) to reason the correctness:
// 1. Within the prefix, the result should be accurate.
// 2. If keys for the prefix is exhausted, it is either positioned to the
// next key after the prefix, or make the iterator invalid.
// A side benefit will be that it invalidates the iterator earlier so that
// the upper level merging iterator can merge fewer child iterators.
size_t ts_sz = user_comparator_.timestamp_size();
Slice target_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(target, ts_sz);
Slice file_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
(!prefix_extractor_->InDomain(file_user_key_without_ts) ||
user_comparator_.CompareWithoutTimestamp(
prefix_extractor_->Transform(target_user_key_without_ts), false,
prefix_extractor_->Transform(file_user_key_without_ts),
false) != 0)) {
SetFileIterator(nullptr);
if (!file_iter_.Valid() && file_iter_.status().ok() &&
prefix_extractor_ != nullptr && !read_options_.total_order_seek &&
!read_options_.auto_prefix_mode &&
file_index_ < flevel_->num_files - 1) {
size_t ts_sz = user_comparator_.timestamp_size();
Slice target_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(target, ts_sz);
Slice next_file_first_user_key_without_ts =
ExtractUserKeyAndStripTimestamp(file_smallest_key(file_index_ + 1),
ts_sz);
if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
(!prefix_extractor_->InDomain(next_file_first_user_key_without_ts) ||
user_comparator_.CompareWithoutTimestamp(
prefix_extractor_->Transform(target_user_key_without_ts), false,
prefix_extractor_->Transform(
next_file_first_user_key_without_ts),
false) != 0)) {
// SkipEmptyFileForward() will not advance to next file when this flag
// is set for reason detailed below.
//
// The file we initially positioned to has no keys under the target
// prefix, and the next file's smallest key has a different prefix than
// target. When doing prefix iterator seek, when keys for one prefix
// have been exhausted, it can jump to any key that is larger. Here we
// are enforcing a stricter contract than that, in order to make it
// easier for higher layers (merging and DB iterator) to reason the
// correctness:
// 1. Within the prefix, the result should be accurate.
// 2. If keys for the prefix is exhausted, it is either positioned to
// the next key after the prefix, or make the iterator invalid.
// A side benefit will be that it invalidates the iterator earlier so
// that the upper level merging iterator can merge fewer child
// iterators.
//
// The flag is cleared in Seek*() calls. There is no need to clear the
// flag in Prev() since Prev() will not be called when the flag is set
// for reasons explained below. If range_tombstone_iter_ is nullptr,
// then there is no file boundary sentinel key. Since
// !file_iter_.Valid() from the if condition above, this level iterator
// is !Valid(), so Prev() will not be called. If range_tombstone_iter_
// is not nullptr, there are two cases depending on if this level
// iterator reaches top of the heap in merging iterator (the upper
// layer).
// If so, merging iterator will see the sentinel key, call
// NextAndGetResult() and the call to NextAndGetResult() will skip the
// sentinel key and makes this level iterator invalid. If not, then it
// could be because the upper layer is done before any method of this
// level iterator is called or another Seek*() call is invoked. Either
// way, Prev() is never called before Seek*().
// The flag should not be cleared at the beginning of
// Next/NextAndGetResult() since it is used in SkipEmptyFileForward()
// called in Next/NextAndGetResult().
prefix_exhausted_ = true;
}
}
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekForPrev(const Slice& target) {
prefix_exhausted_ = false;
ClearSentinel();
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
// Seek beyond this level's smallest key
if (new_file_index == 0 &&
icomparator_.Compare(target, file_smallest_key(0)) < 0) {
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
CheckMayBeOutOfLowerBound();
return;
}
if (new_file_index >= flevel_->num_files) {
new_file_index = flevel_->num_files - 1;
}
@ -1193,24 +1319,47 @@ void LevelIterator::SeekForPrev(const Slice& target) {
InitFileIterator(new_file_index);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekForPrev(target);
if (range_tombstone_iter_ &&
icomparator_.Compare(target, file_smallest_key(file_index_)) >= 0) {
// In SeekForPrev() case, it is possible that the target is less than
// file's lower boundary since largest key is used to determine file index
// (FindFile()). When target is less than file's lower boundary, sentinel
// key should not be set so that SeekForPrev() does not result in a key
// larger than target. This is correct in that there is no need to keep
// the range tombstones in this file alive as they only cover keys
// starting from the file's lower boundary, which is after `target`.
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
SkipEmptyFileBackward();
}
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToFirst() {
prefix_exhausted_ = false;
ClearSentinel();
InitFileIterator(0);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
if (range_tombstone_iter_) {
// We do this in SeekToFirst() and SeekToLast() since
// we could have an empty file with only range tombstones.
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToLast() {
prefix_exhausted_ = false;
ClearSentinel();
InitFileIterator(flevel_->num_files - 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
}
SkipEmptyFileBackward();
CheckMayBeOutOfLowerBound();
@ -1218,25 +1367,47 @@ void LevelIterator::SeekToLast() {
void LevelIterator::Next() {
assert(Valid());
file_iter_.Next();
if (to_return_sentinel_) {
// file_iter_ is at EOF already when to_return_sentinel_
ClearSentinel();
} else {
file_iter_.Next();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
SkipEmptyFileForward();
}
bool LevelIterator::NextAndGetResult(IterateResult* result) {
assert(Valid());
bool is_valid = file_iter_.NextAndGetResult(result);
// file_iter_ is at EOF already when to_return_sentinel_
bool is_valid = !to_return_sentinel_ && file_iter_.NextAndGetResult(result);
if (!is_valid) {
if (to_return_sentinel_) {
ClearSentinel();
} else if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
is_next_read_sequential_ = true;
SkipEmptyFileForward();
is_next_read_sequential_ = false;
is_valid = Valid();
if (is_valid) {
result->key = key();
result->bound_check_result = file_iter_.UpperBoundCheckResult();
// Ideally, we should return the real file_iter_.value_prepared but the
// information is not here. It would casue an extra PrepareValue()
// for the first key of a file.
result->value_prepared = !allow_unprepared_value_;
// This could be set in TrySetDeleteRangeSentinel() or
// SkipEmptyFileForward() above.
if (to_return_sentinel_) {
result->key = sentinel_;
result->bound_check_result = IterBoundCheck::kUnknown;
result->value_prepared = true;
} else {
result->key = key();
result->bound_check_result = file_iter_.UpperBoundCheckResult();
// Ideally, we should return the real file_iter_.value_prepared but the
// information is not here. It would casue an extra PrepareValue()
// for the first key of a file.
result->value_prepared = !allow_unprepared_value_;
}
}
}
return is_valid;
@ -1244,47 +1415,81 @@ bool LevelIterator::NextAndGetResult(IterateResult* result) {
void LevelIterator::Prev() {
assert(Valid());
file_iter_.Prev();
if (to_return_sentinel_) {
ClearSentinel();
} else {
file_iter_.Prev();
if (range_tombstone_iter_) {
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
}
}
SkipEmptyFileBackward();
}
bool LevelIterator::SkipEmptyFileForward() {
bool seen_empty_file = false;
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok() &&
file_iter_.iter()->UpperBoundCheckResult() !=
IterBoundCheck::kOutOfBound)) {
// Pause at sentinel key
while (!to_return_sentinel_ &&
(file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok() &&
file_iter_.iter()->UpperBoundCheckResult() !=
IterBoundCheck::kOutOfBound))) {
seen_empty_file = true;
// Move to next file
if (file_index_ >= flevel_->num_files - 1) {
// Already at the last file
SetFileIterator(nullptr);
break;
}
if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
if (file_index_ >= flevel_->num_files - 1 ||
KeyReachedUpperBound(file_smallest_key(file_index_ + 1)) ||
prefix_exhausted_) {
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
break;
}
// may init a new *range_tombstone_iter
InitFileIterator(file_index_ + 1);
// We moved to a new SST file
// Seek range_tombstone_iter_ to reset its !Valid() default state.
// We do not need to call range_tombstone_iter_.Seek* in
// LevelIterator::Seek* since when the merging iterator calls
// LevelIterator::Seek*, it should also call Seek* into the corresponding
// range tombstone iterator.
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
if (range_tombstone_iter_) {
if (*range_tombstone_iter_) {
(*range_tombstone_iter_)->SeekToFirst();
}
TrySetDeleteRangeSentinel(file_largest_key(file_index_));
}
}
}
return seen_empty_file;
}
void LevelIterator::SkipEmptyFileBackward() {
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok())) {
// Pause at sentinel key
while (!to_return_sentinel_ &&
(file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok()))) {
// Move to previous file
if (file_index_ == 0) {
// Already the first file
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
return;
}
InitFileIterator(file_index_ - 1);
// We moved to a new SST file
// Seek range_tombstone_iter_ to reset its !Valid() default state.
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
if (range_tombstone_iter_) {
if (*range_tombstone_iter_) {
(*range_tombstone_iter_)->SeekToLast();
}
TrySetDeleteRangeSentinel(file_smallest_key(file_index_));
if (to_return_sentinel_) {
break;
}
}
}
}
}
@ -1312,6 +1517,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) {
if (new_file_index >= flevel_->num_files) {
file_index_ = new_file_index;
SetFileIterator(nullptr);
ClearRangeTombstoneIter();
return;
} else {
// If the file iterator shows incomplete, we try it again if users seek
@ -1661,6 +1867,21 @@ Status Version::VerifySstUniqueIds() const {
return Status::OK();
}
InternalIterator* Version::TEST_GetLevelIterator(
const ReadOptions& read_options, MergeIteratorBuilder* merge_iter_builder,
int level, bool allow_unprepared_value) {
auto* arena = merge_iter_builder->GetArena();
auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
return new (mem) LevelIterator(
cfd_->table_cache(), read_options, file_options_,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
nullptr /* range_del_agg */, nullptr /* compaction_boundaries */,
allow_unprepared_value, merge_iter_builder);
}
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
// Estimation will be inaccurate when:
// (1) there exist merge keys
@ -1711,22 +1932,19 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
void Version::AddIterators(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
RangeDelAggregator* range_del_agg,
bool allow_unprepared_value) {
assert(storage_info_.finalized_);
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
range_del_agg, allow_unprepared_value);
allow_unprepared_value);
}
}
void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level,
RangeDelAggregator* range_del_agg,
bool allow_unprepared_value) {
int level, bool allow_unprepared_value) {
assert(storage_info_.finalized_);
if (level >= storage_info_.num_non_empty_levels()) {
// This is an empty level
@ -1741,17 +1959,21 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
auto* arena = merge_iter_builder->GetArena();
if (level == 0) {
// Merge all level zero files together since they may overlap
TruncatedRangeDelIterator* iter = nullptr;
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, range_del_agg,
*file.file_metadata, /*range_del_agg=*/nullptr,
mutable_cf_options_.prefix_extractor, nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value));
/*largest_compaction_key=*/nullptr, allow_unprepared_value, &iter));
if (!read_options.ignore_range_deletions) {
merge_iter_builder->AddRangeTombstoneIterator(iter);
}
}
if (should_sample) {
// Count ones for every L0 files. This is done per iterator creation
@ -1773,8 +1995,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
range_del_agg,
/*compaction_boundaries=*/nullptr, allow_unprepared_value));
/*range_del_agg=*/nullptr, /*compaction_boundaries=*/nullptr,
allow_unprepared_value, merge_iter_builder));
}
}

@ -803,7 +803,6 @@ class Version {
void AddIterators(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merger_iter_builder,
RangeDelAggregator* range_del_agg,
bool allow_unprepared_value);
// @param read_options Must outlive any iterator built by
@ -811,8 +810,7 @@ class Version {
void AddIteratorsForLevel(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg,
bool allow_unprepared_value);
int level, bool allow_unprepared_value);
Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&,
const Slice& smallest_user_key,
@ -963,6 +961,10 @@ class Version {
Status VerifySstUniqueIds() const;
InternalIterator* TEST_GetLevelIterator(
const ReadOptions& read_options, MergeIteratorBuilder* merge_iter_builder,
int level, bool allow_unprepared_value);
private:
Env* env_;
SystemClock* clock_;

@ -1715,7 +1715,8 @@ enum {
rocksdb_blob_read_time,
rocksdb_blob_checksum_time,
rocksdb_blob_decompress_time,
rocksdb_total_metric_count = 77
rocksdb_internal_range_del_reseek_count,
rocksdb_total_metric_count = 78
};
extern ROCKSDB_LIBRARY_API void rocksdb_set_perf_level(int);

@ -124,6 +124,10 @@ struct PerfContext {
// How many values were fed into merge operator by iterators.
//
uint64_t internal_merge_count;
// Number of times we reseeked inside a merging iterator, specifically to skip
// after or before a range of keys covered by a range deletion in a newer LSM
// component.
uint64_t internal_range_del_reseek_count;
uint64_t get_snapshot_time; // total nanos spent on getting snapshot
uint64_t get_from_memtable_time; // total nanos spent on querying memtables

@ -59,6 +59,7 @@ PerfContext::PerfContext(const PerfContext& other) {
internal_delete_skipped_count = other.internal_delete_skipped_count;
internal_recent_skipped_count = other.internal_recent_skipped_count;
internal_merge_count = other.internal_merge_count;
internal_range_del_reseek_count = other.internal_range_del_reseek_count;
write_wal_time = other.write_wal_time;
get_snapshot_time = other.get_snapshot_time;
get_from_memtable_time = other.get_from_memtable_time;
@ -166,6 +167,7 @@ PerfContext::PerfContext(PerfContext&& other) noexcept {
internal_delete_skipped_count = other.internal_delete_skipped_count;
internal_recent_skipped_count = other.internal_recent_skipped_count;
internal_merge_count = other.internal_merge_count;
internal_range_del_reseek_count = other.internal_range_del_reseek_count;
write_wal_time = other.write_wal_time;
get_snapshot_time = other.get_snapshot_time;
get_from_memtable_time = other.get_from_memtable_time;
@ -275,6 +277,7 @@ PerfContext& PerfContext::operator=(const PerfContext& other) {
internal_delete_skipped_count = other.internal_delete_skipped_count;
internal_recent_skipped_count = other.internal_recent_skipped_count;
internal_merge_count = other.internal_merge_count;
internal_range_del_reseek_count = other.internal_range_del_reseek_count;
write_wal_time = other.write_wal_time;
get_snapshot_time = other.get_snapshot_time;
get_from_memtable_time = other.get_from_memtable_time;
@ -381,6 +384,7 @@ void PerfContext::Reset() {
internal_delete_skipped_count = 0;
internal_recent_skipped_count = 0;
internal_merge_count = 0;
internal_range_del_reseek_count = 0;
write_wal_time = 0;
get_snapshot_time = 0;
@ -509,6 +513,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(internal_delete_skipped_count);
PERF_CONTEXT_OUTPUT(internal_recent_skipped_count);
PERF_CONTEXT_OUTPUT(internal_merge_count);
PERF_CONTEXT_OUTPUT(internal_range_del_reseek_count);
PERF_CONTEXT_OUTPUT(write_wal_time);
PERF_CONTEXT_OUTPUT(get_snapshot_time);
PERF_CONTEXT_OUTPUT(get_from_memtable_time);

@ -186,6 +186,13 @@ class InternalIteratorBase : public Cleanable {
// Default implementation is no-op and its implemented by iterators.
virtual void SetReadaheadState(ReadaheadFileInfo* /*readahead_file_info*/) {}
// When used under merging iterator, LevelIterator treats file boundaries
// as sentinel keys to prevent it from moving to next SST file before range
// tombstones in the current SST file are no longer needed. This method makes
// it cheap to check if the current key is a sentinel key. This should only be
// used by MergingIterator and LevelIterator for now.
virtual bool IsDeleteRangeSentinelKey() const { return false; }
protected:
void SeekForPrevImpl(const Slice& target, const CompareInterface* cmp) {
Seek(target);

@ -162,6 +162,10 @@ class IteratorWrapperBase {
}
}
bool IsDeleteRangeSentinelKey() const {
return iter_->IsDeleteRangeSentinelKey();
}
private:
void Update() {
valid_ = iter_->Valid();

File diff suppressed because it is too large Load Diff

@ -9,12 +9,14 @@
#pragma once
#include "db/range_del_aggregator.h"
#include "rocksdb/slice.h"
#include "rocksdb/types.h"
namespace ROCKSDB_NAMESPACE {
class Arena;
class ArenaWrappedDBIter;
class InternalKeyComparator;
template <class TValue>
@ -47,18 +49,37 @@ class MergeIteratorBuilder {
// Add iter to the merging iterator.
void AddIterator(InternalIterator* iter);
// Add a range tombstone iterator to underlying merge iterator.
// See MergingIterator::AddRangeTombstoneIterator() for more detail.
//
// If `iter_ptr` is not nullptr, *iter_ptr will be set to where the merging
// iterator stores `iter` when MergeIteratorBuilder::Finish() is called. This
// is used by level iterator to update range tombstone iters when switching to
// a different SST file.
void AddRangeTombstoneIterator(
TruncatedRangeDelIterator* iter,
TruncatedRangeDelIterator*** iter_ptr = nullptr);
// Get arena used to build the merging iterator. It is called one a child
// iterator needs to be allocated.
Arena* GetArena() { return arena; }
// Return the result merging iterator.
InternalIterator* Finish();
// If db_iter is not nullptr, then db_iter->SetMemtableRangetombstoneIter()
// will be called with pointer to where the merging iterator
// stores the memtable range tombstone iterator.
// This is used for DB iterator to refresh memtable range tombstones.
InternalIterator* Finish(ArenaWrappedDBIter* db_iter = nullptr);
private:
MergingIterator* merge_iter;
InternalIterator* first_iter;
bool use_merging_iter;
Arena* arena;
// Used to set LevelIterator.range_tombstone_iter_.
// See AddRangeTombstoneIterator() implementation for more detail.
std::vector<std::pair<size_t, TruncatedRangeDelIterator***>>
range_del_iter_ptrs_;
};
} // namespace ROCKSDB_NAMESPACE

@ -78,11 +78,9 @@ Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
auto icmp = InternalKeyComparator(idb->GetOptions(cfh).comparator);
ReadOptions read_options;
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
Arena arena;
ScopedArenaIterator iter(idb->NewInternalIterator(
read_options, &arena, &range_del_agg, kMaxSequenceNumber, cfh));
ScopedArenaIterator iter(
idb->NewInternalIterator(read_options, &arena, kMaxSequenceNumber, cfh));
if (!begin_key.empty()) {
InternalKey ikey;

Loading…
Cancel
Save