Support Merge for wide-column entities during point lookups (#10916)

Summary:
The patch adds `Merge` support for wide-column entities to the point lookup
APIs, i.e. `Get`, `MultiGet`, `GetEntity`, and `GetMergeOperands`. (I plan to
update the iterator and compaction logic in separate PRs.) In terms of semantics,
the `Merge` operation is applied to the default (anonymous) column; any other
columns in the entity are unaffected.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10916

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D40962311

Pulled By: ltamasi

fbshipit-source-id: 244bc9d172be1af2f204796b2f89104e4d2fa373
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent cc8c8f6958
commit 941d834739
  1. 38
      db/memtable.cc
  2. 65
      db/merge_helper.cc
  3. 8
      db/merge_helper.h
  4. 4
      db/version_set.cc
  5. 5
      db/version_set_sync_and_async.h
  6. 205
      db/wide/db_wide_basic_test.cc
  7. 55
      table/get_context.cc
  8. 3
      table/get_context.h

@ -1090,20 +1090,6 @@ static bool SaveValue(void* arg, const char* entry) {
return false;
}
case kTypeWideColumnEntity: {
if (!s->do_merge) {
*(s->status) = Status::NotSupported(
"GetMergeOperands not supported for wide-column entities");
*(s->found_final_value) = true;
return false;
}
if (*(s->merge_in_progress)) {
*(s->status) = Status::NotSupported(
"Merge not supported for wide-column entities");
*(s->found_final_value) = true;
return false;
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
@ -1112,7 +1098,29 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = Status::OK();
if (s->value) {
if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
Slice value_of_default;
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);
if (s->status->ok()) {
merge_context->PushOperand(
value_of_default,
s->inplace_update_support == false /* operand_pinned */);
}
} else if (*(s->merge_in_progress)) {
assert(s->do_merge);
if (s->value || s->columns) {
*(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true);
}
} else if (s->value) {
Slice value_of_default;
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);

@ -12,6 +12,7 @@
#include "db/blob/prefetch_buffer_collection.h"
#include "db/compaction/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "db/wide/wide_column_serialization.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "port/likely.h"
@ -140,6 +141,70 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
return Status::OK();
}
Status MergeHelper::TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
SystemClock* clock, Slice* result_operand, bool update_num_ops_stats) {
assert(value || columns);
assert(!value || !columns);
WideColumns base_columns;
{
const Status s =
WideColumnSerialization::Deserialize(base_entity, base_columns);
if (!s.ok()) {
return s;
}
}
const bool has_default_column =
!base_columns.empty() && base_columns[0].name() == kDefaultWideColumnName;
Slice value_of_default;
if (has_default_column) {
value_of_default = base_columns[0].value();
}
std::string result;
{
const Status s = TimedFullMerge(
merge_operator, key, &value_of_default, operands, &result, logger,
statistics, clock, result_operand, update_num_ops_stats);
if (!s.ok()) {
return s;
}
}
if (value) {
*value = std::move(result);
return Status::OK();
}
assert(columns);
std::string output;
if (has_default_column) {
base_columns[0].value() = result;
const Status s = WideColumnSerialization::Serialize(base_columns, output);
if (!s.ok()) {
return s;
}
} else {
const Status s =
WideColumnSerialization::Serialize(result, base_columns, output);
if (!s.ok()) {
return s;
}
}
return columns->SetWideColumnValue(output);
}
// PRE: iter points to the first merge type entry
// POST: iter points to the first entry beyond the merge process (or the end)
// keys_, operands_ are updated to reflect the merge result.

@ -16,6 +16,7 @@
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "rocksdb/wide_columns.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
@ -65,6 +66,13 @@ class MergeHelper {
Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
static Status TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics,
SystemClock* clock, Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
// During compaction, merge entries until we hit
// - a corrupted key
// - a Put/Delete,

@ -2366,10 +2366,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
return;
case GetContext::kUnexpectedWideColumnEntity:
*status =
Status::NotSupported("Encountered unexpected wide-column entity");
return;
}
f = fp.GetNextFile();
}

@ -141,11 +141,6 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
file_range.MarkKeyDone(iter);
continue;
case GetContext::kUnexpectedWideColumnEntity:
*status =
Status::NotSupported("Encountered unexpected wide-column entity");
file_range.MarkKeyDone(iter);
continue;
}
}

