Iterator with timestamp (#6255)

Summary:
Preliminary support for iterator with user timestamp. Current implementation does not consider merge operator and reverse iterator. Auto compaction is also disabled in unit tests.

Create an iterator with timestamp.
```
...
read_opts.timestamp = &ts;
auto* iter = db->NewIterator(read_opts);
// target is key without timestamp.
for (iter->Seek(target); iter->Valid(); iter->Next()) {}
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
delete iter;
read_opts.timestamp = &ts1;
// lower_bound and upper_bound are without timestamp.
read_opts.iterate_lower_bound = &lower_bound;
read_opts.iterate_upper_bound = &upper_bound;
auto* iter1 = db->NewIterator(read_opts);
// Do Seek or SeekToFirst()
delete iter1;
```

Test plan (dev server)
```
$make check
```

Simple benchmarking (dev server)
1. The overhead introduced by this PR even when timestamp is disabled.
key size: 16 bytes
value size: 100 bytes
Entries: 1000000
Data reside in main memory, and try to stress iterator.
Repeated three times on master and this PR.
- Seek without next
```
./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3
```
master: 159047.0 ops/sec
this PR: 158922.3 ops/sec (2% drop in throughput)
- Seek and next 10 times
```
./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3 -seek_nexts=10
```
master: 109539.3 ops/sec
this PR: 107519.7 ops/sec (2% drop in throughput)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6255

Differential Revision: D19438227

Pulled By: riversand963

fbshipit-source-id: b66b4979486f8474619f4aa6bdd88598870b0746
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent 0a0151fb99
commit d93812c9ae
  1. 3
      HISTORY.md
  2. 3
      db/arena_wrapped_db_iter.h
  3. 738
      db/db_basic_test.cc
  4. 129
      db/db_iter.cc
  5. 16
      db/db_iter.h
  6. 9
      db/dbformat.cc
  7. 29
      db/dbformat.h
  8. 4
      db/version_set.cc
  9. 19
      include/rocksdb/comparator.h
  10. 9
      include/rocksdb/iterator.h
  11. 17
      table/block_based/block_based_table_reader.cc
  12. 1
      table/internal_iterator.h
  13. 4
      tools/ldb_cmd.cc
  14. 8
      util/comparator.cc
  15. 15
      util/user_comparator_wrapper.h
  16. 2
      utilities/ttl/db_ttl_impl.h

@ -10,6 +10,9 @@
### Performance Improvements
* In CompactRange, for levels starting from 0, if the level does not have any file with any key falling in the specified range, the level is skipped. So instead of always compacting from level 0, the compaction starts from the first level with keys in the specified range until the last such level.
### New Features
* Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered.
## 6.8.0 (02/24/2020)
### Java API Changes
* Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252).

