Introduce MergeContext to Lazily Initialize merge operand list

Summary: In get operations, merge_operands is only used in few cases. Lazily initialize it can reduce average latency in some cases

Test Plan: make all check

Reviewers: haobo, kailiu, dhruba

Reviewed By: haobo

CC: igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D14415

Conflicts:
	db/db_impl.cc
	db/memtable.cc
main
Siying Dong 11 years ago
parent bc5dd19b14
commit a8029fdc75
  1. 21
      db/db_impl.cc
  2. 8
      db/db_impl_readonly.cc
  3. 32
      db/memtable.cc
  4. 3
      db/memtable.h
  5. 5
      db/memtablelist.cc
  6. 2
      db/memtablelist.h
  7. 69
      db/merge_context.h
  8. 34
      db/version_set.cc
  9. 7
      db/version_set.h
  10. 1
      include/rocksdb/merge_operator.h

@ -27,6 +27,7 @@
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/memtablelist.h" #include "db/memtablelist.h"
#include "db/merge_context.h"
#include "db/merge_helper.h" #include "db/merge_helper.h"
#include "db/prefix_filter_iterator.h" #include "db/prefix_filter_iterator.h"
#include "db/table_cache.h" #include "db/table_cache.h"
@ -2608,20 +2609,20 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Prepare to store a list of merge operations if merge occurs. // Prepare to store a list of merge operations if merge occurs.
std::deque<std::string> merge_operands; MergeContext merge_context;
// First look in the memtable, then in the immutable memtable (if any). // First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress. // s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case. // merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) { if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { } else if (imm.Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else { } else {
current->Get(options, lkey, value, &s, &merge_operands, &stats, current->Get(options, lkey, value, &s, &merge_context, &stats,
options_, value_found); options_, value_found);
have_stat_update = true; have_stat_update = true;
RecordTick(options_.statistics.get(), MEMTABLE_MISS); RecordTick(options_.statistics.get(), MEMTABLE_MISS);
@ -2676,8 +2677,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
bool have_stat_update = false; bool have_stat_update = false;
Version::GetStats stats; Version::GetStats stats;
// Prepare to store a list of merge operations if merge occurs. // Contain a list of merge operations if merge occurs.
std::deque<std::string> merge_operands; MergeContext merge_context;
// Note: this always resizes the values array // Note: this always resizes the values array
int numKeys = keys.size(); int numKeys = keys.size();
@ -2692,17 +2693,17 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress. // s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case. // merge_operands will contain the sequence of merges in the latter case.
for (int i=0; i<numKeys; ++i) { for (int i=0; i<numKeys; ++i) {
merge_operands.clear(); merge_context.Clear();
Status& s = statList[i]; Status& s = statList[i];
std::string* value = &(*values)[i]; std::string* value = &(*values)[i];
LookupKey lkey(keys[i], snapshot); LookupKey lkey(keys[i], snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) { if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { } else if (imm.Get(lkey, value, &s, merge_context, options_)) {
// Done // Done
} else { } else {
current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
have_stat_update = true; have_stat_update = true;
} }

@ -23,6 +23,7 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -30,6 +31,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h" #include "port/port.h"
#include "table/block.h" #include "table/block.h"
#include "table/merger.h" #include "table/merger.h"
@ -57,12 +59,12 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
MemTable* mem = GetMemTable(); MemTable* mem = GetMemTable();
Version* current = versions_->current(); Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence(); SequenceNumber snapshot = versions_->LastSequence();
std::deque<std::string> merge_operands; MergeContext merge_context;
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) { if (mem->Get(lkey, value, &s, merge_context, options_)) {
} else { } else {
Version::GetStats stats; Version::GetStats stats;
current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
} }
return s; return s;
} }

