Batched MultiGet API for multiple column families (#5816)

Summary:
Add a new API that allows a user to call MultiGet specifying multiple keys belonging to different column families. This is mainly useful for users who want to do a consistent read of keys across column families, with the added performance benefits of batching and returning values using PinnableSlice.

As part of this change, the code in the original multi-column family MultiGet for acquiring the super versions has been refactored into a separate function that can be used by both, the batching and the non-batching versions of MultiGet.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5816

Test Plan:
make check
make asan_check
asan_crash_test

Differential Revision: D18408676

Pulled By: anand1976

fbshipit-source-id: 933e7bec91dd70e7b633be4ff623a1116cc28c8d
main
anand76 5 years ago committed by Facebook Github Bot
parent a19de78da5
commit 6c7b1a0cc7
  1. 1
      HISTORY.md
  2. 88
      db/db_basic_test.cc
  3. 505
      db/db_impl/db_impl.cc
  4. 87
      db/db_impl/db_impl.h
  5. 33
      db/db_test_util.cc
  6. 3
      db/db_test_util.h
  7. 41
      include/rocksdb/db.h
  8. 17
      table/multiget_context.h
  9. 10
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -9,6 +9,7 @@
### New Features
* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold.
* `GetLiveFilesMetaData` and `GetColumnFamilyMetaData` now expose the file number of SST files as well as the oldest blob file referenced by each SST.
* A batched MultiGet API (DB::MultiGet()) that supports retrieving keys from multiple column families.
### Performance Improvements
* For 64-bit hashing, RocksDB is standardizing on a slightly modified preview version of XXH3. This function is now used for many non-persisted hashes, along with fastrange64() in place of the modulus operator, and some benchmarks show a slight improvement.

@ -1019,15 +1019,27 @@ TEST_F(DBBasicTest, DBCloseFlushError) {
Destroy(options);
}
TEST_F(DBBasicTest, MultiGetMultiCF) {
class DBMultiGetTestWithParam : public DBBasicTest,
public testing::WithParamInterface<bool> {};
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
options);
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
// <CF, key, value> tuples
std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
static const int num_keys = 24;
cf_kv_vec.reserve(num_keys);
for (int i = 0; i < num_keys; ++i) {
int cf = i / 3;
int cf_key = 1 % 3;
cf_kv_vec.emplace_back(std::make_tuple(
cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
"cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
std::get<2>(cf_kv_vec[i])));
}
int get_sv_count = 0;
@ -1037,10 +1049,14 @@ TEST_F(DBBasicTest, MultiGetMultiCF) {
if (++get_sv_count == 2) {
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
// is forced to repeat the process
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val2"));
for (int i = 0; i < num_keys; ++i) {
int cf = i / 3;
int cf_key = i % 8;
if (cf_key == 0) {
ASSERT_OK(Flush(cf));
}
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
std::get<2>(cf_kv_vec[i]) + "_2"));
}
}
if (get_sv_count == 11) {
@ -1058,26 +1074,53 @@ TEST_F(DBBasicTest, MultiGetMultiCF) {
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 0; i < 8; ++i) {
cfs.push_back(i);
keys.push_back("cf" + std::to_string(i) + "_key");
for (int i = 0; i < num_keys; ++i) {
cfs.push_back(std::get<0>(cf_kv_vec[i]));
keys.push_back(std::get<1>(cf_kv_vec[i]));
}
values = MultiGet(cfs, keys, nullptr);
ASSERT_EQ(values.size(), 8);
values = MultiGet(cfs, keys, nullptr, GetParam());
ASSERT_EQ(values.size(), num_keys);
for (unsigned int j = 0; j < values.size(); ++j) {
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2");
ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
}
for (int i = 0; i < 8; ++i) {
keys.clear();
cfs.clear();
cfs.push_back(std::get<0>(cf_kv_vec[0]));
keys.push_back(std::get<1>(cf_kv_vec[0]));
cfs.push_back(std::get<0>(cf_kv_vec[3]));
keys.push_back(std::get<1>(cf_kv_vec[3]));
cfs.push_back(std::get<0>(cf_kv_vec[4]));
keys.push_back(std::get<1>(cf_kv_vec[4]));
values = MultiGet(cfs, keys, nullptr, GetParam());
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
keys.clear();
cfs.clear();
cfs.push_back(std::get<0>(cf_kv_vec[7]));
keys.push_back(std::get<1>(cf_kv_vec[7]));
cfs.push_back(std::get<0>(cf_kv_vec[6]));
keys.push_back(std::get<1>(cf_kv_vec[6]));
cfs.push_back(std::get<0>(cf_kv_vec[1]));
keys.push_back(std::get<1>(cf_kv_vec[1]));
values = MultiGet(cfs, keys, nullptr, GetParam());
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
for (int cf = 0; cf < 8; ++cf) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
}
}
TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
@ -1123,7 +1166,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
keys.push_back("cf" + std::to_string(i) + "_key");
}
values = MultiGet(cfs, keys, nullptr);
values = MultiGet(cfs, keys, nullptr, GetParam());
ASSERT_TRUE(last_try);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
@ -1138,7 +1181,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
}
}
TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
@ -1183,7 +1226,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
}
const Snapshot* snapshot = db_->GetSnapshot();
values = MultiGet(cfs, keys, snapshot);
values = MultiGet(cfs, keys, snapshot, GetParam());
db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
@ -1197,6 +1240,9 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
}
}
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
testing::Bool());
TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());

