From a8029fdc752acfca9c509f28509919b25fb711f4 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Mon, 2 Dec 2013 18:34:05 -0800 Subject: [PATCH] Introduce MergeContext to Lazily Initialize merge operand list Summary: In get operations, merge_operands is only used in few cases. Lazily initialize it can reduce average latency in some cases Test Plan: make all check Reviewers: haobo, kailiu, dhruba Reviewed By: haobo CC: igor, nkg-, leveldb Differential Revision: https://reviews.facebook.net/D14415 Conflicts: db/db_impl.cc db/memtable.cc --- db/db_impl.cc | 21 +++++----- db/db_impl_readonly.cc | 8 ++-- db/memtable.cc | 32 +++++++-------- db/memtable.h | 3 +- db/memtablelist.cc | 5 +-- db/memtablelist.h | 2 +- db/merge_context.h | 69 ++++++++++++++++++++++++++++++++ db/version_set.cc | 34 +++++++++------- db/version_set.h | 7 ++-- include/rocksdb/merge_operator.h | 1 + 10 files changed, 129 insertions(+), 53 deletions(-) create mode 100644 db/merge_context.h diff --git a/db/db_impl.cc b/db/db_impl.cc index 53d2acdb0..49423d776 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -27,6 +27,7 @@ #include "db/log_writer.h" #include "db/memtable.h" #include "db/memtablelist.h" +#include "db/merge_context.h" #include "db/merge_helper.h" #include "db/prefix_filter_iterator.h" #include "db/table_cache.h" @@ -2608,20 +2609,20 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Prepare to store a list of merge operations if merge occurs. - std::deque merge_operands; + MergeContext merge_context; // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, &merge_operands, options_)) { + if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { + } else if (imm.Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { - current->Get(options, lkey, value, &s, &merge_operands, &stats, + current->Get(options, lkey, value, &s, &merge_context, &stats, options_, value_found); have_stat_update = true; RecordTick(options_.statistics.get(), MEMTABLE_MISS); @@ -2676,8 +2677,8 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, bool have_stat_update = false; Version::GetStats stats; - // Prepare to store a list of merge operations if merge occurs. - std::deque merge_operands; + // Contain a list of merge operations if merge occurs. + MergeContext merge_context; // Note: this always resizes the values array int numKeys = keys.size(); @@ -2692,17 +2693,17 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. for (int i=0; iGet(lkey, value, &s, &merge_operands, options_)) { + if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { + } else if (imm.Get(lkey, value, &s, merge_context, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); + current->Get(options, lkey, value, &s, &merge_context, &stats, options_); have_stat_update = true; } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 27d5c31ed..dbb297e93 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -23,6 +23,7 @@ #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" +#include "db/merge_context.h" #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -30,6 +31,7 @@ #include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/table.h" +#include "rocksdb/merge_operator.h" #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -57,12 +59,12 @@ Status DBImplReadOnly::Get(const ReadOptions& options, MemTable* mem = GetMemTable(); Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); - std::deque merge_operands; + MergeContext merge_context; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, &merge_operands, options_)) { + if (mem->Get(lkey, value, &s, merge_context, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); + current->Get(options, lkey, value, &s, &merge_context, &stats, options_); } return s; } diff --git a/db/memtable.cc b/db/memtable.cc index f86af4e33..d2a51a125 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -12,6 +12,7 @@ #include #include "db/dbformat.h" +#include "db/merge_context.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -162,15 +163,12 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - std::deque* operands, const Options& options) { + MergeContext& merge_context, const Options& options) { Slice memkey = key.memtable_key(); std::shared_ptr iter( table_->GetIterator(key.user_key())); iter->Seek(memkey.data()); - // It is the caller's responsibility to allocate/delete operands list - assert(operands != nullptr); - bool merge_in_progress = s->IsMergeInProgress(); auto merge_operator = options.merge_operator.get(); auto logger = options.info_log; @@ -202,8 +200,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, *s = Status::OK(); if (merge_in_progress) { assert(merge_operator); - if (!merge_operator->FullMerge(key.user_key(), &v, *operands, - value, logger.get())) { + if (!merge_operator->FullMerge(key.user_key(), &v, + merge_context.GetOperands(), value, + logger.get())) { RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } @@ -219,8 +218,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (merge_in_progress) { assert(merge_operator); *s = Status::OK(); - if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands, - value, logger.get())) { + if (!merge_operator->FullMerge(key.user_key(), nullptr, + merge_context.GetOperands(), value, + logger.get())) { RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); *s = Status::Corruption("Error: Could not perform merge."); } @@ -232,16 +232,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, case kTypeMerge: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); merge_in_progress = true; - operands->push_front(v.ToString()); - while(operands->size() >= 2) { + merge_context.PushOperand(v); + while(merge_context.GetNumOperands() >= 2) { // Attempt to associative merge. (Returns true if successful) - if (merge_operator->PartialMerge(key.user_key(), - Slice((*operands)[0]), - Slice((*operands)[1]), - &merge_result, - logger.get())) { - operands->pop_front(); - swap(operands->front(), merge_result); + if (merge_operator->PartialMerge(key.user_key(), + merge_context.GetOperand(0), + merge_context.GetOperand(1), + &merge_result, logger.get())) { + merge_context.PushPartialMergeResult(merge_result); } else { // Stack them because user can't associative merge break; diff --git a/db/memtable.h b/db/memtable.h index 7a0d6b343..79d5ba2d0 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -22,6 +22,7 @@ namespace rocksdb { class Mutex; class MemTableIterator; +class MergeContext; class MemTable { public: @@ -94,7 +95,7 @@ class MemTable { // store MergeInProgress in s, and return false. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - std::deque* operands, const Options& options); + MergeContext& merge_context, const Options& options); // Update the value and return status ok, // if key exists in current memtable diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 3d4d35fd8..71e4e5a92 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -204,10 +204,9 @@ size_t MemTableList::ApproximateMemoryUsage() { // Return the most recent value found, if any. // Operands stores the list of merge operations to apply, so far. bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - std::deque* operands, - const Options& options) { + MergeContext& merge_context, const Options& options) { for (auto &memtable : memlist_) { - if (memtable->Get(key, value, s, operands, options)) { + if (memtable->Get(key, value, s, merge_context, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index 17c6c3ae4..ed353c8b8 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -78,7 +78,7 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - std::deque* operands, const Options& options); + MergeContext& merge_context, const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_context.h b/db/merge_context.h new file mode 100644 index 000000000..91d9f8a01 --- /dev/null +++ b/db/merge_context.h @@ -0,0 +1,69 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#pragma once +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include +#include + +namespace rocksdb { + +const std::deque empty_operand_list; + +// The merge context for merging a user key. +// When doing a Get(), DB will create such a class and pass it when +// issuing Get() operation to memtables and version_set. The operands +// will be fetched from the context when issuing partial of full merge. +class MergeContext { +public: + // Clear all the operands + void Clear() { + if (operand_list) { + operand_list->clear(); + } + } + // Replace the first two operands of merge_result, which are expected be the + // merge results of them. + void PushPartialMergeResult(std::string& merge_result) { + assert (operand_list); + operand_list->pop_front(); + swap(operand_list->front(), merge_result); + } + // Push a merge operand + void PushOperand(const Slice& operand_slice) { + Initialize(); + operand_list->push_front(operand_slice.ToString()); + } + // return total number of operands in the list + size_t GetNumOperands() const { + if (!operand_list) { + return 0; + } + return operand_list->size(); + } + // Get the operand at the index. + Slice GetOperand(int index) const { + assert (operand_list); + return (*operand_list)[index]; + } + // Return all the operands. + const std::deque& GetOperands() const { + if (!operand_list) { + return empty_operand_list; + } + return *operand_list; + } +private: + void Initialize() { + if (!operand_list) { + operand_list.reset(new std::deque()); + } + } + std::unique_ptr> operand_list; +}; + +} // namespace rocksdb + diff --git a/db/version_set.cc b/db/version_set.cc index 2ebb64adf..741752936 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" +#include "db/merge_context.h" #include "db/table_cache.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" @@ -287,7 +288,8 @@ struct Saver { bool* value_found; // Is value set correctly? Used by KeyMayExist std::string* value; const MergeOperator* merge_operator; - std::deque* merge_operands; // the merge operations encountered + // the merge operations encountered; + MergeContext* merge_context; Logger* logger; bool didIO; // did we do any disk io? Statistics* statistics; @@ -309,10 +311,10 @@ static void MarkKeyMayExist(void* arg) { static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ Saver* s = reinterpret_cast(arg); - std::deque* const ops = s->merge_operands; // shorter alias + MergeContext* merge_contex = s->merge_context; std::string merge_result; // temporary area for merge results later - assert(s != nullptr && ops != nullptr); + assert(s != nullptr && merge_contex != nullptr); ParsedInternalKey parsed_key; // TODO: didIO and Merge? @@ -331,7 +333,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } else if (kMerge == s->state) { assert(s->merge_operator != nullptr); s->state = kFound; - if (!s->merge_operator->FullMerge(s->user_key, &v, *ops, + if (!s->merge_operator->FullMerge(s->user_key, &v, + merge_contex->GetOperands(), s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; @@ -346,8 +349,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kDeleted; } else if (kMerge == s->state) { s->state = kFound; - if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops, - s->value, s->logger)) { + if (!s->merge_operator->FullMerge(s->user_key, nullptr, + merge_contex->GetOperands(), + s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); s->state = kCorrupt; } @@ -359,16 +363,15 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ case kTypeMerge: assert(s->state == kNotFound || s->state == kMerge); s->state = kMerge; - ops->push_front(v.ToString()); - while (ops->size() >= 2) { + merge_contex->PushOperand(v); + while (merge_contex->GetNumOperands() >= 2) { // Attempt to merge operands together via user associateive merge if (s->merge_operator->PartialMerge(s->user_key, - Slice((*ops)[0]), - Slice((*ops)[1]), + merge_contex->GetOperand(0), + merge_contex->GetOperand(1), &merge_result, s->logger)) { - ops->pop_front(); - swap(ops->front(), merge_result); + merge_contex->PushPartialMergeResult(merge_result); } else { // Associative merge returns false ==> stack the operands break; @@ -417,7 +420,7 @@ void Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, Status* status, - std::deque* operands, + MergeContext* merge_context, GetStats* stats, const Options& db_options, bool* value_found) { @@ -436,7 +439,7 @@ void Version::Get(const ReadOptions& options, saver.value_found = value_found; saver.value = value; saver.merge_operator = merge_operator; - saver.merge_operands = operands; + saver.merge_context = merge_context; saver.logger = logger.get(); saver.didIO = false; saver.statistics = db_options.statistics.get(); @@ -557,7 +560,8 @@ void Version::Get(const ReadOptions& options, if (kMerge == saver.state) { // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands, + if (merge_operator->FullMerge(user_key, nullptr, + saver.merge_context->GetOperands(), value, logger.get())) { *status = Status::OK(); } else { diff --git a/db/version_set.h b/db/version_set.h index 38415173c..bf466a932 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -38,6 +38,7 @@ class MemTable; class TableCache; class Version; class VersionSet; +class MergeContext; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. @@ -76,9 +77,9 @@ class Version { int seek_file_level; }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, - Status* status, std::deque* operands, GetStats* stats, - const Options& db_option, - bool* value_found = nullptr); + Status* status, MergeContext* merge_context, + GetStats* stats, const Options& db_option, bool* value_found = + nullptr); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index ddb3102e3..bd4c36c07 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -6,6 +6,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ #define STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ +#include #include #include #include "rocksdb/slice.h"