multiget support for timestamps (#6483)

Summary:
Add timestamp support for MultiGet().
timestamp from readoptions is honored, and timestamps can be returned along with values.

MultiReadRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks.
base line (commit 17bef7d3a):
  multireadrandom :     104.173 micros/op 307167 ops/sec; (5462999 of 5462999 found)
This PR:
  multireadrandom :     104.199 micros/op 307095 ops/sec; (5307999 of 5307999 found)

.\db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=multireadrandom --use_existing_db=1 --num=25000000 --threads=32 --allow_concurrent_memtable_write=0
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6483

Reviewed By: anand1976

Differential Revision: D20498373

Pulled By: riversand963

fbshipit-source-id: 8505f22bc40fd791bc7dd05e48d7e67c91edb627
main
Huisheng Liu 5 years ago committed by Facebook GitHub Bot
parent 921cdd37e2
commit a6ce5c823b
  1. 131
      db/db_basic_test.cc
  2. 50
      db/db_impl/db_impl.cc
  3. 16
      db/db_impl/db_impl.h
  4. 109
      db/db_with_timestamp_basic_test.cc
  5. 6
      db/memtable.cc
  6. 4
      db/version_set.cc
  7. 65
      include/rocksdb/db.h
  8. 4
      table/multiget_context.h
  9. 3
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -925,44 +925,44 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) {
#endif #endif
class TestEnv : public EnvWrapper { class TestEnv : public EnvWrapper {
public: public:
explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
class TestLogger : public Logger { class TestLogger : public Logger {
public: public:
using Logger::Logv; using Logger::Logv;
explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() override { ~TestLogger() override {
if (!closed_) { if (!closed_) {
CloseHelper(); CloseHelper();
} }
}
void Logv(const char* /*format*/, va_list /*ap*/) override {}
protected:
Status CloseImpl() override { return CloseHelper(); }
private:
Status CloseHelper() {
env->CloseCountInc();
;
return Status::IOError();
}
TestEnv* env;
};
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; }
Status NewLogger(const std::string& /*fname*/,
std::shared_ptr<Logger>* result) override {
result->reset(new TestLogger(this));
return Status::OK();
} }
void Logv(const char* /*format*/, va_list /*ap*/) override {}
protected:
Status CloseImpl() override { return CloseHelper(); }
private: private:
int close_count; Status CloseHelper() {
env->CloseCountInc();
;
return Status::IOError();
}
TestEnv* env;
};
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; }
Status NewLogger(const std::string& /*fname*/,
std::shared_ptr<Logger>* result) override {
result->reset(new TestLogger(this));
return Status::OK();
}
private:
int close_count;
}; };
TEST_F(DBBasicTest, DBClose) { TEST_F(DBBasicTest, DBClose) {
@ -1014,7 +1014,7 @@ TEST_F(DBBasicTest, DBCloseFlushError) {
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.create_if_missing = true; options.create_if_missing = true;
options.manual_wal_flush = true; options.manual_wal_flush = true;
options.write_buffer_size=100; options.write_buffer_size = 100;
options.env = fault_injection_env.get(); options.env = fault_injection_env.get();
Reopen(options); Reopen(options);
@ -1464,7 +1464,8 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
ASSERT_EQ(0, num_keys); ASSERT_EQ(0, num_keys);
for (int i = 0; i < 128; i += 9) { for (int i = 0; i < 128; i += 9) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); ASSERT_OK(
Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
} }
std::vector<std::string> keys; std::vector<std::string> keys;
@ -1702,8 +1703,8 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_size = 16 * 1024; table_options.block_size = 16 * 1024;
assert(table_options.block_size > ASSERT_TRUE(table_options.block_size >
BlockBasedTable::kMultiGetReadStackBufSize); BlockBasedTable::kMultiGetReadStackBufSize);
options.table_factory.reset(new BlockBasedTableFactory(table_options)); options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
@ -1938,7 +1939,7 @@ class DBBasicTestWithParallelIO
if (!Snappy_Supported()) { if (!Snappy_Supported()) {
compression_enabled_ = false; compression_enabled_ = false;
} }
#endif //ROCKSDB_LITE #endif // ROCKSDB_LITE
table_options.block_cache = uncompressed_cache_; table_options.block_cache = uncompressed_cache_;
if (table_options.block_cache == nullptr) { if (table_options.block_cache == nullptr) {
@ -2275,13 +2276,13 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
ro.fill_cache = fill_cache(); ro.fill_cache = fill_cache();
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
Status* s = static_cast<Status*>(status); Status* s = static_cast<Status*>(status);
read_count++; read_count++;
if (read_count == 2) { if (read_count == 2) {
*s = Status::Corruption(); *s = Status::Corruption();
} }
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
// Warm up the cache first // Warm up the cache first
@ -2294,7 +2295,7 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true); keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString())); ASSERT_TRUE(CheckValue(0, values[0].ToString()));
//ASSERT_TRUE(CheckValue(50, values[1].ToString())); // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
ASSERT_EQ(statuses[0], Status::OK()); ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::Corruption()); ASSERT_EQ(statuses[1], Status::Corruption());
@ -2312,10 +2313,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
ro.fill_cache = fill_cache(); ro.fill_cache = fill_cache();
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"TableCache::MultiGet:FindTable", [&](void *status) { "TableCache::MultiGet:FindTable", [&](void* status) {
Status* s = static_cast<Status*>(status); Status* s = static_cast<Status*>(status);
*s = Status::IOError(); *s = Status::IOError();
}); });
// DB open will create table readers unless we reduce the table cache // DB open will create table readers unless we reduce the table cache
// capacity. // capacity.
// SanitizeOptions will set max_open_files to minimum of 20. Table cache // SanitizeOptions will set max_open_files to minimum of 20. Table cache
@ -2324,10 +2325,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
// prevent file open during DB open and force the file to be opened // prevent file open during DB open and force the file to be opened
// during MultiGet // during MultiGet
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg; int* max_open_files = (int*)arg;
*max_open_files = 11; *max_open_files = 11;
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
Reopen(CurrentOptions()); Reopen(CurrentOptions());
@ -2347,15 +2348,15 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
} }
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
ParallelIO, DBBasicTestWithParallelIO, // Params are as follows -
// Params are as follows - // Param 0 - Compressed cache enabled
// Param 0 - Compressed cache enabled // Param 1 - Uncompressed cache enabled
// Param 1 - Uncompressed cache enabled // Param 2 - Data compression enabled
// Param 2 - Data compression enabled // Param 3 - ReadOptions::fill_cache
// Param 3 - ReadOptions::fill_cache ::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Bool(),
::testing::Bool(), ::testing::Bool())); ::testing::Bool()));
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -1687,12 +1687,20 @@ std::vector<Status> DBImpl::MultiGet(
const ReadOptions& read_options, const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) { const std::vector<Slice>& keys, std::vector<std::string>* values) {
return MultiGet(read_options, column_family, keys, values,
/*timestamps*/ nullptr);
}
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_MULTIGET); StopWatch sw(env_, stats_, DB_MULTIGET);
PERF_TIMER_GUARD(get_snapshot_time); PERF_TIMER_GUARD(get_snapshot_time);
SequenceNumber consistent_seqnum; SequenceNumber consistent_seqnum;
;
std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data( std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
column_family.size()); column_family.size());
@ -1723,6 +1731,9 @@ std::vector<Status> DBImpl::MultiGet(
size_t num_keys = keys.size(); size_t num_keys = keys.size();
std::vector<Status> stat_list(num_keys); std::vector<Status> stat_list(num_keys);
values->resize(num_keys); values->resize(num_keys);
if (timestamps) {
timestamps->resize(num_keys);
}
// Keep track of bytes that we read for statistics-recording later // Keep track of bytes that we read for statistics-recording later
uint64_t bytes_read = 0; uint64_t bytes_read = 0;
@ -1737,8 +1748,9 @@ std::vector<Status> DBImpl::MultiGet(
merge_context.Clear(); merge_context.Clear();
Status& s = stat_list[i]; Status& s = stat_list[i];
std::string* value = &(*values)[i]; std::string* value = &(*values)[i];
std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr;
LookupKey lkey(keys[i], consistent_seqnum); LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
SequenceNumber max_covering_tombstone_seq = 0; SequenceNumber max_covering_tombstone_seq = 0;
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
@ -1750,13 +1762,12 @@ std::vector<Status> DBImpl::MultiGet(
has_unpersisted_data_.load(std::memory_order_relaxed)); has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false; bool done = false;
if (!skip_memtable) { if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, /*timestamp=*/nullptr, &s, if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
&merge_context, &max_covering_tombstone_seq, &max_covering_tombstone_seq, read_options)) {
read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if (super_version->imm->Get( } else if (super_version->imm->Get(
lkey, value, nullptr, &s, &merge_context, lkey, value, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) { &max_covering_tombstone_seq, read_options)) {
done = true; done = true;
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
@ -1765,8 +1776,8 @@ std::vector<Status> DBImpl::MultiGet(
if (!done) { if (!done) {
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, &pinnable_val, super_version->current->Get(read_options, lkey, &pinnable_val, timestamp,
/*timestamp=*/nullptr, &s, &merge_context, &s, &merge_context,
&max_covering_tombstone_seq); &max_covering_tombstone_seq);
value->assign(pinnable_val.data(), pinnable_val.size()); value->assign(pinnable_val.data(), pinnable_val.size());
RecordTick(stats_, MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
@ -1929,6 +1940,14 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses, PinnableSlice* values, Status* statuses,
const bool sorted_input) { const bool sorted_input) {
return MultiGet(read_options, num_keys, column_families, keys, values,
/*timestamps*/ nullptr, statuses, sorted_input);
}
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) {
if (num_keys == 0) { if (num_keys == 0) {
return; return;
} }
@ -1937,7 +1956,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
key_context.emplace_back(column_families[i], keys[i], &values[i], key_context.emplace_back(column_families[i], keys[i], &values[i],
&statuses[i]); &timestamps[i], &statuses[i]);
} }
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i]; sorted_keys[i] = &key_context[i];
@ -2057,11 +2076,22 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys, ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values, const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input) { Status* statuses, const bool sorted_input) {
return MultiGet(read_options, column_family, num_keys, keys, values,
/*timestamp=*/nullptr, statuses, sorted_input);
}
void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context; autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); key_context.emplace_back(column_family, keys[i], &values[i],
timestamps ? &timestamps[i] : nullptr,
&statuses[i]);
} }
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i]; sorted_keys[i] = &key_context[i];

