diff --git a/db/db_iter.cc b/db/db_iter.cc index 27762232e..1f9353b54 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -16,6 +16,7 @@ #include "db/dbformat.h" #include "db/filename.h" #include "db/merge_context.h" +#include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "port/port.h" #include "rocksdb/env.h" @@ -109,7 +110,7 @@ class DBIter: public Iterator { env_(env), logger_(ioptions.info_log), user_comparator_(cmp), - user_merge_operator_(ioptions.merge_operator), + merge_operator_(ioptions.merge_operator), iter_(iter), sequence_(s), direction_(kForward), @@ -239,7 +240,7 @@ class DBIter: public Iterator { Env* const env_; Logger* logger_; const Comparator* const user_comparator_; - const MergeOperator* const user_merge_operator_; + const MergeOperator* const merge_operator_; InternalIterator* iter_; SequenceNumber const sequence_; @@ -424,10 +425,10 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) void DBIter::MergeValuesNewToOld() { - if (!user_merge_operator_) { + if (!merge_operator_) { Log(InfoLogLevel::ERROR_LEVEL, logger_, "Options::merge_operator is null."); - status_ = Status::InvalidArgument("user_merge_operator_ must be set."); + status_ = Status::InvalidArgument("merge_operator_ must be set."); valid_ = false; return; } @@ -456,15 +457,9 @@ void DBIter::MergeValuesNewToOld() { // final result in saved_value_. We are done! // ignore corruption if there is any. const Slice val = iter_->value(); - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(ikey.user_key, &val, - merge_context_.GetOperands(), - &saved_value_, logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } + MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_); // iter_ is positioned after put iter_->Next(); return; @@ -478,18 +473,13 @@ void DBIter::MergeValuesNewToOld() { } } - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - // we either exhausted all internal keys under this user key, or hit - // a deletion marker. - // feed null as the existing value to the merge operator, such that - // client can differentiate this scenario and do things accordingly. - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, - merge_context_.GetOperands(), &saved_value_, - logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); - } + // we either exhausted all internal keys under this user key, or hit + // a deletion marker. + // feed null as the existing value to the merge operator, such that + // client can differentiate this scenario and do things accordingly. + MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_); } void DBIter::Prev() { @@ -614,7 +604,7 @@ bool DBIter::FindValueForCurrentKey() { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: - assert(user_merge_operator_ != nullptr); + assert(merge_operator_ != nullptr); merge_context_.PushOperandBack(iter_->value()); break; default: @@ -636,24 +626,15 @@ bool DBIter::FindValueForCurrentKey() { case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion) { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, - merge_context_.GetOperands(), - &saved_value_, logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); + MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), + nullptr, merge_context_.GetOperands(), + &saved_value_, logger_, statistics_, env_); } else { assert(last_not_merge_type == kTypeValue); - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &pinned_value_, - merge_context_.GetOperands(), - &saved_value_, logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } + MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), + &pinned_value_, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_); } break; case kTypeValue: @@ -708,14 +689,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) || ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, - merge_context_.GetOperands(), - &saved_value_, logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); - } + MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_); // Make iter_ valid and point to saved_key_ if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { @@ -727,14 +703,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } const Slice& val = iter_->value(); - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, - merge_context_.GetOperands(), &saved_value_, - logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); - } + MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val, + merge_context_.GetOperands(), &saved_value_, + logger_, statistics_, env_); valid_ = true; return true; } diff --git a/db/memtable.cc b/db/memtable.cc index ef605de11..dff6c847a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" +#include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "db/writebuffer.h" #include "rocksdb/comparator.h" @@ -489,22 +490,10 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { - assert(merge_operator); - bool merge_success = false; - { - StopWatchNano timer(s->env_, s->statistics != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator->FullMerge( - s->key->user_key(), &v, merge_context->GetOperands(), s->value, - s->logger); - RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } - if (!merge_success) { - RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - *(s->status) = - Status::Corruption("Error: Could not perform merge."); - } + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->value, s->logger, s->statistics, + s->env_); } else if (s->value != nullptr) { s->value->assign(v.data(), v.size()); } @@ -517,23 +506,11 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeDeletion: case kTypeSingleDeletion: { if (*(s->merge_in_progress)) { - assert(merge_operator != nullptr); *(s->status) = Status::OK(); - bool merge_success = false; - { - StopWatchNano timer(s->env_, s->statistics != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator->FullMerge( - s->key->user_key(), nullptr, merge_context->GetOperands(), - s->value, s->logger); - RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } - if (!merge_success) { - RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - *(s->status) = - Status::Corruption("Error: Could not perform merge."); - } + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, s->logger, s->statistics, + s->env_); } else { *(s->status) = Status::NotFound(); } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 145a72b0d..f87b5ee37 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -18,30 +18,29 @@ namespace rocksdb { -// TODO(agiardullo): Clean up merge callsites to use this func -Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, +Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, const std::deque& operands, - const MergeOperator* merge_operator, - Statistics* statistics, Env* env, - Logger* logger, std::string* result) { + std::string* result, Logger* logger, + Statistics* statistics, Env* env) { + assert(merge_operator != nullptr); + if (operands.size() == 0) { result->assign(value->data(), value->size()); return Status::OK(); } - if (merge_operator == nullptr) { - return Status::NotSupported("Provide a merge_operator when opening DB"); - } - - // Setup to time the merge - StopWatchNano timer(env, statistics != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); + bool success; + { + // Setup to time the merge + StopWatchNano timer(env, statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); - // Do the merge - bool success = - merge_operator->FullMerge(key, value, operands, result, logger); + // Do the merge + success = merge_operator->FullMerge(key, value, operands, result, logger); - RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanosSafe()); + RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); + } if (!success) { RecordTick(statistics, NUMBER_MERGE_FAILURES); @@ -140,9 +139,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const Slice val = iter->value(); const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; - s = TimedFullMerge(ikey.user_key, val_ptr, operands_, - user_merge_operator_, stats_, env_, logger_, - &merge_result); + s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr, + operands_, &merge_result, logger_, stats_, env_); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) @@ -221,9 +219,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); std::string merge_result; - s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_, - user_merge_operator_, stats_, env_, logger_, - &merge_result); + s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr, + operands_, &merge_result, logger_, stats_, env_); if (s.ok()) { // The original key encountered // We are certain that keys_ is not empty here (see assertions couple of diff --git a/db/merge_helper.h b/db/merge_helper.h index 7128b1a09..262a5969d 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -55,12 +55,11 @@ class MergeHelper { // Returns one of the following statuses: // - OK: Entries were successfully merged. // - Corruption: Merge operator reported unsuccessful merge. - // - NotSupported: Merge operator is missing. - static Status TimedFullMerge(const Slice& key, const Slice* value, + static Status TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, const std::deque& operands, - const MergeOperator* merge_operator, - Statistics* statistics, Env* env, Logger* logger, - std::string* result); + std::string* result, Logger* logger, + Statistics* statistics, Env* env); // Merge entries until we hit // - a corrupted key diff --git a/db/version_set.cc b/db/version_set.cc index 28a053ea6..6863f0df8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -30,6 +30,7 @@ #include "db/log_writer.h" #include "db/memtable.h" #include "db/merge_context.h" +#include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_builder.h" #include "db/writebuffer.h" @@ -973,22 +974,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - bool merge_success = false; - { - StopWatchNano timer(env_, db_statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator_->FullMerge( - user_key, nullptr, merge_context->GetOperands(), value, info_log_); - RecordTick(db_statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } - if (merge_success) { - *status = Status::OK(); - } else { - RecordTick(db_statistics_, NUMBER_MERGE_FAILURES); - *status = Status::Corruption("could not perform end-of-key merge for ", - user_key); - } + *status = MergeHelper::TimedFullMerge(merge_operator_, user_key, nullptr, + merge_context->GetOperands(), value, + info_log_, db_statistics_, env_); } else { if (key_exists != nullptr) { *key_exists = false; diff --git a/db/write_batch.cc b/db/write_batch.cc index 5ae031497..a4e68d3da 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -40,6 +40,7 @@ #include "db/dbformat.h" #include "db/flush_scheduler.h" #include "db/memtable.h" +#include "db/merge_context.h" #include "db/snapshot_impl.h" #include "db/write_batch_internal.h" #include "rocksdb/merge_operator.h" @@ -936,20 +937,13 @@ class MemTableInserter : public WriteBatch::Handler { std::deque operands; operands.push_front(value.ToString()); std::string new_value; - bool merge_success = false; - { - StopWatchNano timer(Env::Default(), moptions->statistics != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator->FullMerge( - key, &get_value_slice, operands, &new_value, moptions->info_log); - RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanos()); - } - if (!merge_success) { - // Failed to merge! - RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES); + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator, key, &get_value_slice, operands, &new_value, + moptions->info_log, moptions->statistics, Env::Default()); + if (!merge_status.ok()) { + // Failed to merge! // Store the delta in memtable perform_merge = false; } else { diff --git a/table/get_context.cc b/table/get_context.cc index 39b841e25..491007099 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "table/get_context.h" +#include "db/merge_helper.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" @@ -102,18 +103,11 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(merge_operator_ != nullptr); state_ = kFound; if (value_ != nullptr) { - bool merge_success = false; - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator_->FullMerge( - user_key_, &value, merge_context_->GetOperands(), value_, - logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanosSafe()); - } - if (!merge_success) { - RecordTick(statistics_, NUMBER_MERGE_FAILURES); + Status merge_status = + MergeHelper::TimedFullMerge(merge_operator_, user_key_, &value, + merge_context_->GetOperands(), + value_, logger_, statistics_, env_); + if (!merge_status.ok()) { state_ = kCorrupt; } } @@ -130,18 +124,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { state_ = kFound; if (value_ != nullptr) { - bool merge_success = false; - { - StopWatchNano timer(env_, statistics_ != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - merge_success = merge_operator_->FullMerge( - user_key_, nullptr, merge_context_->GetOperands(), value_, - logger_); - RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - timer.ElapsedNanosSafe()); - } - if (!merge_success) { - RecordTick(statistics_, NUMBER_MERGE_FAILURES); + Status merge_status = + MergeHelper::TimedFullMerge(merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), + value_, logger_, statistics_, env_); + + if (!merge_status.ok()) { state_ = kCorrupt; } } diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index e16175449..045cf552f 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -739,9 +739,13 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, merge_data = nullptr; } - s = MergeHelper::TimedFullMerge( - key, merge_data, merge_context.GetOperands(), merge_operator, - statistics, env, logger, value); + if (merge_operator) { + s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data, + merge_context.GetOperands(), value, + logger, statistics, env); + } else { + s = Status::InvalidArgument("Options::merge_operator must be set"); + } } } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index e4ea104e3..e2d379e12 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -237,9 +237,13 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( Env* env = options.env; Logger* logger = options.info_log.get(); - *s = MergeHelper::TimedFullMerge( - key, entry_value, merge_context->GetOperands(), merge_operator, - statistics, env, logger, value); + if (merge_operator) { + *s = MergeHelper::TimedFullMerge(merge_operator, key, entry_value, + merge_context->GetOperands(), value, + logger, statistics, env); + } else { + *s = Status::InvalidArgument("Options::merge_operator must be set"); + } if ((*s).ok()) { result = WriteBatchWithIndexInternal::Result::kFound; } else {