Add a new MultiGetEntity API (#11222)

Summary:
The new `MultiGetEntity` API can be used to get a consistent view of
a batch of keys, with the results presented as wide-column entities.
Similarly to `GetEntity` and the iterator's `columns` API, if the entry
corresponding to the key is a wide-column entity to start with, it is
returned as-is, and if it is a plain key-value, it is wrapped into an entity
with a single default column.

Implementation-wise, the new API shares the logic of the batched `MultiGet`
API (via the `MultiGetCommon` methods). Both single-CF and multi-CF
`MultiGetEntity` APIs are provided, and blobs are also supported.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11222

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D43256950

Pulled By: ltamasi

fbshipit-source-id: 47fb2cb7e2d0470e3580f43fdb2fe9e51f0e7005
oxigraph-8.1.1
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 6d5e8604f1
commit 9794acb597
  1. 43
      db/blob/db_blob_basic_test.cc
  2. 88
      db/db_impl/db_impl.cc
  3. 65
      db/db_impl/db_impl.h
  4. 18
      db/memtable.cc
  5. 77
      db/version_set.cc
  6. 12
      db/version_set.h
  7. 38
      db/version_set_sync_and_async.h
  8. 91
      db/wide/db_wide_basic_test.cc
  9. 62
      include/rocksdb/db.h
  10. 18
      include/rocksdb/utilities/stackable_db.h
  11. 2
      table/block_based/block_based_table_reader_test.cc
  12. 8
      table/multiget_context.h
  13. 3
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -1782,16 +1782,49 @@ TEST_F(DBBlobBasicTest, GetEntityBlob) {
constexpr char key[] = "key"; constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value"; constexpr char blob_value[] = "blob_value";
constexpr char other_key[] = "other_key";
constexpr char other_blob_value[] = "other_blob_value";
ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Put(other_key, other_blob_value));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
PinnableWideColumns result;
ASSERT_OK(
db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key, &result));
WideColumns expected_columns{{kDefaultWideColumnName, blob_value}}; WideColumns expected_columns{{kDefaultWideColumnName, blob_value}};
ASSERT_EQ(result.columns(), expected_columns); WideColumns other_expected_columns{
{kDefaultWideColumnName, other_blob_value}};
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key,
&result));
ASSERT_EQ(result.columns(), expected_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
other_key, &result));
ASSERT_EQ(result.columns(), other_expected_columns);
}
{
constexpr size_t num_keys = 2;
std::array<Slice, num_keys> keys{{key, other_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &results[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), expected_columns);
ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), other_expected_columns);
}
} }
class DBBlobWithTimestampTest : public DBBasicTestWithTimestampBase { class DBBlobWithTimestampTest : public DBBasicTestWithTimestampBase {

@ -2605,14 +2605,25 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses, PinnableSlice* values, Status* statuses,
const bool sorted_input) { const bool sorted_input) {
return MultiGet(read_options, num_keys, column_families, keys, values, MultiGet(read_options, num_keys, column_families, keys, values,
/*timestamps=*/nullptr, statuses, sorted_input); /* timestamps */ nullptr, statuses, sorted_input);
} }
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps, PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) { Status* statuses, const bool sorted_input) {
MultiGetCommon(read_options, num_keys, column_families, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}
void DBImpl::MultiGetCommon(const ReadOptions& read_options,
const size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableSlice* values,
PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
if (num_keys == 0) { if (num_keys == 0) {
return; return;
} }
@ -2658,8 +2669,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset(); PinnableSlice* val = nullptr;
key_context.emplace_back(column_families[i], keys[i], &values[i], PinnableWideColumns* col = nullptr;
if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);
col = &columns[i];
col->Reset();
}
key_context.emplace_back(column_families[i], keys[i], val, col,
timestamps ? &timestamps[i] : nullptr, timestamps ? &timestamps[i] : nullptr,
&statuses[i]); &statuses[i]);
} }
@ -2783,8 +2806,8 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys, ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values, const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input) { Status* statuses, const bool sorted_input) {
return MultiGet(read_options, column_family, num_keys, keys, values, MultiGet(read_options, column_family, num_keys, keys, values,
/*timestamp=*/nullptr, statuses, sorted_input); /* timestamps */ nullptr, statuses, sorted_input);
} }
void DBImpl::MultiGet(const ReadOptions& read_options, void DBImpl::MultiGet(const ReadOptions& read_options,
@ -2792,6 +2815,16 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
const Slice* keys, PinnableSlice* values, const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses, std::string* timestamps, Status* statuses,
const bool sorted_input) { const bool sorted_input) {
MultiGetCommon(read_options, column_family, num_keys, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}
void DBImpl::MultiGetCommon(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
bool sorted_input) {
if (tracer_) { if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when // TODO: This mutex should be removed later, to improve performance when
// tracing is enabled. // tracing is enabled.
@ -2805,8 +2838,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset(); PinnableSlice* val = nullptr;
key_context.emplace_back(column_family, keys[i], &values[i], PinnableWideColumns* col = nullptr;
if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);
col = &columns[i];
col->Reset();
}
key_context.emplace_back(column_family, keys[i], val, col,
timestamps ? &timestamps[i] : nullptr, timestamps ? &timestamps[i] : nullptr,
&statuses[i]); &statuses[i]);
} }
@ -2968,8 +3013,17 @@ Status DBImpl::MultiGetImpl(
uint64_t bytes_read = 0; uint64_t bytes_read = 0;
for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) { for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
KeyContext* key = (*sorted_keys)[i]; KeyContext* key = (*sorted_keys)[i];
assert(key);
assert(key->s);
if (key->s->ok()) { if (key->s->ok()) {
bytes_read += key->value->size(); if (key->value) {
bytes_read += key->value->size();
} else {
assert(key->columns);
bytes_read += key->columns->serialized_size();
}
num_found++; num_found++;
} }
} }
@ -2993,6 +3047,22 @@ Status DBImpl::MultiGetImpl(
return s; return s;
} }
void DBImpl::MultiGetEntity(const ReadOptions& options, size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
MultiGetCommon(options, num_keys, column_families, keys, /* values */ nullptr,
results, /* timestamps */ nullptr, statuses, sorted_input);
}
void DBImpl::MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
MultiGetCommon(options, column_family, num_keys, keys, /* values */ nullptr,
results, /* timestamps */ nullptr, statuses, sorted_input);
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family, const std::string& column_family,
ColumnFamilyHandle** handle) { ColumnFamilyHandle** handle) {

@ -277,33 +277,40 @@ class DBImpl : public DB {
// The values and statuses parameters are arrays with number of elements // The values and statuses parameters are arrays with number of elements
// equal to keys.size(). This allows the storage for those to be alloacted // equal to keys.size(). This allows the storage for those to be alloacted
// by the caller on the stack for small batches // by the caller on the stack for small batches
virtual void MultiGet(const ReadOptions& options, void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values,
const size_t num_keys, const Slice* keys, Status* statuses, const bool sorted_input = false) override;
PinnableSlice* values, Status* statuses, void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const bool sorted_input = false) override; const size_t num_keys, const Slice* keys, PinnableSlice* values,
virtual void MultiGet(const ReadOptions& options, std::string* timestamps, Status* statuses,
ColumnFamilyHandle* column_family, const bool sorted_input = false) override;
const size_t num_keys, const Slice* keys,
PinnableSlice* values, std::string* timestamps, void MultiGet(const ReadOptions& options, const size_t num_keys,
Status* statuses, ColumnFamilyHandle** column_families, const Slice* keys,
const bool sorted_input = false) override; PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options, const size_t num_keys, void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses, PinnableSlice* values, std::string* timestamps,
const bool sorted_input = false) override; Status* statuses, const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys, void MultiGetWithCallback(
PinnableSlice* values, std::string* timestamps,
Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGetWithCallback(
const ReadOptions& options, ColumnFamilyHandle* column_family, const ReadOptions& options, ColumnFamilyHandle* column_family,
ReadCallback* callback, ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
using DB::MultiGetEntity;
void MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) override;
void MultiGetEntity(const ReadOptions& options, size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableWideColumns* results, Status* statuses,
bool sorted_input) override;
virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family, const std::string& column_family,
ColumnFamilyHandle** handle) override; ColumnFamilyHandle** handle) override;
@ -2191,6 +2198,18 @@ class DBImpl : public DB {
const size_t num_keys, bool sorted, const size_t num_keys, bool sorted,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs); autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);
void MultiGetCommon(const ReadOptions& options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
PinnableWideColumns* columns, std::string* timestamps,
Status* statuses, bool sorted_input);
void MultiGetCommon(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
bool sorted_input);
// A structure to hold the information required to process MultiGet of keys // A structure to hold the information required to process MultiGet of keys
// belonging to one column family. For a multi column family MultiGet, there // belonging to one column family. For a multi column family MultiGet, there
// will be a container of these objects. // will be a container of these objects.

