Introduce conditional merge-operator invocation in point lookups

Summary:
For every merge operand encountered for a key in the read path we now have the ability to decide whether to look further (to retrieve more merge operands for the key) or stop and invoke the merge operator to return the value. The user needs to override `ShouldMerge()` method with a condition to terminate search when true to avail this facility.

This has a couple of advantages:
1. It helps in limiting the number of merge operands that are looked at to compute a value as part of a user Get operation.
2. It allows to peek at a merge key-value to see if further merge operands need to look at.

Example: Limiting the number of merge operands that are looked at: Lets say you have 10 merge operands for a key spread over various levels. If you only want RocksDB to look at the latest two merge operands instead of all 10 to compute the value, it is now possible with this PR. You can set the condition in `ShouldMerge()` to return true when the size of the operand list is 2. Look at the example implementation in the unit test. Without this PR, a Get might look at all the 10 merge operands in different levels before invoking the merge-operator.

Added a new unit test.
Made sure that there is no perf regression by running benchmarks.

Command line to Load data:
```
TEST_TMPDIR=/dev/shm ./db_bench --benchmarks="mergerandom" --merge_operator="uint64add" --num=10000000
...
mergerandom  :      12.861 micros/op 77757 ops/sec;    8.6 MB/s ( updates:10000000)
```

**ReadRandomMergeRandom bechmark results:**
Command line:
```
TEST_TMPDIR=/dev/shm ./db_bench --benchmarks="readrandommergerandom" --merge_operator="uint64add" --num=10000000
```

Base -- Without this code change (on commit fc7476b):
```
readrandommergerandom :      38.586 micros/op 25916 ops/sec; (reads:3001599 merges:6998401 total:10000000 hits:842235 maxlength:8)
```

With this code change:
```
readrandommergerandom :      38.653 micros/op 25870 ops/sec; (reads:3001599 merges:6998401 total:10000000 hits:842235 maxlength:8)
```
Closes https://github.com/facebook/rocksdb/pull/2923

Differential Revision: D5898239

Pulled By: sagar0

fbshipit-source-id: daefa325019f77968639a75c851d46352c2303ef
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent a48a398e7c
commit 93c2b91740
  1. 76
      db/db_merge_operator_test.cc
  2. 8
      db/memtable.cc
  3. 9
      include/rocksdb/merge_operator.h
  4. 15
      table/get_context.cc

@ -8,7 +8,9 @@
#include "db/db_test_util.h"
#include "db/forward_iterator.h"
#include "port/stack_trace.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace rocksdb {
@ -18,6 +20,80 @@ class DBMergeOperatorTest : public DBTestBase {
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
};
TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
public:
LimitedStringAppendMergeOp(int limit, char delim)
: StringAppendTESTOperator(delim), limit_(limit) {}
const char* Name() const override {
return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
}
bool ShouldMerge(const std::vector<Slice>& operands) const override {
if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
return true;
}
return false;
}
private:
size_t limit_ = 0;
};
Options options;
options.create_if_missing = true;
// Use only the latest two merge operands.
options.merge_operator =
std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.env = env_;
Reopen(options);
// All K1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
// Make sure that only the latest two merge operands are used. If this was
// not the case the value would be "a,b,c,d".
ASSERT_EQ(value, "c,d");
// All K2 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2", "a"));
ASSERT_OK(Merge("k2", "b"));
ASSERT_OK(Merge("k2", "c"));
ASSERT_OK(Merge("k2", "d"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
ASSERT_EQ(value, "c,d");
// All K3 values are flushed and are in different files.
ASSERT_OK(Merge("k3", "ab"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "bc"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
ASSERT_EQ(value, "cd,de");
// All K4 values are in different levels
ASSERT_OK(Merge("k4", "ab"));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
ASSERT_OK(Merge("k4", "bc"));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Merge("k4", "cd"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Merge("k4", "de"));
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
ASSERT_EQ(value, "cd,de");
}
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
Options options;
options.create_if_missing = true;

@ -639,6 +639,14 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->merge_in_progress) = true;
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
if (merge_operator->ShouldMerge(merge_context->GetOperands())) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->env_, nullptr /* result_operand */, true);
*(s->found_final_value) = true;
return false;
}
return true;
}
default:

@ -190,6 +190,15 @@ class MergeOperator {
// PartialMerge/PartialMergeMulti should be implemented accordingly to handle
// a single operand.
virtual bool AllowSingleOperand() const { return false; }
// Allows to control when to invoke a full merge during Get.
// This could be used to limit the number of merge operands that are looked at
// during a point lookup, thereby helping in limiting the number of levels to
// read from.
// Doesn't help with iterators.
virtual bool ShouldMerge(const std::vector<Slice>& operands) const {
return false;
}
};
// The simpler, associative merge operator.

@ -172,6 +172,21 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else {
merge_context_->PushOperand(value, false);
}
if (merge_operator_ != nullptr &&
merge_operator_->ShouldMerge(merge_context_->GetOperands())) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
logger_, statistics_, env_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
return false;
}
return true;
default:

Loading…
Cancel
Save