@ -51,6 +51,8 @@ class ArenaWrappedDBIter : public Iterator {
bool Valid() const override { return db_iter_->Valid(); }
void SeekToFirst() override { db_iter_->SeekToFirst(); }
void SeekToLast() override { db_iter_->SeekToLast(); }
// 'target' does not contain timestamp, even if user timestamp feature is
// enabled.
void Seek(const Slice& target) override { db_iter_->Seek(target); }
void SeekForPrev(const Slice& target) override {
db_iter_->SeekForPrev(target);
@ -60,6 +62,7 @@ class ArenaWrappedDBIter : public Iterator {
Slice key() const override { return db_iter_->key(); }
Slice value() const override { return db_iter_->value(); }
Status status() const override { return db_iter_->status(); }
Slice timestamp() const override { return db_iter_->timestamp(); }
bool IsBlob() const { return db_iter_->IsBlob(); }
Status GetProperty(std::string prop_name, std::string* prop) override;

@ -392,7 +392,7 @@ TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
sleeping_task_low.WaitUntilDone();
}
TEST_F(DBBasicTest, FLUSH) {
TEST_F(DBBasicTest, Flush) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
WriteOptions writeOpt = WriteOptions();
@ -2196,9 +2196,36 @@ class DBBasicTestWithTimestampBase : public DBTestBase {
: DBTestBase(dbname) {}
protected:
class TestComparatorBase : public Comparator {
static std::string Key1(uint64_t k) {
uint32_t x = 1;
const bool is_little_endian = (*reinterpret_cast<char*>(&x) != 0);
std::string ret;
if (is_little_endian) {
ret.assign(reinterpret_cast<char*>(&k), sizeof(k));
} else {
ret.resize(sizeof(k));
ret[0] = k & 0xff;
ret[1] = (k >> 8) & 0xff;
ret[2] = (k >> 16) & 0xff;
ret[3] = (k >> 24) & 0xff;
ret[4] = (k >> 32) & 0xff;
ret[5] = (k >> 40) & 0xff;
ret[6] = (k >> 48) & 0xff;
ret[7] = (k >> 56) & 0xff;
}
std::reverse(ret.begin(), ret.end());
return ret;
}
class TestComparator : public Comparator {
private:
const Comparator* cmp_without_ts_;
public:
explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {}
explicit TestComparator(size_t ts_sz)
: Comparator(ts_sz), cmp_without_ts_(nullptr) {
cmp_without_ts_ = BytewiseComparator();
}
const char* Name() const override { return "TestComparator"; }
@ -2211,20 +2238,23 @@ class DBBasicTestWithTimestampBase : public DBTestBase {
if (r != 0 || 0 == timestamp_size()) {
return r;
}
return CompareTimestamp(
return -CompareTimestamp(
Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
}
virtual int CompareImpl(const Slice& a, const Slice& b) const = 0;
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
assert(a.size() >= timestamp_size());
assert(b.size() >= timestamp_size());
Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
return CompareImpl(k1, k2);
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
bool b_has_ts) const override {
if (a_has_ts) {
assert(a.size() >= timestamp_size());
}
if (b_has_ts) {
assert(b.size() >= timestamp_size());
}
Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a;
Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b;
return cmp_without_ts_->Compare(lhs, rhs);
}
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
@ -2240,60 +2270,409 @@ class DBBasicTestWithTimestampBase : public DBTestBase {
uint64_t low2 = 0;
uint64_t high1 = 0;
uint64_t high2 = 0;
auto* ptr1 = const_cast<Slice*>(&ts1);
auto* ptr2 = const_cast<Slice*>(&ts2);
const size_t kSize = ts1.size();
std::unique_ptr<char[]> ts1_buf(new char[kSize]);
memcpy(ts1_buf.get(), ts1.data(), ts1.size());
std::unique_ptr<char[]> ts2_buf(new char[kSize]);
memcpy(ts2_buf.get(), ts2.data(), ts2.size());
Slice ts1_copy = Slice(ts1_buf.get(), kSize);
Slice ts2_copy = Slice(ts2_buf.get(), kSize);
auto* ptr1 = const_cast<Slice*>(&ts1_copy);
auto* ptr2 = const_cast<Slice*>(&ts2_copy);
if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
!GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
assert(false);
}
if (high1 < high2) {
return 1;
} else if (high1 > high2) {
return -1;
} else if (high1 > high2) {
return 1;
}
if (low1 < low2) {
return 1;
} else if (low1 > low2) {
return -1;
} else if (low1 > low2) {
return 1;
}
return 0;
}
};
Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
assert(nullptr != ts);
ts->clear();
PutFixed64(ts, low);
PutFixed64(ts, high);
assert(ts->size() == sizeof(low) + sizeof(high));
return Slice(*ts);
std::string Timestamp(uint64_t low, uint64_t high) {
std::string ts;
PutFixed64(&ts, low);
PutFixed64(&ts, high);
return ts;
}
void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
const Slice& expected_value,
const Slice& expected_ts) const {
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
ASSERT_EQ(expected_key, it->key());
ASSERT_EQ(expected_value, it->value());
ASSERT_EQ(expected_ts, it->timestamp());
}
void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
SequenceNumber expected_seq, ValueType expected_val_type,
const Slice& expected_value, const Slice& expected_ts) {
ASSERT_TRUE(it->Valid());
ASSERT_OK(it->status());
std::string ukey_and_ts;
ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
ukey_and_ts.append(expected_ts.data(), expected_ts.size());
ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type);
std::string ikey;
AppendInternalKey(&ikey, parsed_ikey);
ASSERT_EQ(Slice(ikey), it->key());
if (expected_val_type == kTypeValue) {
ASSERT_EQ(expected_value, it->value());
}
ASSERT_EQ(expected_ts, it->timestamp());
}
};
class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
public:
DBBasicTestWithTimestamp()
: DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {}
: DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
};
protected:
class TestComparator : public TestComparatorBase {
public:
const int kKeyPrefixLength =
3; // 3: length of "key" in generated keys ("key" + std::to_string(j))
explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {}
int CompareImpl(const Slice& a, const Slice& b) const override {
int n1 = atoi(
std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength)
.c_str());
int n2 = atoi(
std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength)
.c_str());
return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0;
TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
const int kNumKeysPerFile = 2048;
const uint64_t kMaxKey = 16384;
Options options = CurrentOptions();
options.env = env_;
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
Slice write_ts = write_timestamps[i];
write_opts.timestamp = &write_ts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
ASSERT_OK(s);
}
};
}
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = 0;
for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
it->Next(), ++count, ++key) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
}
size_t expected_count = kMaxKey - start_keys[i] + 1;
ASSERT_EQ(expected_count, count);
// SeekToFirst() with lower bound.
// Then iter with lower and upper bounds.
uint64_t l = 0;
uint64_t r = kMaxKey + 1;
while (l < r) {
std::string lb_str = Key1(l);
Slice lb = lb_str;
std::string ub_str = Key1(r);
Slice ub = ub_str;
read_opts.iterate_lower_bound = &lb;
read_opts.iterate_upper_bound = &ub;
it.reset(db_->NewIterator(read_opts));
for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
it->Valid(); it->Next(), ++key, ++count) {
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
write_timestamps[i]);
}
ASSERT_EQ(r - std::max(l, start_keys[i]), count);
l += (kMaxKey / 100);
r -= (kMaxKey / 100);
}
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
const int kNumKeysPerFile = 2048;
const uint64_t kMaxKey = 0xffffffffffffffff;
const uint64_t kMinKey = kMaxKey - 16383;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
std::vector<SequenceNumber> start_seqs;
const int kNumTimestamps = 4;
std::vector<std::string> write_ts_list;
for (int t = 0; t != kNumTimestamps; ++t) {
write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
}
WriteOptions write_opts;
for (size_t i = 0; i != write_ts_list.size(); ++i) {
Slice write_ts = write_ts_list[i];
write_opts.timestamp = &write_ts;
uint64_t k = kMinKey;
do {
Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
ASSERT_OK(s);
if (k == kMaxKey) {
break;
}
++k;
} while (k != 0);
start_seqs.push_back(db_->GetLatestSequenceNumber());
}
std::vector<std::string> read_ts_list;
for (int t = 0; t != kNumTimestamps - 1; ++t) {
read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
}
ReadOptions read_opts;
for (size_t i = 0; i != read_ts_list.size(); ++i) {
Slice read_ts = read_ts_list[i];
read_opts.timestamp = &read_ts;
read_opts.iter_start_seqnum = start_seqs[i];
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
SequenceNumber expected_seq = start_seqs[i] + 1;
uint64_t key = kMinKey;
for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue,
"value" + std::to_string(i + 1), write_ts_list[i + 1]);
++key;
++expected_seq;
}
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
constexpr size_t kNumKeys = 16;
options.max_sequential_skip_in_iterations = kNumKeys / 2;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Insert kNumKeys
WriteOptions write_opts;
Status s;
for (size_t i = 0; i != kNumKeys; ++i) {
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
s = db_->Put(write_opts, "foo", "value" + std::to_string(i));
ASSERT_OK(s);
}
{
ReadOptions read_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->SeekToFirst();
CheckIterUserEntry(iter.get(), "foo", "value0", ts_str);
ASSERT_EQ(
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
constexpr size_t kNumKeys = 16;
options.max_sequential_skip_in_iterations = kNumKeys / 2;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
// TODO(yanqin) re-enable auto compaction
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Write kNumKeys + 1 keys
WriteOptions write_opts;
Status s;
for (size_t i = 0; i != kNumKeys; ++i) {
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
s = db_->Put(write_opts, "a", "value" + std::to_string(i));
ASSERT_OK(s);
}
{
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
WriteBatch batch(0, 0, kTimestampSize);
batch.Put("a", "new_value");
batch.Put("b", "new_value");
s = batch.AssignTimestamp(ts_str);
ASSERT_OK(s);
s = db_->Write(write_opts, &batch);
ASSERT_OK(s);
}
{
ReadOptions read_opts;
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
Slice ts = ts_str;
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->Seek("a");
iter->Next();
CheckIterUserEntry(iter.get(), "b", "new_value", ts_str);
ASSERT_EQ(
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
constexpr size_t max_skippable_internal_keys = 2;
const size_t kNumKeys = max_skippable_internal_keys + 2;
WriteOptions write_opts;
Status s;
{
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "a", "value"));
}
for (size_t i = 0; i < kNumKeys; ++i) {
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
s = db_->Put(write_opts, "b", "value" + std::to_string(i));
ASSERT_OK(s);
}
{
ReadOptions read_opts;
read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
iter->SeekToFirst();
iter->Next();
ASSERT_TRUE(iter->status().IsIncomplete());
}
Close();
}
class DBBasicTestWithTimestampCompressionSettings
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<std::tuple<
std::shared_ptr<const FilterPolicy>, CompressionType, uint32_t>> {
public:
DBBasicTestWithTimestampCompressionSettings()
: DBBasicTestWithTimestampBase(
"db_basic_test_with_timestamp_compression") {}
};
TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
const int kNumKeysPerFile = 8192;
const size_t kNumTimestamps = 6;
Options options = CurrentOptions();
options.create_if_missing = true;
options.env = env_;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
size_t ts_sz = Timestamp(0, 0).size();
TestComparator test_cmp(ts_sz);
options.comparator = &test_cmp;
BlockBasedTableOptions bbto;
bbto.filter_policy = std::get<0>(GetParam());
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const CompressionType comp_type = std::get<1>(GetParam());
#if LZ4_VERSION_NUMBER < 10400 // r124+
if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
return;
}
#endif // LZ4_VERSION_NUMBER >= 10400
if (!ZSTD_Supported() && comp_type == kZSTD) {
return;
}
if (!Zlib_Supported() && comp_type == kZlibCompression) {
return;
}
options.compression = comp_type;
options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
if (comp_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
}
options.target_file_size_base = 1 << 26; // 64MB
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
std::vector<std::string> write_ts_list;
std::vector<std::string> read_ts_list;
for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.push_back(Timestamp(i * 2, 0));
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
const Slice write_ts = write_ts_list.back();
WriteOptions wopts;
wopts.timestamp = &write_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
ASSERT_OK(Put(cf, Key1(j),
"value_" + std::to_string(j) + "_" + std::to_string(i),
wopts));
}
}
}
const auto& verify_db_func = [&]() {
for (size_t i = 0; i != kNumTimestamps; ++i) {
ReadOptions ropts;
const Slice read_ts = read_ts_list[i];
ropts.timestamp = &read_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
ColumnFamilyHandle* cfh = handles_[cf];
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
std::string value;
ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value));
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
value);
}
}
}
};
verify_db_func();
Close();
}
#ifndef ROCKSDB_LITE
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
@ -2325,7 +2704,7 @@ class FlushedFileCollector : public EventListener {
InstrumentedMutex mutex_;
};
TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
const int kNumKeysPerFile = 8192;
const size_t kNumTimestamps = 2;
const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
@ -2338,23 +2717,39 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
std::string tmp;
size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
size_t ts_sz = Timestamp(0, 0).size();
TestComparator test_cmp(ts_sz);
options.comparator = &test_cmp;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(
10 /*bits_per_key*/, false /*use_block_based_builder*/));
bbto.filter_policy = std::get<0>(GetParam());
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const CompressionType comp_type = std::get<1>(GetParam());
#if LZ4_VERSION_NUMBER < 10400 // r124+
if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
return;
}
#endif // LZ4_VERSION_NUMBER >= 10400
if (!ZSTD_Supported() && comp_type == kZSTD) {
return;
}
if (!Zlib_Supported() && comp_type == kZlibCompression) {
return;
}
options.compression = comp_type;
options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
if (comp_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
}
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
std::vector<std::string> write_ts_strs(kNumTimestamps);
std::vector<std::string> read_ts_strs(kNumTimestamps);
std::vector<Slice> write_ts_list;
std::vector<Slice> read_ts_list;
std::vector<std::string> write_ts_list;
std::vector<std::string> read_ts_list;
const auto& verify_record_func = [&](size_t i, size_t k,
ColumnFamilyHandle* cfh) {
@ -2362,26 +2757,26 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
std::string timestamp;
ReadOptions ropts;
ropts.timestamp = &read_ts_list[i];
const Slice read_ts = read_ts_list[i];
ropts.timestamp = &read_ts;
std::string expected_timestamp =
std::string(write_ts_list[i].data(), write_ts_list[i].size());
ASSERT_OK(
db_->Get(ropts, cfh, "key" + std::to_string(k), &value, &timestamp));
ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, &timestamp));
ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value);
ASSERT_EQ(expected_timestamp, timestamp);
};
for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
const Slice& write_ts = write_ts_list.back();
write_ts_list.push_back(Timestamp(i * 2, 0));
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
const Slice write_ts = write_ts_list.back();
WriteOptions wopts;
wopts.timestamp = &write_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
size_t memtable_get_start = 0;
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK(Put(cf, "key" + std::to_string(j),
ASSERT_OK(Put(cf, Key1(j),
"value_" + std::to_string(j) + "_" + std::to_string(i),
wopts));
if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
@ -2411,7 +2806,8 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
const auto& verify_db_func = [&]() {
for (size_t i = 0; i != kNumTimestamps; ++i) {
ReadOptions ropts;
ropts.timestamp = &read_ts_list[i];
const Slice read_ts = read_ts_list[i];
ropts.timestamp = &read_ts;
std::string expected_timestamp(write_ts_list[i].data(),
write_ts_list[i].size());
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
@ -2423,130 +2819,148 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) {
}
};
verify_db_func();
Close();
}
#endif // !ROCKSDB_LITE
class DBBasicTestWithTimestampWithParam
INSTANTIATE_TEST_CASE_P(
Timestamp, DBBasicTestWithTimestampCompressionSettings,
::testing::Combine(
::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
std::shared_ptr<const FilterPolicy>(
NewBloomFilterPolicy(10, false))),
::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
kLZ4HCCompression, kZSTD),
::testing::Values(0, 1 << 14)));
class DBBasicTestWithTimestampPrefixSeek
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<bool> {
public testing::WithParamInterface<
std::tuple<std::shared_ptr<const SliceTransform>,
std::shared_ptr<const FilterPolicy>, bool>> {
public:
DBBasicTestWithTimestampWithParam()
DBBasicTestWithTimestampPrefixSeek()
: DBBasicTestWithTimestampBase(
"/db_basic_test_with_timestamp_with_param") {}
protected:
class TestComparator : public TestComparatorBase {
private:
const Comparator* cmp_without_ts_;
public:
explicit TestComparator(size_t ts_sz)
: TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) {
cmp_without_ts_ = BytewiseComparator();
}
int CompareImpl(const Slice& a, const Slice& b) const override {
return cmp_without_ts_->Compare(a, b);
}
};
"/db_basic_test_with_timestamp_prefix_seek") {}
};
TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
const int kNumKeysPerFile = 8192;
const size_t kNumTimestamps = 6;
bool memtable_only = GetParam();
TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
const size_t kNumKeysPerFile = 4096;
Options options = CurrentOptions();
options.create_if_missing = true;
options.env = env_;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
std::string tmp;
size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
TestComparator test_cmp(ts_sz);
options.create_if_missing = true;
// TODO(yanqin): re-enable auto compactions
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.prefix_extractor = std::get<0>(GetParam());
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(
10 /*bits_per_key*/, false /*use_block_based_builder*/));
bbto.whole_key_filtering = true;
bbto.filter_policy = std::get<1>(GetParam());
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
std::vector<CompressionType> compression_types;
compression_types.push_back(kNoCompression);
if (Zlib_Supported()) {
compression_types.push_back(kZlibCompression);
}
#if LZ4_VERSION_NUMBER >= 10400 // r124+
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
if (ZSTD_Supported()) {
compression_types.push_back(kZSTD);
const uint64_t kMaxKey = 0xffffffffffffffff;
const uint64_t kMinKey = 0xffffffffffff8000;
const std::vector<std::string> write_ts_list = {Timestamp(3, 0xffffffff),
Timestamp(6, 0xffffffff)};
WriteOptions write_opts;
{
for (size_t i = 0; i != write_ts_list.size(); ++i) {
Slice write_ts = write_ts_list[i];
write_opts.timestamp = &write_ts;
uint64_t key = kMinKey;
do {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
ASSERT_OK(s);
if (key == kMaxKey) {
break;
}
++key;
} while (true);
}
}
// Switch compression dictionary on/off to check key extraction
// correctness in kBuffered state
std::vector<uint32_t> max_dict_bytes_list = {0, 1 << 14}; // 0 or 16KB
for (auto compression_type : compression_types) {
for (uint32_t max_dict_bytes : max_dict_bytes_list) {
options.compression = compression_type;
options.compression_opts.max_dict_bytes = max_dict_bytes;
if (compression_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = max_dict_bytes;
}
options.target_file_size_base = 1 << 26; // 64MB
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
std::vector<std::string> write_ts_strs(kNumTimestamps);
std::vector<std::string> read_ts_strs(kNumTimestamps);
std::vector<Slice> write_ts_list;
std::vector<Slice> read_ts_list;
for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.emplace_back(
EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
read_ts_list.emplace_back(
EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
const Slice& write_ts = write_ts_list.back();
WriteOptions wopts;
wopts.timestamp = &write_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
ASSERT_OK(Put(
cf, "key" + std::to_string(j),
"value_" + std::to_string(j) + "_" + std::to_string(i), wopts));
}
if (!memtable_only) {
ASSERT_OK(Flush(cf));
const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
Timestamp(9, 0xffffffff)};
{
ReadOptions read_opts;
read_opts.total_order_seek = false;
read_opts.prefix_same_as_start = std::get<2>(GetParam());
fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(),
bbto.filter_policy ? bbto.filter_policy->Name() : "null",
static_cast<int>(read_opts.prefix_same_as_start));
for (size_t i = 0; i != read_ts_list.size(); ++i) {
Slice read_ts = read_ts_list[i];
read_opts.timestamp = &read_ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
// Seek to kMaxKey
iter->Seek(Key1(kMaxKey));
CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i),
write_ts_list[i]);
iter->Next();
ASSERT_FALSE(iter->Valid());
}
const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
kMinKey + 0x100, kMaxKey};
const SliceTransform* const pe = options.prefix_extractor.get();
ASSERT_NE(nullptr, pe);
const size_t kPrefixShift =
8 * (Key1(0).size() - pe->Transform(Key1(0)).size());
const uint64_t kPrefixMask =
~((static_cast<uint64_t>(1) << kPrefixShift) - 1);
const uint64_t kNumKeysWithinPrefix =
(static_cast<uint64_t>(1) << kPrefixShift);
for (size_t i = 0; i != read_ts_list.size(); ++i) {
Slice read_ts = read_ts_list[i];
read_opts.timestamp = &read_ts;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
for (size_t j = 0; j != targets.size(); ++j) {
std::string start_key = Key1(targets[j]);
uint64_t expected_ub =
(targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix;
uint64_t expected_key = targets[j];
size_t count = 0;
it->Seek(Key1(targets[j]));
while (it->Valid()) {
std::string saved_prev_key;
saved_prev_key.assign(it->key().data(), it->key().size());
// Out of prefix
if (!read_opts.prefix_same_as_start &&
pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
break;
}
CheckIterUserEntry(it.get(), Key1(expected_key),
"value" + std::to_string(i), write_ts_list[i]);
++count;
++expected_key;
it->Next();
}
ASSERT_EQ(expected_ub - targets[j] + 1, count);
}
const auto& verify_db_func = [&]() {
for (size_t i = 0; i != kNumTimestamps; ++i) {
ReadOptions ropts;
ropts.timestamp = &read_ts_list[i];
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
ColumnFamilyHandle* cfh = handles_[cf];
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps;
++j) {
std::string value;
ASSERT_OK(
db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
value);
}
}
}
};
verify_db_func();
}
}
Close();
}
INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
::testing::Bool());
// TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g.
// NoopTransform.
INSTANTIATE_TEST_CASE_P(
Timestamp, DBBasicTestWithTimestampPrefixSeek,
::testing::Combine(
::testing::Values(
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
std::shared_ptr<const FilterPolicy>(
NewBloomFilterPolicy(10 /*bits_per_key*/, false)),
std::shared_ptr<const FilterPolicy>(
NewBloomFilterPolicy(20 /*bits_per_key*/,
false))),
::testing::Bool()));
} // namespace ROCKSDB_NAMESPACE

