Support `iter_start_ts` for backward iteration (#10200)

Summary:
Resolves https://github.com/facebook/rocksdb/issues/9761

With this PR, applications can create an iterator with the following
```cpp
ReadOptions read_opts;
read_opts.timestamp = &ts_ub;
read_opts.iter_start_ts = &ts_lb;
auto* it = db->NewIterator(read_opts);
it->SeekToLast();
// or it->SeekForPrev("foo");
it->Prev();
...
```
The application can access different versions of the same user key via `key()`, `value()`, and `timestamp()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10200

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D37258074

Pulled By: riversand963

fbshipit-source-id: 3f0b866ade50dcff7ef60d506397a9dd6ec91565
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent d96febeeaa
commit b6cfda1283
  1. 1
      HISTORY.md
  2. 105
      db/db_iter.cc
  3. 5
      db/db_iter.h
  4. 177
      db/db_with_timestamp_basic_test.cc

@ -2,6 +2,7 @@
## Unreleased
### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.
* Support backward iteration when `ReadOptions::iter_start_ts` is set.
### Public API changes
* Add metadata related structs and functions in C API, including

@ -812,6 +812,10 @@ bool DBIter::FindValueForCurrentKey() {
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
// If false, it indicates that we have not seen any valid entry, even though
// last_key_entry_type is initialized to kTypeDeletion.
bool valid_entry_seen = false;
// Temporarily pin blocks that hold (merge operands / the value)
ReleaseTempPinnedData();
TempPinData();
@ -822,20 +826,33 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
// Found a smaller user key, thus we are done with current user key.
break;
}
assert(ikey.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
timestamp_size_);
}
if (!IsVisible(ikey.sequence, ts) ||
!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
bool visible = IsVisible(ikey.sequence, ts);
if (!visible &&
(timestamp_lb_ == nullptr ||
user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) {
// Found an invisible version of the current user key, and it must have
// a higher sequence number or timestamp. Therefore, we are done with the
// current user key.
break;
}
if (!ts.empty()) {
saved_timestamp_.assign(ts.data(), ts.size());
}
if (TooManyInternalKeysSkipped()) {
return false;
}
@ -852,6 +869,15 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
if (timestamp_lb_ != nullptr) {
// Only needed when timestamp_lb_ is not null
[[maybe_unused]] const bool ret = ParseKey(&ikey_);
saved_ikey_.assign(iter_.key().data(), iter_.key().size());
// Since the preceding ParseKey(&ikey) succeeds, so must this.
assert(ret);
}
valid_entry_seen = true;
last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
@ -908,6 +934,14 @@ bool DBIter::FindValueForCurrentKey() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
iter_.Prev();
++num_skipped;
if (visible && timestamp_lb_ != nullptr) {
// If timestamp_lb_ is not nullptr, we do not have to look further for
// another internal key. We can return this current internal key. Yet we
// still keep the invariant that iter_ is positioned before the returned
// key.
break;
}
}
if (!iter_.status().ok()) {
@ -915,6 +949,20 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
if (!valid_entry_seen) {
// Since we haven't seen any valid entry, last_key_entry_type remains
// unchanged and the same as its initial value.
assert(last_key_entry_type == kTypeDeletion);
assert(last_not_merge_type == kTypeDeletion);
valid_ = false;
return true;
}
if (timestamp_lb_ != nullptr) {
assert(last_key_entry_type == ikey_.type ||
last_key_entry_type == kTypeRangeDeletion);
}
Status s;
s.PermitUncheckedError();
is_blob_ = false;
@ -923,7 +971,12 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
valid_ = false;
if (timestamp_lb_ == nullptr) {
valid_ = false;
} else {
saved_key_.SetInternalKey(saved_ikey_);
valid_ = true;
}
return true;
case kTypeMerge:
current_entry_is_merged_ = true;
@ -970,6 +1023,9 @@ bool DBIter::FindValueForCurrentKey() {
break;
case kTypeValue:
// do nothing - we've already has value in pinned_value_
if (timestamp_lb_ != nullptr) {
saved_key_.SetInternalKey(saved_ikey_);
}
break;
case kTypeBlobIndex:
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
@ -1015,7 +1071,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek),
*timestamp_ub_);
timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_);
}
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
@ -1060,7 +1116,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal) ||
kTypeDeletionWithTimestamp == ikey.type) {
valid_ = false;
if (timestamp_lb_ == nullptr) {
valid_ = false;
} else {
valid_ = true;
saved_key_.SetInternalKey(ikey);
}
return true;
}
if (!iter_.PrepareValue()) {
@ -1085,6 +1146,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}
}
if (timestamp_lb_ != nullptr) {
saved_key_.SetInternalKey(ikey);
}
valid_ = true;
return true;
}
@ -1214,8 +1279,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
return false;
}
if (user_comparator_.CompareWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey()) < 0) {
if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) {
return true;
}
@ -1328,7 +1392,9 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
if (timestamp_size_ > 0) {
const std::string kTsMin(timestamp_size_, '\0');
Slice ts = kTsMin;
saved_key_.UpdateInternalKey(/*seq=*/0, kValueTypeForSeekForPrev, &ts);
saved_key_.UpdateInternalKey(
/*seq=*/0, kValueTypeForSeekForPrev,
timestamp_lb_ == nullptr ? &ts : timestamp_lb_);
}
if (iterate_upper_bound_ != nullptr &&
@ -1341,8 +1407,9 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
if (timestamp_size_ > 0) {
const std::string kTsMax(timestamp_size_, '\xff');
Slice ts = kTsMax;
saved_key_.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeekForPrev,
&ts);
saved_key_.UpdateInternalKey(
kMaxSequenceNumber, kValueTypeForSeekForPrev,
timestamp_lb_ != nullptr ? timestamp_lb_ : &ts);
}
}
}
@ -1543,11 +1610,21 @@ void DBIter::SeekToLast() {
if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);
if (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
*iterate_upper_bound_, /*a_has_ts=*/false, key(),
/*b_has_ts=*/false)) {
const bool is_ikey = (timestamp_size_ > 0 && timestamp_lb_ != nullptr);
Slice k = Valid() ? key() : Slice();
if (is_ikey) {
k.remove_suffix(kNumInternalBytes + timestamp_size_);
}
while (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
*iterate_upper_bound_, /*a_has_ts=*/false, k,
/*b_has_ts=*/false)) {
ReleaseTempPinnedData();
PrevInternal(nullptr);
k = key();
if (is_ikey) {
k.remove_suffix(kNumInternalBytes + timestamp_size_);
}
}
return;
}

@ -224,9 +224,11 @@ class DBIter final : public Iterator {
bool ReverseToBackward();
// Set saved_key_ to the seek key to target, with proper sequence number set.
// It might get adjusted if the seek key is smaller than iterator lower bound.
// target does not have timestamp.
void SetSavedKeyToSeekTarget(const Slice& target);
// Set saved_key_ to the seek key to target, with proper sequence number set.
// It might get adjusted if the seek key is larger than iterator upper bound.
// target does not have timestamp.
void SetSavedKeyToSeekForPrevTarget(const Slice& target);
bool FindValueForCurrentKey();
bool FindValueForCurrentKeyUsingSeek();
@ -377,6 +379,9 @@ class DBIter final : public Iterator {
const Slice* const timestamp_lb_;
const size_t timestamp_size_;
std::string saved_timestamp_;
// Used only if timestamp_lb_ is not nullptr.
std::string saved_ikey_;
};
// Return a new iterator that converts internal keys (yielded by

@ -1065,6 +1065,183 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound) {
constexpr int kNumKeysPerFile = 128;
constexpr uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
const std::vector<std::string> read_timestamps_lb = {Timestamp(1, 0),
Timestamp(1, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
Slice read_ts_lb = read_timestamps_lb[i];
read_opts.timestamp = &read_ts;
read_opts.iter_start_ts = &read_ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = 0;
for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
it->Prev(), ++count, --key) {
CheckIterEntry(it.get(), Key1(key), kTypeValue, "value0",
write_timestamps[0]);
if (i > 0) {
it->Prev();
CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
write_timestamps[1]);
}
}
size_t expected_count = kMaxKey + 1;
ASSERT_EQ(expected_count, count);
}
// Delete all keys@ts=5 and check iteration result with start ts set
{
std::string write_timestamp = Timestamp(5, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
Status s = db_->Delete(write_opts, Key1(key), write_timestamp);
ASSERT_OK(s);
}
std::string read_timestamp = Timestamp(6, 0);
ReadOptions read_opts;
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
std::string read_timestamp_lb = Timestamp(2, 0);
Slice read_ts_lb = read_timestamp_lb;
read_opts.iter_start_ts = &read_ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = kMaxKey;
for (it->SeekForPrev(Key1(key)), key = kMaxKey; it->Valid();
it->Prev(), ++count, --key) {
CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
Timestamp(3, 0));
it->Prev();
CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
write_timestamp);
}
ASSERT_EQ(kMaxKey + 1, count);
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, SimpleBackwardIterateLowerTsBound) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
std::string ts_ub_buf = Timestamp(5, 0);
Slice ts_ub = ts_ub_buf;
std::string ts_lb_buf = Timestamp(1, 0);
Slice ts_lb = ts_lb_buf;
{
ReadOptions read_opts;
read_opts.timestamp = &ts_ub;
read_opts.iter_start_ts = &ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
it->SeekToLast();
ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status());
it->SeekForPrev("foo");
ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status());
}
// Test iterate_upper_bound
ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(0, 0), "v0"));
ASSERT_OK(db_->SingleDelete(WriteOptions(), "a", Timestamp(1, 0)));
for (int i = 0; i < 5; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
"v" + std::to_string(i)));
}
{
ReadOptions read_opts;
read_opts.timestamp = &ts_ub;
read_opts.iter_start_ts = &ts_lb;
std::string key_ub_str = "b"; // exclusive
Slice key_ub = key_ub_str;
read_opts.iterate_upper_bound = &key_ub;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
it->SeekToLast();
CheckIterEntry(it.get(), "a", kTypeSingleDeletion, Slice(),
Timestamp(1, 0));
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound_Reseek) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.max_sequential_skip_in_iterations = 2;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
for (int i = 0; i < 10; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(i, 0),
"v" + std::to_string(i)));
}
for (int i = 0; i < 10; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
"v" + std::to_string(i)));
}
{
std::string ts_ub_buf = Timestamp(6, 0);
Slice ts_ub = ts_ub_buf;
std::string ts_lb_buf = Timestamp(4, 0);
Slice ts_lb = ts_lb_buf;
ReadOptions read_opts;
read_opts.timestamp = &ts_ub;
read_opts.iter_start_ts = &ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
it->SeekToLast();
for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
CheckIterEntry(it.get(), "b", kTypeValue, "v" + std::to_string(4 + i),
Timestamp(4 + i, 0));
}
for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
CheckIterEntry(it.get(), "a", kTypeValue, "v" + std::to_string(4 + i),
Timestamp(4 + i, 0));
}
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
Options options = CurrentOptions();
options.env = env_;

Loading…
Cancel
Save