@ -334,14 +334,19 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) {
}
};
// Base KVs (if any) and Merge operands both in memtable
write_base();
write_merge();
verify();
// Base KVs (if any) and Merge operands both in storage
ASSERT_OK(Flush());
verify();
{
// Base KVs (if any) and Merge operands both in memtable (note: we take a
// snapshot in between to make sure they do not get reconciled during the
// subsequent flush)
write_base();
ManagedSnapshot snapshot(db_);
write_merge();
verify();
// Base KVs (if any) and Merge operands both in storage
ASSERT_OK(Flush());
verify();
}
// Base KVs (if any) in storage, Merge operands in memtable
DestroyAndReopen(options);
@ -351,28 +356,126 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) {
verify();
}
TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) {
TEST_F(DBWideBasicTest, MergeEntity) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.create_if_missing = true;
const std::string delim("|");
options.merge_operator = MergeOperators::CreateStringAppendOperator(delim);
Reopen(options);
// Test Merge with two entities: one that has the default column and one that
// doesn't
constexpr char first_key[] = "first";
WideColumns first_columns{{kDefaultWideColumnName, "a"},
{"attr_name1", "foo"},
{"attr_name2", "bar"}};
constexpr char first_merge_operand[] = "bla1";
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
constexpr char second_merge_operand[] = "bla2";
auto write_base = [&]() {
// Use the DB::PutEntity API
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
// Use WriteBatch
WriteBatch batch;
ASSERT_OK(batch.PutEntity(db_->DefaultColumnFamily(), second_key,
second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
};
auto write_merge = [&]() {
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
first_merge_operand));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
second_merge_operand));
};
// Note: Merge is currently not supported for wide-column entities
auto verify = [&]() {
const std::string first_expected_default(
first_columns[0].value().ToString() + delim + first_merge_operand);
{
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
&result)
.IsNotSupported());
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
&result));
ASSERT_EQ(result, first_expected_default);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, first_expected_default},
first_columns[1],
first_columns[2]};
ASSERT_EQ(result.columns(), expected_columns);
}
{
constexpr size_t num_merge_operands = 2;
std::array<PinnableSlice, num_merge_operands> merge_operands;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_EQ(merge_operands[0], first_columns[0].value());
ASSERT_EQ(merge_operands[1], first_merge_operand);
}
const std::string second_expected_default(delim + second_merge_operand);
{
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result)
.IsNotSupported());
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
&result));
ASSERT_EQ(result, second_expected_default);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, second_expected_default},
second_columns[0],
second_columns[1]};
ASSERT_EQ(result.columns(), expected_columns);
}
{
constexpr size_t num_merge_operands = 2;
std::array<PinnableSlice, num_merge_operands> merge_operands;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_TRUE(merge_operands[0].empty());
ASSERT_EQ(merge_operands[1], second_merge_operand);
}
{
@ -385,13 +488,15 @@ TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) {
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &values[0], &statuses[0]);
ASSERT_TRUE(values[0].empty());
ASSERT_TRUE(statuses[0].IsNotSupported());
ASSERT_EQ(values[0], first_expected_default);
ASSERT_OK(statuses[0]);
ASSERT_TRUE(values[1].empty());
ASSERT_TRUE(statuses[1].IsNotSupported());
ASSERT_EQ(values[1], second_expected_default);
ASSERT_OK(statuses[1]);
}
// Note: Merge is currently not supported for wide-column entities in
// iterator
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
@ -405,47 +510,25 @@ TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) {
}
};
// Use the DB::PutEntity API
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
// Use WriteBatch
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(
batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
// Add a couple of merge operands
constexpr char merge_operand[] = "bla";
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
merge_operand));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
// Try reading when PutEntity is in storage, Merge is in memtable
verify();
// Try reading when PutEntity and Merge are both in storage
{
// Base KVs and Merge operands both in memtable (note: we take a snapshot in
// between to make sure they do not get reconciled during the subsequent
// flush)
write_base();
ManagedSnapshot snapshot(db_);
write_merge();
verify();
// Base KVs and Merge operands both in storage
ASSERT_OK(Flush());
verify();
}
// Base KVs in storage, Merge operands in memtable
DestroyAndReopen(options);
write_base();
ASSERT_OK(Flush());
verify();
// Try reading when PutEntity and Merge are both in memtable
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
merge_operand));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
write_merge();
verify();
}

@ -351,9 +351,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
Slice blob_value(pin_val);
push_operand(blob_value, nullptr);
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false;
Slice value_copy = value;
Slice value_of_default;
if (!WideColumnSerialization::GetValueOfDefaultColumn(
value_copy, value_of_default)
.ok()) {
state_ = kCorrupt;
return false;
}
push_operand(value_of_default, value_pinner);
} else {
assert(type == kTypeValue);
push_operand(value, value_pinner);
@ -377,9 +385,26 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
push_operand(blob_value, nullptr);
}
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false;
state_ = kFound;
if (do_merge_) {
MergeWithEntity(value);
} else {
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
// merge_context_->operand_list
Slice value_copy = value;
Slice value_of_default;
if (!WideColumnSerialization::GetValueOfDefaultColumn(
value_copy, value_of_default)
.ok()) {
state_ = kCorrupt;
return false;
}
push_operand(value_of_default, value_pinner);
}
} else {
assert(type == kTypeValue);
@ -457,6 +482,24 @@ void GetContext::Merge(const Slice* value) {
}
}
void GetContext::MergeWithEntity(Slice entity) {
assert(do_merge_);
assert(!pinnable_val_ || !columns_);
const Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key_, entity, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
statistics_, clock_);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
if (LIKELY(pinnable_val_ != nullptr)) {
pinnable_val_->PinSelf();
}
}
bool GetContext::GetBlobValue(const Slice& blob_index,
PinnableSlice* blob_value) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;

@ -75,8 +75,6 @@ class GetContext {
kCorrupt,
kMerge, // saver contains the current merge result (the operands)
kUnexpectedBlobIndex,
// TODO: remove once wide-column entities are supported by Get/MultiGet
kUnexpectedWideColumnEntity,
};
GetContextStats get_context_stats_;
@ -185,6 +183,7 @@ class GetContext {
private:
void Merge(const Slice* value);
void MergeWithEntity(Slice entity);
bool GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value);
const Comparator* ucmp_;

Loading…
Cancel
Save