Reuse TimedFullMerge instead of FullMerge + instrumentation

Summary:
We have alot of code duplication whenever we call FullMerge we keep duplicating the instrumentation and statistics code
This is a simple diff to refactor the code to use TimedFullMerge instead of FullMerge

Test Plan: COMPILE_WITH_ASAN=1 make check -j64

Reviewers: andrewkr, yhchiang, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59577
main
Islam AbdelRahman 9 years ago
parent 9a33a723ba
commit 7c919deccc
  1. 87
      db/db_iter.cc
  2. 41
      db/memtable.cc
  3. 41
      db/merge_helper.cc
  4. 9
      db/merge_helper.h
  5. 20
      db/version_set.cc
  6. 18
      db/write_batch.cc
  7. 36
      table/get_context.cc
  8. 10
      utilities/write_batch_with_index/write_batch_with_index.cc
  9. 10
      utilities/write_batch_with_index/write_batch_with_index_internal.cc

@ -16,6 +16,7 @@
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "port/port.h"
#include "rocksdb/env.h"
@ -109,7 +110,7 @@ class DBIter: public Iterator {
env_(env),
logger_(ioptions.info_log),
user_comparator_(cmp),
user_merge_operator_(ioptions.merge_operator),
merge_operator_(ioptions.merge_operator),
iter_(iter),
sequence_(s),
direction_(kForward),
@ -239,7 +240,7 @@ class DBIter: public Iterator {
Env* const env_;
Logger* logger_;
const Comparator* const user_comparator_;
const MergeOperator* const user_merge_operator_;
const MergeOperator* const merge_operator_;
InternalIterator* iter_;
SequenceNumber const sequence_;
@ -424,10 +425,10 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
if (!user_merge_operator_) {
if (!merge_operator_) {
Log(InfoLogLevel::ERROR_LEVEL,
logger_, "Options::merge_operator is null.");
status_ = Status::InvalidArgument("user_merge_operator_ must be set.");
status_ = Status::InvalidArgument("merge_operator_ must be set.");
valid_ = false;
return;
}
@ -456,15 +457,9 @@ void DBIter::MergeValuesNewToOld() {
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice val = iter_->value();
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(ikey.user_key, &val,
merge_context_.GetOperands(),
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
// iter_ is positioned after put
iter_->Next();
return;
@ -478,18 +473,13 @@ void DBIter::MergeValuesNewToOld() {
}
}
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
}
void DBIter::Prev() {
@ -614,7 +604,7 @@ bool DBIter::FindValueForCurrentKey() {
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
assert(user_merge_operator_ != nullptr);
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(iter_->value());
break;
default:
@ -636,24 +626,15 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion) {
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(),
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_);
} else {
assert(last_not_merge_type == kTypeValue);
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &pinned_value_,
merge_context_.GetOperands(),
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
&pinned_value_,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
}
break;
case kTypeValue:
@ -708,14 +689,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(),
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
@ -727,14 +703,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}
const Slice& val = iter_->value();
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
valid_ = true;
return true;
}

@ -15,6 +15,7 @@
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/writebuffer.h"
#include "rocksdb/comparator.h"
@ -489,22 +490,10 @@ static bool SaveValue(void* arg, const char* entry) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
assert(merge_operator);
bool merge_success = false;
{
StopWatchNano timer(s->env_, s->statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator->FullMerge(
s->key->user_key(), &v, merge_context->GetOperands(), s->value,
s->logger);
RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
if (!merge_success) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->env_);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
@ -517,23 +506,11 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeDeletion:
case kTypeSingleDeletion: {
if (*(s->merge_in_progress)) {
assert(merge_operator != nullptr);
*(s->status) = Status::OK();
bool merge_success = false;
{
StopWatchNano timer(s->env_, s->statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator->FullMerge(
s->key->user_key(), nullptr, merge_context->GetOperands(),
s->value, s->logger);
RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
if (!merge_success) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->env_);
} else {
*(s->status) = Status::NotFound();
}

@ -18,30 +18,29 @@
namespace rocksdb {
// TODO(agiardullo): Clean up merge callsites to use this func
Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value,
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::deque<std::string>& operands,
const MergeOperator* merge_operator,
Statistics* statistics, Env* env,
Logger* logger, std::string* result) {
std::string* result, Logger* logger,
Statistics* statistics, Env* env) {
assert(merge_operator != nullptr);
if (operands.size() == 0) {
result->assign(value->data(), value->size());
return Status::OK();
}
if (merge_operator == nullptr) {
return Status::NotSupported("Provide a merge_operator when opening DB");
}
// Setup to time the merge
StopWatchNano timer(env, statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
bool success;
{
// Setup to time the merge
StopWatchNano timer(env, statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
// Do the merge
bool success =
merge_operator->FullMerge(key, value, operands, result, logger);
// Do the merge
success = merge_operator->FullMerge(key, value, operands, result, logger);
RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanosSafe());
RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
if (!success) {
RecordTick(statistics, NUMBER_MERGE_FAILURES);
@ -140,9 +139,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
const Slice val = iter->value();
const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
std::string merge_result;
s = TimedFullMerge(ikey.user_key, val_ptr, operands_,
user_merge_operator_, stats_, env_, logger_,
&merge_result);
s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
operands_, &merge_result, logger_, stats_, env_);
// We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk)
@ -221,9 +219,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
assert(operands_.size() >= 1);
assert(operands_.size() == keys_.size());
std::string merge_result;
s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_,
user_merge_operator_, stats_, env_, logger_,
&merge_result);
s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
operands_, &merge_result, logger_, stats_, env_);
if (s.ok()) {
// The original key encountered
// We are certain that keys_ is not empty here (see assertions couple of

@ -55,12 +55,11 @@ class MergeHelper {
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - Corruption: Merge operator reported unsuccessful merge.
// - NotSupported: Merge operator is missing.
static Status TimedFullMerge(const Slice& key, const Slice* value,
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::deque<std::string>& operands,
const MergeOperator* merge_operator,
Statistics* statistics, Env* env, Logger* logger,
std::string* result);
std::string* result, Logger* logger,
Statistics* statistics, Env* env);
// Merge entries until we hit
// - a corrupted key

@ -30,6 +30,7 @@
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/writebuffer.h"
@ -973,22 +974,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
bool merge_success = false;
{
StopWatchNano timer(env_, db_statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator_->FullMerge(
user_key, nullptr, merge_context->GetOperands(), value, info_log_);
RecordTick(db_statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
if (merge_success) {
*status = Status::OK();
} else {
RecordTick(db_statistics_, NUMBER_MERGE_FAILURES);
*status = Status::Corruption("could not perform end-of-key merge for ",
user_key);
}
*status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr,
merge_context->GetOperands(), value,
info_log_, db_statistics_, env_);
} else {
if (key_exists != nullptr) {
*key_exists = false;

@ -40,6 +40,7 @@
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/snapshot_impl.h"
#include "db/write_batch_internal.h"
#include "rocksdb/merge_operator.h"
@ -936,20 +937,13 @@ class MemTableInserter : public WriteBatch::Handler {
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
bool merge_success = false;
{
StopWatchNano timer(Env::Default(), moptions->statistics != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator->FullMerge(
key, &get_value_slice, operands, &new_value, moptions->info_log);
RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
if (!merge_success) {
// Failed to merge!
RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, operands, &new_value,
moptions->info_log, moptions->statistics, Env::Default());
if (!merge_status.ok()) {
// Failed to merge!
// Store the delta in memtable
perform_merge = false;
} else {

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "table/get_context.h"
#include "db/merge_helper.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
@ -102,18 +103,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(merge_operator_ != nullptr);
state_ = kFound;
if (value_ != nullptr) {
bool merge_success = false;
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator_->FullMerge(
user_key_, &value, merge_context_->GetOperands(), value_,
logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanosSafe());
}
if (!merge_success) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
Status merge_status =
MergeHelper::TimedFullMerge(merge_operator_, user_key_, &value,
merge_context_->GetOperands(),
value_, logger_, statistics_, env_);
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
@ -130,18 +124,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else if (kMerge == state_) {
state_ = kFound;
if (value_ != nullptr) {
bool merge_success = false;
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
merge_success = merge_operator_->FullMerge(
user_key_, nullptr, merge_context_->GetOperands(), value_,
logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanosSafe());
}
if (!merge_success) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
Status merge_status =
MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(),
value_, logger_, statistics_, env_);
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}

@ -739,9 +739,13 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
merge_data = nullptr;
}
s = MergeHelper::TimedFullMerge(
key, merge_data, merge_context.GetOperands(), merge_operator,
statistics, env, logger, value);
if (merge_operator) {
s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
merge_context.GetOperands(), value,
logger, statistics, env);
} else {
s = Status::InvalidArgument("Options::merge_operator must be set");
}
}
}

@ -237,9 +237,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
Env* env = options.env;
Logger* logger = options.info_log.get();
*s = MergeHelper::TimedFullMerge(
key, entry_value, merge_context->GetOperands(), merge_operator,
statistics, env, logger, value);
if (merge_operator) {
*s = MergeHelper::TimedFullMerge(merge_operator, key, entry_value,
merge_context->GetOperands(), value,
logger, statistics, env);
} else {
*s = Status::InvalidArgument("Options::merge_operator must be set");
}
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;
} else {

Loading…
Cancel
Save