Support for Merge in Integrated BlobDB with base values (#8292)

Summary:
This PR add support for Merge operation in Integrated BlobDB with base values(i.e DB::Put). Merged values can be retrieved through  DB::Get, DB::MultiGet, DB::GetMergeOperands and Iterator operation.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8292

Test Plan: Add new unit tests

Reviewed By: ltamasi

Differential Revision: D28415896

Pulled By: akankshamahajan15

fbshipit-source-id: e9b3478bef51d2f214fb88c31ed3c8d2f4a531ff
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent d61a449364
commit 3897ce3125
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 22
      db/blob/blob_fetcher.cc
  4. 26
      db/blob/blob_fetcher.h
  5. 57
      db/blob/db_blob_basic_test.cc
  6. 110
      db/blob/db_blob_index_test.cc
  7. 114
      db/db_iter.cc
  8. 2
      db/db_iter.h
  9. 106
      db/db_merge_operand_test.cc
  10. 7
      db/version_set.cc
  11. 1
      src.mk
  12. 121
      table/get_context.cc
  13. 10
      table/get_context.h

@ -628,6 +628,7 @@ set(SOURCES
cache/lru_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_fetcher.cc
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc
db/blob/blob_file_cache.cc

@ -137,6 +137,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_cache.cc",
@ -448,6 +449,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_cache.cc",

@ -0,0 +1,22 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_fetcher.h"
#include "db/version_set.h"
namespace ROCKSDB_NAMESPACE {
Status BlobFetcher::FetchBlob(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value) {
Status s;
assert(version_);
constexpr uint64_t* bytes_read = nullptr;
s = version_->GetBlob(read_options_, user_key, blob_index, blob_value,
bytes_read);
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,26 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/options.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class Version;
class BlobFetcher {
public:
BlobFetcher(Version* version, const ReadOptions& read_options)
: version_(version), read_options_(read_options) {}
Status FetchBlob(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value);
private:
Version* version_;
ReadOptions read_options_;
};
} // namespace ROCKSDB_NAMESPACE

@ -310,6 +310,63 @@ TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
}
TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
ASSERT_OK(Put("Key1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v3"));
ASSERT_OK(Flush());
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value));
ASSERT_EQ(Get("Key1"), "v1,v2,v3");
}
TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
constexpr size_t num_keys = 3;
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.enable_blob_files = true;
options.min_blob_size = 0;
Reopen(options);
ASSERT_OK(Put("Key0", "v0_0"));
ASSERT_OK(Put("Key1", "v1_0"));
ASSERT_OK(Put("Key2", "v2_0"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key0", "v0_1"));
ASSERT_OK(Merge("Key1", "v1_1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key0", "v0_2"));
ASSERT_OK(Flush());
std::array<Slice, num_keys> keys{{"Key0", "Key1", "Key2"}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], "v0_0,v0_1,v0_2");
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], "v1_0,v1_1");
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], "v2_0");
}
class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
public testing::WithParamInterface<std::string> {
protected:

@ -399,7 +399,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
create_normal_iterator);
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_normal_iterator);
verify(11, Status::kNotSupported, "", "", create_normal_iterator);
verify(11, Status::kCorruption, "", "", create_normal_iterator);
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
@ -418,7 +418,11 @@ TEST_F(DBBlobIndexTest, Iterate) {
create_blob_iterator, check_is_blob(false));
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_blob_iterator, check_is_blob(false));
if (tier <= kImmutableMemtables) {
verify(11, Status::kNotSupported, "", "", create_blob_iterator);
} else {
verify(11, Status::kCorruption, "", "", create_blob_iterator);
}
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
@ -440,7 +444,11 @@ TEST_F(DBBlobIndexTest, Iterate) {
create_blob_iterator, check_is_blob(false));
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_blob_iterator, check_is_blob(false));
if (tier <= kImmutableMemtables) {
verify(11, Status::kNotSupported, "", "", create_blob_iterator);
} else {
verify(11, Status::kCorruption, "", "", create_blob_iterator);
}
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
@ -455,6 +463,106 @@ TEST_F(DBBlobIndexTest, Iterate) {
}
}
TEST_F(DBBlobIndexTest, IntegratedBlobIterate) {
const std::vector<std::vector<std::string>> data = {
/*00*/ {"Put"},
/*01*/ {"Put", "Merge", "Merge", "Merge"},
/*02*/ {"Put"}};
auto get_key = [](size_t index) { return ("key" + std::to_string(index)); };
auto get_value = [&](size_t index, size_t version) {
return get_key(index) + "_value" + ToString(version);
};
auto check_iterator = [&](Iterator* iterator, Status expected_status,
const Slice& expected_value) {
ASSERT_EQ(expected_status, iterator->status());
if (expected_status.ok()) {
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ(expected_value, iterator->value());
} else {
ASSERT_FALSE(iterator->Valid());
}
};
auto verify = [&](size_t index, Status expected_status,
const Slice& expected_value) {
// Seek
{
Iterator* iterator = db_->NewIterator(ReadOptions());
std::unique_ptr<Iterator> iterator_guard(iterator);
ASSERT_OK(iterator->status());
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index));
check_iterator(iterator, expected_status, expected_value);
}
// Next
{
Iterator* iterator = db_->NewIterator(ReadOptions());
std::unique_ptr<Iterator> iterator_guard(iterator);
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index - 1));
ASSERT_TRUE(iterator->Valid());
ASSERT_OK(iterator->status());
iterator->Next();
check_iterator(iterator, expected_status, expected_value);
}
// SeekForPrev
{
Iterator* iterator = db_->NewIterator(ReadOptions());
std::unique_ptr<Iterator> iterator_guard(iterator);
ASSERT_OK(iterator->status());
ASSERT_OK(iterator->Refresh());
iterator->SeekForPrev(get_key(index));
check_iterator(iterator, expected_status, expected_value);
}
// Prev
{
Iterator* iterator = db_->NewIterator(ReadOptions());
std::unique_ptr<Iterator> iterator_guard(iterator);
iterator->Seek(get_key(index + 1));
ASSERT_TRUE(iterator->Valid());
ASSERT_OK(iterator->status());
iterator->Prev();
check_iterator(iterator, expected_status, expected_value);
}
};
Options options = GetTestOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
DestroyAndReopen(options);
// fill data
for (size_t i = 0; i < data.size(); i++) {
for (size_t j = 0; j < data[i].size(); j++) {
std::string key = get_key(i);
std::string value = get_value(i, j);
if (data[i][j] == "Put") {
ASSERT_OK(Put(key, value));
ASSERT_OK(Flush());
} else if (data[i][j] == "Merge") {
ASSERT_OK(Merge(key, value));
ASSERT_OK(Flush());
}
}
}
std::string expected_value = get_value(1, 0) + "," + get_value(1, 1) + "," +
get_value(1, 2) + "," + get_value(1, 3);
Status expected_status;
verify(1, expected_status, expected_value);
#ifndef ROCKSDB_LITE
// Test DBIter::FindValueForCurrentKeyUsingSeek flow.
ASSERT_OK(dbfull()->SetOptions(cfh(),
{{"max_sequential_skip_in_iterations", "0"}}));
verify(1, expected_status, expected_value);
#endif // !ROCKSDB_LITE
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -576,12 +576,8 @@ bool DBIter::MergeValuesNewToOld() {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_.value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_, true);
Status s = Merge(&val, ikey.user_key);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
// iter_ is positioned after put
@ -598,9 +594,31 @@ bool DBIter::MergeValuesNewToOld() {
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
status_ = Status::NotSupported("BlobDB does not support merge operator.");
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
}
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
return false;
}
valid_ = true;
const Slice blob_value = value();
Status s = Merge(&blob_value, ikey.user_key);
if (!s.ok()) {
return false;
}
is_blob_ = false;
// iter_ is positioned after put
iter_.Next();
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
return true;
} else {
valid_ = false;
status_ = Status::Corruption(
@ -619,16 +637,10 @@ bool DBIter::MergeValuesNewToOld() {
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
&pinned_value_, true);
Status s = Merge(nullptr, saved_key_.GetUserKey());
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
assert(status_.ok());
return true;
}
@ -931,21 +943,36 @@ bool DBIter::FindValueForCurrentKey() {
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
clock_, &pinned_value_, true);
s = Merge(nullptr, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
return true;
} else if (last_not_merge_type == kTypeBlobIndex) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
}
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
valid_ = true;
const Slice blob_value = value();
s = Merge(&blob_value, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
is_blob_ = false;
return true;
} else {
assert(last_not_merge_type == kTypeValue);
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
clock_, &pinned_value_, true);
s = Merge(&pinned_value_, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
return true;
}
break;
case kTypeValue:
@ -955,7 +982,6 @@ bool DBIter::FindValueForCurrentKey() {
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
break;
default:
valid_ = false;
@ -1095,25 +1121,33 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (ikey.type == kTypeValue) {
const Slice val = iter_.value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &val,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
clock_, &pinned_value_, true);
Status s = Merge(&val, saved_key_.GetUserKey());
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
valid_ = true;
return true;
} else if (ikey.type == kTypeMerge) {
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
status_ = Status::NotSupported("BlobDB does not support merge operator.");
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
}
if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
return false;
}
valid_ = true;
const Slice blob_value = value();
Status s = Merge(&blob_value, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
}
is_blob_ = false;
return true;
} else {
valid_ = false;
status_ = Status::Corruption(
@ -1123,13 +1157,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}
}
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_, clock_,
&pinned_value_, true);
Status s = Merge(nullptr, saved_key_.GetUserKey());
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
@ -1152,6 +1181,19 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return true;
}
Status DBIter::Merge(const Slice* val, const Slice& user_key) {
Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return s;
}
valid_ = true;
return s;
}
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {

@ -299,6 +299,8 @@ class DBIter final : public Iterator {
// index when using the integrated BlobDB implementation.
bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);
Status Merge(const Slice* val, const Slice& user_key);
const SliceTransform* prefix_extractor_;
Env* const env_;
SystemClock* clock_;

@ -19,13 +19,7 @@
namespace ROCKSDB_NAMESPACE {
class DBMergeOperandTest : public DBTestBase {
public:
DBMergeOperandTest()
: DBTestBase("/db_merge_operand_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
namespace {
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
public:
LimitedStringAppendMergeOp(int limit, char delim)
@ -45,7 +39,15 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
private:
size_t limit_ = 0;
};
} // namespace
class DBMergeOperandTest : public DBTestBase {
public:
DBMergeOperandTest()
: DBTestBase("/db_merge_operand_test", /*env_do_fsync=*/true) {}
};
TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
Options options;
options.create_if_missing = true;
// Use only the latest two merge operands.
@ -214,7 +216,8 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
ASSERT_EQ(values[2], "dc");
ASSERT_EQ(values[3], "ed");
// First 3 k5 values are in SST and next 4 k5 values are in Immutable Memtable
// First 3 k5 values are in SST and next 4 k5 values are in Immutable
// Memtable
ASSERT_OK(Merge("k5", "who"));
ASSERT_OK(Merge("k5", "am"));
ASSERT_OK(Merge("k5", "i"));
@ -232,6 +235,93 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
ASSERT_EQ(values[2], "am");
}
TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
Options options;
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
// Use only the latest two merge operands.
options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.env = env_;
Reopen(options);
int num_records = 4;
int number_of_operands = 0;
std::vector<PinnableSlice> values(num_records);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = num_records;
// All k1 values are in memtable.
ASSERT_OK(Put("k1", "x"));
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k1", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(values[0], "x");
ASSERT_EQ(values[1], "b");
ASSERT_EQ(values[2], "c");
ASSERT_EQ(values[3], "d");
// expected_max_number_of_operands is less than number of merge operands so
// status should be Incomplete.
merge_operands_info.expected_max_number_of_operands = num_records - 1;
Status status = db_->GetMergeOperands(
ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
&merge_operands_info, &number_of_operands);
ASSERT_EQ(status.IsIncomplete(), true);
merge_operands_info.expected_max_number_of_operands = num_records;
// All k2 values are flushed to L0 into a single file.
ASSERT_OK(Put("k2", "q"));
ASSERT_OK(Merge("k2", "w"));
ASSERT_OK(Merge("k2", "e"));
ASSERT_OK(Merge("k2", "r"));
ASSERT_OK(Flush());
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k2", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(values[0], "q,w,e,r");
// Do some compaction that will make the following tests more predictable
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// All k3 values are flushed and are in different files.
ASSERT_OK(Put("k3", "ab"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "bc"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k3", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(values[0], "ab");
ASSERT_EQ(values[1], "bc");
ASSERT_EQ(values[2], "cd");
ASSERT_EQ(values[3], "de");
// All K4 values are in different levels
ASSERT_OK(Put("k4", "ba"));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
ASSERT_OK(Merge("k4", "cb"));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Merge("k4", "dc"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Merge("k4", "ed"));
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k4", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(values[0], "ba");
ASSERT_EQ(values[1], "cb");
ASSERT_EQ(values[2], "dc");
ASSERT_EQ(values[3], "ed");
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -21,6 +21,7 @@
#include <vector>
#include "compaction/compaction.h"
#include "db/blob/blob_fetcher.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_index.h"
@ -1875,6 +1876,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
// need to provide it here.
bool is_blob_index = false;
bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
BlobFetcher blob_fetcher(this, read_options);
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
@ -1882,7 +1884,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
merge_context, do_merge, max_covering_tombstone_seq, clock_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
tracing_get_id);
tracing_get_id, &blob_fetcher);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
@ -2031,6 +2033,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
// use autovector in order to avoid unnecessary construction of GetContext
// objects, which is expensive
autovector<GetContext, 16> get_ctx;
BlobFetcher blob_fetcher(this, read_options);
for (auto iter = range->begin(); iter != range->end(); ++iter) {
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
@ -2039,7 +2042,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
&(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
&iter->is_blob_index, tracing_mget_id);
&iter->is_blob_index, tracing_mget_id, &blob_fetcher);
// MergeInProgress status, if set, has been transferred to the get_context
// state, so we set status to ok here. From now on, the iter status will
// be used for IO errors, and get_context state will be used for any

@ -6,6 +6,7 @@ LIB_SOURCES = \
cache/lru_cache.cc \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob/blob_fetcher.cc \
db/blob/blob_file_addition.cc \
db/blob/blob_file_builder.cc \
db/blob/blob_file_cache.cc \

@ -39,14 +39,17 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
} // namespace
GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found,
GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* pinnable_val,
std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge,
SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
SequenceNumber* _max_covering_tombstone_seq,
SystemClock* clock, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index,
uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@ -65,7 +68,8 @@ GetContext::GetContext(
callback_(callback),
do_merge_(do_merge),
is_blob_index_(is_blob_index),
tracing_get_id_(tracing_get_id) {
tracing_get_id_(tracing_get_id),
blob_fetcher_(blob_fetcher) {
if (seq_) {
*seq_ = kMaxSequenceNumber;
}
@ -79,11 +83,11 @@ GetContext::GetContext(
bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
SystemClock* clock, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
pinnable_val, nullptr, value_found, merge_context, do_merge,
_max_covering_tombstone_seq, clock, seq, _pinned_iters_mgr,
callback, is_blob_index, tracing_get_id) {}
callback, is_blob_index, tracing_get_id, blob_fetcher) {}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
@ -250,6 +254,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kUnexpectedBlobIndex;
return false;
}
if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex);
}
if (kNotFound == state_) {
state_ = kFound;
if (do_merge_) {
@ -260,7 +267,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else {
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
this);
// Otherwise copy the value
pinnable_val_->PinSelf(value);
}
@ -269,22 +275,38 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
// merge_context_->operand_list
if (is_blob_index_ != nullptr && *is_blob_index_) {
PinnableSlice pin_val;
if (GetBlobValue(value, &pin_val) == false) {
return false;
}
Slice blob_value(pin_val);
push_operand(blob_value, nullptr);
} else {
push_operand(value, value_pinner);
}
}
} else if (kMerge == state_) {
assert(merge_operator_ != nullptr);
if (is_blob_index_ != nullptr && *is_blob_index_) {
PinnableSlice pin_val;
if (GetBlobValue(value, &pin_val) == false) {
return false;
}
Slice blob_value(pin_val);
state_ = kFound;
if (do_merge_) {
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, clock_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
Merge(&blob_value);
} else {
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
// merge_context_->operand_list
push_operand(blob_value, nullptr);
}
} else {
state_ = kFound;
if (do_merge_) {
Merge(&value);
} else {
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
@ -292,6 +314,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
push_operand(value, value_pinner);
}
}
}
if (state_ == kFound) {
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
@ -299,9 +322,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
timestamp_->assign(ts.data(), ts.size());
}
}
if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex);
}
return false;
case kTypeDeletion:
@ -315,21 +335,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kDeleted;
} else if (kMerge == state_) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, clock_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
Merge(nullptr);
// If do_merge_ = false then the current value shouldn't be part of
// merge_context_->operand_list
}
}
return false;
case kTypeMerge:
@ -341,33 +350,49 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
merge_operator_->ShouldMerge(
merge_context_->GetOperandsDirectionBackward())) {
state_ = kFound;
Merge(nullptr);
return false;
}
return true;
default:
assert(false);
break;
}
}
// state_ could be Corrupt, merge or notfound
return false;
}
void GetContext::Merge(const Slice* value) {
if (LIKELY(pinnable_val_ != nullptr)) {
// do_merge_ = true this is the case where this function is called
// as part of DB Get API hence merge operators should be merged.
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, clock_);
merge_operator_, user_key_, value, merge_context_->GetOperands(),
pinnable_val_->GetSelf(), logger_, statistics_, clock_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
}
return false;
}
return true;
default:
assert(false);
break;
}
bool GetContext::GetBlobValue(const Slice& blob_index,
PinnableSlice* blob_value) {
Status status = blob_fetcher_->FetchBlob(user_key_, blob_index, blob_value);
if (!status.ok()) {
if (status.IsIncomplete()) {
MarkKeyMayExist();
return false;
}
// state_ could be Corrupt, merge or notfound
state_ = kCorrupt;
return false;
}
*is_blob_index_ = false;
return true;
}
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&

@ -5,6 +5,8 @@
#pragma once
#include <string>
#include "db/blob/blob_fetcher.h"
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/read_callback.h"
@ -103,7 +105,7 @@ class GetContext {
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
uint64_t tracing_get_id = 0);
uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr);
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value,
@ -113,7 +115,7 @@ class GetContext {
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
uint64_t tracing_get_id = 0);
uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr);
GetContext() = delete;
@ -170,6 +172,9 @@ class GetContext {
void push_operand(const Slice& value, Cleanable* value_pinner);
private:
void Merge(const Slice* value);
bool GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value);
const Comparator* ucmp_;
const MergeOperator* merge_operator_;
// the merge operations encountered;
@ -200,6 +205,7 @@ class GetContext {
// Used for block cache tracing only. A tracing get id uniquely identifies a
// Get or a MultiGet.
const uint64_t tracing_get_id_;
BlobFetcher* blob_fetcher_;
};
// Call this to replay a log and bring the get_context up to date. The replay

Loading…
Cancel
Save