Revisit the interface of MergeHelper::TimedFullMerge(WithEntity) (#10932)

Summary:
The patch refines/reworks `MergeHelper::TimedFullMerge(WithEntity)`
a bit in two ways. First, it eliminates the recently introduced `TimedFullMerge`
overload, which makes the responsibilities clearer by making sure the query
result (`value` for `Get`, `columns` for `GetEntity`) is set uniformly in
`SaveValue` and `GetContext`. Second, it changes the interface of
`TimedFullMergeWithEntity` so it exposes its result in a serialized form; this
is a more decoupled design which will come in handy when adding support
for `Merge` with wide-column entities to `DBIter`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10932

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D41129399

Pulled By: ltamasi

fbshipit-source-id: 69d8da358c77d4fc7e8c40f4dafc2c129a710677
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent c62f322169
commit 2ea109521f
  1. 64
      db/memtable.cc
  2. 59
      db/merge_helper.cc
  3. 13
      db/merge_helper.h
  4. 10
      db/version_set.cc
  5. 55
      table/get_context.cc

@ -1066,11 +1066,21 @@ static bool SaveValue(void* arg, const char* entry) {
assert(s->do_merge); assert(s->do_merge);
if (s->value || s->columns) { if (s->value || s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v, merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), &result, s->logger, s->statistics,
s->statistics, s->clock, /* result_operand */ nullptr, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true); /* update_num_ops_stats */ true);
if (s->status->ok()) {
if (s->value) {
*(s->value) = std::move(result);
} else {
assert(s->columns);
s->columns->SetPlainValue(result);
}
}
} }
} else if (s->value) { } else if (s->value) {
s->value->assign(v.data(), v.size()); s->value->assign(v.data(), v.size());
@ -1115,11 +1125,27 @@ static bool SaveValue(void* arg, const char* entry) {
} else if (*(s->merge_in_progress)) { } else if (*(s->merge_in_progress)) {
assert(s->do_merge); assert(s->do_merge);
if (s->value || s->columns) { if (s->value) {
Slice value_of_default;
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);
if (s->status->ok()) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &value_of_default,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
}
} else if (s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMergeWithEntity( *(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v, merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), &result, s->logger, s->statistics,
s->statistics, s->clock, /* update_num_ops_stats */ true); s->clock, /* update_num_ops_stats */ true);
if (s->status->ok()) {
*(s->status) = s->columns->SetWideColumnValue(result);
}
} }
} else if (s->value) { } else if (s->value) {
Slice value_of_default; Slice value_of_default;
@ -1150,11 +1176,21 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeRangeDeletion: { case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) { if (*(s->merge_in_progress)) {
if (s->value || s->columns) { if (s->value || s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr, merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), &result, s->logger, s->statistics,
s->statistics, s->clock, /* result_operand */ nullptr, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true); /* update_num_ops_stats */ true);
if (s->status->ok()) {
if (s->value) {
*(s->value) = std::move(result);
} else {
assert(s->columns);
s->columns->SetPlainValue(result);
}
}
} }
} else { } else {
*(s->status) = Status::NotFound(); *(s->status) = Status::NotFound();
@ -1180,11 +1216,21 @@ static bool SaveValue(void* arg, const char* entry) {
if (s->do_merge && merge_operator->ShouldMerge( if (s->do_merge && merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) { merge_context->GetOperandsDirectionBackward())) {
if (s->value || s->columns) { if (s->value || s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMerge( *(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr, merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger, merge_context->GetOperands(), &result, s->logger, s->statistics,
s->statistics, s->clock, /* result_operand */ nullptr, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true); /* update_num_ops_stats */ true);
if (s->status->ok()) {
if (s->value) {
*(s->value) = std::move(result);
} else {
assert(s->columns);
s->columns->SetPlainValue(result);
}
}
} }
*(s->found_final_value) = true; *(s->found_final_value) = true;

@ -112,44 +112,10 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
return Status::OK(); return Status::OK();
} }
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands,
std::string* value,
PinnableWideColumns* columns, Logger* logger,
Statistics* statistics, SystemClock* clock,
Slice* result_operand,
bool update_num_ops_stats) {
assert(value || columns);
assert(!value || !columns);
std::string result;
const Status s =
TimedFullMerge(merge_operator, key, base_value, operands, &result, logger,
statistics, clock, result_operand, update_num_ops_stats);
if (!s.ok()) {
return s;
}
if (value) {
*value = std::move(result);
return Status::OK();
}
assert(columns);
columns->SetPlainValue(result);
return Status::OK();
}
Status MergeHelper::TimedFullMergeWithEntity( Status MergeHelper::TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity, const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value, const std::vector<Slice>& operands, std::string* result, Logger* logger,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics, Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) {
SystemClock* clock, bool update_num_ops_stats) {
assert(value || columns);
assert(!value || !columns);
WideColumns base_columns; WideColumns base_columns;
{ {
@ -168,44 +134,35 @@ Status MergeHelper::TimedFullMergeWithEntity(
value_of_default = base_columns[0].value(); value_of_default = base_columns[0].value();
} }
std::string result; std::string merge_result;
{ {
constexpr Slice* result_operand = nullptr; constexpr Slice* result_operand = nullptr;
const Status s = TimedFullMerge( const Status s = TimedFullMerge(
merge_operator, key, &value_of_default, operands, &result, logger, merge_operator, key, &value_of_default, operands, &merge_result, logger,
statistics, clock, result_operand, update_num_ops_stats); statistics, clock, result_operand, update_num_ops_stats);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} }
if (value) {
*value = std::move(result);
return Status::OK();
}
assert(columns);
std::string output;
if (has_default_column) { if (has_default_column) {
base_columns[0].value() = result; base_columns[0].value() = merge_result;
const Status s = WideColumnSerialization::Serialize(base_columns, output); const Status s = WideColumnSerialization::Serialize(base_columns, *result);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} else { } else {
const Status s = const Status s =
WideColumnSerialization::Serialize(result, base_columns, output); WideColumnSerialization::Serialize(merge_result, base_columns, *result);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} }
return columns->SetWideColumnValue(output); return Status::OK();
} }
// PRE: iter points to the first merge type entry // PRE: iter points to the first merge type entry

@ -57,19 +57,10 @@ class MergeHelper {
Slice* result_operand, Slice* result_operand,
bool update_num_ops_stats); bool update_num_ops_stats);
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands,
std::string* value, PinnableWideColumns* columns,
Logger* logger, Statistics* statistics,
SystemClock* clock, Slice* result_operand,
bool update_num_ops_stats);
static Status TimedFullMergeWithEntity( static Status TimedFullMergeWithEntity(
const MergeOperator* merge_operator, const Slice& key, Slice base_entity, const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
const std::vector<Slice>& operands, std::string* value, const std::vector<Slice>& operands, std::string* result, Logger* logger,
PinnableWideColumns* columns, Logger* logger, Statistics* statistics, Statistics* statistics, SystemClock* clock, bool update_num_ops_stats);
SystemClock* clock, bool update_num_ops_stats);
// During compaction, merge entries until we hit // During compaction, merge entries until we hit
// - a corrupted key // - a corrupted key