@ -1452,18 +1452,24 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
} }
SequenceNumber dummy_seq; SequenceNumber dummy_seq;
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, &iter->is_blob_index, iter->value->GetSelf(), callback, &iter->is_blob_index,
/*columns=*/nullptr, iter->timestamp, iter->s, iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
&(iter->merge_context), &dummy_seq, &found_final_value, iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
&merge_in_progress); &found_final_value, &merge_in_progress);
if (!found_final_value && merge_in_progress) { if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress(); *(iter->s) = Status::MergeInProgress();
} }
if (found_final_value) { if (found_final_value) {
iter->value->PinSelf(); if (iter->value) {
range->AddValueSize(iter->value->size()); iter->value->PinSelf();
range->AddValueSize(iter->value->size());
} else {
assert(iter->columns);
range->AddValueSize(iter->columns->serialized_size());
}
range->MarkKeyDone(iter); range->MarkKeyDone(iter);
RecordTick(moptions_.statistics, MEMTABLE_HIT); RecordTick(moptions_.statistics, MEMTABLE_HIT);
if (range->GetValueSize() > read_options.value_size_soft_limit) { if (range->GetValueSize() > read_options.value_size_soft_limit) {

@ -23,7 +23,6 @@
#include "db/blob/blob_fetcher.h" #include "db/blob/blob_fetcher.h"
#include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h" #include "db/blob/blob_file_reader.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/blob/blob_source.h" #include "db/blob/blob_source.h"
#include "db/compaction/compaction.h" #include "db/compaction/compaction.h"
@ -2177,26 +2176,35 @@ void Version::MultiGetBlob(
autovector<BlobReadRequest> blob_reqs_in_file; autovector<BlobReadRequest> blob_reqs_in_file;
BlobReadContexts& blobs_in_file = ctx.second; BlobReadContexts& blobs_in_file = ctx.second;
for (const auto& blob : blobs_in_file) { for (auto& blob : blobs_in_file) {
const BlobIndex& blob_index = blob.first; const BlobIndex& blob_index = blob.blob_index;
const KeyContext& key_context = blob.second; const KeyContext* const key_context = blob.key_context;
assert(key_context);
assert(key_context->get_context);
assert(key_context->s);
if (key_context->value) {
key_context->value->Reset();
} else {
assert(key_context->columns);
key_context->columns->Reset();
}
if (!blob_file_meta) { if (!blob_file_meta) {
*key_context.s = Status::Corruption("Invalid blob file number"); *key_context->s = Status::Corruption("Invalid blob file number");
continue; continue;
} }
if (blob_index.HasTTL() || blob_index.IsInlined()) { if (blob_index.HasTTL() || blob_index.IsInlined()) {
*key_context.s = *key_context->s =
Status::Corruption("Unexpected TTL/inlined blob index"); Status::Corruption("Unexpected TTL/inlined blob index");
continue; continue;
} }
key_context.value->Reset();
blob_reqs_in_file.emplace_back( blob_reqs_in_file.emplace_back(
key_context.get_context->ukey_to_get_blob_value(), key_context->get_context->ukey_to_get_blob_value(),
blob_index.offset(), blob_index.size(), blob_index.compression(), blob_index.offset(), blob_index.size(), blob_index.compression(),
key_context.value, key_context.s); &blob.result, key_context->s);
} }
if (blob_reqs_in_file.size() > 0) { if (blob_reqs_in_file.size() > 0) {
const auto file_size = blob_file_meta->GetBlobFileSize(); const auto file_size = blob_file_meta->GetBlobFileSize();
@ -2211,18 +2219,29 @@ void Version::MultiGetBlob(
for (auto& ctx : blob_ctxs) { for (auto& ctx : blob_ctxs) {
BlobReadContexts& blobs_in_file = ctx.second; BlobReadContexts& blobs_in_file = ctx.second;
for (const auto& blob : blobs_in_file) { for (auto& blob : blobs_in_file) {
const KeyContext& key_context = blob.second; const KeyContext* const key_context = blob.key_context;
if (key_context.s->ok()) { assert(key_context);
range.AddValueSize(key_context.value->size()); assert(key_context->get_context);
assert(key_context->s);
if (key_context->s->ok()) {
if (key_context->value) {
*key_context->value = std::move(blob.result);
range.AddValueSize(key_context->value->size());
} else {
assert(key_context->columns);
key_context->columns->SetPlainValue(blob.result);
range.AddValueSize(key_context->columns->serialized_size());
}
if (range.GetValueSize() > read_options.value_size_soft_limit) { if (range.GetValueSize() > read_options.value_size_soft_limit) {
*key_context.s = Status::Aborted(); *key_context->s = Status::Aborted();
} }
} else if (key_context.s->IsIncomplete()) { } else if (key_context->s->IsIncomplete()) {
// read_options.read_tier == kBlockCacheTier // read_options.read_tier == kBlockCacheTier
// Cannot read blob(s): no disk I/O allowed // Cannot read blob(s): no disk I/O allowed
assert(key_context.get_context); auto& get_context = *(key_context->get_context);
auto& get_context = *(key_context.get_context);
get_context.MarkKeyMayExist(); get_context.MarkKeyMayExist();
} }
} }
@ -2457,7 +2476,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
get_ctx.emplace_back( get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
iter->ukey_with_ts, iter->value, /*columns=*/nullptr, iter->timestamp, iter->ukey_with_ts, iter->value, iter->columns, iter->timestamp,
nullptr, &(iter->merge_context), true, nullptr, &(iter->merge_context), true,
&iter->max_covering_tombstone_seq, clock_, nullptr, &iter->max_covering_tombstone_seq, clock_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
@ -2657,23 +2676,29 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
} }
// merge_operands are in saver and we hit the beginning of the key history // merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands; // do a final merge of nullptr and operands;
std::string* str_value = std::string result;
iter->value != nullptr ? iter->value->GetSelf() : nullptr;
// `op_failure_scope` (an output parameter) is not provided (set to // `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its value. // nullptr) since a failure must be propagated regardless of its value.
*status = MergeHelper::TimedFullMerge( *status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(), merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, clock_, &result, info_log_, db_statistics_, clock_,
/* result_operand */ nullptr, /* update_num_ops_stats */ true, /* result_operand */ nullptr, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr); /* op_failure_scope */ nullptr);
if (LIKELY(iter->value != nullptr)) { if (LIKELY(iter->value != nullptr)) {
*iter->value->GetSelf() = std::move(result);
iter->value->PinSelf(); iter->value->PinSelf();
range->AddValueSize(iter->value->size()); range->AddValueSize(iter->value->size());
range->MarkKeyDone(iter); } else {
if (range->GetValueSize() > read_options.value_size_soft_limit) { assert(iter->columns);
s = Status::Aborted(); iter->columns->SetPlainValue(result);
break; range->AddValueSize(iter->columns->serialized_size());
} }
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
} }
} else { } else {
range->MarkKeyDone(iter); range->MarkKeyDone(iter);

@ -33,6 +33,7 @@
#include "cache/cache_helpers.h" #include "cache/cache_helpers.h"
#include "db/blob/blob_file_meta.h" #include "db/blob/blob_file_meta.h"
#include "db/blob/blob_index.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction/compaction.h" #include "db/compaction/compaction.h"
#include "db/compaction/compaction_picker.h" #include "db/compaction/compaction_picker.h"
@ -889,8 +890,15 @@ class Version {
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read) const; uint64_t* bytes_read) const;
using BlobReadContext = struct BlobReadContext {
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>; BlobReadContext(const BlobIndex& blob_idx, const KeyContext* key_ctx)
: blob_index(blob_idx), key_context(key_ctx) {}
BlobIndex blob_index;
const KeyContext* key_context;
PinnableSlice result;
};
using BlobReadContexts = std::vector<BlobReadContext>; using BlobReadContexts = std::vector<BlobReadContext>;
void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range, void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs); std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs);

@ -101,23 +101,39 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
file_range.MarkKeyDone(iter); file_range.MarkKeyDone(iter);
if (iter->is_blob_index) { if (iter->is_blob_index) {
BlobIndex blob_index;
Status tmp_s;
if (iter->value) { if (iter->value) {
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
&(*iter)); &(*iter));
const Slice& blob_index_slice = *(iter->value); tmp_s = blob_index.DecodeFrom(*(iter->value));
BlobIndex blob_index;
Status tmp_s = blob_index.DecodeFrom(blob_index_slice); } else {
if (tmp_s.ok()) { assert(iter->columns);
const uint64_t blob_file_num = blob_index.file_number(); assert(!iter->columns->columns().empty());
blob_ctxs[blob_file_num].emplace_back( assert(iter->columns->columns().front().name() ==
std::make_pair(blob_index, std::cref(*iter))); kDefaultWideColumnName);
} else {
*(iter->s) = tmp_s; tmp_s =
} blob_index.DecodeFrom(iter->columns->columns().front().value());
}
if (tmp_s.ok()) {
const uint64_t blob_file_num = blob_index.file_number();
blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
} else {
*(iter->s) = tmp_s;
} }
} else { } else {
file_range.AddValueSize(iter->value->size()); if (iter->value) {
file_range.AddValueSize(iter->value->size());
} else {
assert(iter->columns);
file_range.AddValueSize(iter->columns->serialized_size());
}
if (file_range.GetValueSize() > read_options.value_size_soft_limit) { if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted(); s = Status::Aborted();
break; break;

@ -105,6 +105,26 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_EQ(values[2], third_value); ASSERT_EQ(values[2], third_value);
} }
{
constexpr size_t num_keys = 3;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &results[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), first_columns);
ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), second_columns);
ASSERT_OK(statuses[2]);
ASSERT_EQ(results[2].columns(), expected_third_columns);
}
{ {
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
@ -210,6 +230,40 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
} }
TEST_F(DBWideBasicTest, MultiCFMultiGetEntity) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"corinthian"}, options);
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
ASSERT_OK(
db_->PutEntity(WriteOptions(), handles_[1], second_key, second_columns));
constexpr size_t num_keys = 2;
std::array<ColumnFamilyHandle*, num_keys> column_families{
{db_->DefaultColumnFamily(), handles_[1]}};
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
db_->MultiGetEntity(ReadOptions(), num_keys, &column_families[0], &keys[0],
&results[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), first_columns);
ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), second_columns);
}
TEST_F(DBWideBasicTest, MergePlainKeyValue) { TEST_F(DBWideBasicTest, MergePlainKeyValue) {
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.create_if_missing = true; options.create_if_missing = true;
@ -280,6 +334,26 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) {
ASSERT_EQ(result.columns(), expected_third_columns); ASSERT_EQ(result.columns(), expected_third_columns);
} }
{
constexpr size_t num_keys = 3;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &results[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), expected_first_columns);
ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), expected_second_columns);
ASSERT_OK(statuses[2]);
ASSERT_EQ(results[2].columns(), expected_third_columns);
}
{ {
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
@ -457,6 +531,23 @@ TEST_F(DBWideBasicTest, MergeEntity) {
ASSERT_OK(statuses[1]); ASSERT_OK(statuses[1]);
} }
{
constexpr size_t num_keys = 2;
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &results[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), first_expected_columns);
ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), second_expected_columns);
}
{ {
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));

