diff --git a/db/db_iter.cc b/db/db_iter.cc index 256b65447..4afe610f7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/filename.h" +#include "db/merge_context.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -251,7 +252,7 @@ class DBIter: public Iterator { bool prefix_same_as_start_; bool iter_pinned_; // List of operands for merge operator. - std::deque merge_operands_; + MergeContext merge_context_; LocalStatistics local_stats_; // No copying allowed @@ -411,9 +412,9 @@ void DBIter::MergeValuesNewToOld() { return; } + merge_context_.Clear(); // Start the merge process by pushing the first operand - std::deque operands; - operands.push_front(iter_->value().ToString()); + merge_context_.PushOperand(iter_->value()); ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { @@ -438,7 +439,8 @@ void DBIter::MergeValuesNewToOld() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(ikey.user_key, &val, operands, + user_merge_operator_->FullMerge(ikey.user_key, &val, + merge_context_.GetOperands(), &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); @@ -450,7 +452,7 @@ void DBIter::MergeValuesNewToOld() { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. const Slice& val = iter_->value(); - operands.push_front(val.ToString()); + merge_context_.PushOperand(val); } else { assert(false); } @@ -463,8 +465,9 @@ void DBIter::MergeValuesNewToOld() { // a deletion marker. // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, + logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } } @@ -556,7 +559,7 @@ void DBIter::PrevInternal() { // saved_value_ bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); - merge_operands_.clear(); + merge_context_.Clear(); // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or // kTypeValue) ValueType last_not_merge_type = kTypeDeletion; @@ -576,19 +579,19 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: - merge_operands_.clear(); + merge_context_.Clear(); saved_value_ = iter_->value().ToString(); last_not_merge_type = kTypeValue; break; case kTypeDeletion: case kTypeSingleDeletion: - merge_operands_.clear(); + merge_context_.Clear(); last_not_merge_type = last_key_entry_type; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: assert(user_merge_operator_ != nullptr); - merge_operands_.push_back(iter_->value().ToString()); + merge_context_.PushOperandBack(iter_->value()); break; default: assert(false); @@ -611,8 +614,8 @@ bool DBIter::FindValueForCurrentKey() { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, - merge_operands_, &saved_value_, - logger_); + merge_context_.GetOperands(), + &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } else { @@ -623,8 +626,8 @@ bool DBIter::FindValueForCurrentKey() { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, - merge_operands_, &saved_value_, - logger_); + merge_context_.GetOperands(), + &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } @@ -667,11 +670,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands - std::deque operands; + merge_context_.Clear(); while (iter_->Valid() && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) && ikey.type == kTypeMerge) { - operands.push_front(iter_->value().ToString()); + merge_context_.PushOperand(iter_->value()); iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -682,7 +685,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } @@ -700,8 +704,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, - &saved_value_, logger_); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, + merge_context_.GetOperands(), &saved_value_, + logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } valid_ = true; diff --git a/db/merge_context.h b/db/merge_context.h index 74264c4c9..229f11d62 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -37,6 +37,11 @@ public: Initialize(); operand_list->push_front(operand_slice.ToString()); } + // Push back a merge operand + void PushOperandBack(const Slice& operand_slice) { + Initialize(); + operand_list->push_back(operand_slice.ToString()); + } // return total number of operands in the list size_t GetNumOperands() const { if (!operand_list) {