From 1d20fa9d0f5c2f999a0f5805f1bf6fe27ad856a8 Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Fri, 17 Jul 2015 09:27:24 -0700 Subject: [PATCH] Fixed and simplified merge_helper Summary: MergeUntil was not reporting a success when merging an operand with a Value/Deletion despite the comments in MergeHelper and CompactionJob indicating otherwise. This lead to operands being written to the compaction output unnecessarily: M1 M2 M3 P M4 M5 --> (P+M1+M2+M3) M2 M3 M4 M5 (before the diff) M1 M2 M3 P M4 M5 --> (P+M1+M2+M3) M4 M5 (after the diff) In addition, the code handling Values/Deletion was basically identical. This patch unifies the code. Finally, this patch also adds testing for merge_helper. Test Plan: make && make check Reviewers: sdong, rven, yhchiang, tnovak, igor Reviewed By: igor Subscribers: tnovak, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42351 --- Makefile | 4 + db/merge_helper.cc | 48 ++------ db/merge_helper_test.cc | 155 +++++++++++++++++++++++++ table/merger_test.cc | 36 +----- util/testutil.h | 43 +++++++ utilities/merge_operators/uint64add.cc | 1 + 6 files changed, 215 insertions(+), 72 deletions(-) create mode 100644 db/merge_helper_test.cc diff --git a/Makefile b/Makefile index d533bd2f2..5c08da141 100644 --- a/Makefile +++ b/Makefile @@ -252,6 +252,7 @@ TESTS = \ memenv_test \ mock_env_test \ memtable_list_test \ + merge_helper_test \ merge_test \ merger_test \ redis_test \ @@ -805,6 +806,9 @@ write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +merge_helper_test: db/merge_helper_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/merge_helper.cc b/db/merge_helper.cc index bcccf9d3a..cf58c6812 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -110,47 +110,20 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // At this point we are guaranteed that we need to process this key. - if (kTypeDeletion == ikey.type) { - // hit a delete - // => merge nullptr with operands_ - // => store result in operands_.back() (and update keys_.back()) - // => change the entry type to kTypeValue for keys_.back() - // We are done! Return a success if the merge passes. - - std::string merge_result; - Status s = TimedFullMerge(ikey.user_key, nullptr, operands_, - user_merge_operator_, stats, env_, logger_, - &merge_result); - - // We store the result in keys_.back() and operands_.back() - // if nothing went wrong (i.e.: no operand corruption on disk) - if (s.ok()) { - std::string& original_key = - keys_.back(); // The original key encountered - orig_ikey.type = kTypeValue; - UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - operands_.back() = std::move(merge_result); - } - - // move iter to the next entry (before doing anything else) - iter->Next(); - if (steps) { - ++(*steps); - } - return; - } - - if (kTypeValue == ikey.type) { - // hit a put - // => merge the put value with operands_ + assert(ikey.type <= kValueTypeForSeek); + if (ikey.type != kTypeMerge) { + // hit a put/delete + // => merge the put value or a nullptr with operands_ // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice val = iter->value(); + const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; Status s = - TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_, - stats, env_, logger_, &merge_result); + TimedFullMerge(ikey.user_key, val_ptr, operands_, + user_merge_operator_, stats, env_, logger_, + &merge_result); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) @@ -160,6 +133,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); operands_.back() = std::move(merge_result); + success_ = true; } // move iter to the next entry @@ -168,9 +142,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ++(*steps); } return; - } - - if (kTypeMerge == ikey.type) { + } else { // hit a merge // => merge the operand into the front of the operands_ list // => use the user's associative merge function to determine how. diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc new file mode 100644 index 000000000..a1ac91014 --- /dev/null +++ b/db/merge_helper_test.cc @@ -0,0 +1,155 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include + +#include "db/merge_helper.h" +#include "rocksdb/comparator.h" +#include "util/coding.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +class MergeHelperTest : public testing::Test { + public: + MergeHelperTest() : steps_(0) {} + ~MergeHelperTest() = default; + + void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { + InitIterator(); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), + nullptr, 2U, true)); + merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, + &steps_, Env::Default()); + } + + void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) { + InitIterator(); + merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); + merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), + nullptr, 2U, true)); + merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, + &steps_, Env::Default()); + } + + std::string Key(const std::string& user_key, const SequenceNumber& seq, + const ValueType& t) { + std::string ikey; + AppendInternalKey(&ikey, ParsedInternalKey(Slice(user_key), seq, t)); + return ikey; + } + + void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, + const ValueType& t, const std::string& val) { + ks_.push_back(Key(user_key, seq, t)); + vs_.push_back(val); + } + + void InitIterator() { + iter_.reset(new test::VectorIterator(ks_, vs_)); + iter_->SeekToFirst(); + } + + std::string EncodeInt(uint64_t x) { + std::string result; + PutFixed64(&result, x); + return result; + } + + void CheckState(bool success, int steps, int iter_pos) { + ASSERT_EQ(success, merge_helper_->IsSuccess()); + ASSERT_EQ(steps, steps_); + if (iter_pos == -1) { + ASSERT_FALSE(iter_->Valid()); + } else { + ASSERT_EQ(ks_[iter_pos], iter_->key()); + } + } + + std::unique_ptr iter_; + std::shared_ptr merge_op_; + std::unique_ptr merge_helper_; + std::vector ks_; + std::vector vs_; + int steps_; +}; + +// If MergeHelper encounters a new key on the last level, we know that +// the key has no more history and it can merge keys. +TEST_F(MergeHelperTest, MergeAtBottomSuccess) { + AddKeyVal("a", 20, kTypeMerge, EncodeInt(1U)); + AddKeyVal("a", 10, kTypeMerge, EncodeInt(3U)); + AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge + + RunUInt64MergeHelper(0, true); + CheckState(true, 2, 2); + ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key()); + ASSERT_EQ(EncodeInt(4U), merge_helper_->value()); +} + +// Merging with a value results in a successful merge. +TEST_F(MergeHelperTest, MergeValue) { + AddKeyVal("a", 40, kTypeMerge, EncodeInt(1U)); + AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); + AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); // <- Iterator after merge + AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); + + RunUInt64MergeHelper(0, false); + CheckState(true, 3, 3); + ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key()); + ASSERT_EQ(EncodeInt(8U), merge_helper_->value()); +} + +// Merging stops before a snapshot. +TEST_F(MergeHelperTest, SnapshotBeforeValue) { + AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U)); + AddKeyVal("a", 40, kTypeMerge, EncodeInt(3U)); // <- Iterator after merge + AddKeyVal("a", 30, kTypeMerge, EncodeInt(1U)); + AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); + AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); + + RunUInt64MergeHelper(31, true); + CheckState(false, 2, 2); + ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); +} + +// MergeHelper preserves the operand stack for merge operators that +// cannot do a partial merge. +TEST_F(MergeHelperTest, NoPartialMerge) { + AddKeyVal("a", 50, kTypeMerge, "v2"); + AddKeyVal("a", 40, kTypeMerge, "v"); // <- Iterator after merge + AddKeyVal("a", 30, kTypeMerge, "v"); + + RunStringAppendMergeHelper(31, true); + CheckState(false, 2, 2); + ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ("v", merge_helper_->values()[0]); + ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); + ASSERT_EQ("v2", merge_helper_->values()[1]); +} + +// Merging with a deletion turns the deletion into a value +TEST_F(MergeHelperTest, MergeDeletion) { + AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); + AddKeyVal("a", 20, kTypeDeletion, ""); + + RunUInt64MergeHelper(15, false); + CheckState(true, 2, -1); + ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key()); + ASSERT_EQ(EncodeInt(3U), merge_helper_->value()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/table/merger_test.cc b/table/merger_test.cc index 1085ce452..562c0ae85 100644 --- a/table/merger_test.cc +++ b/table/merger_test.cc @@ -5,45 +5,13 @@ #include #include -#include -#include "rocksdb/iterator.h" #include "table/merger.h" #include "util/testharness.h" #include "util/testutil.h" namespace rocksdb { -class VectorIterator : public Iterator { - public: - explicit VectorIterator(const std::vector& keys) - : keys_(keys), current_(keys.size()) { - std::sort(keys_.begin(), keys_.end()); - } - - virtual bool Valid() const override { return current_ < keys_.size(); } - - virtual void SeekToFirst() override { current_ = 0; } - virtual void SeekToLast() override { current_ = keys_.size() - 1; } - - virtual void Seek(const Slice& target) override { - current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) - - keys_.begin(); - } - - virtual void Next() override { current_++; } - virtual void Prev() override { current_--; } - - virtual Slice key() const override { return Slice(keys_[current_]); } - virtual Slice value() const override { return Slice(); } - - virtual Status status() const override { return Status::OK(); } - - private: - std::vector keys_; - size_t current_; -}; - class MergerTest : public testing::Test { public: MergerTest() @@ -123,14 +91,14 @@ class MergerTest : public testing::Test { std::vector small_iterators; for (size_t i = 0; i < num_iterators; ++i) { auto strings = GenerateStrings(strings_per_iterator, letters_per_string); - small_iterators.push_back(new VectorIterator(strings)); + small_iterators.push_back(new test::VectorIterator(strings)); all_keys_.insert(all_keys_.end(), strings.begin(), strings.end()); } merging_iterator_.reset( NewMergingIterator(BytewiseComparator(), &small_iterators[0], static_cast(small_iterators.size()))); - single_iterator_.reset(new VectorIterator(all_keys_)); + single_iterator_.reset(new test::VectorIterator(all_keys_)); } Random rnd_; diff --git a/util/testutil.h b/util/testutil.h index 958483806..240a468ae 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -8,9 +8,13 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include +#include + #include "db/dbformat.h" #include "rocksdb/env.h" +#include "rocksdb/iterator.h" #include "rocksdb/slice.h" #include "util/random.h" @@ -117,5 +121,44 @@ class SimpleSuffixReverseComparator : public Comparator { // endian machines. extern const Comparator* Uint64Comparator(); +// Iterator over a vector of keys/values +class VectorIterator : public Iterator { + public: + explicit VectorIterator(const std::vector& keys) + : keys_(keys), current_(keys.size()) { + std::sort(keys_.begin(), keys_.end()); + values_.resize(keys.size()); + } + + VectorIterator(const std::vector& keys, + const std::vector& values) + : keys_(keys), values_(values), current_(keys.size()) { + assert(keys_.size() == values_.size()); + } + + virtual bool Valid() const override { return current_ < keys_.size(); } + + virtual void SeekToFirst() override { current_ = 0; } + virtual void SeekToLast() override { current_ = keys_.size() - 1; } + + virtual void Seek(const Slice& target) override { + current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) - + keys_.begin(); + } + + virtual void Next() override { current_++; } + virtual void Prev() override { current_--; } + + virtual Slice key() const override { return Slice(keys_[current_]); } + virtual Slice value() const override { return Slice(values_[current_]); } + + virtual Status status() const override { return Status::OK(); } + + private: + std::vector keys_; + std::vector values_; + size_t current_; +}; + } // namespace test } // namespace rocksdb diff --git a/utilities/merge_operators/uint64add.cc b/utilities/merge_operators/uint64add.cc index 453b8c8ea..6024beb95 100644 --- a/utilities/merge_operators/uint64add.cc +++ b/utilities/merge_operators/uint64add.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include + #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h"