@ -769,6 +769,68 @@ class DB {
} }
} }
// Batched MultiGet-like API that returns wide-column entities from a single
// column family. For any given "key[i]" in "keys" (where 0 <= "i" <
// "num_keys"), if the column family specified by "column_family" contains an
// entry, it is returned it as a wide-column entity in "results[i]". If the
// entry is a wide-column entity, it is returned as-is; if it is a plain
// key-value, it is returned as an entity with a single anonymous column (see
// kDefaultWideColumnName) which contains the value.
//
// "statuses[i]" is set to OK if "keys[i]" is successfully retrieved. It is
// set to NotFound and an empty wide-column entity is returned in "results[i]"
// if there is no entry for "keys[i]". Finally, "statuses[i]" is set to some
// other non-OK status on error.
//
// If "keys" are sorted according to the column family's comparator, the
// "sorted_input" flag can be set for a small performance improvement.
//
// Note that it is the caller's responsibility to ensure that "keys",
// "results", and "statuses" point to "num_keys" number of contiguous objects
// (Slices, PinnableWideColumns, and Statuses respectively).
virtual void MultiGetEntity(const ReadOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
size_t num_keys, const Slice* /* keys */,
PinnableWideColumns* /* results */,
Status* statuses,
bool /* sorted_input */ = false) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported("MultiGetEntity not supported");
}
}
// Batched MultiGet-like API that returns wide-column entities potentially
// from multiple column families. For any given "key[i]" in "keys" (where 0 <=
// "i" < "num_keys"), if the column family specified by "column_families[i]"
// contains an entry, it is returned it as a wide-column entity in
// "results[i]". If the entry is a wide-column entity, it is returned as-is;
// if it is a plain key-value, it is returned as an entity with a single
// anonymous column (see kDefaultWideColumnName) which contains the value.
//
// "statuses[i]" is set to OK if "keys[i]" is successfully retrieved. It is
// set to NotFound and an empty wide-column entity is returned in "results[i]"
// if there is no entry for "keys[i]". Finally, "statuses[i]" is set to some
// other non-OK status on error.
//
// If "keys" are sorted by column family id and within each column family,
// according to the column family's comparator, the "sorted_input" flag can be
// set for a small performance improvement.
//
// Note that it is the caller's responsibility to ensure that
// "column_families", "keys", "results", and "statuses" point to "num_keys"
// number of contiguous objects (ColumnFamilyHandle pointers, Slices,
// PinnableWideColumns, and Statuses respectively).
virtual void MultiGetEntity(const ReadOptions& /* options */, size_t num_keys,
ColumnFamilyHandle** /* column_families */,
const Slice* /* keys */,
PinnableWideColumns* /* results */,
Status* statuses,
bool /* sorted_input */ = false) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported("MultiGetEntity not supported");
}
}
// If the key definitely does not exist in the database, then this method // If the key definitely does not exist in the database, then this method
// returns false, else true. If the caller wants to obtain value when the key // returns false, else true. If the caller wants to obtain value when the key
// is found in memory, a bool for 'value_found' must be passed. 'value_found' // is found in memory, a bool for 'value_found' must be passed. 'value_found'

