Call merge operators with empty values

Summary: It's not really nice to call user's API with garbage data in new_value. This diff makes sure that new_value is empty before calling the merge operator.

Test Plan: Added assert to Merge operator in merge_test

Reviewers: sdong, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D40773
main
Igor Canadi 10 years ago
parent 619167ee66
commit 4cbc4e6f88
  1. 13
      db/merge_helper.cc
  2. 4
      db/merge_operator.cc
  3. 4
      db/merge_test.cc
  4. 10
      include/rocksdb/merge_operator.h

@ -81,7 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
ParseInternalKey(keys_.back(), &orig_ikey); ParseInternalKey(keys_.back(), &orig_ikey);
bool hit_the_next_user_key = false; bool hit_the_next_user_key = false;
std::string merge_result; // Temporary value for merge results
if (steps) { if (steps) {
++(*steps); ++(*steps);
} }
@ -118,6 +117,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// => change the entry type to kTypeValue for keys_.back() // => change the entry type to kTypeValue for keys_.back()
// We are done! Return a success if the merge passes. // We are done! Return a success if the merge passes.
std::string merge_result;
Status s = TimedFullMerge(ikey.user_key, nullptr, operands_, Status s = TimedFullMerge(ikey.user_key, nullptr, operands_,
user_merge_operator_, stats, env_, logger_, user_merge_operator_, stats, env_, logger_,
&merge_result); &merge_result);
@ -130,7 +130,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
orig_ikey.type = kTypeValue; orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type); orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(), merge_result); operands_.back() = std::move(merge_result);
} }
// move iter to the next entry (before doing anything else) // move iter to the next entry (before doing anything else)
@ -148,6 +148,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// => change the entry type to kTypeValue for keys_.back() // => change the entry type to kTypeValue for keys_.back()
// We are done! Success! // We are done! Success!
const Slice val = iter->value(); const Slice val = iter->value();
std::string merge_result;
Status s = Status s =
TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_, TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_,
stats, env_, logger_, &merge_result); stats, env_, logger_, &merge_result);
@ -160,7 +161,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
orig_ikey.type = kTypeValue; orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type); orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(), merge_result); operands_.back() = std::move(merge_result);
} }
// move iter to the next entry // move iter to the next entry
@ -210,6 +211,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
assert(kTypeMerge == orig_ikey.type); assert(kTypeMerge == orig_ikey.type);
assert(operands_.size() >= 1); assert(operands_.size() >= 1);
assert(operands_.size() == keys_.size()); assert(operands_.size() == keys_.size());
std::string merge_result;
{ {
StopWatchNano timer(env_, stats != nullptr); StopWatchNano timer(env_, stats != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos); PERF_TIMER_GUARD(merge_operator_time_nanos);
@ -224,7 +226,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key[0], original_key.size(),
orig_ikey.sequence, orig_ikey.type); orig_ikey.sequence, orig_ikey.type);
swap(operands_.back(),merge_result); operands_.back() = std::move(merge_result);
} else { } else {
RecordTick(stats, NUMBER_MERGE_FAILURES); RecordTick(stats, NUMBER_MERGE_FAILURES);
// Do nothing if not success_. Leave keys() and operands() as they are. // Do nothing if not success_. Leave keys() and operands() as they are.
@ -237,6 +239,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
if (operands_.size() >= 2 && if (operands_.size() >= 2 &&
operands_.size() >= min_partial_merge_operands_) { operands_.size() >= min_partial_merge_operands_) {
bool merge_success = false; bool merge_success = false;
std::string merge_result;
{ {
StopWatchNano timer(env_, stats != nullptr); StopWatchNano timer(env_, stats != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos); PERF_TIMER_GUARD(merge_operator_time_nanos);
@ -251,7 +254,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// Merging of operands (associative merge) was successful. // Merging of operands (associative merge) was successful.
// Replace operands with the merge result // Replace operands with the merge result
operands_.clear(); operands_.clear();
operands_.push_front(std::move(merge_result)); operands_.emplace_front(std::move(merge_result));
keys_.erase(keys_.begin(), keys_.end() - 1); keys_.erase(keys_.begin(), keys_.end() - 1);
} }
} }

@ -20,11 +20,11 @@ bool MergeOperator::PartialMergeMulti(const Slice& key,
Logger* logger) const { Logger* logger) const {
assert(operand_list.size() >= 2); assert(operand_list.size() >= 2);
// Simply loop through the operands // Simply loop through the operands
std::string temp_value;
Slice temp_slice(operand_list[0]); Slice temp_slice(operand_list[0]);
for (size_t i = 1; i < operand_list.size(); ++i) { for (size_t i = 1; i < operand_list.size(); ++i) {
auto& operand = operand_list[i]; auto& operand = operand_list[i];
std::string temp_value;
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
return false; return false;
} }
@ -48,9 +48,9 @@ bool AssociativeMergeOperator::FullMerge(
// Simply loop through the operands // Simply loop through the operands
Slice temp_existing; Slice temp_existing;
std::string temp_value;
for (const auto& operand : operand_list) { for (const auto& operand : operand_list) {
Slice value(operand); Slice value(operand);
std::string temp_value;
if (!Merge(key, existing_value, value, &temp_value, logger)) { if (!Merge(key, existing_value, value, &temp_value, logger)) {
return false; return false;
} }

@ -7,6 +7,7 @@
#include <memory> #include <memory>
#include <iostream> #include <iostream>
#include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -41,6 +42,7 @@ class CountMergeOperator : public AssociativeMergeOperator {
const Slice& value, const Slice& value,
std::string* new_value, std::string* new_value,
Logger* logger) const override { Logger* logger) const override {
assert(new_value->empty());
++num_merge_operator_calls; ++num_merge_operator_calls;
if (existing_value == nullptr) { if (existing_value == nullptr) {
new_value->assign(value.data(), value.size()); new_value->assign(value.data(), value.size());
@ -59,6 +61,7 @@ class CountMergeOperator : public AssociativeMergeOperator {
const std::deque<Slice>& operand_list, const std::deque<Slice>& operand_list,
std::string* new_value, std::string* new_value,
Logger* logger) const override { Logger* logger) const override {
assert(new_value->empty());
++num_partial_merge_calls; ++num_partial_merge_calls;
return mergeOperator_->PartialMergeMulti(key, operand_list, new_value, return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
logger); logger);
@ -498,6 +501,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
//TODO: Make this test like a general rocksdb unit-test //TODO: Make this test like a general rocksdb unit-test
rocksdb::port::InstallStackTraceHandler();
runTest(argc, test::TmpDir() + "/merge_testdb"); runTest(argc, test::TmpDir() + "/merge_testdb");
runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database
printf("Passed all tests!\n"); printf("Passed all tests!\n");

@ -54,7 +54,8 @@ class MergeOperator {
// merge operation semantics // merge operation semantics
// existing: (IN) null indicates that the key does not exist before this op // existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first. // operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value:(OUT) Client is responsible for filling the merge result here // new_value:(OUT) Client is responsible for filling the merge result here.
// The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge. // logger: (IN) Client could use this to log errors during merge.
// //
// Return true on success. // Return true on success.
@ -80,6 +81,8 @@ class MergeOperator {
// DB::Merge(key, *new_value) would yield the same result as a call // DB::Merge(key, *new_value) would yield the same result as a call
// to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op).
// //
// The string that new_value is pointing to will be empty.
//
// The default implementation of PartialMergeMulti will use this function // The default implementation of PartialMergeMulti will use this function
// as a helper, for backward compatibility. Any successor class of // as a helper, for backward compatibility. Any successor class of
// MergeOperator should either implement PartialMerge or PartialMergeMulti, // MergeOperator should either implement PartialMerge or PartialMergeMulti,
@ -116,6 +119,8 @@ class MergeOperator {
// the same result as subquential individual calls to DB::Merge(key, operand) // the same result as subquential individual calls to DB::Merge(key, operand)
// for each operand in operand_list from front() to back(). // for each operand in operand_list from front() to back().
// //
// The string that new_value is pointing to will be empty.
//
// The PartialMergeMulti function will be called only when the list of // The PartialMergeMulti function will be called only when the list of
// operands are long enough. The minimum amount of operands that will be // operands are long enough. The minimum amount of operands that will be
// passed to the function are specified by the "min_partial_merge_operands" // passed to the function are specified by the "min_partial_merge_operands"
@ -147,7 +152,8 @@ class AssociativeMergeOperator : public MergeOperator {
// key: (IN) The key that's associated with this merge operation. // key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op // existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with // value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result here // new_value: (OUT) Client is responsible for filling the merge result
// here. The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge. // logger: (IN) Client could use this to log errors during merge.
// //
// Return true on success. // Return true on success.

Loading…
Cancel
Save