From b6cfda12837006c535c85f5256ae4a6e2878b75a Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 28 Jun 2022 19:51:05 -0700 Subject: [PATCH] Support `iter_start_ts` for backward iteration (#10200) Summary: Resolves https://github.com/facebook/rocksdb/issues/9761 With this PR, applications can create an iterator with the following ```cpp ReadOptions read_opts; read_opts.timestamp = &ts_ub; read_opts.iter_start_ts = &ts_lb; auto* it = db->NewIterator(read_opts); it->SeekToLast(); // or it->SeekForPrev("foo"); it->Prev(); ... ``` The application can access different versions of the same user key via `key()`, `value()`, and `timestamp()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10200 Test Plan: make check Reviewed By: ltamasi Differential Revision: D37258074 Pulled By: riversand963 fbshipit-source-id: 3f0b866ade50dcff7ef60d506397a9dd6ec91565 --- HISTORY.md | 1 + db/db_iter.cc | 105 ++++++++++++++--- db/db_iter.h | 5 + db/db_with_timestamp_basic_test.cc | 177 +++++++++++++++++++++++++++++ 4 files changed, 274 insertions(+), 14 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 7fb65369b..f49b38985 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. +* Support backward iteration when `ReadOptions::iter_start_ts` is set. ### Public API changes * Add metadata related structs and functions in C API, including diff --git a/db/db_iter.cc b/db/db_iter.cc index b68a3bcb2..1ffc82319 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -812,6 +812,10 @@ bool DBIter::FindValueForCurrentKey() { ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; + // If false, it indicates that we have not seen any valid entry, even though + // last_key_entry_type is initialized to kTypeDeletion. + bool valid_entry_seen = false; + // Temporarily pin blocks that hold (merge operands / the value) ReleaseTempPinnedData(); TempPinData(); @@ -822,20 +826,33 @@ bool DBIter::FindValueForCurrentKey() { return false; } + if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey())) { + // Found a smaller user key, thus we are done with current user key. + break; + } + assert(ikey.user_key.size() >= timestamp_size_); Slice ts; if (timestamp_size_ > 0) { ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_, timestamp_size_); } - if (!IsVisible(ikey.sequence, ts) || - !user_comparator_.EqualWithoutTimestamp(ikey.user_key, - saved_key_.GetUserKey())) { + + bool visible = IsVisible(ikey.sequence, ts); + if (!visible && + (timestamp_lb_ == nullptr || + user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) { + // Found an invisible version of the current user key, and it must have + // a higher sequence number or timestamp. Therefore, we are done with the + // current user key. break; } + if (!ts.empty()) { saved_timestamp_.assign(ts.data(), ts.size()); } + if (TooManyInternalKeysSkipped()) { return false; } @@ -852,6 +869,15 @@ bool DBIter::FindValueForCurrentKey() { return false; } + if (timestamp_lb_ != nullptr) { + // Only needed when timestamp_lb_ is not null + [[maybe_unused]] const bool ret = ParseKey(&ikey_); + saved_ikey_.assign(iter_.key().data(), iter_.key().size()); + // Since the preceding ParseKey(&ikey) succeeds, so must this. + assert(ret); + } + + valid_entry_seen = true; last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: @@ -908,6 +934,14 @@ bool DBIter::FindValueForCurrentKey() { PERF_COUNTER_ADD(internal_key_skipped_count, 1); iter_.Prev(); ++num_skipped; + + if (visible && timestamp_lb_ != nullptr) { + // If timestamp_lb_ is not nullptr, we do not have to look further for + // another internal key. We can return this current internal key. Yet we + // still keep the invariant that iter_ is positioned before the returned + // key. + break; + } } if (!iter_.status().ok()) { @@ -915,6 +949,20 @@ bool DBIter::FindValueForCurrentKey() { return false; } + if (!valid_entry_seen) { + // Since we haven't seen any valid entry, last_key_entry_type remains + // unchanged and the same as its initial value. + assert(last_key_entry_type == kTypeDeletion); + assert(last_not_merge_type == kTypeDeletion); + valid_ = false; + return true; + } + + if (timestamp_lb_ != nullptr) { + assert(last_key_entry_type == ikey_.type || + last_key_entry_type == kTypeRangeDeletion); + } + Status s; s.PermitUncheckedError(); is_blob_ = false; @@ -923,7 +971,12 @@ bool DBIter::FindValueForCurrentKey() { case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: - valid_ = false; + if (timestamp_lb_ == nullptr) { + valid_ = false; + } else { + saved_key_.SetInternalKey(saved_ikey_); + valid_ = true; + } return true; case kTypeMerge: current_entry_is_merged_ = true; @@ -970,6 +1023,9 @@ bool DBIter::FindValueForCurrentKey() { break; case kTypeValue: // do nothing - we've already has value in pinned_value_ + if (timestamp_lb_ != nullptr) { + saved_key_.SetInternalKey(saved_ikey_); + } break; case kTypeBlobIndex: if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { @@ -1015,7 +1071,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_, kValueTypeForSeek), - *timestamp_ub_); + timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_); } iter_.Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); @@ -1060,7 +1116,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { range_del_agg_.ShouldDelete( ikey, RangeDelPositioningMode::kBackwardTraversal) || kTypeDeletionWithTimestamp == ikey.type) { - valid_ = false; + if (timestamp_lb_ == nullptr) { + valid_ = false; + } else { + valid_ = true; + saved_key_.SetInternalKey(ikey); + } return true; } if (!iter_.PrepareValue()) { @@ -1085,6 +1146,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } } + if (timestamp_lb_ != nullptr) { + saved_key_.SetInternalKey(ikey); + } + valid_ = true; return true; } @@ -1214,8 +1279,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() { return false; } - if (user_comparator_.CompareWithoutTimestamp(ikey.user_key, - saved_key_.GetUserKey()) < 0) { + if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) { return true; } @@ -1328,7 +1392,9 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) { if (timestamp_size_ > 0) { const std::string kTsMin(timestamp_size_, '\0'); Slice ts = kTsMin; - saved_key_.UpdateInternalKey(/*seq=*/0, kValueTypeForSeekForPrev, &ts); + saved_key_.UpdateInternalKey( + /*seq=*/0, kValueTypeForSeekForPrev, + timestamp_lb_ == nullptr ? &ts : timestamp_lb_); } if (iterate_upper_bound_ != nullptr && @@ -1341,8 +1407,9 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) { if (timestamp_size_ > 0) { const std::string kTsMax(timestamp_size_, '\xff'); Slice ts = kTsMax; - saved_key_.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeekForPrev, - &ts); + saved_key_.UpdateInternalKey( + kMaxSequenceNumber, kValueTypeForSeekForPrev, + timestamp_lb_ != nullptr ? timestamp_lb_ : &ts); } } } @@ -1543,11 +1610,21 @@ void DBIter::SeekToLast() { if (iterate_upper_bound_ != nullptr) { // Seek to last key strictly less than ReadOptions.iterate_upper_bound. SeekForPrev(*iterate_upper_bound_); - if (Valid() && 0 == user_comparator_.CompareWithoutTimestamp( - *iterate_upper_bound_, /*a_has_ts=*/false, key(), - /*b_has_ts=*/false)) { + const bool is_ikey = (timestamp_size_ > 0 && timestamp_lb_ != nullptr); + Slice k = Valid() ? key() : Slice(); + if (is_ikey) { + k.remove_suffix(kNumInternalBytes + timestamp_size_); + } + while (Valid() && 0 == user_comparator_.CompareWithoutTimestamp( + *iterate_upper_bound_, /*a_has_ts=*/false, k, + /*b_has_ts=*/false)) { ReleaseTempPinnedData(); PrevInternal(nullptr); + + k = key(); + if (is_ikey) { + k.remove_suffix(kNumInternalBytes + timestamp_size_); + } } return; } diff --git a/db/db_iter.h b/db/db_iter.h index 975f0e824..8a4ee3792 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -224,9 +224,11 @@ class DBIter final : public Iterator { bool ReverseToBackward(); // Set saved_key_ to the seek key to target, with proper sequence number set. // It might get adjusted if the seek key is smaller than iterator lower bound. + // target does not have timestamp. void SetSavedKeyToSeekTarget(const Slice& target); // Set saved_key_ to the seek key to target, with proper sequence number set. // It might get adjusted if the seek key is larger than iterator upper bound. + // target does not have timestamp. void SetSavedKeyToSeekForPrevTarget(const Slice& target); bool FindValueForCurrentKey(); bool FindValueForCurrentKeyUsingSeek(); @@ -377,6 +379,9 @@ class DBIter final : public Iterator { const Slice* const timestamp_lb_; const size_t timestamp_size_; std::string saved_timestamp_; + + // Used only if timestamp_lb_ is not nullptr. + std::string saved_ikey_; }; // Return a new iterator that converts internal keys (yielded by diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 84fae1f10..e08133cb7 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -1065,6 +1065,183 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { Close(); } +TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound) { + constexpr int kNumKeysPerFile = 128; + constexpr uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::vector write_timestamps = {Timestamp(1, 0), + Timestamp(3, 0)}; + const std::vector read_timestamps = {Timestamp(2, 0), + Timestamp(4, 0)}; + const std::vector read_timestamps_lb = {Timestamp(1, 0), + Timestamp(1, 0)}; + for (size_t i = 0; i < write_timestamps.size(); ++i) { + WriteOptions write_opts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), write_timestamps[i], + "value" + std::to_string(i)); + ASSERT_OK(s); + } + } + for (size_t i = 0; i < read_timestamps.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_timestamps[i]; + Slice read_ts_lb = read_timestamps_lb[i]; + read_opts.timestamp = &read_ts; + read_opts.iter_start_ts = &read_ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = 0; + for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid(); + it->Prev(), ++count, --key) { + CheckIterEntry(it.get(), Key1(key), kTypeValue, "value0", + write_timestamps[0]); + if (i > 0) { + it->Prev(); + CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1", + write_timestamps[1]); + } + } + size_t expected_count = kMaxKey + 1; + ASSERT_EQ(expected_count, count); + } + // Delete all keys@ts=5 and check iteration result with start ts set + { + std::string write_timestamp = Timestamp(5, 0); + WriteOptions write_opts; + for (uint64_t key = 0; key < kMaxKey + 1; ++key) { + Status s = db_->Delete(write_opts, Key1(key), write_timestamp); + ASSERT_OK(s); + } + + std::string read_timestamp = Timestamp(6, 0); + ReadOptions read_opts; + Slice read_ts = read_timestamp; + read_opts.timestamp = &read_ts; + std::string read_timestamp_lb = Timestamp(2, 0); + Slice read_ts_lb = read_timestamp_lb; + read_opts.iter_start_ts = &read_ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = kMaxKey; + for (it->SeekForPrev(Key1(key)), key = kMaxKey; it->Valid(); + it->Prev(), ++count, --key) { + CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1", + Timestamp(3, 0)); + it->Prev(); + CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(), + write_timestamp); + } + ASSERT_EQ(kMaxKey + 1, count); + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, SimpleBackwardIterateLowerTsBound) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + std::string ts_ub_buf = Timestamp(5, 0); + Slice ts_ub = ts_ub_buf; + std::string ts_lb_buf = Timestamp(1, 0); + Slice ts_lb = ts_lb_buf; + + { + ReadOptions read_opts; + read_opts.timestamp = &ts_ub; + read_opts.iter_start_ts = &ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + it->SeekToLast(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + + it->SeekForPrev("foo"); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + // Test iterate_upper_bound + ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(0, 0), "v0")); + ASSERT_OK(db_->SingleDelete(WriteOptions(), "a", Timestamp(1, 0))); + + for (int i = 0; i < 5; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0), + "v" + std::to_string(i))); + } + + { + ReadOptions read_opts; + read_opts.timestamp = &ts_ub; + read_opts.iter_start_ts = &ts_lb; + std::string key_ub_str = "b"; // exclusive + Slice key_ub = key_ub_str; + read_opts.iterate_upper_bound = &key_ub; + std::unique_ptr it(db_->NewIterator(read_opts)); + it->SeekToLast(); + CheckIterEntry(it.get(), "a", kTypeSingleDeletion, Slice(), + Timestamp(1, 0)); + } + + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound_Reseek) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.max_sequential_skip_in_iterations = 2; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + + for (int i = 0; i < 10; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(i, 0), + "v" + std::to_string(i))); + } + + for (int i = 0; i < 10; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0), + "v" + std::to_string(i))); + } + + { + std::string ts_ub_buf = Timestamp(6, 0); + Slice ts_ub = ts_ub_buf; + std::string ts_lb_buf = Timestamp(4, 0); + Slice ts_lb = ts_lb_buf; + + ReadOptions read_opts; + read_opts.timestamp = &ts_ub; + read_opts.iter_start_ts = &ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + it->SeekToLast(); + for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) { + CheckIterEntry(it.get(), "b", kTypeValue, "v" + std::to_string(4 + i), + Timestamp(4 + i, 0)); + } + for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) { + CheckIterEntry(it.get(), "a", kTypeValue, "v" + std::to_string(4 + i), + Timestamp(4 + i, 0)); + } + } + + Close(); +} + TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { Options options = CurrentOptions(); options.env = env_;