diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index d4b95f7f0..690a19775 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -1782,16 +1782,49 @@ TEST_F(DBBlobBasicTest, GetEntityBlob) { constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; + constexpr char other_key[] = "other_key"; + constexpr char other_blob_value[] = "other_blob_value"; + ASSERT_OK(Put(key, blob_value)); + ASSERT_OK(Put(other_key, other_blob_value)); ASSERT_OK(Flush()); - PinnableWideColumns result; - ASSERT_OK( - db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key, &result)); - WideColumns expected_columns{{kDefaultWideColumnName, blob_value}}; - ASSERT_EQ(result.columns(), expected_columns); + WideColumns other_expected_columns{ + {kDefaultWideColumnName, other_blob_value}}; + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key, + &result)); + ASSERT_EQ(result.columns(), expected_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + other_key, &result)); + + ASSERT_EQ(result.columns(), other_expected_columns); + } + + { + constexpr size_t num_keys = 2; + + std::array keys{{key, other_key}}; + std::array results; + std::array statuses; + + db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &results[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(results[0].columns(), expected_columns); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(results[1].columns(), other_expected_columns); + } } class DBBlobWithTimestampTest : public DBBasicTestWithTimestampBase { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 562f27e0b..36bf1f9cd 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2605,14 +2605,25 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { - return MultiGet(read_options, num_keys, column_families, keys, values, - /*timestamps=*/nullptr, statuses, sorted_input); + MultiGet(read_options, num_keys, column_families, keys, values, + /* timestamps */ nullptr, statuses, sorted_input); } void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, std::string* timestamps, Status* statuses, const bool sorted_input) { + MultiGetCommon(read_options, num_keys, column_families, keys, values, + /* columns */ nullptr, timestamps, statuses, sorted_input); +} + +void DBImpl::MultiGetCommon(const ReadOptions& read_options, + const size_t num_keys, + ColumnFamilyHandle** column_families, + const Slice* keys, PinnableSlice* values, + PinnableWideColumns* columns, + std::string* timestamps, Status* statuses, + const bool sorted_input) { if (num_keys == 0) { return; } @@ -2658,8 +2669,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { - values[i].Reset(); - key_context.emplace_back(column_families[i], keys[i], &values[i], + PinnableSlice* val = nullptr; + PinnableWideColumns* col = nullptr; + + if (values) { + val = &values[i]; + val->Reset(); + } else { + assert(columns); + + col = &columns[i]; + col->Reset(); + } + + key_context.emplace_back(column_families[i], keys[i], val, col, timestamps ? ×tamps[i] : nullptr, &statuses[i]); } @@ -2783,8 +2806,8 @@ void DBImpl::MultiGet(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { - return MultiGet(read_options, column_family, num_keys, keys, values, - /*timestamp=*/nullptr, statuses, sorted_input); + MultiGet(read_options, column_family, num_keys, keys, values, + /* timestamps */ nullptr, statuses, sorted_input); } void DBImpl::MultiGet(const ReadOptions& read_options, @@ -2792,6 +2815,16 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const Slice* keys, PinnableSlice* values, std::string* timestamps, Status* statuses, const bool sorted_input) { + MultiGetCommon(read_options, column_family, num_keys, keys, values, + /* columns */ nullptr, timestamps, statuses, sorted_input); +} + +void DBImpl::MultiGetCommon(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, PinnableWideColumns* columns, + std::string* timestamps, Status* statuses, + bool sorted_input) { if (tracer_) { // TODO: This mutex should be removed later, to improve performance when // tracing is enabled. @@ -2805,8 +2838,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options, autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { - values[i].Reset(); - key_context.emplace_back(column_family, keys[i], &values[i], + PinnableSlice* val = nullptr; + PinnableWideColumns* col = nullptr; + + if (values) { + val = &values[i]; + val->Reset(); + } else { + assert(columns); + + col = &columns[i]; + col->Reset(); + } + + key_context.emplace_back(column_family, keys[i], val, col, timestamps ? ×tamps[i] : nullptr, &statuses[i]); } @@ -2968,8 +3013,17 @@ Status DBImpl::MultiGetImpl( uint64_t bytes_read = 0; for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) { KeyContext* key = (*sorted_keys)[i]; + assert(key); + assert(key->s); + if (key->s->ok()) { - bytes_read += key->value->size(); + if (key->value) { + bytes_read += key->value->size(); + } else { + assert(key->columns); + bytes_read += key->columns->serialized_size(); + } + num_found++; } } @@ -2993,6 +3047,22 @@ Status DBImpl::MultiGetImpl( return s; } +void DBImpl::MultiGetEntity(const ReadOptions& options, size_t num_keys, + ColumnFamilyHandle** column_families, + const Slice* keys, PinnableWideColumns* results, + Status* statuses, bool sorted_input) { + MultiGetCommon(options, num_keys, column_families, keys, /* values */ nullptr, + results, /* timestamps */ nullptr, statuses, sorted_input); +} + +void DBImpl::MultiGetEntity(const ReadOptions& options, + ColumnFamilyHandle* column_family, size_t num_keys, + const Slice* keys, PinnableWideColumns* results, + Status* statuses, bool sorted_input) { + MultiGetCommon(options, column_family, num_keys, keys, /* values */ nullptr, + results, /* timestamps */ nullptr, statuses, sorted_input); +} + Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 75e048e7f..309021d74 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -277,33 +277,40 @@ class DBImpl : public DB { // The values and statuses parameters are arrays with number of elements // equal to keys.size(). This allows the storage for those to be alloacted // by the caller on the stack for small batches - virtual void MultiGet(const ReadOptions& options, - ColumnFamilyHandle* column_family, - const size_t num_keys, const Slice* keys, - PinnableSlice* values, Status* statuses, - const bool sorted_input = false) override; - virtual void MultiGet(const ReadOptions& options, - ColumnFamilyHandle* column_family, - const size_t num_keys, const Slice* keys, - PinnableSlice* values, std::string* timestamps, - Status* statuses, - const bool sorted_input = false) override; - - virtual void MultiGet(const ReadOptions& options, const size_t num_keys, - ColumnFamilyHandle** column_families, const Slice* keys, - PinnableSlice* values, Status* statuses, - const bool sorted_input = false) override; - virtual void MultiGet(const ReadOptions& options, const size_t num_keys, - ColumnFamilyHandle** column_families, const Slice* keys, - PinnableSlice* values, std::string* timestamps, - Status* statuses, - const bool sorted_input = false) override; - - virtual void MultiGetWithCallback( + void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + Status* statuses, const bool sorted_input = false) override; + void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, PinnableSlice* values, + std::string* timestamps, Status* statuses, + const bool sorted_input = false) override; + + void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override; + void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool sorted_input = false) override; + + void MultiGetWithCallback( const ReadOptions& options, ColumnFamilyHandle* column_family, ReadCallback* callback, autovector* sorted_keys); + using DB::MultiGetEntity; + + void MultiGetEntity(const ReadOptions& options, + ColumnFamilyHandle* column_family, size_t num_keys, + const Slice* keys, PinnableWideColumns* results, + Status* statuses, bool sorted_input) override; + + void MultiGetEntity(const ReadOptions& options, size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableWideColumns* results, Status* statuses, + bool sorted_input) override; + virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) override; @@ -2191,6 +2198,18 @@ class DBImpl : public DB { const size_t num_keys, bool sorted, autovector* key_ptrs); + void MultiGetCommon(const ReadOptions& options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + PinnableWideColumns* columns, std::string* timestamps, + Status* statuses, bool sorted_input); + + void MultiGetCommon(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, PinnableWideColumns* columns, + std::string* timestamps, Status* statuses, + bool sorted_input); + // A structure to hold the information required to process MultiGet of keys // belonging to one column family. For a multi column family MultiGet, there // will be a container of these objects. diff --git a/db/memtable.cc b/db/memtable.cc index fae4fc972..5cb84cfed 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1452,18 +1452,24 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } SequenceNumber dummy_seq; GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, - callback, &iter->is_blob_index, iter->value->GetSelf(), - /*columns=*/nullptr, iter->timestamp, iter->s, - &(iter->merge_context), &dummy_seq, &found_final_value, - &merge_in_progress); + callback, &iter->is_blob_index, + iter->value ? iter->value->GetSelf() : nullptr, iter->columns, + iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq, + &found_final_value, &merge_in_progress); if (!found_final_value && merge_in_progress) { *(iter->s) = Status::MergeInProgress(); } if (found_final_value) { - iter->value->PinSelf(); - range->AddValueSize(iter->value->size()); + if (iter->value) { + iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); + } else { + assert(iter->columns); + range->AddValueSize(iter->columns->serialized_size()); + } + range->MarkKeyDone(iter); RecordTick(moptions_.statistics, MEMTABLE_HIT); if (range->GetValueSize() > read_options.value_size_soft_limit) { diff --git a/db/version_set.cc b/db/version_set.cc index fd0c8e06b..d0ade94d5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -23,7 +23,6 @@ #include "db/blob/blob_fetcher.h" #include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" -#include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_source.h" #include "db/compaction/compaction.h" @@ -2177,26 +2176,35 @@ void Version::MultiGetBlob( autovector blob_reqs_in_file; BlobReadContexts& blobs_in_file = ctx.second; - for (const auto& blob : blobs_in_file) { - const BlobIndex& blob_index = blob.first; - const KeyContext& key_context = blob.second; + for (auto& blob : blobs_in_file) { + const BlobIndex& blob_index = blob.blob_index; + const KeyContext* const key_context = blob.key_context; + assert(key_context); + assert(key_context->get_context); + assert(key_context->s); + + if (key_context->value) { + key_context->value->Reset(); + } else { + assert(key_context->columns); + key_context->columns->Reset(); + } if (!blob_file_meta) { - *key_context.s = Status::Corruption("Invalid blob file number"); + *key_context->s = Status::Corruption("Invalid blob file number"); continue; } if (blob_index.HasTTL() || blob_index.IsInlined()) { - *key_context.s = + *key_context->s = Status::Corruption("Unexpected TTL/inlined blob index"); continue; } - key_context.value->Reset(); blob_reqs_in_file.emplace_back( - key_context.get_context->ukey_to_get_blob_value(), + key_context->get_context->ukey_to_get_blob_value(), blob_index.offset(), blob_index.size(), blob_index.compression(), - key_context.value, key_context.s); + &blob.result, key_context->s); } if (blob_reqs_in_file.size() > 0) { const auto file_size = blob_file_meta->GetBlobFileSize(); @@ -2211,18 +2219,29 @@ void Version::MultiGetBlob( for (auto& ctx : blob_ctxs) { BlobReadContexts& blobs_in_file = ctx.second; - for (const auto& blob : blobs_in_file) { - const KeyContext& key_context = blob.second; - if (key_context.s->ok()) { - range.AddValueSize(key_context.value->size()); + for (auto& blob : blobs_in_file) { + const KeyContext* const key_context = blob.key_context; + assert(key_context); + assert(key_context->get_context); + assert(key_context->s); + + if (key_context->s->ok()) { + if (key_context->value) { + *key_context->value = std::move(blob.result); + range.AddValueSize(key_context->value->size()); + } else { + assert(key_context->columns); + key_context->columns->SetPlainValue(blob.result); + range.AddValueSize(key_context->columns->serialized_size()); + } + if (range.GetValueSize() > read_options.value_size_soft_limit) { - *key_context.s = Status::Aborted(); + *key_context->s = Status::Aborted(); } - } else if (key_context.s->IsIncomplete()) { + } else if (key_context->s->IsIncomplete()) { // read_options.read_tier == kBlockCacheTier // Cannot read blob(s): no disk I/O allowed - assert(key_context.get_context); - auto& get_context = *(key_context.get_context); + auto& get_context = *(key_context->get_context); get_context.MarkKeyMayExist(); } } @@ -2457,7 +2476,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, - iter->ukey_with_ts, iter->value, /*columns=*/nullptr, iter->timestamp, + iter->ukey_with_ts, iter->value, iter->columns, iter->timestamp, nullptr, &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, @@ -2657,23 +2676,29 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - std::string* str_value = - iter->value != nullptr ? iter->value->GetSelf() : nullptr; + std::string result; + // `op_failure_scope` (an output parameter) is not provided (set to // nullptr) since a failure must be propagated regardless of its value. *status = MergeHelper::TimedFullMerge( merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(), - str_value, info_log_, db_statistics_, clock_, + &result, info_log_, db_statistics_, clock_, /* result_operand */ nullptr, /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr); if (LIKELY(iter->value != nullptr)) { + *iter->value->GetSelf() = std::move(result); iter->value->PinSelf(); range->AddValueSize(iter->value->size()); - range->MarkKeyDone(iter); - if (range->GetValueSize() > read_options.value_size_soft_limit) { - s = Status::Aborted(); - break; - } + } else { + assert(iter->columns); + iter->columns->SetPlainValue(result); + range->AddValueSize(iter->columns->serialized_size()); + } + + range->MarkKeyDone(iter); + if (range->GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; } } else { range->MarkKeyDone(iter); diff --git a/db/version_set.h b/db/version_set.h index 94b91bbce..9c428526c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -33,6 +33,7 @@ #include "cache/cache_helpers.h" #include "db/blob/blob_file_meta.h" +#include "db/blob/blob_index.h" #include "db/column_family.h" #include "db/compaction/compaction.h" #include "db/compaction/compaction_picker.h" @@ -889,8 +890,15 @@ class Version { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; - using BlobReadContext = - std::pair>; + struct BlobReadContext { + BlobReadContext(const BlobIndex& blob_idx, const KeyContext* key_ctx) + : blob_index(blob_idx), key_context(key_ctx) {} + + BlobIndex blob_index; + const KeyContext* key_context; + PinnableSlice result; + }; + using BlobReadContexts = std::vector; void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range, std::unordered_map& blob_ctxs); diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h index 51f58cdad..8041c91fd 100644 --- a/db/version_set_sync_and_async.h +++ b/db/version_set_sync_and_async.h @@ -101,23 +101,39 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) file_range.MarkKeyDone(iter); if (iter->is_blob_index) { + BlobIndex blob_index; + Status tmp_s; + if (iter->value) { TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", &(*iter)); - const Slice& blob_index_slice = *(iter->value); - BlobIndex blob_index; - Status tmp_s = blob_index.DecodeFrom(blob_index_slice); - if (tmp_s.ok()) { - const uint64_t blob_file_num = blob_index.file_number(); - blob_ctxs[blob_file_num].emplace_back( - std::make_pair(blob_index, std::cref(*iter))); - } else { - *(iter->s) = tmp_s; - } + tmp_s = blob_index.DecodeFrom(*(iter->value)); + + } else { + assert(iter->columns); + assert(!iter->columns->columns().empty()); + assert(iter->columns->columns().front().name() == + kDefaultWideColumnName); + + tmp_s = + blob_index.DecodeFrom(iter->columns->columns().front().value()); + } + + if (tmp_s.ok()) { + const uint64_t blob_file_num = blob_index.file_number(); + blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter); + } else { + *(iter->s) = tmp_s; } } else { - file_range.AddValueSize(iter->value->size()); + if (iter->value) { + file_range.AddValueSize(iter->value->size()); + } else { + assert(iter->columns); + file_range.AddValueSize(iter->columns->serialized_size()); + } + if (file_range.GetValueSize() > read_options.value_size_soft_limit) { s = Status::Aborted(); break; diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index 2c345d464..536a543e6 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -105,6 +105,26 @@ TEST_F(DBWideBasicTest, PutEntity) { ASSERT_EQ(values[2], third_value); } + { + constexpr size_t num_keys = 3; + + std::array keys{{first_key, second_key, third_key}}; + std::array results; + std::array statuses; + + db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &results[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(results[0].columns(), first_columns); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(results[1].columns(), second_columns); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(results[2].columns(), expected_third_columns); + } + { std::unique_ptr iter(db_->NewIterator(ReadOptions())); @@ -210,6 +230,40 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) { ASSERT_OK(db_->Write(WriteOptions(), &batch)); } +TEST_F(DBWideBasicTest, MultiCFMultiGetEntity) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"corinthian"}, options); + + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + ASSERT_OK( + db_->PutEntity(WriteOptions(), handles_[1], second_key, second_columns)); + + constexpr size_t num_keys = 2; + + std::array column_families{ + {db_->DefaultColumnFamily(), handles_[1]}}; + std::array keys{{first_key, second_key}}; + std::array results; + std::array statuses; + + db_->MultiGetEntity(ReadOptions(), num_keys, &column_families[0], &keys[0], + &results[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(results[0].columns(), first_columns); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(results[1].columns(), second_columns); +} + TEST_F(DBWideBasicTest, MergePlainKeyValue) { Options options = GetDefaultOptions(); options.create_if_missing = true; @@ -280,6 +334,26 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) { ASSERT_EQ(result.columns(), expected_third_columns); } + { + constexpr size_t num_keys = 3; + + std::array keys{{first_key, second_key, third_key}}; + std::array results; + std::array statuses; + + db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &results[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(results[0].columns(), expected_first_columns); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(results[1].columns(), expected_second_columns); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(results[2].columns(), expected_third_columns); + } + { std::unique_ptr iter(db_->NewIterator(ReadOptions())); @@ -457,6 +531,23 @@ TEST_F(DBWideBasicTest, MergeEntity) { ASSERT_OK(statuses[1]); } + { + constexpr size_t num_keys = 2; + + std::array keys{{first_key, second_key}}; + std::array results; + std::array statuses; + + db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &results[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(results[0].columns(), first_expected_columns); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(results[1].columns(), second_expected_columns); + } + { std::unique_ptr iter(db_->NewIterator(ReadOptions())); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index fc027a9d3..09307efb3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -769,6 +769,68 @@ class DB { } } + // Batched MultiGet-like API that returns wide-column entities from a single + // column family. For any given "key[i]" in "keys" (where 0 <= "i" < + // "num_keys"), if the column family specified by "column_family" contains an + // entry, it is returned it as a wide-column entity in "results[i]". If the + // entry is a wide-column entity, it is returned as-is; if it is a plain + // key-value, it is returned as an entity with a single anonymous column (see + // kDefaultWideColumnName) which contains the value. + // + // "statuses[i]" is set to OK if "keys[i]" is successfully retrieved. It is + // set to NotFound and an empty wide-column entity is returned in "results[i]" + // if there is no entry for "keys[i]". Finally, "statuses[i]" is set to some + // other non-OK status on error. + // + // If "keys" are sorted according to the column family's comparator, the + // "sorted_input" flag can be set for a small performance improvement. + // + // Note that it is the caller's responsibility to ensure that "keys", + // "results", and "statuses" point to "num_keys" number of contiguous objects + // (Slices, PinnableWideColumns, and Statuses respectively). + virtual void MultiGetEntity(const ReadOptions& /* options */, + ColumnFamilyHandle* /* column_family */, + size_t num_keys, const Slice* /* keys */, + PinnableWideColumns* /* results */, + Status* statuses, + bool /* sorted_input */ = false) { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::NotSupported("MultiGetEntity not supported"); + } + } + + // Batched MultiGet-like API that returns wide-column entities potentially + // from multiple column families. For any given "key[i]" in "keys" (where 0 <= + // "i" < "num_keys"), if the column family specified by "column_families[i]" + // contains an entry, it is returned it as a wide-column entity in + // "results[i]". If the entry is a wide-column entity, it is returned as-is; + // if it is a plain key-value, it is returned as an entity with a single + // anonymous column (see kDefaultWideColumnName) which contains the value. + // + // "statuses[i]" is set to OK if "keys[i]" is successfully retrieved. It is + // set to NotFound and an empty wide-column entity is returned in "results[i]" + // if there is no entry for "keys[i]". Finally, "statuses[i]" is set to some + // other non-OK status on error. + // + // If "keys" are sorted by column family id and within each column family, + // according to the column family's comparator, the "sorted_input" flag can be + // set for a small performance improvement. + // + // Note that it is the caller's responsibility to ensure that + // "column_families", "keys", "results", and "statuses" point to "num_keys" + // number of contiguous objects (ColumnFamilyHandle pointers, Slices, + // PinnableWideColumns, and Statuses respectively). + virtual void MultiGetEntity(const ReadOptions& /* options */, size_t num_keys, + ColumnFamilyHandle** /* column_families */, + const Slice* /* keys */, + PinnableWideColumns* /* results */, + Status* statuses, + bool /* sorted_input */ = false) { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::NotSupported("MultiGetEntity not supported"); + } + } + // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key // is found in memory, a bool for 'value_found' must be passed. 'value_found' diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 356ea08d9..abb365ad3 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -136,6 +136,24 @@ class StackableDB : public DB { statuses, sorted_input); } + using DB::MultiGetEntity; + + void MultiGetEntity(const ReadOptions& options, + ColumnFamilyHandle* column_family, size_t num_keys, + const Slice* keys, PinnableWideColumns* results, + Status* statuses, bool sorted_input) override { + db_->MultiGetEntity(options, column_family, num_keys, keys, results, + statuses, sorted_input); + } + + void MultiGetEntity(const ReadOptions& options, size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableWideColumns* results, Status* statuses, + bool sorted_input) override { + db_->MultiGetEntity(options, num_keys, column_families, keys, results, + statuses, sorted_input); + } + using DB::IngestExternalFile; virtual Status IngestExternalFile( ColumnFamilyHandle* column_family, diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 0981cfb61..eb1175a7d 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -253,7 +253,7 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) { nullptr, nullptr, nullptr, nullptr, true /* do_merge */, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr); - key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, + key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, nullptr, &statuses.back()); key_context.back().get_context = &get_context.back(); } diff --git a/table/multiget_context.h b/table/multiget_context.h index ca369dcb2..54af2262c 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -22,6 +22,7 @@ namespace ROCKSDB_NAMESPACE { class GetContext; +class PinnableWideColumns; struct KeyContext { const Slice* key; @@ -37,11 +38,13 @@ struct KeyContext { bool is_blob_index; void* cb_arg; PinnableSlice* value; + PinnableWideColumns* columns; std::string* timestamp; GetContext* get_context; KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, - PinnableSlice* val, std::string* ts, Status* stat) + PinnableSlice* val, PinnableWideColumns* cols, std::string* ts, + Status* stat) : key(&user_key), lkey(nullptr), column_family(col_family), @@ -51,10 +54,9 @@ struct KeyContext { is_blob_index(false), cb_arg(nullptr), value(val), + columns(cols), timestamp(ts), get_context(nullptr) {} - - KeyContext() = default; }; // The MultiGetContext class is a container for the sorted list of keys that diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 8192525bd..f20b7e5e3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -617,7 +617,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( assert(result == WBWIIteratorImpl::kMergeInProgress || result == WBWIIteratorImpl::kNotFound); key_context.emplace_back(column_family, keys[i], &values[i], - /*timestamp*/ nullptr, &statuses[i]); + /* columns */ nullptr, /* timestamp */ nullptr, + &statuses[i]); merges.emplace_back(result, std::move(merge_context)); }