@ -49,6 +49,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
read_callback_(read_callback),
sequence_(s),
statistics_(cf_options.statistics),
max_skip_(max_sequential_skip_in_iterations),
max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
num_internal_keys_skipped_(0),
iterate_lower_bound_(read_options.iterate_lower_bound),
iterate_upper_bound_(read_options.iterate_upper_bound),
@ -69,16 +71,17 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
range_del_agg_(&cf_options.internal_comparator, s),
db_impl_(db_impl),
cfd_(cfd),
start_seqnum_(read_options.iter_start_seqnum) {
start_seqnum_(read_options.iter_start_seqnum),
timestamp_ub_(read_options.timestamp),
timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
RecordTick(statistics_, NO_ITERATOR_CREATED);
max_skip_ = max_sequential_skip_in_iterations;
max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
if (pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
if (iter_.iter()) {
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
assert(timestamp_size_ == user_comparator_.timestamp_size());
}
Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
@ -220,9 +223,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
is_key_seqnum_zero_ = (ikey_.sequence == 0);
assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
/*b_has_ts=*/false) < 0);
if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
/*b_has_ts=*/false) >= 0) {
break;
}
@ -237,20 +244,25 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
return false;
}
if (IsVisible(ikey_.sequence)) {
assert(ikey_.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
}
if (IsVisible(ikey_.sequence, ts)) {
// If the previous entry is of seqnum 0, the current entry will not
// possibly be skipped. This condition can potentially be relaxed to
// prev_key.seq <= ikey_.sequence. We are cautious because it will be more
// prone to bugs causing the same user key with the same sequence number.
if (!is_prev_key_seqnum_zero && skipping_saved_key &&
user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
0) {
user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
assert(!skipping_saved_key ||
user_comparator_.Compare(ikey_.user_key,
saved_key_.GetUserKey()) > 0);
user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, saved_key_.GetUserKey()) > 0);
num_skipped = 0;
reseek_done = false;
switch (ikey_.type) {
@ -356,8 +368,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// This key was inserted after our snapshot was taken.
// If this happens too many times in a row for the same user key, we want
// to seek to the target sequence number.
int cmp =
user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
int cmp = user_comparator_.CompareWithoutTimestamp(
ikey_.user_key, saved_key_.GetUserKey());
if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
num_skipped++;
} else {
@ -388,8 +400,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// We're looking for the next user-key but all we see are the same
// user-key with decreasing sequence numbers. Fast forward to
// sequence number 0 and type deletion (the smallest type).
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
0, kTypeDeletion));
if (timestamp_size_ == 0) {
AppendInternalKey(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
} else {
std::string min_ts(timestamp_size_, static_cast<char>(0));
AppendInternalKeyWithDifferentTimestamp(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
min_ts);
}
// Don't set skipping_saved_key = false because we may still see more
// user-keys equal to saved_key_.
} else {
@ -398,9 +419,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// Note that this only covers a case when a higher key was overwritten
// many times since our snapshot was taken, not the case when a lot of
// different keys were inserted after our snapshot was taken.
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek));
if (timestamp_size_ == 0) {
AppendInternalKey(
&last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek));
} else {
AppendInternalKeyWithDifferentTimestamp(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek),
*timestamp_ub_);
}
}
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
@ -519,6 +548,13 @@ bool DBIter::MergeValuesNewToOld() {
}
void DBIter::Prev() {
if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}
assert(valid_);
assert(status_.ok());
@ -697,7 +733,13 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
if (!IsVisible(ikey.sequence) ||
assert(ikey.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
timestamp_size_);
}
if (!IsVisible(ikey.sequence, ts) ||
!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
@ -853,6 +895,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!ParseKey(&ikey)) {
return false;
}
assert(ikey.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
timestamp_size_);
}
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
// No visible values for this key, even though FindValueForCurrentKey()
// has seen some. This is possible if we're using a tailing iterator, and
@ -861,7 +910,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return true;
}
if (IsVisible(ikey.sequence)) {
if (IsVisible(ikey.sequence, ts)) {
break;
}
@ -1001,7 +1050,13 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
}
assert(ikey.sequence != kMaxSequenceNumber);
if (!IsVisible(ikey.sequence)) {
assert(ikey.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
timestamp_size_);
}
if (!IsVisible(ikey.sequence, ts)) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
@ -1046,10 +1101,18 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
return false;
}
bool DBIter::IsVisible(SequenceNumber sequence) {
bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) {
// Remember that comparator orders preceding timestamp as larger.
int cmp_ts = timestamp_ub_ != nullptr
? user_comparator_.CompareTimestamp(ts, *timestamp_ub_)
: 0;
if (cmp_ts > 0) {
return false;
}
if (read_callback_ == nullptr) {
return sequence <= sequence_;
} else {
// TODO(yanqin): support timestamp in read_callback_.
return read_callback_->IsVisible(sequence);
}
}
@ -1058,14 +1121,16 @@ void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
is_key_seqnum_zero_ = false;
SequenceNumber seq = sequence_;
saved_key_.Clear();
saved_key_.SetInternalKey(target, seq);
saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
if (iterate_lower_bound_ != nullptr &&
user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
0) {
user_comparator_.CompareWithoutTimestamp(
saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
/*b_has_ts=*/false) < 0) {
// Seek key is smaller than the lower bound.
saved_key_.Clear();
saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
timestamp_ub_);
}
}
@ -1155,6 +1220,13 @@ void DBIter::SeekForPrev(const Slice& target) {
}
#endif // ROCKSDB_LITE
if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}
status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
@ -1248,6 +1320,13 @@ void DBIter::SeekToFirst() {
}
void DBIter::SeekToLast() {
if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}
if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);