@ -188,6 +188,11 @@ class DBImpl : public DB {
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, const std::vector<Slice>& keys,
std::vector<std::string>* values) override; std::vector<std::string>* values) override;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) override;
// This MultiGet is a batched version, which may be faster than calling Get // This MultiGet is a batched version, which may be faster than calling Get
// multiple times, especially if the keys have some spatial locality that // multiple times, especially if the keys have some spatial locality that
@ -201,11 +206,22 @@ class DBImpl : public DB {
const size_t num_keys, const Slice* keys, const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses, PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override; const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options, const size_t num_keys, virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses, PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override; const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGetWithCallback( virtual void MultiGetWithCallback(
const ReadOptions& options, ColumnFamilyHandle* column_family, const ReadOptions& options, ColumnFamilyHandle* column_family,

@ -580,7 +580,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
std::vector<std::string> write_ts_list; std::vector<std::string> write_ts_list;
std::vector<std::string> read_ts_list; std::vector<std::string> read_ts_list;
const auto& verify_record_func = [&](size_t i, size_t k, const auto& verify_records_func = [&](size_t i, size_t begin, size_t end,
ColumnFamilyHandle* cfh) { ColumnFamilyHandle* cfh) {
std::string value; std::string value;
std::string timestamp; std::string timestamp;
@ -591,9 +591,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
std::string expected_timestamp = std::string expected_timestamp =
std::string(write_ts_list[i].data(), write_ts_list[i].size()); std::string(write_ts_list[i].data(), write_ts_list[i].size());
ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, &timestamp)); for (size_t j = begin; j <= end; ++j) {
ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value); ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, &timestamp));
ASSERT_EQ(expected_timestamp, timestamp); ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value);
ASSERT_EQ(expected_timestamp, timestamp);
}
}; };
for (size_t i = 0; i != kNumTimestamps; ++i) { for (size_t i = 0; i != kNumTimestamps; ++i) {
@ -609,9 +611,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
"value_" + std::to_string(j) + "_" + std::to_string(i), "value_" + std::to_string(j) + "_" + std::to_string(i),
wopts)); wopts));
if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
for (size_t k = memtable_get_start; k <= j; ++k) { verify_records_func(i, memtable_get_start, j, handles_[cf]);
verify_record_func(i, k, handles_[cf]);
}
memtable_get_start = j + 1; memtable_get_start = j + 1;
// flush all keys with the same timestamp to two sst files, split at // flush all keys with the same timestamp to two sst files, split at
@ -641,15 +641,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
write_ts_list[i].size()); write_ts_list[i].size());
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) { for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
ColumnFamilyHandle* cfh = handles_[cf]; ColumnFamilyHandle* cfh = handles_[cf];
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh);
verify_record_func(i, j, cfh);
}
} }
} }
}; };
verify_db_func(); verify_db_func();
Close(); Close();
} }
TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
const int kNumKeysPerFile = 8192;
const size_t kNumTimestamps = 2;
const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
Options options = CurrentOptions();
options.create_if_missing = true;
options.env = env_;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
size_t ts_sz = Timestamp(0, 0).size();
TestComparator test_cmp(ts_sz);
options.comparator = &test_cmp;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(
10 /*bits_per_key*/, false /*use_block_based_builder*/));
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
std::vector<std::string> write_ts_list;
std::vector<std::string> read_ts_list;
const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) {
std::vector<Slice> keys;
std::vector<std::string> key_vals;
std::vector<std::string> values;
std::vector<std::string> timestamps;
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
key_vals.push_back(Key1(j));
}
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
keys.push_back(key_vals[j]);
}
ReadOptions ropts;
const Slice read_ts = read_ts_list[i];
ropts.timestamp = &read_ts;
std::string expected_timestamp(write_ts_list[i].data(),
write_ts_list[i].size());
std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
std::vector<Status> statuses =
db_->MultiGet(ropts, cfhs, keys, &values, &timestamps);
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK(statuses[j]);
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
values[j]);
ASSERT_EQ(expected_timestamp, timestamps[j]);
}
};
for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.push_back(Timestamp(i * 2, 0));
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
const Slice& write_ts = write_ts_list.back();
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
WriteOptions wopts;
WriteBatch batch(0, 0, ts_sz);
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK(
batch.Put(handles_[cf], Key1(j),
"value_" + std::to_string(j) + "_" + std::to_string(i)));
}
batch.AssignTimestamp(write_ts);
ASSERT_OK(db_->Write(wopts, &batch));
verify_records_func(i, handles_[cf]);
ASSERT_OK(Flush(cf));
}
}
const auto& verify_db_func = [&]() {
for (size_t i = 0; i != kNumTimestamps; ++i) {
ReadOptions ropts;
const Slice read_ts = read_ts_list[i];
ropts.timestamp = &read_ts;
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
ColumnFamilyHandle* cfh = handles_[cf];
verify_records_func(i, cfh);
}
}
};
verify_db_func();
Close();
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(

@ -919,9 +919,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
} }
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, is_blob, iter->value->GetSelf(), callback, is_blob, iter->value->GetSelf(), iter->timestamp,
/*timestamp=*/nullptr, iter->s, &(iter->merge_context), &seq, iter->s, &(iter->merge_context), &seq, &found_final_value,
&found_final_value, &merge_in_progress); &merge_in_progress);
if (!found_final_value && merge_in_progress) { if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress(); *(iter->s) = Status::MergeInProgress();

@ -1897,8 +1897,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
get_ctx.emplace_back( get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, /*timestamp*/ nullptr, nullptr, &(iter->merge_context), iter->value, iter->timestamp, nullptr, &(iter->merge_context), true,
true, &iter->max_covering_tombstone_seq, this->env_, nullptr, &iter->max_covering_tombstone_seq, this->env_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_mget_id); tracing_mget_id);
// MergeInProgress status, if set, has been transferred to the get_context // MergeInProgress status, if set, has been transferred to the get_context