@ -2384,15 +2384,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} }
// merge_operands are in saver and we hit the beginning of the key history // merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands; // do a final merge of nullptr and operands;
std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; if (value || columns) {
if (str_value || columns) { std::string result;
*status = MergeHelper::TimedFullMerge( *status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(), merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, columns, info_log_, db_statistics_, clock_, &result, info_log_, db_statistics_, clock_,
/* result_operand */ nullptr, /* update_num_ops_stats */ true); /* result_operand */ nullptr, /* update_num_ops_stats */ true);
if (status->ok()) { if (status->ok()) {
if (LIKELY(value != nullptr)) { if (LIKELY(value != nullptr)) {
*(value->GetSelf()) = std::move(result);
value->PinSelf(); value->PinSelf();
} else {
assert(columns != nullptr);
columns->SetPlainValue(result);
} }
} }
} }

@ -468,10 +468,10 @@ void GetContext::Merge(const Slice* value) {
assert(do_merge_); assert(do_merge_);
assert(!pinnable_val_ || !columns_); assert(!pinnable_val_ || !columns_);
std::string result;
const Status s = MergeHelper::TimedFullMerge( const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(), merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, logger_, statistics_, clock_, /* result_operand */ nullptr,
statistics_, clock_, /* result_operand */ nullptr,
/* update_num_ops_stats */ true); /* update_num_ops_stats */ true);
if (!s.ok()) { if (!s.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
@ -479,25 +479,66 @@ void GetContext::Merge(const Slice* value) {
} }
if (LIKELY(pinnable_val_ != nullptr)) { if (LIKELY(pinnable_val_ != nullptr)) {
*(pinnable_val_->GetSelf()) = std::move(result);
pinnable_val_->PinSelf(); pinnable_val_->PinSelf();
return;
} }
assert(columns_);
columns_->SetPlainValue(result);
} }
void GetContext::MergeWithEntity(Slice entity) { void GetContext::MergeWithEntity(Slice entity) {
assert(do_merge_); assert(do_merge_);
assert(!pinnable_val_ || !columns_); assert(!pinnable_val_ || !columns_);
if (LIKELY(pinnable_val_ != nullptr)) {
Slice value_of_default;
{
const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
entity, value_of_default);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
}
{
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value_of_default,
merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
statistics_, clock_, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
}
pinnable_val_->PinSelf();
return;
}
std::string result;
{
const Status s = MergeHelper::TimedFullMergeWithEntity( const Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key_, entity, merge_context_->GetOperands(), merge_operator_, user_key_, entity, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
statistics_, clock_, /* update_num_ops_stats */ true);
if (!s.ok()) { if (!s.ok()) {
state_ = kCorrupt; state_ = kCorrupt;
return; return;
} }
}
if (LIKELY(pinnable_val_ != nullptr)) { {
pinnable_val_->PinSelf(); assert(columns_);
const Status s = columns_->SetWideColumnValue(result);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
} }
} }

Loading…
Cancel
Save