Add timestamp support to CompactedDBImpl (#10030)

Summary:
This PR is the second and last part for adding user defined timestamp support to read only DB. Specifically, the change in this PR includes:

- `options.timestamp` respected by `CompactedDBImpl::Get` and `CompactedDBImpl::MultiGet` to return results visible up till that timestamp.
- `CompactedDBImpl::Get(...,std::string* timestsamp)` and `CompactedDBImpl::MultiGet(std::vector<std::string>* timestamps)` return the timestamp(s) associated with the key(s).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10030

Test Plan:
```
$COMPILE_WITH_ASAN=1 make -j24 all
$./db_readonly_with_timestamp_test --gtest_filter="DBReadOnlyTestWithTimestamp.CompactedDB*"
$./db_basic_test --gtest_filter="DBBasicTest.CompactedDB*"
$make all check
```

Reviewed By: riversand963

Differential Revision: D36613926

Pulled By: jowlyzhang

fbshipit-source-id: 5b7ed7fef822708c12e2caf7a8d2deb6a696f0f0
main
Yu Zhang 3 years ago committed by Facebook GitHub Bot
parent 8515bd50c9
commit d4081bf0be
  1. 1
      HISTORY.md
  2. 102
      db/db_impl/compacted_db_impl.cc
  3. 19
      db/db_impl/compacted_db_impl.h
  4. 629
      db/db_readonly_with_timestamp_test.cc

@ -33,6 +33,7 @@
* Add an option, `CompressionOptions::use_zstd_dict_trainer`, to indicate whether zstd dictionary trainer should be used for generating zstd compression dictionaries. The default value of this option is true for backward compatibility. When this option is set to false, zstd API `ZDICT_finalizeDictionary` is used to generate compression dictionaries. * Add an option, `CompressionOptions::use_zstd_dict_trainer`, to indicate whether zstd dictionary trainer should be used for generating zstd compression dictionaries. The default value of this option is true for backward compatibility. When this option is set to false, zstd API `ZDICT_finalizeDictionary` is used to generate compression dictionaries.
* Seek API which positions itself every LevelIterator on the correct data block in the correct SST file which can be parallelized if ReadOptions.async_io option is enabled. * Seek API which positions itself every LevelIterator on the correct data block in the correct SST file which can be parallelized if ReadOptions.async_io option is enabled.
* Add new stat number_async_seek in PerfContext that indicates number of async calls made by seek to prefetch data. * Add new stat number_async_seek in PerfContext that indicates number of async calls made by seek to prefetch data.
* Add support for user-defined timestamps to read only DB.
### Bug Fixes ### Bug Fixes
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.

@ -40,17 +40,45 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) { const Slice& key, PinnableSlice* value) {
return Get(options, /*column_family*/ nullptr, key, value,
/*timestamp*/ nullptr);
}
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value,
std::string* timestamp) {
assert(user_comparator_); assert(user_comparator_);
if (options.timestamp || user_comparator_->timestamp_size()) { if (options.timestamp) {
// TODO: support timestamp const Status s =
return Status::NotSupported(); FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
if (!s.ok()) {
return s;
}
} else {
const Status s = FailIfCfHasTs(DefaultColumnFamily());
if (!s.ok()) {
return s;
} }
}
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
std::string* ts =
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr, GetContext::kNotFound, lkey.user_key(), value, ts,
nullptr, true, nullptr, nullptr); nullptr, nullptr, true, nullptr, nullptr, nullptr,
LookupKey lkey(key, kMaxSequenceNumber); nullptr, &read_cb);
Status s = files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context, nullptr); const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
key, /*a_has_ts=*/false,
ExtractUserKeyAndStripTimestamp(f.smallest_key,
user_comparator_->timestamp_size()),
/*b_has_ts=*/false) < 0) {
return Status::NotFound();
}
Status s = f.fd.table_reader->Get(options, lkey.internal_key(), &get_context,
nullptr);
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {
return s; return s;
} }
@ -60,37 +88,65 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
return Status::NotFound(); return Status::NotFound();
} }
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options, std::vector<Status> CompactedDBImpl::MultiGet(
const std::vector<ColumnFamilyHandle*>&, const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) { const std::vector<Slice>& keys, std::vector<std::string>* values) {
return MultiGet(options, keys, values, /*timestamps*/ nullptr);
}
std::vector<Status> CompactedDBImpl::MultiGet(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
assert(user_comparator_); assert(user_comparator_);
if (user_comparator_->timestamp_size() || options.timestamp) { size_t num_keys = keys.size();
// TODO: support timestamp
return std::vector<Status>(keys.size(), Status::NotSupported()); if (options.timestamp) {
Status s =
FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
}
} else {
Status s = FailIfCfHasTs(DefaultColumnFamily());
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
}
} }
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
autovector<TableReader*, 16> reader_list; autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) { for (const auto& key : keys) {
const FdWithKeyRange& f = files_.files[FindFile(key)]; LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) { const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
key, /*a_has_ts=*/false,
ExtractUserKeyAndStripTimestamp(f.smallest_key,
user_comparator_->timestamp_size()),
/*b_has_ts=*/false) < 0) {
reader_list.push_back(nullptr); reader_list.push_back(nullptr);
} else { } else {
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Prepare(lkey.internal_key()); f.fd.table_reader->Prepare(lkey.internal_key());
reader_list.push_back(f.fd.table_reader); reader_list.push_back(f.fd.table_reader);
} }
} }
std::vector<Status> statuses(num_keys, Status::NotFound());
std::vector<Status> statuses(keys.size(), Status::NotFound()); values->resize(num_keys);
values->resize(keys.size()); if (timestamps) {
timestamps->resize(num_keys);
}
int idx = 0; int idx = 0;
for (auto* r : reader_list) { for (auto* r : reader_list) {
if (r != nullptr) { if (r != nullptr) {
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
std::string& value = (*values)[idx]; std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, LookupKey lkey(keys[idx], kMaxSequenceNumber, options.timestamp);
GetContext::kNotFound, keys[idx], &pinnable_val, std::string* timestamp = timestamps ? &(*timestamps)[idx] : nullptr;
nullptr, nullptr, nullptr, true, nullptr, nullptr); GetContext get_context(
LookupKey lkey(keys[idx], kMaxSequenceNumber); user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
lkey.user_key(), &pinnable_val,
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr); Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
assert(static_cast<size_t>(idx) < statuses.size()); assert(static_cast<size_t>(idx) < statuses.size());
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {

@ -28,12 +28,25 @@ class CompactedDBImpl : public DBImpl {
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override; PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;
using DB::MultiGet; using DB::MultiGet;
// Note that CompactedDBImpl::MultiGet is not the optimized version of
// MultiGet to use.
// TODO: optimize CompactedDBImpl::MultiGet, see DBImpl::MultiGet for details.
virtual std::vector<Status> MultiGet( virtual std::vector<Status> MultiGet(
const ReadOptions& options, const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) const std::vector<Slice>& keys,
override; std::vector<std::string>* values,
std::vector<std::string>* timestamps) override;
using DBImpl::Put; using DBImpl::Put;
virtual Status Put(const WriteOptions& /*options*/, virtual Status Put(const WriteOptions& /*options*/,

@ -15,6 +15,55 @@ class DBReadOnlyTestWithTimestamp : public DBBasicTestWithTimestampBase {
public: public:
DBReadOnlyTestWithTimestamp() DBReadOnlyTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_readonly_test_with_timestamp") {} : DBBasicTestWithTimestampBase("db_readonly_test_with_timestamp") {}
protected:
#ifndef ROCKSDB_LITE
void CheckDBOpenedAsCompactedDBWithOneLevel0File() {
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
// Only 1 L0 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// L0 is the max level.
ASSERT_EQ(storage_info->num_non_empty_levels(), 1);
}
void CheckDBOpenedAsCompactedDBWithOnlyHighestNonEmptyLevelFiles() {
VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
// L0 has no files.
ASSERT_EQ(0, NumTableFilesAtLevel(0));
// All other levels have no files except the highest level with files.
for (int i = 1; i < storage_info->num_non_empty_levels() - 1; ++i) {
ASSERT_FALSE(storage_info->LevelFilesBrief(i).num_files > 0);
}
// The highest level with files have some files.
int highest_non_empty_level = storage_info->num_non_empty_levels() - 1;
ASSERT_TRUE(
storage_info->LevelFilesBrief(highest_non_empty_level).num_files > 0);
}
#endif // !ROCKSDB_LITE
}; };
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -101,6 +150,46 @@ TEST_F(DBReadOnlyTestWithTimestamp,
Close(); Close();
} }
TEST_F(DBReadOnlyTestWithTimestamp,
IteratorAndGetWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database in read only mode to test its timestamp support.
Close();
ASSERT_OK(ReadOnlyReopen(options));
ReadOptions read_opts;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
ASSERT_TRUE(
db_->Get(read_opts, Key1(key), &value_from_get).IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp, IteratorAndGet) { TEST_F(DBReadOnlyTestWithTimestamp, IteratorAndGet) {
const int kNumKeysPerFile = 128; const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024; const uint64_t kMaxKey = 1024;
@ -320,6 +409,546 @@ TEST_F(DBReadOnlyTestWithTimestamp,
Close(); Close();
} }
TEST_F(DBReadOnlyTestWithTimestamp,
IteratorsWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(key));
ASSERT_OK(s);
}
// Reopen the database in read only mode to test its timestamp support.
Close();
ASSERT_OK(ReadOnlyReopen(options));
ReadOptions read_opts;
{
std::vector<Iterator*> iters;
ASSERT_TRUE(
db_->NewIterators(read_opts, {db_->DefaultColumnFamily()}, &iters)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp, CompactedDBGetReadTimestampSizeMismatch) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
std::string different_size_read_timestamp;
PutFixed32(&different_size_read_timestamp, 2);
Slice different_size_read_ts = different_size_read_timestamp;
read_opts.timestamp = &different_size_read_ts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
std::string timestamp;
ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBGetReadTimestampSpecifiedWithoutWriteTimestamp) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
const std::string read_timestamp = Timestamp(2, 0);
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
std::string timestamp;
ASSERT_TRUE(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp)
.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBGetWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
std::string value_from_get;
ASSERT_TRUE(
db_->Get(read_opts, Key1(key), &value_from_get).IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp, CompactedDBGetWithOnlyOneL0File) {
const int kNumKeysPerFile = 1026 * 2;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
int count = 0;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key, ++count) {
std::string value_from_get;
std::string timestamp;
ASSERT_OK(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp));
ASSERT_EQ("value" + std::to_string(i), value_from_get);
ASSERT_EQ(write_timestamps[i], timestamp);
}
size_t expected_count = kMaxKey - start_keys[i] + 1;
ASSERT_EQ(expected_count, count);
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBGetWithOnlyHighestNonEmptyLevelFiles) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOnlyHighestNonEmptyLevelFiles();
for (size_t i = 0; i < read_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
int count = 0;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key, ++count) {
std::string value_from_get;
std::string timestamp;
ASSERT_OK(db_->Get(read_opts, Key1(key), &value_from_get, &timestamp));
ASSERT_EQ("value" + std::to_string(i), value_from_get);
ASSERT_EQ(write_timestamps[i], timestamp);
}
size_t expected_count = kMaxKey - start_keys[i] + 1;
ASSERT_EQ(expected_count, count);
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBMultiGetReadTimestampSizeMismatch) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
std::string different_size_read_timestamp;
PutFixed32(&different_size_read_timestamp, 2);
Slice different_size_read_ts = different_size_read_timestamp;
read_opts.timestamp = &different_size_read_ts;
std::vector<std::string> key_strs;
std::vector<Slice> keys;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
key_strs.push_back(Key1(key));
}
for (const auto& key_str : key_strs) {
keys.emplace_back(key_str);
}
std::vector<std::string> values;
std::vector<std::string> timestamps;
std::vector<Status> status_list =
db_->MultiGet(read_opts, keys, &values, &timestamps);
for (const auto& status : status_list) {
ASSERT_TRUE(status.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBMultiGetReadTimestampSpecifiedWithoutWriteTimestamp) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
std::string read_timestamp = Timestamp(2, 0);
Slice read_ts = read_timestamp;
read_opts.timestamp = &read_ts;
std::vector<std::string> key_strs;
std::vector<Slice> keys;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
key_strs.push_back(Key1(key));
}
for (const auto& key_str : key_strs) {
keys.emplace_back(key_str);
}
std::vector<std::string> values;
std::vector<std::string> timestamps;
std::vector<Status> status_list =
db_->MultiGet(read_opts, keys, &values, &timestamps);
for (const auto& status : status_list) {
ASSERT_TRUE(status.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBMultiGetWriteWithTimestampReadWithoutTimestamp) {
const int kNumKeysPerFile = 1026;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
std::string write_timestamp = Timestamp(1, 0);
WriteOptions write_opts;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamp,
"value" + std::to_string(0));
ASSERT_OK(s);
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
ReadOptions read_opts;
std::vector<std::string> key_strs;
std::vector<Slice> keys;
for (uint64_t key = 0; key <= kMaxKey; ++key) {
key_strs.push_back(Key1(key));
}
for (const auto& key_str : key_strs) {
keys.emplace_back(key_str);
}
std::vector<std::string> values;
std::vector<Status> status_list = db_->MultiGet(read_opts, keys, &values);
for (const auto& status : status_list) {
ASSERT_TRUE(status.IsInvalidArgument());
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp, CompactedDBMultiGetWithOnlyOneL0File) {
const int kNumKeysPerFile = 1026 * 2;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOneLevel0File();
for (size_t i = 0; i < write_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
std::vector<std::string> key_strs;
std::vector<Slice> keys;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
key_strs.push_back(Key1(key));
}
for (const auto& key_str : key_strs) {
keys.emplace_back(key_str);
}
size_t batch_size = kMaxKey - start_keys[i] + 1;
std::vector<std::string> values;
std::vector<std::string> timestamps;
std::vector<Status> status_list =
db_->MultiGet(read_opts, keys, &values, &timestamps);
ASSERT_EQ(batch_size, values.size());
ASSERT_EQ(batch_size, timestamps.size());
for (uint64_t idx = 0; idx < values.size(); ++idx) {
ASSERT_EQ("value" + std::to_string(i), values[idx]);
ASSERT_EQ(write_timestamps[i], timestamps[idx]);
ASSERT_OK(status_list[idx]);
}
}
Close();
}
TEST_F(DBReadOnlyTestWithTimestamp,
CompactedDBMultiGetWithOnlyHighestNonEmptyLevelFiles) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 1024;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
const std::vector<uint64_t> start_keys = {1, 0};
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
Timestamp(3, 0)};
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
Timestamp(4, 0)};
for (size_t i = 0; i < write_timestamps.size(); ++i) {
WriteOptions write_opts;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
"value" + std::to_string(i));
ASSERT_OK(s);
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Close();
// Reopen the database in read only mode as a Compacted DB to test its
// timestamp support.
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(options));
CheckDBOpenedAsCompactedDBWithOnlyHighestNonEmptyLevelFiles();
for (size_t i = 0; i < write_timestamps.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_timestamps[i];
read_opts.timestamp = &read_ts;
std::vector<std::string> key_strs;
std::vector<Slice> keys;
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
key_strs.push_back(Key1(key));
}
for (const auto& key_str : key_strs) {
keys.emplace_back(key_str);
}
size_t batch_size = kMaxKey - start_keys[i] + 1;
std::vector<std::string> values;
std::vector<std::string> timestamps;
std::vector<Status> status_list =
db_->MultiGet(read_opts, keys, &values, &timestamps);
ASSERT_EQ(batch_size, values.size());
ASSERT_EQ(batch_size, timestamps.size());
for (uint64_t idx = 0; idx < values.size(); ++idx) {
ASSERT_EQ("value" + std::to_string(i), values[idx]);
ASSERT_EQ(write_timestamps[i], timestamps[idx]);
ASSERT_OK(status_list[idx]);
}
}
Close();
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save