@ -136,6 +136,24 @@ class StackableDB : public DB {
statuses, sorted_input); statuses, sorted_input);
} }
using DB::MultiGetEntity;
void MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) override {
db_->MultiGetEntity(options, column_family, num_keys, keys, results,
statuses, sorted_input);
}
void MultiGetEntity(const ReadOptions& options, size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableWideColumns* results, Status* statuses,
bool sorted_input) override {
db_->MultiGetEntity(options, num_keys, column_families, keys, results,
statuses, sorted_input);
}
using DB::IngestExternalFile; using DB::IngestExternalFile;
virtual Status IngestExternalFile( virtual Status IngestExternalFile(
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,

@ -253,7 +253,7 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr,
true /* do_merge */, nullptr, nullptr, nullptr, true /* do_merge */, nullptr, nullptr, nullptr,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, nullptr,
&statuses.back()); &statuses.back());
key_context.back().get_context = &get_context.back(); key_context.back().get_context = &get_context.back();
} }

@ -22,6 +22,7 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class GetContext; class GetContext;
class PinnableWideColumns;
struct KeyContext { struct KeyContext {
const Slice* key; const Slice* key;
@ -37,11 +38,13 @@ struct KeyContext {
bool is_blob_index; bool is_blob_index;
void* cb_arg; void* cb_arg;
PinnableSlice* value; PinnableSlice* value;
PinnableWideColumns* columns;
std::string* timestamp; std::string* timestamp;
GetContext* get_context; GetContext* get_context;
KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key,
PinnableSlice* val, std::string* ts, Status* stat) PinnableSlice* val, PinnableWideColumns* cols, std::string* ts,
Status* stat)
: key(&user_key), : key(&user_key),
lkey(nullptr), lkey(nullptr),
column_family(col_family), column_family(col_family),
@ -51,10 +54,9 @@ struct KeyContext {
is_blob_index(false), is_blob_index(false),
cb_arg(nullptr), cb_arg(nullptr),
value(val), value(val),
columns(cols),
timestamp(ts), timestamp(ts),
get_context(nullptr) {} get_context(nullptr) {}
KeyContext() = default;
}; };
// The MultiGetContext class is a container for the sorted list of keys that // The MultiGetContext class is a container for the sorted list of keys that

@ -617,7 +617,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
assert(result == WBWIIteratorImpl::kMergeInProgress || assert(result == WBWIIteratorImpl::kMergeInProgress ||
result == WBWIIteratorImpl::kNotFound); result == WBWIIteratorImpl::kNotFound);
key_context.emplace_back(column_family, keys[i], &values[i], key_context.emplace_back(column_family, keys[i], &values[i],
/*timestamp*/ nullptr, &statuses[i]); /* columns */ nullptr, /* timestamp */ nullptr,
&statuses[i]);
merges.emplace_back(result, std::move(merge_context)); merges.emplace_back(result, std::move(merge_context));
} }

Loading…
Cancel
Save