Add initial blob support to batched MultiGet (#7766)

Summary:
The patch adds initial support for reading blobs to the batched `MultiGet` API.
The current implementation simply retrieves the blob values as the blob indexes
are encountered; that is, reads from blob files are currently not batched. (This
will be optimized in a separate phase.) In addition, the patch removes some dead
code related to BlobDB from the batched `MultiGet` implementation, namely the
`is_blob` / `is_blob_index` flags that are passed around in `DBImpl` and `MemTable` /
`MemTableListVersion`. These were never hooked up to anything and wouldn't
work anyways, since a single flag is not sufficient to communicate the "blobness"
of multiple key-values.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7766

Test Plan: `make check`

Reviewed By: jay-zhuang

Differential Revision: D25479290

Pulled By: ltamasi

fbshipit-source-id: 7aba2d290e31876ee592bcf1adfd1018713a8000
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 003e72b201
commit 1afbd1948c
  1. 123
      db/blob/db_blob_basic_test.cc
  2. 15
      db/db_impl/db_impl.cc
  3. 3
      db/db_impl/db_impl.h
  4. 8
      db/memtable.cc
  5. 2
      db/memtable.h
  6. 6
      db/memtable_list.cc
  7. 2
      db/memtable_list.h
  8. 43
      db/version_set.cc
  9. 2
      db/version_set.h
  10. 2
      table/multiget_context.h

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include <array>
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
@ -44,6 +46,85 @@ TEST_F(DBBlobBasicTest, GetBlob) {
.IsIncomplete()); .IsIncomplete());
} }
TEST_F(DBBlobBasicTest, MultiGetBlobs) {
constexpr size_t min_blob_size = 6;
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = min_blob_size;
Reopen(options);
// Put then retrieve three key-values. The first value is below the size limit
// and is thus stored inline; the other two are stored separately as blobs.
constexpr size_t num_keys = 3;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "short";
static_assert(sizeof(first_value) - 1 < min_blob_size,
"first_value too long to be inlined");
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "long_value";
static_assert(sizeof(second_value) - 1 >= min_blob_size,
"second_value too short to be stored as blob");
ASSERT_OK(Put(second_key, second_value));
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "other_long_value";
static_assert(sizeof(third_value) - 1 >= min_blob_size,
"third_value too short to be stored as blob");
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Flush());
ReadOptions read_options;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
// Try again with no I/O allowed. The table and the necessary blocks should
// already be in their respective caches. The first (inlined) value should be
// successfully read; however, the two blob values could only be read from the
// blob file, so for those the read should return Incomplete.
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_TRUE(statuses[1].IsIncomplete());
ASSERT_TRUE(statuses[2].IsIncomplete());
}
}
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.enable_blob_files = true; options.enable_blob_files = true;
@ -175,6 +256,48 @@ TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
constexpr size_t num_keys = 2;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_TRUE(statuses[0].IsIOError());
ASSERT_TRUE(statuses[1].IsIOError());
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -2205,7 +2205,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
&sorted_keys, cf_iter->super_version, consistent_seqnum, &sorted_keys, cf_iter->super_version, consistent_seqnum,
read_callback, nullptr); read_callback);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -2377,7 +2377,7 @@ void DBImpl::MultiGetWithCallback(
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum, multiget_cf_data[0].super_version, consistent_seqnum,
read_callback, nullptr); read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted()); assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version); multiget_cf_data[0].super_version);
@ -2396,7 +2396,7 @@ Status DBImpl::MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys, const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys, autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot, SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback, bool* is_blob_index) { ReadCallback* callback) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_MULTIGET); StopWatch sw(env_, stats_, DB_MULTIGET);
@ -2435,11 +2435,9 @@ Status DBImpl::MultiGetImpl(
(read_options.read_tier == kPersistedTier && (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed)); has_unpersisted_data_.load(std::memory_order_relaxed));
if (!skip_memtable) { if (!skip_memtable) {
super_version->mem->MultiGet(read_options, &range, callback, super_version->mem->MultiGet(read_options, &range, callback);
is_blob_index);
if (!range.empty()) { if (!range.empty()) {
super_version->imm->MultiGet(read_options, &range, callback, super_version->imm->MultiGet(read_options, &range, callback);
is_blob_index);
} }
if (!range.empty()) { if (!range.empty()) {
lookup_current = true; lookup_current = true;
@ -2449,8 +2447,7 @@ Status DBImpl::MultiGetImpl(
} }
if (lookup_current) { if (lookup_current) {
PERF_TIMER_GUARD(get_from_output_files_time); PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->MultiGet(read_options, &range, callback, super_version->current->MultiGet(read_options, &range, callback);
is_blob_index);
} }
curr_value_size = range.GetValueSize(); curr_value_size = range.GetValueSize();
if (curr_value_size > read_options.value_size_soft_limit) { if (curr_value_size > read_options.value_size_soft_limit) {

@ -1866,8 +1866,7 @@ class DBImpl : public DB {
Status MultiGetImpl( Status MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys, const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys, autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback, SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback);
bool* is_blob_index);
Status DisableFileDeletionsWithLock(); Status DisableFileDeletionsWithLock();

@ -897,7 +897,7 @@ void MemTable::GetFromTable(const LookupKey& key,
} }
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob) { ReadCallback* callback) {
// The sequence number is updated synchronously in version_set.h // The sequence number is updated synchronously in version_set.h
if (IsEmpty()) { if (IsEmpty()) {
// Avoiding recording stats for speed. // Avoiding recording stats for speed.
@ -950,9 +950,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
} }
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, is_blob, iter->value->GetSelf(), iter->timestamp, callback, &iter->is_blob_index, iter->value->GetSelf(),
iter->s, &(iter->merge_context), &seq, &found_final_value, iter->timestamp, iter->s, &(iter->merge_context), &seq,
&merge_in_progress); &found_final_value, &merge_in_progress);
if (!found_final_value && merge_in_progress) { if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress(); *(iter->s) = Status::MergeInProgress();

@ -238,7 +238,7 @@ class MemTable {
} }
void MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob); ReadCallback* callback);
// If `key` exists in current memtable with type `kTypeValue` and the existing // If `key` exists in current memtable with type `kTypeValue` and the existing
// value is at least as large as the new value, updates it in-place. Otherwise // value is at least as large as the new value, updates it in-place. Otherwise

