diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 3a3cbe366..cd4d456e8 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -16,6 +16,40 @@ namespace rocksdb { +// TODO(agiardullo): Clean up merge callsites to use this func +Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, + const std::deque& operands, + const MergeOperator* merge_operator, + Statistics* statistics, Env* env, + Logger* logger, std::string* result) { + if (operands.size() == 0) { + result->assign(value->data(), value->size()); + return Status::OK(); + } + + if (merge_operator == nullptr) { + return Status::NotSupported("Provide a merge_operator when opening DB"); + } + + // Setup to time the merge + StopWatchNano timer(env, statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + + // Do the merge + bool success = + merge_operator->FullMerge(key, value, operands, result, logger); + + RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, + env != nullptr ? timer.ElapsedNanos() : 0); + + if (!success) { + RecordTick(statistics, NUMBER_MERGE_FAILURES); + return Status::Corruption("Error: Could not perform merge."); + } + + return Status::OK(); +} + // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) // keys_, operands_ are updated to reflect the merge result. @@ -81,25 +115,20 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Return a success if the merge passes. - { - StopWatchNano timer(env_, stats != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - success_ = user_merge_operator_->FullMerge( - ikey.user_key, nullptr, operands_, &merge_result, logger_); - RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); - } + + Status s = TimedFullMerge(ikey.user_key, nullptr, operands_, + user_merge_operator_, stats, env_, logger_, + &merge_result); + // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) - if (success_) { + if (s.ok()) { std::string& original_key = keys_.back(); // The original key encountered orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); swap(operands_.back(), merge_result); - } else { - RecordTick(stats, NUMBER_MERGE_FAILURES); } // move iter to the next entry (before doing anything else) @@ -117,25 +146,19 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice val = iter->value(); - { - StopWatchNano timer(env_, stats != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - success_ = user_merge_operator_->FullMerge( - ikey.user_key, &val, operands_, &merge_result, logger_); - RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); - } + Status s = + TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_, + stats, env_, logger_, &merge_result); + // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) - if (success_) { + if (s.ok()) { std::string& original_key = keys_.back(); // The original key encountered orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); swap(operands_.back(), merge_result); - } else { - RecordTick(stats, NUMBER_MERGE_FAILURES); } // move iter to the next entry diff --git a/db/merge_helper.h b/db/merge_helper.h index 69f576ca1..7722446dd 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -35,6 +35,15 @@ class MergeHelper { operands_(), success_(false) {} + // Wrapper around MergeOperator::FullMerge() that records perf statistics. + // Result of merge will be written to result if status returned is OK. + // If operands is empty, the value will simply be copied to result. + static Status TimedFullMerge(const Slice& key, const Slice* value, + const std::deque& operands, + const MergeOperator* merge_operator, + Statistics* statistics, Env* env, Logger* logger, + std::string* result); + // Merge entries until we hit // - a corrupted key // - a Put/Delete,