@ -1645,14 +1645,9 @@ std::vector<Status> DBImpl::MultiGet(
StopWatch sw(env_, stats_, DB_MULTIGET);
PERF_TIMER_GUARD(get_snapshot_time);
SequenceNumber snapshot;
SequenceNumber consistent_seqnum;
;
struct MultiGetColumnFamilyData {
ColumnFamilyData* cfd;
SuperVersion* super_version;
MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv)
: cfd(cf), super_version(sv) {}
};
std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
column_family.size());
for (auto cf : column_family) {
@ -1660,86 +1655,20 @@ std::vector<Status> DBImpl::MultiGet(
auto cfd = cfh->cfd();
if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
multiget_cf_data.emplace(cfd->GetID(),
MultiGetColumnFamilyData(cfd, nullptr));
MultiGetColumnFamilyData(cfh, nullptr));
}
}
bool last_try = false;
{
// If we end up with the same issue of memtable geting sealed during 2
// consecutive retries, it means the write rate is very high. In that case
// its probably ok to take the mutex on the 3rd try so we can succeed for
// sure
static const int num_retries = 3;
for (auto i = 0; i < num_retries; ++i) {
last_try = (i == num_retries - 1);
bool retry = false;
if (i > 0) {
for (auto mgd_iter = multiget_cf_data.begin();
mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
auto super_version = mgd_iter->second.super_version;
auto cfd = mgd_iter->second.cfd;
if (super_version != nullptr) {
ReturnAndCleanupSuperVersion(cfd, super_version);
}
mgd_iter->second.super_version = nullptr;
}
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
} else {
snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_;
}
std::function<MultiGetColumnFamilyData*(
std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
iter_deref_lambda =
[](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
cf_iter) { return &cf_iter->second; };
for (auto mgd_iter = multiget_cf_data.begin();
mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
if (!last_try) {
mgd_iter->second.super_version =
GetAndRefSuperVersion(mgd_iter->second.cfd);
} else {
mgd_iter->second.super_version =
mgd_iter->second.cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
if (read_options.snapshot != nullptr || last_try) {
// If user passed a snapshot, then we don't care if a memtable is
// sealed or compaction happens because the snapshot would ensure
// that older key versions are kept around. If this is the last
// retry, then we have the lock so nothing bad can happen
continue;
}
// We could get the earliest sequence number for the whole list of
// memtables, which will include immutable memtables as well, but that
// might be tricky to maintain in case we decide, in future, to do
// memtable compaction.
if (!last_try) {
auto seq =
mgd_iter->second.super_version->mem->GetEarliestSequenceNumber();
if (seq > snapshot) {
retry = true;
break;
}
}
}
if (!retry) {
if (last_try) {
mutex_.Unlock();
}
break;
}
}
}
bool unref_only =
MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum);
// Contain a list of merge operations if merge occurs.
MergeContext merge_context;
@ -1763,7 +1692,7 @@ std::vector<Status> DBImpl::MultiGet(
Status& s = stat_list[i];
std::string* value = &(*values)[i];
LookupKey lkey(keys[i], snapshot);
LookupKey lkey(keys[i], consistent_seqnum);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
SequenceNumber max_covering_tombstone_seq = 0;
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
@ -1807,7 +1736,7 @@ std::vector<Status> DBImpl::MultiGet(
for (auto mgd_iter : multiget_cf_data) {
auto mgd = mgd_iter.second;
if (!last_try) {
if (!unref_only) {
ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
} else {
mgd.cfd->GetSuperVersion()->Unref();
@ -1824,125 +1753,330 @@ std::vector<Status> DBImpl::MultiGet(
return stat_list;
}
template <class T>
bool DBImpl::MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot) {
PERF_TIMER_GUARD(get_snapshot_time);
bool last_try = false;
if (cf_list->size() == 1) {
// Fast path for a single column family. We can simply get the thread loca
// super version
auto cf_iter = cf_list->begin();
auto node = iter_deref_func(cf_iter);
node->super_version = GetAndRefSuperVersion(node->cfd);
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
// a snapshot is specified we should be fine with skipping seq numbers
// that are greater than that.
//
// In WriteUnprepared, we cannot set snapshot in the lookup key because we
// may skip uncommitted data that should be visible to the transaction for
// reading own writes.
*snapshot =
static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
*snapshot = std::max(*snapshot, callback->max_visible_seq());
}
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the super
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
*snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
} else {
// If we end up with the same issue of memtable geting sealed during 2
// consecutive retries, it means the write rate is very high. In that case
// its probably ok to take the mutex on the 3rd try so we can succeed for
// sure
static const int num_retries = 3;
for (int i = 0; i < num_retries; ++i) {
last_try = (i == num_retries - 1);
bool retry = false;
if (i > 0) {
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
ReturnAndCleanupSuperVersion(cfd, super_version);
}
node->super_version = nullptr;
}
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
*snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
} else {
*snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_;
}
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
node->super_version = GetAndRefSuperVersion(node->cfd);
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
if (read_options.snapshot != nullptr || last_try) {
// If user passed a snapshot, then we don't care if a memtable is
// sealed or compaction happens because the snapshot would ensure
// that older key versions are kept around. If this is the last
// retry, then we have the lock so nothing bad can happen
continue;
}
// We could get the earliest sequence number for the whole list of
// memtables, which will include immutable memtables as well, but that
// might be tricky to maintain in case we decide, in future, to do
// memtable compaction.
if (!last_try) {
SequenceNumber seq =
node->super_version->mem->GetEarliestSequenceNumber();
if (seq > *snapshot) {
retry = true;
break;
}
}
}
if (!retry) {
if (last_try) {
mutex_.Unlock();
}
break;
}
}
}
// Keep track of bytes that we read for statistics-recording later
PERF_TIMER_STOP(get_snapshot_time);
return last_try;
}
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
key_context.emplace_back(column_families[i], keys[i], &values[i],
&statuses[i]);
}
for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i];
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
multiget_cf_data;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
multiget_cf_data.emplace_back(
MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
cf_start = i;
cf = key_ctx->column_family;
}
}
{
// multiget_cf_data.emplace_back(
// MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
}
std::function<MultiGetColumnFamilyData*(
autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
iter_deref_lambda =
[](autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
};
SequenceNumber consistent_seqnum;
bool unref_only = MultiCFSnapshot<
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum);
for (auto cf_iter = multiget_cf_data.begin();
cf_iter != multiget_cf_data.end(); ++cf_iter) {
MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
if (!unref_only) {
ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
} else {
cf_iter->cfd->GetSuperVersion()->Unref();
}
}
}
namespace {
// Order keys by CF ID, followed by key contents
struct CompareKeyContext {
inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
const Comparator* comparator = cfd->user_comparator();
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id1 = cfh->cfd()->GetID();
const Comparator* comparator = cfh->cfd()->user_comparator();
cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id2 = cfh->cfd()->GetID();
if (cfd_id1 < cfd_id2) {
return true;
} else if (cfd_id1 > cfd_id2) {
return false;
}
// Both keys are from the same column family
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
if (cmp < 0) {
return true;
}
return false;
}
const ColumnFamilyData* cfd;
};
} // anonymous namespace
void DBImpl::PrepareMultiGetKeys(
size_t num_keys, bool sorted_input,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
#ifndef NDEBUG
if (sorted_input) {
for (size_t index = 0; index < sorted_keys->size(); ++index) {
if (index > 0) {
KeyContext* lhs = (*sorted_keys)[index - 1];
KeyContext* rhs = (*sorted_keys)[index];
ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id1 = cfh->cfd()->GetID();
const Comparator* comparator = cfh->cfd()->user_comparator();
cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id2 = cfh->cfd()->GetID();
assert(cfd_id1 <= cfd_id2);
if (cfd_id1 < cfd_id2) {
continue;
}
// Both keys are from the same column family
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
assert(cmp <= 0);
}
index++;
}
}
#endif
if (!sorted_input) {
CompareKeyContext sort_comparator;
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
sort_comparator);
}
}
void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input) {
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
key_context.emplace_back(keys[i], &values[i], &statuses[i]);
key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]);
}
MultiGetImpl(read_options, column_family, key_context, sorted_input, nullptr,
nullptr);
for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i];
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
}
void DBImpl::MultiGetImpl(
void DBImpl::MultiGetWithCallback(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE>& key_context,
bool sorted_input, ReadCallback* callback, bool* is_blob_index) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_MULTIGET);
size_t num_keys = key_context.size();
PERF_TIMER_GUARD(get_snapshot_time);
ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
{
size_t index = 0;
for (KeyContext& key : key_context) {
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
std::function<MultiGetColumnFamilyData*(
std::array<MultiGetColumnFamilyData, 1>::iterator&)>
iter_deref_lambda =
[](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
return &(*cf_iter);
};
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum;
bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
read_options, callback, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum);
#ifndef NDEBUG
if (index > 0 && sorted_input) {
KeyContext* lhs = &key_context[index-1];
KeyContext* rhs = &key_context[index];
const Comparator* comparator = cfd->user_comparator();
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
assert(cmp <= 0);
}
#endif
sorted_keys[index] = &key;
index++;
}
if (!sorted_input) {
CompareKeyContext sort_comparator;
sort_comparator.cfd = cfd;
std::sort(sorted_keys.begin(), sorted_keys.begin() + index,
sort_comparator);
}
assert(!unref_only);
#else
// Silence unused variable warning
(void)unref_only;
#endif // NDEBUG
if (callback && read_options.snapshot == nullptr) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(consistent_seqnum);
// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
//
// Currently, the commented out assert is broken by
// InvalidSnapshotReadCallback, but if write unprepared recovery followed
// the regular transaction flow, then this special read callback would not
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
consistent_seqnum = callback->max_visible_seq();
}
// Keep track of bytes that we read for statistics-recording later
PERF_TIMER_STOP(get_snapshot_time);
// Acquire SuperVersion
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
// a snapshot is specified we should be fine with skipping seq numbers
// that are greater than that.
//
// In WriteUnprepared, we cannot set snapshot in the lookup key because we
// may skip uncommitted data that should be visible to the transaction for
// reading own writes.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
snapshot = std::max(snapshot, callback->max_visible_seq());
}
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the super
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
nullptr);
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
}
// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
//
// Currently, the commented out assert is broken by
// InvalidSnapshotReadCallback, but if write unprepared recovery followed
// the regular transaction flow, then this special read callback would not
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
}
}
void DBImpl::MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback, bool* is_blob_index) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_MULTIGET);
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
@ -1953,8 +2087,8 @@ void DBImpl::MultiGetImpl(
size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
? MultiGetContext::MAX_BATCH_SIZE
: keys_left;
MultiGetContext ctx(&sorted_keys[num_keys - keys_left], batch_size,
snapshot);
MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
batch_size, snapshot);
MultiGetRange range = ctx.GetMultiGetRange();
bool lookup_current = false;
@ -1992,15 +2126,14 @@ void DBImpl::MultiGetImpl(
PERF_TIMER_GUARD(get_post_process_time);
size_t num_found = 0;
uint64_t bytes_read = 0;
for (KeyContext& key : key_context) {
if (key.s->ok()) {
bytes_read += key.value->size();
for (size_t i = start_key; i < start_key + num_keys; ++i) {
KeyContext* key = (*sorted_keys)[i];
if (key->s->ok()) {
bytes_read += key->value->size();
num_found++;
}
}
ReturnAndCleanupSuperVersion(cfd, super_version);
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);

@ -199,11 +199,15 @@ class DBImpl : public DB {
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
void MultiGetImpl(
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGetWithCallback(
const ReadOptions& options, ColumnFamilyHandle* column_family,
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE>& key_context,
bool sorted_input, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
@ -1640,6 +1644,81 @@ class DBImpl : public DB {
const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families);
// Utility function to do some debug validation and sort the given vector
// of MultiGet keys
void PrepareMultiGetKeys(
const size_t num_keys, bool sorted,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);
// A structure to hold the information required to process MultiGet of keys
// belonging to one column family. For a multi column family MultiGet, there
// will be a container of these objects.
struct MultiGetColumnFamilyData {
ColumnFamilyHandle* cf;
ColumnFamilyData* cfd;
// For the batched MultiGet which relies on sorted keys, start specifies
// the index of first key belonging to this column family in the sorted
// list.
size_t start;
// For the batched MultiGet case, num_keys specifies the number of keys
// belonging to this column family in the sorted list
size_t num_keys;
// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(0),
num_keys(0),
super_version(sv) {}
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first,
size_t count, SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(first),
num_keys(count),
super_version(sv) {}
MultiGetColumnFamilyData() = default;
};
// A common function to obtain a consistent snapshot, which can be implicit
// if the user doesn't specify a snapshot in read_options, across
// multiple column families for MultiGet. It will attempt to get an implicit
// snapshot without acquiring the db_mutes, but will give up after a few
// tries and acquire the mutex if a memtable flush happens. The template
// allows both the batched and non-batched MultiGet to call this with
// either an std::unordered_map or autovector of column families.
//
// If callback is non-null, the callback is refreshed with the snapshot
// sequence number
//
// A return value of true indicates that the SuperVersions were obtained
// from the ColumnFamilyData, whereas false indicates they are thread
// local
template <class T>
bool MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot);
// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number
// in order to construct the LookupKeys. The start_key and num_keys specify
// the range of keys in the sorted_keys vector for a single column family.
void MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
bool* is_blob_index);
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;

@ -777,7 +777,8 @@ std::string DBTestBase::Get(int cf, const std::string& k,
std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
const std::vector<std::string>& k,
const Snapshot* snapshot) {
const Snapshot* snapshot,
const bool batched) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
@ -789,12 +790,30 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
handles.push_back(handles_[cfs[i]]);
keys.push_back(k[i]);
}
std::vector<Status> s = db_->MultiGet(options, handles, keys, &result);
for (unsigned int i = 0; i < s.size(); ++i) {
if (s[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!s[i].ok()) {
result[i] = s[i].ToString();
std::vector<Status> s;
if (!batched) {
s = db_->MultiGet(options, handles, keys, &result);
for (unsigned int i = 0; i < s.size(); ++i) {
if (s[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!s[i].ok()) {
result[i] = s[i].ToString();
}
}
} else {
std::vector<PinnableSlice> pin_values(cfs.size());
result.resize(cfs.size());
s.resize(cfs.size());
db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
pin_values.data(), s.data());
for (unsigned int i = 0; i < s.size(); ++i) {
if (s[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!s[i].ok()) {
result[i] = s[i].ToString();
} else {
result[i].assign(pin_values[i].data(), pin_values[i].size());
}
}
}
return result;

@ -850,7 +850,8 @@ class DBTestBase : public testing::Test {
std::vector<std::string> MultiGet(std::vector<int> cfs,
const std::vector<std::string>& k,
const Snapshot* snapshot = nullptr);
const Snapshot* snapshot,
const bool batched);
std::vector<std::string> MultiGet(const std::vector<std::string>& k,
const Snapshot* snapshot = nullptr);

@ -490,6 +490,47 @@ class DB {
values++;
}
}
// Overloaded MultiGet API that improves performance by batching operations
// in the read path for greater efficiency. Currently, only the block based
// table format with full filters are supported. Other table formats such
// as plain table, block based table with block based filters and
// partitioned indexes will still work, but will not get any performance
// benefits.
// Parameters -
// options - ReadOptions
// column_family - ColumnFamilyHandle* that the keys belong to. All the keys
// passed to the API are restricted to a single column family
// num_keys - Number of keys to lookup
// keys - Pointer to C style array of key Slices with num_keys elements
// values - Pointer to C style array of PinnableSlices with num_keys elements
// statuses - Pointer to C style array of Status with num_keys elements
// sorted_input - If true, it means the input keys are already sorted by key
// order, so the MultiGet() API doesn't have to sort them
// again. If false, the keys will be copied and sorted
// internally by the API - the input array will not be
// modified
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool /*sorted_input*/ = false) {
std::vector<ColumnFamilyHandle*> cf;
std::vector<Slice> user_keys;
std::vector<Status> status;
std::vector<std::string> vals;
for (size_t i = 0; i < num_keys; ++i) {
cf.emplace_back(column_families[i]);
user_keys.emplace_back(keys[i]);
}
status = MultiGet(options, cf, user_keys, &vals);
std::copy(status.begin(), status.end(), statuses);
for (auto& value : vals) {
values->PinSelf(value);
values++;
}
}
// If the key definitely does not exist in the database, then this method
// returns false, else true. If the caller wants to obtain value when the key
// is found in memory, a bool for 'value_found' must be passed. 'value_found'

@ -5,6 +5,7 @@
#pragma once
#include <algorithm>
#include <array>
#include <string>
#include "db/lookup_key.h"
#include "db/merge_context.h"
@ -21,6 +22,7 @@ struct KeyContext {
LookupKey* lkey;
Slice ukey;
Slice ikey;
ColumnFamilyHandle* column_family;
Status* s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq;
@ -29,9 +31,11 @@ struct KeyContext {
PinnableSlice* value;
GetContext* get_context;
KeyContext(const Slice& user_key, PinnableSlice* val, Status* stat)
KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key,
PinnableSlice* val, Status* stat)
: key(&user_key),
lkey(nullptr),
column_family(col_family),
s(stat),
max_covering_tombstone_seq(0),
key_exists(false),
@ -85,10 +89,9 @@ class MultiGetContext {
// htat need to be performed
static const int MAX_BATCH_SIZE = 32;
MultiGetContext(KeyContext** sorted_keys, size_t num_keys,
SequenceNumber snapshot)
: sorted_keys_(sorted_keys),
num_keys_(num_keys),
MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
size_t begin, size_t num_keys, SequenceNumber snapshot)
: num_keys_(num_keys),
value_mask_(0),
lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
int index = 0;
@ -100,6 +103,8 @@ class MultiGetContext {
}
for (size_t iter = 0; iter != num_keys_; ++iter) {
// autovector may not be contiguous storage, so make a copy
sorted_keys_[iter] = (*sorted_keys)[begin + iter];
sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[index])
LookupKey(*sorted_keys_[iter]->key, snapshot);
sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key();
@ -118,7 +123,7 @@ class MultiGetContext {
static const int MAX_LOOKUP_KEYS_ON_STACK = 16;
alignas(alignof(LookupKey))
char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK];
KeyContext** sorted_keys_;
std::array<KeyContext*, MAX_BATCH_SIZE> sorted_keys_;
size_t num_keys_;
uint64_t value_mask_;
std::unique_ptr<char[]> lookup_key_heap_buf;

@ -963,6 +963,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
->immutable_db_options();
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
// To hold merges from the write batch
autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
MultiGetContext::MAX_BATCH_SIZE>
@ -1002,14 +1003,17 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
key_context.emplace_back(keys[i], &values[i], &statuses[i]);
key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]);
sorted_keys.emplace_back(&key_context.back());
merges.emplace_back(result, std::move(merge_context));
}
// Did not find key in batch OR could not resolve Merges. Try DB.
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->MultiGetImpl(read_options, column_family, key_context, sorted_input,
callback);
->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->MultiGetWithCallback(read_options, column_family, callback,
&sorted_keys);
ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);

Loading…
Cancel
Save