@ -480,6 +480,25 @@ class DB {
keys, values); keys, values);
} }
virtual std::vector<Status> MultiGet(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_family*/,
const std::vector<Slice>& keys, std::vector<std::string>* /*values*/,
std::vector<std::string>* /*timestamps*/) {
return std::vector<Status>(
keys.size(), Status::NotSupported(
"MultiGet() returning timestamps not implemented."));
}
virtual std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
return MultiGet(
options,
std::vector<ColumnFamilyHandle*>(keys.size(), DefaultColumnFamily()),
keys, values, timestamps);
}
// Overloaded MultiGet API that improves performance by batching operations // Overloaded MultiGet API that improves performance by batching operations
// in the read path for greater efficiency. Currently, only the block based // in the read path for greater efficiency. Currently, only the block based
// table format with full filters are supported. Other table formats such // table format with full filters are supported. Other table formats such
@ -521,6 +540,30 @@ class DB {
} }
} }
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool /*sorted_input*/ = false) {
std::vector<ColumnFamilyHandle*> cf;
std::vector<Slice> user_keys;
std::vector<Status> status;
std::vector<std::string> vals;
std::vector<std::string> tss;
for (size_t i = 0; i < num_keys; ++i) {
cf.emplace_back(column_family);
user_keys.emplace_back(keys[i]);
}
status = MultiGet(options, cf, user_keys, &vals, &tss);
std::copy(status.begin(), status.end(), statuses);
std::copy(tss.begin(), tss.end(), timestamps);
for (auto& value : vals) {
values->PinSelf(value);
values++;
}
}
// Overloaded MultiGet API that improves performance by batching operations // Overloaded MultiGet API that improves performance by batching operations
// in the read path for greater efficiency. Currently, only the block based // in the read path for greater efficiency. Currently, only the block based
// table format with full filters are supported. Other table formats such // table format with full filters are supported. Other table formats such
@ -560,6 +603,28 @@ class DB {
values++; values++;
} }
} }
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool /*sorted_input*/ = false) {
std::vector<ColumnFamilyHandle*> cf;
std::vector<Slice> user_keys;
std::vector<Status> status;
std::vector<std::string> vals;
std::vector<std::string> tss;
for (size_t i = 0; i < num_keys; ++i) {
cf.emplace_back(column_families[i]);
user_keys.emplace_back(keys[i]);
}
status = MultiGet(options, cf, user_keys, &vals, &tss);
std::copy(status.begin(), status.end(), statuses);
std::copy(tss.begin(), tss.end(), timestamps);
for (auto& value : vals) {
values->PinSelf(value);
values++;
}
}
// If the key definitely does not exist in the database, then this method // If the key definitely does not exist in the database, then this method
// returns false, else true. If the caller wants to obtain value when the key // returns false, else true. If the caller wants to obtain value when the key

@ -29,10 +29,11 @@ struct KeyContext {
bool key_exists; bool key_exists;
void* cb_arg; void* cb_arg;
PinnableSlice* value; PinnableSlice* value;
std::string* timestamp;
GetContext* get_context; GetContext* get_context;
KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key,
PinnableSlice* val, Status* stat) PinnableSlice* val, std::string* ts, Status* stat)
: key(&user_key), : key(&user_key),
lkey(nullptr), lkey(nullptr),
column_family(col_family), column_family(col_family),
@ -41,6 +42,7 @@ struct KeyContext {
key_exists(false), key_exists(false),
cb_arg(nullptr), cb_arg(nullptr),
value(val), value(val),
timestamp(ts),
get_context(nullptr) {} get_context(nullptr) {}
KeyContext() = default; KeyContext() = default;

@ -982,7 +982,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound); result == WriteBatchWithIndexInternal::Result::kNotFound);
key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); key_context.emplace_back(column_family, keys[i], &values[i],
/*timestamp*/ nullptr, &statuses[i]);
merges.emplace_back(result, std::move(merge_context)); merges.emplace_back(result, std::move(merge_context));
} }

Loading…
Cancel
Save