@ -146,7 +146,8 @@ class DBIter final : public Iterator {
if (start_seqnum_ > 0) {
return saved_key_.GetInternalKey();
} else {
return saved_key_.GetUserKey();
const Slice ukey_and_ts = saved_key_.GetUserKey();
return Slice(ukey_and_ts.data(), ukey_and_ts.size() - timestamp_size_);
}
}
Slice value() const override {
@ -169,6 +170,13 @@ class DBIter final : public Iterator {
return status_;
}
}
Slice timestamp() const override {
assert(valid_);
assert(timestamp_size_ > 0);
const Slice ukey_and_ts = saved_key_.GetUserKey();
assert(timestamp_size_ < ukey_and_ts.size());
return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_);
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
return is_blob_;
@ -178,6 +186,8 @@ class DBIter final : public Iterator {
void Next() final override;
void Prev() final override;
// 'target' does not contain timestamp, even if user timestamp feature is
// enabled.
void Seek(const Slice& target) final override;
void SeekForPrev(const Slice& target) final override;
void SeekToFirst() final override;
@ -221,7 +231,7 @@ class DBIter final : public Iterator {
// entry can be found within the prefix.
void PrevInternal(const Slice* prefix);
bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence);
bool IsVisible(SequenceNumber sequence, const Slice& ts);
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
@ -327,6 +337,8 @@ class DBIter final : public Iterator {
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
const Slice* const timestamp_ub_;
const size_t timestamp_size_;
};
// Return a new iterator that converts internal keys (yielded by

@ -75,6 +75,15 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
void AppendInternalKeyWithDifferentTimestamp(std::string* result,
const ParsedInternalKey& key,
const Slice& ts) {
assert(key.user_key.size() >= ts.size());
result->append(key.user_key.data(), key.user_key.size() - ts.size());
result->append(ts.data(), ts.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t) {
PutFixed64(result, PackSequenceAndType(s, t));

@ -104,6 +104,7 @@ struct ParsedInternalKey {
ParsedInternalKey()
: sequence(kMaxSequenceNumber) // Make code analyzer happy
{} // Intentionally left uninitialized (for speed)
// u contains timestamp if user timestamp feature is enabled.
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
std::string DebugString(bool hex = false) const;
@ -132,6 +133,12 @@ EntryType GetEntryType(ValueType value_type);
// Append the serialization of "key" to *result.
extern void AppendInternalKey(std::string* result,
const ParsedInternalKey& key);
// Append the serialization of "key" to *result, replacing the original
// timestamp with argument ts.
extern void AppendInternalKeyWithDifferentTimestamp(
std::string* result, const ParsedInternalKey& key, const Slice& ts);
// Serialized internal key consists of user key followed by footer.
// This function appends the footer to *result, assuming that *result already
// contains the user key at the end.
@ -192,7 +199,8 @@ class InternalKeyComparator
public:
explicit InternalKeyComparator(const Comparator* c)
: user_comparator_(c),
: Comparator(c->timestamp_size()),
user_comparator_(c),
name_("rocksdb.InternalKeyComparator:" +
std::string(user_comparator_.Name())) {}
virtual ~InternalKeyComparator() {}
@ -439,24 +447,31 @@ class IterKey {
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
ValueType value_type = kValueTypeForSeek,
const Slice* ts = nullptr) {
size_t psize = key_prefix.size();
size_t usize = user_key.size();
EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
size_t ts_sz = (ts != nullptr ? ts->size() : 0);
EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t) + ts_sz);
if (psize > 0) {
memcpy(buf_, key_prefix.data(), psize);
}
memcpy(buf_ + psize, user_key.data(), usize);
EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type));
if (ts) {
memcpy(buf_ + psize + usize, ts->data(), ts_sz);
}
EncodeFixed64(buf_ + usize + psize + ts_sz,
PackSequenceAndType(s, value_type));
key_ = buf_;
key_size_ = psize + usize + sizeof(uint64_t);
key_size_ = psize + usize + sizeof(uint64_t) + ts_sz;
is_user_key_ = false;
}
void SetInternalKey(const Slice& user_key, SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
SetInternalKey(Slice(), user_key, s, value_type);
ValueType value_type = kValueTypeForSeek,
const Slice* ts = nullptr) {
SetInternalKey(Slice(), user_key, s, value_type, ts);
}
void Reserve(size_t size) {

@ -955,8 +955,8 @@ class LevelIterator final : public InternalIterator {
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
user_comparator_.CompareWithoutTimestamp(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) >= 0;
ExtractUserKey(internal_key), /*a_has_ts=*/true,
*read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
}
InternalIterator* NewFileIterator() {

@ -42,6 +42,9 @@ class Comparator {
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
// Note that Compare(a, b) also compares timestamp if timestamp size is
// non-zero. For the same user key with different timestamps, larger (newer)
// timestamp comes first.
virtual int Compare(const Slice& a, const Slice& b) const = 0;
// Compares two slices for equality. The following invariant should always
@ -97,15 +100,27 @@ class Comparator {
inline size_t timestamp_size() const { return timestamp_size_; }
virtual int CompareWithoutTimestamp(const Slice& a, const Slice& b) const {
return Compare(a, b);
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const {
return CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true);
}
// For two events e1 and e2 whose timestamps are t1 and t2 respectively,
// Returns value:
// < 0 iff t1 < t2
// == 0 iff t1 == t2
// > 0 iff t1 > t2
// Note that an all-zero byte array will be the smallest (oldest) timestamp
// of the same length.
virtual int CompareTimestamp(const Slice& /*ts1*/,
const Slice& /*ts2*/) const {
return 0;
}
virtual int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/,
const Slice& b, bool /*b_has_ts*/) const {
return Compare(a, b);
}
private:
size_t timestamp_size_;
};

@ -45,6 +45,7 @@ class Iterator : public Cleanable {
// Position at the last key in the source. The iterator is
// Valid() after this call iff the source is not empty.
// Currently incompatible with user timestamp.
virtual void SeekToLast() = 0;
// Position at the first key in the source that at or past target.
@ -53,11 +54,13 @@ class Iterator : public Cleanable {
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
// Target does not contain timestamp.
virtual void Seek(const Slice& target) = 0;
// Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target.
// Currently incompatible with user timestamp.
virtual void SeekForPrev(const Slice& target) = 0;
// Moves to the next entry in the source. After this call, Valid() is
@ -67,6 +70,7 @@ class Iterator : public Cleanable {
// Moves to the previous entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the first entry in source.
// Currently incompatible with user timestamp.
// REQUIRES: Valid()
virtual void Prev() = 0;
@ -108,6 +112,11 @@ class Iterator : public Cleanable {
// Get the user-key portion of the internal key at which the iteration
// stopped.
virtual Status GetProperty(std::string prop_name, std::string* prop);
virtual Slice timestamp() const {
assert(false);
return Slice();
}
};
// Return an empty iterator (yields nothing).

@ -3124,8 +3124,9 @@ void BlockBasedTableIterator<TBlockIter, TValue>::FindBlockForward() {
read_options_.iterate_upper_bound != nullptr &&
block_iter_points_to_real_block_ && !data_block_within_upper_bound_;
assert(!next_block_is_out_of_bound ||
user_comparator_.Compare(*read_options_.iterate_upper_bound,
index_iter_->user_key()) <= 0);
user_comparator_.CompareWithoutTimestamp(
*read_options_.iterate_upper_bound, /*a_has_ts=*/false,
index_iter_->user_key(), /*b_has_ts=*/true) <= 0);
ResetDataIter();
index_iter_->Next();
if (next_block_is_out_of_bound) {
@ -3184,8 +3185,10 @@ void BlockBasedTableIterator<TBlockIter, TValue>::FindKeyBackward() {
template <class TBlockIter, typename TValue>
void BlockBasedTableIterator<TBlockIter, TValue>::CheckOutOfBound() {
if (read_options_.iterate_upper_bound != nullptr && Valid()) {
is_out_of_bound_ = user_comparator_.Compare(
*read_options_.iterate_upper_bound, user_key()) <= 0;
is_out_of_bound_ =
user_comparator_.CompareWithoutTimestamp(
*read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(),
/*b_has_ts=*/true) <= 0;
}
}
@ -3195,8 +3198,10 @@ void BlockBasedTableIterator<TBlockIter,
if (read_options_.iterate_upper_bound != nullptr &&
block_iter_points_to_real_block_) {
data_block_within_upper_bound_ =
(user_comparator_.Compare(*read_options_.iterate_upper_bound,
index_iter_->user_key()) > 0);
(user_comparator_.CompareWithoutTimestamp(
*read_options_.iterate_upper_bound, /*a_has_ts=*/false,
index_iter_->user_key(),
/*b_has_ts=*/true) > 0);
}
}

@ -52,6 +52,7 @@ class InternalIteratorBase : public Cleanable {
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
// 'target' contains user timestamp if timestamp is enabled.
virtual void Seek(const Slice& target) = 0;
// Position at the first key in the source that at or before target

@ -1749,7 +1749,7 @@ void DBDumperCommand::DoDumpCommand() {
break;
if (is_db_ttl_) {
TtlIterator* it_ttl = static_cast_with_check<TtlIterator, Iterator>(iter);
rawtime = it_ttl->timestamp();
rawtime = it_ttl->ttl_timestamp();
if (rawtime < ttl_start || rawtime >= ttl_end) {
continue;
}
@ -2578,7 +2578,7 @@ void ScanCommand::DoCommand() {
it->Next()) {
if (is_db_ttl_) {
TtlIterator* it_ttl = static_cast_with_check<TtlIterator, Iterator>(it);
int rawtime = it_ttl->timestamp();
int rawtime = it_ttl->ttl_timestamp();
if (rawtime < ttl_start || rawtime >= ttl_end) {
continue;
}

@ -125,7 +125,9 @@ class BytewiseComparatorImpl : public Comparator {
return false;
}
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b,
bool /*b_has_ts*/) const override {
return a.compare(b);
}
};
@ -197,7 +199,9 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl {
return false;
}
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b,
bool /*b_has_ts*/) const override {
return -a.compare(b);
}
};

@ -18,8 +18,8 @@ namespace ROCKSDB_NAMESPACE {
class UserComparatorWrapper final : public Comparator {
public:
explicit UserComparatorWrapper(const Comparator* const user_cmp)
: user_comparator_(user_cmp) {}
: Comparator(user_cmp->timestamp_size()), user_comparator_(user_cmp) {}
~UserComparatorWrapper() = default;
const Comparator* user_comparator() const { return user_comparator_; }
@ -58,6 +58,17 @@ class UserComparatorWrapper final : public Comparator {
return user_comparator_->CanKeysWithDifferentByteContentsBeEqual();
}
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
return user_comparator_->CompareTimestamp(ts1, ts2);
}
using Comparator::CompareWithoutTimestamp;
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
bool b_has_ts) const override {
PERF_COUNTER_ADD(user_key_comparison_count, 1);
return user_comparator_->CompareWithoutTimestamp(a, a_has_ts, b, b_has_ts);
}
private:
const Comparator* user_comparator_;
};

@ -129,7 +129,7 @@ class TtlIterator : public Iterator {
Slice key() const override { return iter_->key(); }
int32_t timestamp() const {
int32_t ttl_timestamp() const {
return DecodeFixed32(iter_->value().data() + iter_->value().size() -
DBWithTTLImpl::kTSLength);
}

Loading…
Cancel
Save