diff --git a/HISTORY.md b/HISTORY.md index e7a4fb361..582cba0ff 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed. * Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false. * Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots. +* Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`. ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/db/db_iter.cc b/db/db_iter.cc index 59d47bb08..1288e19f0 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -115,6 +115,7 @@ class DBIter final: public Iterator { valid_(false), current_entry_is_merged_(false), statistics_(cf_options.statistics), + iterate_lower_bound_(read_options.iterate_lower_bound), iterate_upper_bound_(read_options.iterate_upper_bound), prefix_same_as_start_(read_options.prefix_same_as_start), pin_thru_lifetime_(read_options.pin_data), @@ -285,6 +286,7 @@ class DBIter final: public Iterator { uint64_t max_skip_; uint64_t max_skippable_internal_keys_; uint64_t num_internal_keys_skipped_; + const Slice* iterate_lower_bound_; const Slice* iterate_upper_bound_; IterKey prefix_start_buf_; Slice prefix_start_key_; @@ -723,6 +725,14 @@ void DBIter::PrevInternal() { return; } + if (iterate_lower_bound_ != nullptr && + user_comparator_->Compare(saved_key_.GetUserKey(), + *iterate_lower_bound_) < 0) { + // We've iterated earlier than the user-specified lower bound. + valid_ = false; + return; + } + if (FindValueForCurrentKey()) { if (!iter_->Valid()) { return; @@ -1154,6 +1164,10 @@ void DBIter::SeekToFirst() { if (prefix_extractor_ != nullptr) { max_skip_ = std::numeric_limits::max(); } + if (iterate_lower_bound_ != nullptr) { + Seek(*iterate_lower_bound_); + return; + } direction_ = kForward; ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index ebe3b9653..eb19a8f0d 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -2849,6 +2849,74 @@ TEST_F(DBIteratorTest, SeekPrefixTombstones) { ASSERT_EQ(skipped_keys, 0); } +TEST_F(DBIteratorTest, SeekToFirstLowerBound) { + const int kNumKeys = 3; + for (int i = 0; i < kNumKeys + 2; ++i) { + // + 2 for two special cases: lower bound before and lower bound after the + // internal iterator's keys + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + for (int j = 1; j <= kNumKeys; ++j) { + internal_iter->AddPut(std::to_string(j), "val"); + } + internal_iter->Finish(); + + ReadOptions ro; + auto lower_bound_str = std::to_string(i); + Slice lower_bound(lower_bound_str); + ro.iterate_lower_bound = &lower_bound; + Options options; + std::unique_ptr db_iter(NewDBIterator( + env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10 /* sequence */, + options.max_sequential_skip_in_iterations, nullptr /* read_callback */)); + + db_iter->SeekToFirst(); + if (i == kNumKeys + 1) { + // lower bound was beyond the last key + ASSERT_FALSE(db_iter->Valid()); + } else { + ASSERT_TRUE(db_iter->Valid()); + int expected; + if (i == 0) { + // lower bound was before the first key + expected = 1; + } else { + // lower bound was at the ith key + expected = i; + } + ASSERT_EQ(std::to_string(expected), db_iter->key().ToString()); + } + } +} + +TEST_F(DBIteratorTest, PrevLowerBound) { + const int kNumKeys = 3; + const int kLowerBound = 2; + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + for (int j = 1; j <= kNumKeys; ++j) { + internal_iter->AddPut(std::to_string(j), "val"); + } + internal_iter->Finish(); + + ReadOptions ro; + auto lower_bound_str = std::to_string(kLowerBound); + Slice lower_bound(lower_bound_str); + ro.iterate_lower_bound = &lower_bound; + Options options; + std::unique_ptr db_iter(NewDBIterator( + env_, ro, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10 /* sequence */, + options.max_sequential_skip_in_iterations, nullptr /* read_callback */)); + + db_iter->SeekToLast(); + for (int i = kNumKeys; i >= kLowerBound; --i) { + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(std::to_string(i), db_iter->key().ToString()); + db_iter->Prev(); + } + ASSERT_FALSE(db_iter->Valid()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3ec83a65c..dfc960bd3 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -971,14 +971,24 @@ struct ReadOptions { // Default: nullptr const Snapshot* snapshot; + // `iterate_lower_bound` defines the smallest key at which the backward + // iterator can return an entry. Once the bound is passed, Valid() will be + // false. `iterate_lower_bound` is inclusive ie the bound value is a valid + // entry. + // + // If prefix_extractor is not null, the Seek target and `iterate_lower_bound` + // need to have the same prefix. This is because ordering is not guaranteed + // outside of prefix domain. + // + // Default: nullptr + const Slice* iterate_lower_bound; + // "iterate_upper_bound" defines the extent upto which the forward iterator // can returns entries. Once the bound is reached, Valid() will be false. // "iterate_upper_bound" is exclusive ie the bound value is // not a valid entry. If iterator_extractor is not null, the Seek target // and iterator_upper_bound need to have the same prefix. // This is because ordering is not guaranteed outside of prefix domain. - // There is no lower bound on the iterator. If needed, that can be easily - // implemented. // // Default: nullptr const Slice* iterate_upper_bound; diff --git a/options/options.cc b/options/options.cc index f3db8ccb8..6761a7b9f 100644 --- a/options/options.cc +++ b/options/options.cc @@ -514,6 +514,7 @@ DBOptions* DBOptions::IncreaseParallelism(int total_threads) { ReadOptions::ReadOptions() : snapshot(nullptr), + iterate_lower_bound(nullptr), iterate_upper_bound(nullptr), readahead_size(0), max_skippable_internal_keys(0), @@ -530,6 +531,7 @@ ReadOptions::ReadOptions() ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), + iterate_lower_bound(nullptr), iterate_upper_bound(nullptr), readahead_size(0), max_skippable_internal_keys(0),