Add timestamp support to secondary instance (#10061)

Summary:
This PR adds timestamp support to the secondary DB instance.

With this, these timestamp related APIs are supported:

ReadOptions.timestamp : read should return the latest data visible to this specified timestamp
Iterator::timestamp() : returns the timestamp associated with the key, value
DB:Get(..., std::string* timestamp) : returns the timestamp associated with the key, value in timestamp

Test plan (on devserver):
```
$COMPILE_WITH_ASAN=1 make -j24 all
$./db_secondary_test --gtest_filter=DBSecondaryTestWithTimestamp*
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10061

Reviewed By: riversand963

Differential Revision: D36722915

Pulled By: jowlyzhang

fbshipit-source-id: 644ada39e4e51164a759593478c38285e0c1a666
main
Yu Zhang 2 years ago committed by Facebook GitHub Bot
parent f6e45382e9
commit 8c4ea7b851
  1. 88
      db/db_impl/db_impl_secondary.cc
  2. 7
      db/db_impl/db_impl_secondary.h
  3. 424
      db/db_secondary_test.cc

@ -329,23 +329,37 @@ Status DBImplSecondary::RecoverLogFiles(
Status DBImplSecondary::Get(const ReadOptions& read_options, Status DBImplSecondary::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) { PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value); return GetImpl(read_options, column_family, key, value,
/*timestamp*/ nullptr);
}
Status DBImplSecondary::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
return GetImpl(read_options, column_family, key, value, timestamp);
} }
Status DBImplSecondary::GetImpl(const ReadOptions& read_options, Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) { const Slice& key, PinnableSlice* pinnable_val,
std::string* timestamp) {
assert(pinnable_val != nullptr); assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET); StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time); PERF_TIMER_GUARD(get_snapshot_time);
assert(column_family); assert(column_family);
const Comparator* ucmp = column_family->GetComparator(); if (read_options.timestamp) {
assert(ucmp); const Status s =
if (ucmp->timestamp_size() || read_options.timestamp) { FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
// TODO: support timestamp if (!s.ok()) {
return Status::NotSupported(); return s;
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
} }
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
@ -359,23 +373,27 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
// Acquire SuperVersion // Acquire SuperVersion
SuperVersion* super_version = GetAndRefSuperVersion(cfd); SuperVersion* super_version = GetAndRefSuperVersion(cfd);
SequenceNumber snapshot = versions_->LastSequence(); SequenceNumber snapshot = versions_->LastSequence();
GetWithTimestampReadCallback read_cb(snapshot);
MergeContext merge_context; MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0; SequenceNumber max_covering_tombstone_seq = 0;
Status s; Status s;
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time); PERF_TIMER_STOP(get_snapshot_time);
bool done = false; bool done = false;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), const Comparator* ucmp = column_family->GetComparator();
/*timestamp=*/nullptr, &s, &merge_context, assert(ucmp);
&max_covering_tombstone_seq, read_options)) { std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, &read_cb)) {
done = true; done = true;
pinnable_val->PinSelf(); pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) && } else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get( super_version->imm->Get(
lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s, lkey, pinnable_val->GetSelf(), ts, &s, &merge_context,
&merge_context, &max_covering_tombstone_seq, read_options)) { &max_covering_tombstone_seq, read_options, &read_cb)) {
done = true; done = true;
pinnable_val->PinSelf(); pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
@ -387,9 +405,11 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
if (!done) { if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr; PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(read_options, lkey, pinnable_val, super_version->current->Get(
/*timestamp=*/nullptr, &s, &merge_context, read_options, lkey, pinnable_val, ts, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr); &max_covering_tombstone_seq, &pinned_iters_mgr, /*value_found*/ nullptr,
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
/*do_merge*/ true);
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
{ {
@ -416,11 +436,17 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
} }
assert(column_family); assert(column_family);
const Comparator* ucmp = column_family->GetComparator(); if (read_options.timestamp) {
assert(ucmp); const Status s =
if (ucmp->timestamp_size() || read_options.timestamp) { FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
// TODO: support timestamp if (!s.ok()) {
return NewErrorIterator(Status::NotSupported()); return NewErrorIterator(s);
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return NewErrorIterator(s);
}
} }
Iterator* result = nullptr; Iterator* result = nullptr;
@ -479,17 +505,21 @@ Status DBImplSecondary::NewIterators(
if (iterators == nullptr) { if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr"); return Status::InvalidArgument("iterators not allowed to be nullptr");
} }
if (read_options.timestamp) { if (read_options.timestamp) {
// TODO: support timestamp for (auto* cf : column_families) {
return Status::NotSupported(); assert(cf);
const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else { } else {
for (auto* cf : column_families) { for (auto* cf : column_families) {
assert(cf); assert(cf);
const Comparator* ucmp = cf->GetComparator(); const Status s = FailIfCfHasTs(cf);
assert(ucmp); if (!s.ok()) {
if (ucmp->timestamp_size()) { return s;
// TODO: support timestamp
return Status::NotSupported();
} }
} }
} }
@ -502,7 +532,7 @@ Status DBImplSecondary::NewIterators(
// TODO (yanqin) support snapshot. // TODO (yanqin) support snapshot.
return Status::NotSupported("snapshot not supported in secondary mode"); return Status::NotSupported("snapshot not supported in secondary mode");
} else { } else {
SequenceNumber read_seq = versions_->LastSequence(); SequenceNumber read_seq(kMaxSequenceNumber);
for (auto cfh : column_families) { for (auto cfh : column_families) {
ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
iterators->push_back( iterators->push_back(

@ -89,8 +89,13 @@ class DBImplSecondary : public DBImpl {
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override; const Slice& key, PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value); const Slice& key, PinnableSlice* value,
std::string* timestamp);
using DBImpl::NewIterator; using DBImpl::NewIterator;
Iterator* NewIterator(const ReadOptions&, Iterator* NewIterator(const ReadOptions&,

@ -9,6 +9,7 @@
#include "db/db_impl/db_impl_secondary.h" #include "db/db_impl/db_impl_secondary.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/db_with_timestamp_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
@ -18,10 +19,10 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
class DBSecondaryTest : public DBTestBase { class DBSecondaryTestBase : public DBBasicTestWithTimestampBase {
public: public:
DBSecondaryTest() explicit DBSecondaryTestBase(const std::string& dbname)
: DBTestBase("db_secondary_test", /*env_do_fsync=*/true), : DBBasicTestWithTimestampBase(dbname),
secondary_path_(), secondary_path_(),
handles_secondary_(), handles_secondary_(),
db_secondary_(nullptr) { db_secondary_(nullptr) {
@ -29,7 +30,7 @@ class DBSecondaryTest : public DBTestBase {
test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
} }
~DBSecondaryTest() override { ~DBSecondaryTestBase() override {
CloseSecondary(); CloseSecondary();
if (getenv("KEEP_DB") != nullptr) { if (getenv("KEEP_DB") != nullptr) {
fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
@ -73,17 +74,17 @@ class DBSecondaryTest : public DBTestBase {
DB* db_secondary_; DB* db_secondary_;
}; };
void DBSecondaryTest::OpenSecondary(const Options& options) { void DBSecondaryTestBase::OpenSecondary(const Options& options) {
ASSERT_OK(TryOpenSecondary(options)); ASSERT_OK(TryOpenSecondary(options));
} }
Status DBSecondaryTest::TryOpenSecondary(const Options& options) { Status DBSecondaryTestBase::TryOpenSecondary(const Options& options) {
Status s = Status s =
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
return s; return s;
} }
void DBSecondaryTest::OpenSecondaryWithColumnFamilies( void DBSecondaryTestBase::OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options) { const std::vector<std::string>& column_families, const Options& options) {
std::vector<ColumnFamilyDescriptor> cf_descs; std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options); cf_descs.emplace_back(kDefaultColumnFamilyName, options);
@ -95,9 +96,10 @@ void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
ASSERT_OK(s); ASSERT_OK(s);
} }
void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir, void DBSecondaryTestBase::CheckFileTypeCounts(const std::string& dir,
int expected_log, int expected_sst, int expected_log,
int expected_manifest) const { int expected_sst,
int expected_manifest) const {
std::vector<std::string> filenames; std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dir, &filenames)); ASSERT_OK(env_->GetChildren(dir, &filenames));
@ -116,6 +118,11 @@ void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
ASSERT_EQ(expected_manifest, manifest_cnt); ASSERT_EQ(expected_manifest, manifest_cnt);
} }
class DBSecondaryTest : public DBSecondaryTestBase {
public:
explicit DBSecondaryTest() : DBSecondaryTestBase("db_secondary_test") {}
};
TEST_F(DBSecondaryTest, NonExistingDb) { TEST_F(DBSecondaryTest, NonExistingDb) {
Destroy(last_options_); Destroy(last_options_);
@ -1254,6 +1261,403 @@ TEST_F(DBSecondaryTest, OpenWithTransactionDB) {
ASSERT_OK(TryOpenSecondary(options)); ASSERT_OK(TryOpenSecondary(options));
} }
class DBSecondaryTestWithTimestamp : public DBSecondaryTestBase {
public:
explicit DBSecondaryTestWithTimestamp()
: DBSecondaryTestBase("db_secondary_test_with_timestamp") {}
};
TEST_F(DBSecondaryTestWithTimestamp, IteratorAndGetReadTimestampSizeMismatch) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
std::string different_size_read_timestamp;
PutFixed32(&different_size_read_timestamp, 2);
Slice different_size_read_ts = different_size_read_timestamp;
read_opts.timestamp = &different_size_read_ts;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
std::string timestamp;
ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp,
IteratorAndGetReadTimestampSpecifiedWithoutWriteTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
const std::string read_timestamp = Timestamp(2, 0);
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
std::string timestamp;
ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp,
IteratorAndGetWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
ASSERT_TRUE(
db_->Get(read_opts, Key1(key), &value_from_get).IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp, IteratorAndGet) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
auto get_value_and_check = [](DB* db, ReadOptions read_opts, Slice key,
Slice expected_value, std::string expected_ts) {
std::string value_from_get;
std::string timestamp;
ASSERT_OK(db->Get(read_opts, key.ToString(), &value_from_get, &timestamp));
ASSERT_EQ(expected_value, value_from_get);
ASSERT_EQ(expected_ts, timestamp);
};
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
int count = 0;
uint64_t key = 0;
// Forward iterate.
for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
it->Next(), ++count, ++key) {
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
get_value_and_check(db_, read_opts, it->key(), it->value(),
write_timestamps[i]);
}
size_t expected_count = kMaxKey - start_keys[i] + 1;
ASSERT_EQ(expected_count, count);
// Backward iterate.
count = 0;
for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
it->Prev(), ++count, --key) {
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
get_value_and_check(db_, read_opts, it->key(), it->value(),
write_timestamps[i]);
}
ASSERT_EQ(static_cast<size_t>(kMaxKey) - start_keys[i] + 1, count);
// SeekToFirst()/SeekToLast() with lower/upper bounds.
// Then iter with lower and upper bounds.
uint64_t l = 0;
uint64_t r = kMaxKey + 1;
while (l < r) {
std::string lb_str = Key1(l);
Slice lb = lb_str;
std::string ub_str = Key1(r);
Slice ub = ub_str;
read_opts.iterate_lower_bound = &lb;
read_opts.iterate_upper_bound = &ub;
it.reset(db_->NewIterator(read_opts));
for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
it->Valid(); it->Next(), ++key, ++count) {
CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
get_value_and_check(db_, read_opts, it->key(), it->value(),
write_timestamps[i]);
}
ASSERT_EQ(r - std::max(l, start_keys[i]), count);
for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0;
it->Valid(); it->Prev(), --key, ++count) {
CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue,
"value" + std::to_string(i), write_timestamps[i]);
get_value_and_check(db_, read_opts, it->key(), it->value(),
write_timestamps[i]);
}
l += (kMaxKey / 100);
r -= (kMaxKey / 100);
}
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp, IteratorsReadTimestampSizeMismatch) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
std::string different_size_read_timestamp;
PutFixed32(&different_size_read_timestamp, 2);
Slice different_size_read_ts = different_size_read_timestamp;
read_opts.timestamp = &different_size_read_ts;
{
std::vector<Iterator*> iters;
ASSERT_TRUE(
db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp,
IteratorsReadTimestampSpecifiedWithoutWriteTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
const std::string read_timestamp = Timestamp(2, 0);
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
{
std::vector<Iterator*> iters;
ASSERT_TRUE(
db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp,
IteratorsWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
{
std::vector<Iterator*> iters;
ASSERT_TRUE(
db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBSecondaryTestWithTimestamp, Iterators) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
const std::string read_timestamp = Timestamp(2, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database as secondary instance to test its timestamp support.
Close();
options.max_open_files = -1;
ASSERT_OK(ReopenAsSecondary(options));
ReadOptions read_opts;
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters));
ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
int count = 0;
uint64_t key = 0;
// Forward iterate.
for (iters[0]->Seek(Key1(0)), key = 0; iters[0]->Valid();
iters[0]->Next(), ++count, ++key) {
CheckIterUserEntry(iters[0], Key1(key), kTypeValue,
"value" + std::to_string(key), write_timestamp);
}
size_t expected_count = kMaxKey - 0 + 1;
ASSERT_EQ(expected_count, count);
delete iters[0];
Close();
}
#endif //! ROCKSDB_LITE #endif //! ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save