make iterator return versions between timestamp bounds (#6544)

Summary:
(Based on Yanqin's idea) Add a new field in readoptions as lower timestamp bound for iterator. When the parameter is not supplied (nullptr), the iterator returns the latest visible version of a record. When it is supplied, the existing timestamp field is the upper bound. Together the two serves as a bounded time window. The iterator returns all versions of a record falling in the window.

SeekRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks.
base line (commit e860f8840):
seekrandom   : 7.836 micros/op 4082449 ops/sec; (0 of 73481999 found)
This PR:
seekrandom   : 7.764 micros/op 4120935 ops/sec; (0 of 71303999 found)

db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=seekrandom --use_existing_db=1 --num=25000000 --threads=32 --allow_concurrent_memtable_write=0
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6544

Reviewed By: ltamasi

Differential Revision: D20844069

Pulled By: riversand963

fbshipit-source-id: d97f2bf38a323c8c6a68db213b2d3c694b1c1f74
main
Huisheng Liu 5 years ago committed by Facebook GitHub Bot
parent 66a95f0fac
commit 9e89ffb776
  1. 48
      db/db_iter.cc
  2. 13
      db/db_iter.h
  3. 59
      db/db_with_timestamp_basic_test.cc
  4. 5
      include/rocksdb/options.h
  5. 6
      options/options.cc

@ -73,6 +73,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
cfd_(cfd), cfd_(cfd),
start_seqnum_(read_options.iter_start_seqnum), start_seqnum_(read_options.iter_start_seqnum),
timestamp_ub_(read_options.timestamp), timestamp_ub_(read_options.timestamp),
timestamp_lb_(read_options.iter_start_ts),
timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) { timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
RecordTick(statistics_, NO_ITERATOR_CREATED); RecordTick(statistics_, NO_ITERATOR_CREATED);
if (pin_thru_lifetime_) { if (pin_thru_lifetime_) {
@ -246,23 +247,22 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
assert(ikey_.user_key.size() >= timestamp_size_); assert(ikey_.user_key.size() >= timestamp_size_);
Slice ts; Slice ts;
bool more_recent = false;
if (timestamp_size_ > 0) { if (timestamp_size_ > 0) {
ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
} }
if (IsVisible(ikey_.sequence, ts)) { if (IsVisible(ikey_.sequence, ts, &more_recent)) {
// If the previous entry is of seqnum 0, the current entry will not // If the previous entry is of seqnum 0, the current entry will not
// possibly be skipped. This condition can potentially be relaxed to // possibly be skipped. This condition can potentially be relaxed to
// prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
// prone to bugs causing the same user key with the same sequence number. // prone to bugs causing the same user key with the same sequence number.
if (!is_prev_key_seqnum_zero && skipping_saved_key && if (!is_prev_key_seqnum_zero && skipping_saved_key &&
user_comparator_.CompareWithoutTimestamp( CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
num_skipped++; // skip this entry num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1); PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else { } else {
assert(!skipping_saved_key || assert(!skipping_saved_key ||
user_comparator_.CompareWithoutTimestamp( CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
ikey_.user_key, saved_key_.GetUserKey()) > 0);
num_skipped = 0; num_skipped = 0;
reseek_done = false; reseek_done = false;
switch (ikey_.type) { switch (ikey_.type) {
@ -363,11 +363,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
} }
} }
} else { } else {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1); if (more_recent) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
}
// This key was inserted after our snapshot was taken. // This key was inserted after our snapshot was taken or skipped by
// If this happens too many times in a row for the same user key, we want // timestamp range. If this happens too many times in a row for the same
// to seek to the target sequence number. // user key, we want to seek to the target sequence number.
int cmp = user_comparator_.CompareWithoutTimestamp( int cmp = user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, saved_key_.GetUserKey()); ikey_.user_key, saved_key_.GetUserKey());
if (cmp == 0 || (skipping_saved_key && cmp < 0)) { if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
@ -1101,20 +1103,24 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
return false; return false;
} }
bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) { bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
bool* more_recent) {
// Remember that comparator orders preceding timestamp as larger. // Remember that comparator orders preceding timestamp as larger.
int cmp_ts = timestamp_ub_ != nullptr // TODO(yanqin): support timestamp in read_callback_.
? user_comparator_.CompareTimestamp(ts, *timestamp_ub_) bool visible_by_seq = (read_callback_ == nullptr)
: 0; ? sequence <= sequence_
if (cmp_ts > 0) { : read_callback_->IsVisible(sequence);
return false;
} bool visible_by_ts =
if (read_callback_ == nullptr) { (timestamp_ub_ == nullptr ||
return sequence <= sequence_; user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
} else { (timestamp_lb_ == nullptr ||
// TODO(yanqin): support timestamp in read_callback_. user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);
return read_callback_->IsVisible(sequence);
if (more_recent) {
*more_recent = !visible_by_seq;
} }
return visible_by_seq && visible_by_ts;
} }
void DBIter::SetSavedKeyToSeekTarget(const Slice& target) { void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {

@ -231,7 +231,8 @@ class DBIter final : public Iterator {
// entry can be found within the prefix. // entry can be found within the prefix.
void PrevInternal(const Slice* prefix); void PrevInternal(const Slice* prefix);
bool TooManyInternalKeysSkipped(bool increment = true); bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence, const Slice& ts); bool IsVisible(SequenceNumber sequence, const Slice& ts,
bool* more_recent = nullptr);
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called // is called
@ -270,6 +271,15 @@ class DBIter final : public Iterator {
return expect_total_order_inner_iter_; return expect_total_order_inner_iter_;
} }
// If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need
// to return versions of the same key. We cannot just skip if the key value
// is the same but timestamps are different but fall in timestamp range.
inline int CompareKeyForSkip(const Slice& a, const Slice& b) {
return timestamp_lb_ != nullptr
? user_comparator_.Compare(a, b)
: user_comparator_.CompareWithoutTimestamp(a, b);
}
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;
Env* const env_; Env* const env_;
Logger* logger_; Logger* logger_;
@ -338,6 +348,7 @@ class DBIter final : public Iterator {
// if this value > 0 iterator will return internal keys // if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_; SequenceNumber start_seqnum_;
const Slice* const timestamp_ub_; const Slice* const timestamp_ub_;
const Slice* const timestamp_lb_;
const size_t timestamp_size_; const size_t timestamp_size_;
}; };

@ -235,6 +235,57 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
Close(); Close();
} }
TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
const std::vector<std::string> read_timestamps_lb = {Timestamp(1, 0),
Timestamp(1, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
Slice write_ts = write_timestamps[i];
write_opts.timestamp = &write_ts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
ASSERT_OK(s);
}
}
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
Slice read_ts_lb = read_timestamps_lb[i];
read_opts.timestamp = &read_ts;
read_opts.iter_start_ts = &read_ts_lb;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = 0;
for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
if (i > 0) {
it->Next();
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i - 1),
write_timestamps[i - 1]);
}
}
size_t expected_count = kMaxKey + 1;
ASSERT_EQ(expected_count, count);
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
const int kNumKeysPerFile = 128; const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 0xffffffffffffffff; const uint64_t kMaxKey = 0xffffffffffffffff;
@ -584,7 +635,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
std::vector<std::string> read_ts_list; std::vector<std::string> read_ts_list;
const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, const auto& verify_records_func = [&](size_t i, size_t begin, size_t end,
ColumnFamilyHandle* cfh) { ColumnFamilyHandle* cfh) {
std::string value; std::string value;
std::string timestamp; std::string timestamp;
@ -622,9 +673,9 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
// higherlevel[0].largest.userkey // higherlevel[0].largest.userkey
ASSERT_OK(Flush(cf)); ASSERT_OK(Flush(cf));
// compact files (2 at each level) to a lower level such that all keys // compact files (2 at each level) to a lower level such that all
// with the same timestamp is at one level, with newer versions at // keys with the same timestamp is at one level, with newer versions
// higher levels. // at higher levels.
CompactionOptions compact_opt; CompactionOptions compact_opt;
compact_opt.compression = kNoCompression; compact_opt.compression = kNoCompression;
db_->CompactFiles(compact_opt, handles_[cf], db_->CompactFiles(compact_opt, handles_[cf],

@ -1337,9 +1337,14 @@ struct ReadOptions {
// specified timestamp. All timestamps of the same database must be of the // specified timestamp. All timestamps of the same database must be of the
// same length and format. The user is responsible for providing a customized // same length and format. The user is responsible for providing a customized
// compare function via Comparator to order <key, timestamp> tuples. // compare function via Comparator to order <key, timestamp> tuples.
// For iterator, iter_start_ts is the lower bound (older) and timestamp
// serves as the upper bound. Versions of the same record that fall in
// the timestamp range will be returned. If iter_start_ts is nullptr,
// only the most recent version visible to timestamp is returned.
// The user-specified timestamp feature is still under active development, // The user-specified timestamp feature is still under active development,
// and the API is subject to change. // and the API is subject to change.
const Slice* timestamp; const Slice* timestamp;
const Slice* iter_start_ts;
ReadOptions(); ReadOptions();
ReadOptions(bool cksum, bool cache); ReadOptions(bool cksum, bool cache);

@ -607,7 +607,8 @@ ReadOptions::ReadOptions()
background_purge_on_iterator_cleanup(false), background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false), ignore_range_deletions(false),
iter_start_seqnum(0), iter_start_seqnum(0),
timestamp(nullptr) {} timestamp(nullptr),
iter_start_ts(nullptr) {}
ReadOptions::ReadOptions(bool cksum, bool cache) ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr), : snapshot(nullptr),
@ -627,6 +628,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
background_purge_on_iterator_cleanup(false), background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false), ignore_range_deletions(false),
iter_start_seqnum(0), iter_start_seqnum(0),
timestamp(nullptr) {} timestamp(nullptr),
iter_start_ts(nullptr) {}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save