@ -113,10 +113,10 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
} }
void MemTableListVersion::MultiGet(const ReadOptions& read_options, void MemTableListVersion::MultiGet(const ReadOptions& read_options,
MultiGetRange* range, ReadCallback* callback, MultiGetRange* range,
bool* is_blob) { ReadCallback* callback) {
for (auto memtable : memlist_) { for (auto memtable : memlist_) {
memtable->MultiGet(read_options, range, callback, is_blob); memtable->MultiGet(read_options, range, callback);
if (range->empty()) { if (range->empty()) {
return; return;
} }

@ -76,7 +76,7 @@ class MemTableListVersion {
} }
void MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob); ReadCallback* callback);
// Returns all the merge operands corresponding to the key by searching all // Returns all the merge operands corresponding to the key by searching all
// memtables starting from the most recent one. // memtables starting from the most recent one.

@ -1938,6 +1938,17 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
// TODO: update per-level perfcontext user_key_return_count for kMerge // TODO: update per-level perfcontext user_key_return_count for kMerge
break; break;
case GetContext::kFound: case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
if (is_blob_index) { if (is_blob_index) {
if (do_merge && value) { if (do_merge && value) {
*status = GetBlob(read_options, user_key, *value, value); *status = GetBlob(read_options, user_key, *value, value);
@ -1950,15 +1961,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} }
} }
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
return; return;
case GetContext::kDeleted: case GetContext::kDeleted:
// Use empty error message for speed // Use empty error message for speed
@ -2008,7 +2010,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} }
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob) { ReadCallback* callback) {
PinnedIteratorsManager pinned_iters_mgr; PinnedIteratorsManager pinned_iters_mgr;
// Pin blocks that we read to hold merge operands // Pin blocks that we read to hold merge operands
@ -2033,7 +2035,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
iter->ukey_with_ts, iter->value, iter->timestamp, nullptr, iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
&(iter->merge_context), true, &iter->max_covering_tombstone_seq, &(iter->merge_context), true, &iter->max_covering_tombstone_seq,
this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr,
callback, is_blob, tracing_mget_id); callback, &iter->is_blob_index, tracing_mget_id);
// MergeInProgress status, if set, has been transferred to the get_context // MergeInProgress status, if set, has been transferred to the get_context
// state, so we set status to ok here. From now on, the iter status will // state, so we set status to ok here. From now on, the iter status will
// be used for IO errors, and get_context state will be used for any // be used for IO errors, and get_context state will be used for any
@ -2135,10 +2137,27 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
} else if (fp.GetHitFileLevel() >= 2) { } else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP); RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
} }
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel()); fp.GetHitFileLevel());
file_range.AddValueSize(iter->value->size());
file_range.MarkKeyDone(iter); file_range.MarkKeyDone(iter);
if (iter->is_blob_index) {
if (iter->value) {
*status = GetBlob(read_options, iter->ukey_with_ts, *iter->value,
iter->value);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();
}
continue;
}
}
}
file_range.AddValueSize(iter->value->size());
if (file_range.GetValueSize() > read_options.value_size_soft_limit) { if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted(); s = Status::Aborted();
break; break;

@ -682,7 +682,7 @@ class Version {
bool* is_blob = nullptr, bool do_merge = true); bool* is_blob = nullptr, bool do_merge = true);
void MultiGet(const ReadOptions&, MultiGetRange* range, void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr); ReadCallback* callback = nullptr);
// Interprets blob_index_slice as a blob reference, and (assuming the // Interprets blob_index_slice as a blob reference, and (assuming the
// corresponding blob file is part of this Version) retrieves the blob and // corresponding blob file is part of this Version) retrieves the blob and

@ -31,6 +31,7 @@ struct KeyContext {
MergeContext merge_context; MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq; SequenceNumber max_covering_tombstone_seq;
bool key_exists; bool key_exists;
bool is_blob_index;
void* cb_arg; void* cb_arg;
PinnableSlice* value; PinnableSlice* value;
std::string* timestamp; std::string* timestamp;
@ -44,6 +45,7 @@ struct KeyContext {
s(stat), s(stat),
max_covering_tombstone_seq(0), max_covering_tombstone_seq(0),
key_exists(false), key_exists(false),
is_blob_index(false),
cb_arg(nullptr), cb_arg(nullptr),
value(val), value(val),
timestamp(ts), timestamp(ts),

Loading…
Cancel
Save