implement lower bound for iterators

Summary:
- for `SeekToFirst()`, just convert it to a regular `Seek()` if lower bound is specified
- for operations that iterate backwards over user keys (`SeekForPrev`, `SeekToLast`, `Prev`), change `PrevInternal` to check whether user key went below lower bound every time the user key changes -- same approach we use to ensure we stay within a prefix when `prefix_same_as_start=true`.
Closes https://github.com/facebook/rocksdb/pull/3074

Differential Revision: D6158654

Pulled By: ajkr

fbshipit-source-id: cb0e3a922e2650d2cd4d1c6e1c0f1e8b729ff518
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 5a2a6483dc
commit 95667383db
  1. 1
      HISTORY.md
  2. 14
      db/db_iter.cc
  3. 68
      db/db_iter_test.cc
  4. 14
      include/rocksdb/options.h
  5. 2
      options/options.cc

@ -10,6 +10,7 @@
* Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed.
* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false.
* Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots.
* Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`.
### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.

@ -115,6 +115,7 @@ class DBIter final: public Iterator {
valid_(false),
current_entry_is_merged_(false),
statistics_(cf_options.statistics),
iterate_lower_bound_(read_options.iterate_lower_bound),
iterate_upper_bound_(read_options.iterate_upper_bound),
prefix_same_as_start_(read_options.prefix_same_as_start),
pin_thru_lifetime_(read_options.pin_data),
@ -285,6 +286,7 @@ class DBIter final: public Iterator {
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
uint64_t num_internal_keys_skipped_;
const Slice* iterate_lower_bound_;
const Slice* iterate_upper_bound_;
IterKey prefix_start_buf_;
Slice prefix_start_key_;
@ -723,6 +725,14 @@ void DBIter::PrevInternal() {
return;
}
if (iterate_lower_bound_ != nullptr &&
user_comparator_->Compare(saved_key_.GetUserKey(),
*iterate_lower_bound_) < 0) {
// We've iterated earlier than the user-specified lower bound.
valid_ = false;
return;
}
if (FindValueForCurrentKey()) {
if (!iter_->Valid()) {
return;
@ -1154,6 +1164,10 @@ void DBIter::SeekToFirst() {
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
if (iterate_lower_bound_ != nullptr) {
Seek(*iterate_lower_bound_);
return;
}
direction_ = kForward;
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();

@ -2849,6 +2849,74 @@ TEST_F(DBIteratorTest, SeekPrefixTombstones) {
ASSERT_EQ(skipped_keys, 0);
}
TEST_F(DBIteratorTest, SeekToFirstLowerBound) {
const int kNumKeys = 3;
for (int i = 0; i < kNumKeys + 2; ++i) {
// + 2 for two special cases: lower bound before and lower bound after the
// internal iterator's keys
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
for (int j = 1; j <= kNumKeys; ++j) {
internal_iter->AddPut(std::to_string(j), "val");
}
internal_iter->Finish();
ReadOptions ro;
auto lower_bound_str = std::to_string(i);
Slice lower_bound(lower_bound_str);
ro.iterate_lower_bound = &lower_bound;
Options options;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 10 /* sequence */,
options.max_sequential_skip_in_iterations, nullptr /* read_callback */));
db_iter->SeekToFirst();
if (i == kNumKeys + 1) {
// lower bound was beyond the last key
ASSERT_FALSE(db_iter->Valid());
} else {
ASSERT_TRUE(db_iter->Valid());
int expected;
if (i == 0) {
// lower bound was before the first key
expected = 1;
} else {
// lower bound was at the ith key
expected = i;
}
ASSERT_EQ(std::to_string(expected), db_iter->key().ToString());
}
}
}
TEST_F(DBIteratorTest, PrevLowerBound) {
const int kNumKeys = 3;
const int kLowerBound = 2;
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
for (int j = 1; j <= kNumKeys; ++j) {
internal_iter->AddPut(std::to_string(j), "val");
}
internal_iter->Finish();
ReadOptions ro;
auto lower_bound_str = std::to_string(kLowerBound);
Slice lower_bound(lower_bound_str);
ro.iterate_lower_bound = &lower_bound;
Options options;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 10 /* sequence */,
options.max_sequential_skip_in_iterations, nullptr /* read_callback */));
db_iter->SeekToLast();
for (int i = kNumKeys; i >= kLowerBound; --i) {
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(std::to_string(i), db_iter->key().ToString());
db_iter->Prev();
}
ASSERT_FALSE(db_iter->Valid());
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -971,14 +971,24 @@ struct ReadOptions {
// Default: nullptr
const Snapshot* snapshot;
// `iterate_lower_bound` defines the smallest key at which the backward
// iterator can return an entry. Once the bound is passed, Valid() will be
// false. `iterate_lower_bound` is inclusive ie the bound value is a valid
// entry.
//
// If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
// need to have the same prefix. This is because ordering is not guaranteed
// outside of prefix domain.
//
// Default: nullptr
const Slice* iterate_lower_bound;
// "iterate_upper_bound" defines the extent upto which the forward iterator
// can returns entries. Once the bound is reached, Valid() will be false.
// "iterate_upper_bound" is exclusive ie the bound value is
// not a valid entry. If iterator_extractor is not null, the Seek target
// and iterator_upper_bound need to have the same prefix.
// This is because ordering is not guaranteed outside of prefix domain.
// There is no lower bound on the iterator. If needed, that can be easily
// implemented.
//
// Default: nullptr
const Slice* iterate_upper_bound;

@ -514,6 +514,7 @@ DBOptions* DBOptions::IncreaseParallelism(int total_threads) {
ReadOptions::ReadOptions()
: snapshot(nullptr),
iterate_lower_bound(nullptr),
iterate_upper_bound(nullptr),
readahead_size(0),
max_skippable_internal_keys(0),
@ -530,6 +531,7 @@ ReadOptions::ReadOptions()
ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr),
iterate_lower_bound(nullptr),
iterate_upper_bound(nullptr),
readahead_size(0),
max_skippable_internal_keys(0),

Loading…
Cancel
Save