@ -12,6 +12,7 @@
#include <memory> #include <memory>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/merge_context.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -162,15 +163,12 @@ void MemTable::Add(SequenceNumber s, ValueType type,
} }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options) { MergeContext& merge_context, const Options& options) {
Slice memkey = key.memtable_key(); Slice memkey = key.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter( std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key())); table_->GetIterator(key.user_key()));
iter->Seek(memkey.data()); iter->Seek(memkey.data());
// It is the caller's responsibility to allocate/delete operands list
assert(operands != nullptr);
bool merge_in_progress = s->IsMergeInProgress(); bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get(); auto merge_operator = options.merge_operator.get();
auto logger = options.info_log; auto logger = options.info_log;
@ -202,8 +200,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
*s = Status::OK(); *s = Status::OK();
if (merge_in_progress) { if (merge_in_progress) {
assert(merge_operator); assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v, *operands, if (!merge_operator->FullMerge(key.user_key(), &v,
value, logger.get())) { merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge."); *s = Status::Corruption("Error: Could not perform merge.");
} }
@ -219,8 +218,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (merge_in_progress) { if (merge_in_progress) {
assert(merge_operator); assert(merge_operator);
*s = Status::OK(); *s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands, if (!merge_operator->FullMerge(key.user_key(), nullptr,
value, logger.get())) { merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge."); *s = Status::Corruption("Error: Could not perform merge.");
} }
@ -232,16 +232,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
case kTypeMerge: { case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
merge_in_progress = true; merge_in_progress = true;
operands->push_front(v.ToString()); merge_context.PushOperand(v);
while(operands->size() >= 2) { while(merge_context.GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful) // Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(key.user_key(), if (merge_operator->PartialMerge(key.user_key(),
Slice((*operands)[0]), merge_context.GetOperand(0),
Slice((*operands)[1]), merge_context.GetOperand(1),
&merge_result, &merge_result, logger.get())) {
logger.get())) { merge_context.PushPartialMergeResult(merge_result);
operands->pop_front();
swap(operands->front(), merge_result);
} else { } else {
// Stack them because user can't associative merge // Stack them because user can't associative merge
break; break;

@ -22,6 +22,7 @@ namespace rocksdb {
class Mutex; class Mutex;
class MemTableIterator; class MemTableIterator;
class MergeContext;
class MemTable { class MemTable {
public: public:
@ -94,7 +95,7 @@ class MemTable {
// store MergeInProgress in s, and return false. // store MergeInProgress in s, and return false.
// Else, return false. // Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options); MergeContext& merge_context, const Options& options);
// Update the value and return status ok, // Update the value and return status ok,
// if key exists in current memtable // if key exists in current memtable

@ -204,10 +204,9 @@ size_t MemTableList::ApproximateMemoryUsage() {
// Return the most recent value found, if any. // Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far. // Operands stores the list of merge operations to apply, so far.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, MergeContext& merge_context, const Options& options) {
const Options& options) {
for (auto &memtable : memlist_) { for (auto &memtable : memlist_) {
if (memtable->Get(key, value, s, operands, options)) { if (memtable->Get(key, value, s, merge_context, options)) {
return true; return true;
} }
} }

@ -78,7 +78,7 @@ class MemTableList {
// Search all the memtables starting from the most recent one. // Search all the memtables starting from the most recent one.
// Return the most recent value found, if any. // Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s, bool Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options); MergeContext& merge_context, const Options& options);
// Returns the list of underlying memtables. // Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list); void GetMemTables(std::vector<MemTable*>* list);

@ -0,0 +1,69 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include "db/dbformat.h"
#include "rocksdb/slice.h"
#include <string>
#include <deque>
namespace rocksdb {
const std::deque<std::string> empty_operand_list;
// The merge context for merging a user key.
// When doing a Get(), DB will create such a class and pass it when
// issuing Get() operation to memtables and version_set. The operands
// will be fetched from the context when issuing partial of full merge.
class MergeContext {
public:
// Clear all the operands
void Clear() {
if (operand_list) {
operand_list->clear();
}
}
// Replace the first two operands of merge_result, which are expected be the
// merge results of them.
void PushPartialMergeResult(std::string& merge_result) {
assert (operand_list);
operand_list->pop_front();
swap(operand_list->front(), merge_result);
}
// Push a merge operand
void PushOperand(const Slice& operand_slice) {
Initialize();
operand_list->push_front(operand_slice.ToString());
}
// return total number of operands in the list
size_t GetNumOperands() const {
if (!operand_list) {
return 0;
}
return operand_list->size();
}
// Get the operand at the index.
Slice GetOperand(int index) const {
assert (operand_list);
return (*operand_list)[index];
}
// Return all the operands.
const std::deque<std::string>& GetOperands() const {
if (!operand_list) {
return empty_operand_list;
}
return *operand_list;
}
private:
void Initialize() {
if (!operand_list) {
operand_list.reset(new std::deque<std::string>());
}
}
std::unique_ptr<std::deque<std::string>> operand_list;
};
} // namespace rocksdb

@ -16,6 +16,7 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
@ -287,7 +288,8 @@ struct Saver {
bool* value_found; // Is value set correctly? Used by KeyMayExist bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value; std::string* value;
const MergeOperator* merge_operator; const MergeOperator* merge_operator;
std::deque<std::string>* merge_operands; // the merge operations encountered // the merge operations encountered;
MergeContext* merge_context;
Logger* logger; Logger* logger;
bool didIO; // did we do any disk io? bool didIO; // did we do any disk io?
Statistics* statistics; Statistics* statistics;
@ -309,10 +311,10 @@ static void MarkKeyMayExist(void* arg) {
static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
Saver* s = reinterpret_cast<Saver*>(arg); Saver* s = reinterpret_cast<Saver*>(arg);
std::deque<std::string>* const ops = s->merge_operands; // shorter alias MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later std::string merge_result; // temporary area for merge results later
assert(s != nullptr && ops != nullptr); assert(s != nullptr && merge_contex != nullptr);
ParsedInternalKey parsed_key; ParsedInternalKey parsed_key;
// TODO: didIO and Merge? // TODO: didIO and Merge?
@ -331,7 +333,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
} else if (kMerge == s->state) { } else if (kMerge == s->state) {
assert(s->merge_operator != nullptr); assert(s->merge_operator != nullptr);
s->state = kFound; s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v, *ops, if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) { s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES); RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt; s->state = kCorrupt;
@ -346,8 +349,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
s->state = kDeleted; s->state = kDeleted;
} else if (kMerge == s->state) { } else if (kMerge == s->state) {
s->state = kFound; s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops, if (!s->merge_operator->FullMerge(s->user_key, nullptr,
s->value, s->logger)) { merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES); RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt; s->state = kCorrupt;
} }
@ -359,16 +363,15 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
case kTypeMerge: case kTypeMerge:
assert(s->state == kNotFound || s->state == kMerge); assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge; s->state = kMerge;
ops->push_front(v.ToString()); merge_contex->PushOperand(v);
while (ops->size() >= 2) { while (merge_contex->GetNumOperands() >= 2) {
// Attempt to merge operands together via user associateive merge // Attempt to merge operands together via user associateive merge
if (s->merge_operator->PartialMerge(s->user_key, if (s->merge_operator->PartialMerge(s->user_key,
Slice((*ops)[0]), merge_contex->GetOperand(0),
Slice((*ops)[1]), merge_contex->GetOperand(1),
&merge_result, &merge_result,
s->logger)) { s->logger)) {
ops->pop_front(); merge_contex->PushPartialMergeResult(merge_result);
swap(ops->front(), merge_result);
} else { } else {
// Associative merge returns false ==> stack the operands // Associative merge returns false ==> stack the operands
break; break;
@ -417,7 +420,7 @@ void Version::Get(const ReadOptions& options,
const LookupKey& k, const LookupKey& k,
std::string* value, std::string* value,
Status* status, Status* status,
std::deque<std::string>* operands, MergeContext* merge_context,
GetStats* stats, GetStats* stats,
const Options& db_options, const Options& db_options,
bool* value_found) { bool* value_found) {
@ -436,7 +439,7 @@ void Version::Get(const ReadOptions& options,
saver.value_found = value_found; saver.value_found = value_found;
saver.value = value; saver.value = value;
saver.merge_operator = merge_operator; saver.merge_operator = merge_operator;
saver.merge_operands = operands; saver.merge_context = merge_context;
saver.logger = logger.get(); saver.logger = logger.get();
saver.didIO = false; saver.didIO = false;
saver.statistics = db_options.statistics.get(); saver.statistics = db_options.statistics.get();
@ -557,7 +560,8 @@ void Version::Get(const ReadOptions& options,
if (kMerge == saver.state) { if (kMerge == saver.state) {
// merge_operands are in saver and we hit the beginning of the key history // merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands; // do a final merge of nullptr and operands;
if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands, if (merge_operator->FullMerge(user_key, nullptr,
saver.merge_context->GetOperands(),
value, logger.get())) { value, logger.get())) {
*status = Status::OK(); *status = Status::OK();
} else { } else {

@ -38,6 +38,7 @@ class MemTable;
class TableCache; class TableCache;
class Version; class Version;
class VersionSet; class VersionSet;
class MergeContext;
// Return the smallest index i such that files[i]->largest >= key. // Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file. // Return files.size() if there is no such file.
@ -76,9 +77,9 @@ class Version {
int seek_file_level; int seek_file_level;
}; };
void Get(const ReadOptions&, const LookupKey& key, std::string* val, void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, std::deque<std::string>* operands, GetStats* stats, Status* status, MergeContext* merge_context,
const Options& db_option, GetStats* stats, const Options& db_option, bool* value_found =
bool* value_found = nullptr); nullptr);
// Adds "stats" into the current state. Returns true if a new // Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise. // compaction may need to be triggered, false otherwise.

@ -6,6 +6,7 @@
#ifndef STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ #ifndef STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_
#define STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_ #define STORAGE_ROCKSDB_INCLUDE_MERGE_OPERATOR_H_
#include <memory>
#include <string> #include <string>
#include <deque> #include <deque>
#include "rocksdb/slice.h" #include "rocksdb/slice.h"

Loading…
Cancel
Save