From 8a1a603fdb3ca9f26ea2586def26d8af2dbc02a8 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Fri, 1 Apr 2016 15:48:55 -0700 Subject: [PATCH] Eliminate std::deque initialization while iterating over merge operands Summary: This patch is similar to D52563, When we iterate over a DB with merge operands we keep creating std::queue to store the operands, optimize this by reusing merge_operands_ data member Before the patch ``` ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq" --db="/dev/shm/bench_merge_memcpy_on_the_fly/" --merge_operator="put" --merge_keys=10000 --num=10000 DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.757 micros/op 266141 ops/sec; 29.4 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.413 micros/op 2423538 ops/sec; 268.1 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.451 micros/op 2219071 ops/sec; 245.5 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.420 micros/op 2382039 ops/sec; 263.5 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.408 micros/op 2452017 ops/sec; 271.3 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.947 micros/op 253376 ops/sec; 28.0 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.441 micros/op 2266473 ops/sec; 250.7 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.471 micros/op 2122033 ops/sec; 234.8 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.440 micros/op 2271407 ops/sec; 251.3 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.429 micros/op 2331471 ops/sec; 257.9 MB/s ``` with the patch ``` ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq" --db="/dev/shm/bench_merge_memcpy_on_the_fly/" --merge_operator="put" --merge_keys=10000 --num=10000 DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 4.080 micros/op 245092 ops/sec; 27.1 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.308 micros/op 3241843 ops/sec; 358.6 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.312 micros/op 3200408 ops/sec; 354.0 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.332 micros/op 3013962 ops/sec; 333.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.300 micros/op 3328017 ops/sec; 368.2 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.973 micros/op 251705 ops/sec; 27.8 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.320 micros/op 3123752 ops/sec; 345.6 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.335 micros/op 2986641 ops/sec; 330.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.339 micros/op 2950047 ops/sec; 326.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.319 micros/op 3131565 ops/sec; 346.4 MB/s ``` Test Plan: make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D56031 --- db/db_iter.cc | 45 +++++++++++++++++++++++++-------------------- db/merge_context.h | 5 +++++ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index 256b65447..4afe610f7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -15,6 +15,7 @@ #include "db/dbformat.h" #include "db/filename.h" +#include "db/merge_context.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -251,7 +252,7 @@ class DBIter: public Iterator { bool prefix_same_as_start_; bool iter_pinned_; // List of operands for merge operator. - std::deque merge_operands_; + MergeContext merge_context_; LocalStatistics local_stats_; // No copying allowed @@ -411,9 +412,9 @@ void DBIter::MergeValuesNewToOld() { return; } + merge_context_.Clear(); // Start the merge process by pushing the first operand - std::deque operands; - operands.push_front(iter_->value().ToString()); + merge_context_.PushOperand(iter_->value()); ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { @@ -438,7 +439,8 @@ void DBIter::MergeValuesNewToOld() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(ikey.user_key, &val, operands, + user_merge_operator_->FullMerge(ikey.user_key, &val, + merge_context_.GetOperands(), &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); @@ -450,7 +452,7 @@ void DBIter::MergeValuesNewToOld() { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. const Slice& val = iter_->value(); - operands.push_front(val.ToString()); + merge_context_.PushOperand(val); } else { assert(false); } @@ -463,8 +465,9 @@ void DBIter::MergeValuesNewToOld() { // a deletion marker. // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, - &saved_value_, logger_); + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, + logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } } @@ -556,7 +559,7 @@ void DBIter::PrevInternal() { // saved_value_ bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); - merge_operands_.clear(); + merge_context_.Clear(); // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or // kTypeValue) ValueType last_not_merge_type = kTypeDeletion; @@ -576,19 +579,19 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: - merge_operands_.clear(); + merge_context_.Clear(); saved_value_ = iter_->value().ToString(); last_not_merge_type = kTypeValue; break; case kTypeDeletion: case kTypeSingleDeletion: - merge_operands_.clear(); + merge_context_.Clear(); last_not_merge_type = last_key_entry_type; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: assert(user_merge_operator_ != nullptr); - merge_operands_.push_back(iter_->value().ToString()); + merge_context_.PushOperandBack(iter_->value()); break; default: assert(false); @@ -611,8 +614,8 @@ bool DBIter::FindValueForCurrentKey() { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, - merge_operands_, &saved_value_, - logger_); + merge_context_.GetOperands(), + &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } else { @@ -623,8 +626,8 @@ bool DBIter::FindValueForCurrentKey() { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, - merge_operands_, &saved_value_, - logger_); + merge_context_.GetOperands(), + &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } @@ -667,11 +670,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands - std::deque operands; + merge_context_.Clear(); while (iter_->Valid() && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) && ikey.type == kTypeMerge) { - operands.push_front(iter_->value().ToString()); + merge_context_.PushOperand(iter_->value()); iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -682,7 +685,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands, + user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } @@ -700,8 +704,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands, - &saved_value_, logger_); + user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, + merge_context_.GetOperands(), &saved_value_, + logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos()); } valid_ = true; diff --git a/db/merge_context.h b/db/merge_context.h index 74264c4c9..229f11d62 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -37,6 +37,11 @@ public: Initialize(); operand_list->push_front(operand_slice.ToString()); } + // Push back a merge operand + void PushOperandBack(const Slice& operand_slice) { + Initialize(); + operand_list->push_back(operand_slice.ToString()); + } // return total number of operands in the list size_t GetNumOperands() const { if (!operand_list) {