diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index de2861910..49a2cf6af 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -8,7 +8,9 @@ #include "db/db_test_util.h" #include "db/forward_iterator.h" #include "port/stack_trace.h" +#include "rocksdb/merge_operator.h" #include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend2.h" namespace rocksdb { @@ -18,6 +20,80 @@ class DBMergeOperatorTest : public DBTestBase { DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {} }; +TEST_F(DBMergeOperatorTest, LimitMergeOperands) { + class LimitedStringAppendMergeOp : public StringAppendTESTOperator { + public: + LimitedStringAppendMergeOp(int limit, char delim) + : StringAppendTESTOperator(delim), limit_(limit) {} + + const char* Name() const override { + return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; + } + + bool ShouldMerge(const std::vector& operands) const override { + if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { + return true; + } + return false; + } + + private: + size_t limit_ = 0; + }; + + Options options; + options.create_if_missing = true; + // Use only the latest two merge operands. + options.merge_operator = + std::make_shared(2, ','); + options.env = env_; + Reopen(options); + // All K1 values are in memtable. + ASSERT_OK(Merge("k1", "a")); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "d")); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok()); + // Make sure that only the latest two merge operands are used. If this was + // not the case the value would be "a,b,c,d". + ASSERT_EQ(value, "c,d"); + + // All K2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2", "a")); + ASSERT_OK(Merge("k2", "b")); + ASSERT_OK(Merge("k2", "c")); + ASSERT_OK(Merge("k2", "d")); + ASSERT_OK(Flush()); + ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok()); + ASSERT_EQ(value, "c,d"); + + // All K3 values are flushed and are in different files. + ASSERT_OK(Merge("k3", "ab")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "bc")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "de")); + ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok()); + ASSERT_EQ(value, "cd,de"); + + // All K4 values are in different levels + ASSERT_OK(Merge("k4", "ab")); + ASSERT_OK(Flush()); + MoveFilesToLevel(4); + ASSERT_OK(Merge("k4", "bc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_OK(Merge("k4", "cd")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Merge("k4", "de")); + ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok()); + ASSERT_EQ(value, "cd,de"); +} + TEST_F(DBMergeOperatorTest, MergeErrorOnRead) { Options options; options.create_if_missing = true; diff --git a/db/memtable.cc b/db/memtable.cc index 33c941b4f..4d5247e96 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -639,6 +639,14 @@ static bool SaveValue(void* arg, const char* entry) { *(s->merge_in_progress) = true; merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); + if (merge_operator->ShouldMerge(merge_context->GetOperands())) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, s->logger, s->statistics, + s->env_, nullptr /* result_operand */, true); + *(s->found_final_value) = true; + return false; + } return true; } default: diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index f29471005..d263ae88b 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -190,6 +190,15 @@ class MergeOperator { // PartialMerge/PartialMergeMulti should be implemented accordingly to handle // a single operand. virtual bool AllowSingleOperand() const { return false; } + + // Allows to control when to invoke a full merge during Get. + // This could be used to limit the number of merge operands that are looked at + // during a point lookup, thereby helping in limiting the number of levels to + // read from. + // Doesn't help with iterators. + virtual bool ShouldMerge(const std::vector& operands) const { + return false; + } }; // The simpler, associative merge operator. diff --git a/table/get_context.cc b/table/get_context.cc index c68aa3984..b17b7dfbd 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -172,6 +172,21 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else { merge_context_->PushOperand(value, false); } + if (merge_operator_ != nullptr && + merge_operator_->ShouldMerge(merge_context_->GetOperands())) { + state_ = kFound; + if (LIKELY(pinnable_val_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } + } + return false; + } return true; default: