From f32a5720994dca9b5d95cec20ec89fce5abb7694 Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Mon, 17 Aug 2015 17:34:38 -0700 Subject: [PATCH] Simplify querying of merge results Summary: While working on supporting mixing merge operators with single deletes ( https://reviews.facebook.net/D43179 ), I realized that returning and dealing with merge results can be made simpler. Submitting this as a separate diff because it is not directly related to single deletes. Before, callers of merge helper had to retrieve the merge result in one of two ways depending on whether the merge was successful or not (success = result of merge was single kTypeValue). For successful merges, the caller could query the resulting key/value pair and for unsuccessful merges, the result could be retrieved in the form of two deques of keys and values. However, with single deletes, a successful merge does not return a single key/value pair (if merge operands are merged with a single delete, we have to generate a value and keep the original single delete around to make sure that we are not accidentially producing a key overwrite). In addition, the two existing call sites of the merge helper were taking the same actions independently from whether the merge was successful or not, so this patch simplifies that. Test Plan: make clean all check Reviewers: rven, sdong, yhchiang, anthony, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D43353 --- db/builder.cc | 53 ++++----- db/compaction_job.cc | 43 +++---- db/compaction_job_test.cc | 11 +- db/memtable.cc | 2 +- db/merge_helper.cc | 65 ++++++----- db/merge_helper.h | 44 +++---- db/merge_helper_test.cc | 108 ++++++++++++------ include/rocksdb/status.h | 4 +- util/testutil.cc | 6 + util/testutil.h | 12 +- .../write_batch_with_index.cc | 2 +- 11 files changed, 188 insertions(+), 162 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 9f499e650..953213610 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -9,7 +9,9 @@ #include "db/builder.h" +#include #include + #include "db/dbformat.h" #include "db/filename.h" #include "db/internal_stats.h" @@ -24,8 +26,8 @@ #include "table/block_based_table_builder.h" #include "util/file_reader_writer.h" #include "util/iostats_context_imp.h" -#include "util/thread_status_util.h" #include "util/stop_watch.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -150,36 +152,27 @@ Status BuildTable( // TODO: pass statistics to MergeUntil merge.MergeUntil(iter, 0 /* don't worry about snapshot */); iterator_at_next = true; - if (merge.IsSuccess()) { - // Merge completed correctly. - // Add the resulting merge key/value and continue to next - builder->Add(merge.key(), merge.value()); - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - } else { - // Merge did not find a Put/Delete. - // Can not compact these merges into a kValueType. - // Write them out one-by-one. (Proceed back() to front()) - const std::deque& keys = merge.keys(); - const std::deque& values = merge.values(); - assert(keys.size() == values.size() && keys.size() >= 1); - std::deque::const_reverse_iterator key_iter; - std::deque::const_reverse_iterator value_iter; - for (key_iter=keys.rbegin(), value_iter = values.rbegin(); - key_iter != keys.rend() && value_iter != values.rend(); - ++key_iter, ++value_iter) { - - builder->Add(Slice(*key_iter), Slice(*value_iter)); - } - - // Sanity check. Both iterators should end at the same time - assert(key_iter == keys.rend() && value_iter == values.rend()); - - prev_key.assign(keys.front()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); + + // Write them out one-by-one. (Proceed back() to front()) + // If the merge successfully merged the input into + // a kTypeValue, the list contains a single element. + const std::deque& keys = merge.keys(); + const std::deque& values = merge.values(); + assert(keys.size() == values.size() && keys.size() >= 1); + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + for (key_iter = keys.rbegin(), value_iter = values.rbegin(); + key_iter != keys.rend() && value_iter != values.rend(); + ++key_iter, ++value_iter) { + builder->Add(Slice(*key_iter), Slice(*value_iter)); } + + // Sanity check. Both iterators should end at the same time + assert(key_iter == keys.rend() && value_iter == values.rend()); + + prev_key.assign(keys.front()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); } else { // Handle Put/Delete-type keys by simply writing them builder->Add(key, value); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 376e82b51..dd68b2147 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -587,31 +587,24 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, merge.MergeUntil(input, prev_snapshot, bottommost_level_, db_options_.statistics.get(), env_); - if (merge.IsSuccess()) { - // Successfully found Put/Delete/(end-of-key-range) while merging - // Get the merge result - key = merge.key(); - ParseInternalKey(key, &ikey); - status = WriteKeyValue(key, merge.value(), ikey, input->status()); - } else { - // Did not find a Put/Delete/(end-of-key-range) while merging - // We now have some stack of merge operands to write out. - // NOTE: key,value, and ikey are now referring to old entries. - // These will be correctly set below. - const auto& keys = merge.keys(); - const auto& values = merge.values(); - assert(!keys.empty()); - assert(keys.size() == values.size()); - - // We have a list of keys to write, write all keys in the list. - for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); - !status.ok() || key_iter != keys.rend(); - key_iter++, value_iter++) { - key = Slice(*key_iter); - value = Slice(*value_iter); - ParseInternalKey(key, &ikey); - status = WriteKeyValue(key, value, ikey, input->status()); - } + // NOTE: key, value, and ikey refer to old entries. + // These will be correctly set below. + const auto& keys = merge.keys(); + const auto& values = merge.values(); + assert(!keys.empty()); + assert(keys.size() == values.size()); + + // We have a list of keys to write, write all keys in the list. + for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); + !status.ok() || key_iter != keys.rend(); key_iter++, value_iter++) { + key = Slice(*key_iter); + value = Slice(*value_iter); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key, &ikey); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + status = WriteKeyValue(key, value, ikey, input->status()); } } else { status = WriteKeyValue(key, value, ikey, input->status()); diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 9c2b61e91..a7140b595 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -92,13 +92,6 @@ class CompactionJobTest : public testing::Test { return InternalKey(user_key, seq_num, t).Encode().ToString(); } - // Corrupts key by changing the type - void CorruptKeyType(InternalKey* ikey) { - std::string keystr = ikey->Encode().ToString(); - keystr[keystr.size() - 8] = kTypeLogData; - ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); - } - void AddMockFile(const mock::MockFileContents& contents, int level = 0) { assert(contents.size() > 0); @@ -171,8 +164,8 @@ class CompactionJobTest : public testing::Test { // file InternalKey bottommost_internal_key(key, 0, kTypeValue); if (corrupt_id(k)) { - CorruptKeyType(&internal_key); - CorruptKeyType(&bottommost_internal_key); + test::CorruptKeyType(&internal_key); + test::CorruptKeyType(&bottommost_internal_key); } contents.insert({ internal_key.Encode().ToString(), value }); if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) { diff --git a/db/memtable.cc b/db/memtable.cc index c447cbbb1..fded51e67 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -534,7 +534,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // No change to value, since we have not yet found a Put/Delete if (!found_final_value && merge_in_progress) { - *s = Status::MergeInProgress(""); + *s = Status::MergeInProgress(); } PERF_COUNTER_ADD(get_from_memtable_count, 1); return found_final_value; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index c8a4b140c..3e8c3e5cf 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -4,16 +4,17 @@ // of patent rights can be found in the PATENTS file in the same directory. // +#include "db/merge_helper.h" + #include #include -#include "merge_helper.h" #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" -#include "util/statistics.h" #include "util/perf_context_imp.h" +#include "util/statistics.h" #include "util/stop_watch.h" namespace rocksdb { @@ -58,9 +59,9 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, // keys_ stores the list of keys encountered while merging. // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. -void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, - const bool at_bottom, Statistics* stats, - Env* env_) { +Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, + const bool at_bottom, Statistics* stats, + Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -70,8 +71,6 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, operands_.push_front(iter->value().ToString()); assert(user_merge_operator_); - success_ = false; // Will become true if we hit Put/Delete or bottom - // We need to parse the internal key again as the parsed key is // backed by the internal key! // Assume no internal key corruption as it has been successfully parsed @@ -80,6 +79,7 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, ParsedInternalKey orig_ikey; ParseInternalKey(keys_.back(), &orig_ikey); + Status s; bool hit_the_next_user_key = false; for (iter->Next(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; @@ -114,28 +114,33 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Success! + // + // TODO(noetzli) If the merge operator returns false, we are currently + // (almost) silently dropping the put/delete. That's probably not what we + // want. const Slice val = iter->value(); const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; - Status s = - TimedFullMerge(ikey.user_key, val_ptr, operands_, + s = TimedFullMerge(ikey.user_key, val_ptr, operands_, user_merge_operator_, stats, env_, logger_, &merge_result); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (s.ok()) { - std::string& original_key = - keys_.back(); // The original key encountered + // The original key encountered + std::string original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - operands_.back() = std::move(merge_result); - success_ = true; + keys_.clear(); + operands_.clear(); + keys_.emplace_front(std::move(original_key)); + operands_.emplace_front(std::move(merge_result)); } // move iter to the next entry iter->Next(); - return; + return s; } else { // hit a merge // => merge the operand into the front of the operands_ list @@ -173,28 +178,28 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); std::string merge_result; - { - StopWatchNano timer(env_, stats != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - success_ = user_merge_operator_->FullMerge( - orig_ikey.user_key, nullptr, operands_, &merge_result, logger_); - RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); - } - if (success_) { - std::string& original_key = keys_.back(); // The original key encountered + s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_, + user_merge_operator_, stats, env_, logger_, + &merge_result); + if (s.ok()) { + // The original key encountered + std::string original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - operands_.back() = std::move(merge_result); - } else { - RecordTick(stats, NUMBER_MERGE_FAILURES); - // Do nothing if not success_. Leave keys() and operands() as they are. + keys_.clear(); + operands_.clear(); + keys_.emplace_front(std::move(original_key)); + operands_.emplace_front(std::move(merge_result)); } } else { // We haven't seen the beginning of the key nor a Put/Delete. // Attempt to use the user's associative merge function to // merge the stacked merge operands into a single operand. - + // + // TODO(noetzli) The docblock of MergeUntil suggests that a successful + // partial merge returns Status::OK(). Should we change the status code + // after a successful partial merge? + s = Status::MergeInProgress(); if (operands_.size() >= 2 && operands_.size() >= min_partial_merge_operands_) { bool merge_success = false; @@ -218,6 +223,8 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, } } } + + return s; } } // namespace rocksdb diff --git a/db/merge_helper.h b/db/merge_helper.h index 8ad6acc07..d88f3e356 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -33,12 +33,17 @@ class MergeHelper { min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), keys_(), - operands_(), - success_(false) {} + operands_() { + assert(user_comparator_ != nullptr); + } // Wrapper around MergeOperator::FullMerge() that records perf statistics. // Result of merge will be written to result if status returned is OK. // If operands is empty, the value will simply be copied to result. + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - Corruption: Merge operator reported unsuccessful merge. + // - NotSupported: Merge operator is missing. static Status TimedFullMerge(const Slice& key, const Slice* value, const std::deque& operands, const MergeOperator* merge_operator, @@ -57,18 +62,23 @@ class MergeHelper { // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. - void MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, - const bool at_bottom = false, Statistics* stats = nullptr, - Env* env_ = nullptr); + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - MergeInProgress: Put/Delete not encountered and unable to merge operands. + // - Corruption: Merge operator reported unsuccessful merge. + // + // REQUIRED: The first key in the input is not corrupted. + Status MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, + const bool at_bottom = false, Statistics* stats = nullptr, + Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call // If the merging was successful: - // - IsSuccess() will be true - // - key() will have the latest sequence number of the merges. - // The type will be Put or Merge. See IMPORTANT 1 note, below. - // - value() will be the result of merging all the operands together - // - The user should ignore keys() and values(). + // - keys() contains a single element with the latest sequence number of + // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. + // - values() contains a single element with the result of merging all the + // operands together // // IMPORTANT 1: the key type could change after the MergeUntil call. // Put/Delete + Merge + ... + Merge => Put @@ -76,7 +86,6 @@ class MergeHelper { // // If the merge operator is not associative, and if a Put/Delete is not found // then the merging will be unsuccessful. In this case: - // - IsSuccess() will be false // - keys() contains the list of internal keys seen in order of iteration. // - values() contains the list of values (merges) seen in the same order. // values() is parallel to keys() so that the first entry in @@ -84,20 +93,12 @@ class MergeHelper { // and so on. These lists will be the same length. // All of these pairs will be merges over the same user key. // See IMPORTANT 2 note below. - // - The user should ignore key() and value(). // // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. // So keys().back() was the first key seen by iterator. // TODO: Re-style this comment to be like the first one - bool IsSuccess() const { return success_; } - Slice key() const { assert(success_); return Slice(keys_.back()); } - Slice value() const { assert(success_); return Slice(operands_.back()); } - const std::deque& keys() const { - assert(!success_); return keys_; - } - const std::deque& values() const { - assert(!success_); return operands_; - } + const std::deque& keys() const { return keys_; } + const std::deque& values() const { return operands_; } bool HasOperator() const { return user_merge_operator_ != nullptr; } private: @@ -111,7 +112,6 @@ class MergeHelper { // valid up to the next MergeUntil call std::deque keys_; // Keeps track of the sequence of keys seen std::deque operands_; // Parallel with keys_; stores the values - bool success_; }; } // namespace rocksdb diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 1dd17fb5f..53b13a2cb 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -21,34 +21,38 @@ class MergeHelperTest : public testing::Test { MergeHelperTest() = default; ~MergeHelperTest() = default; - void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { + Status RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { InitIterator(); merge_op_ = MergeOperators::CreateUInt64AddOperator(); merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, true)); - merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - Env::Default()); + nullptr, 2U, false)); + return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, + nullptr, Env::Default()); } - void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) { + Status RunStringAppendMergeHelper(SequenceNumber stop_before, + bool at_bottom) { InitIterator(); merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, true)); - merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - Env::Default()); + nullptr, 2U, false)); + return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, + nullptr, Env::Default()); } std::string Key(const std::string& user_key, const SequenceNumber& seq, const ValueType& t) { - std::string ikey; - AppendInternalKey(&ikey, ParsedInternalKey(Slice(user_key), seq, t)); - return ikey; + return InternalKey(user_key, seq, t).Encode().ToString(); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, - const ValueType& t, const std::string& val) { - ks_.push_back(Key(user_key, seq, t)); + const ValueType& t, const std::string& val, + bool corrupt = false) { + InternalKey ikey = InternalKey(user_key, seq, t); + if (corrupt) { + test::CorruptKeyType(&ikey); + } + ks_.push_back(ikey.Encode().ToString()); vs_.push_back(val); } @@ -63,15 +67,6 @@ class MergeHelperTest : public testing::Test { return result; } - void CheckState(bool success, int iter_pos) { - ASSERT_EQ(success, merge_helper_->IsSuccess()); - if (iter_pos == -1) { - ASSERT_FALSE(iter_->Valid()); - } else { - ASSERT_EQ(ks_[iter_pos], iter_->key()); - } - } - std::unique_ptr iter_; std::shared_ptr merge_op_; std::unique_ptr merge_helper_; @@ -86,10 +81,12 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(3U)); AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge - RunUInt64MergeHelper(0, true); - CheckState(true, 2); - ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(4U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(0, true).ok()); + ASSERT_EQ(ks_[2], iter_->key()); + ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a value results in a successful merge. @@ -99,10 +96,12 @@ TEST_F(MergeHelperTest, MergeValue) { AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); // <- Iterator after merge AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); - RunUInt64MergeHelper(0, false); - CheckState(true, 3); - ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(8U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(0, false).ok()); + ASSERT_EQ(ks_[3], iter_->key()); + ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(8U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging stops before a snapshot. @@ -113,10 +112,12 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); - RunUInt64MergeHelper(31, true); - CheckState(false, 2); + ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // MergeHelper preserves the operand stack for merge operators that @@ -126,12 +127,26 @@ TEST_F(MergeHelperTest, NoPartialMerge) { AddKeyVal("a", 40, kTypeMerge, "v"); // <- Iterator after merge AddKeyVal("a", 30, kTypeMerge, "v"); - RunStringAppendMergeHelper(31, true); - CheckState(false, 2); + ASSERT_TRUE(RunStringAppendMergeHelper(31, true).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); ASSERT_EQ("v2", merge_helper_->values()[1]); + ASSERT_EQ(2U, merge_helper_->keys().size()); + ASSERT_EQ(2U, merge_helper_->values().size()); +} + +// A single operand can not be merged. +TEST_F(MergeHelperTest, SingleOperand) { + AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U)); + + ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + ASSERT_FALSE(iter_->Valid()); + ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(1U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a deletion turns the deletion into a value @@ -139,10 +154,27 @@ TEST_F(MergeHelperTest, MergeDeletion) { AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); AddKeyVal("a", 20, kTypeDeletion, ""); - RunUInt64MergeHelper(15, false); - CheckState(true, -1); - ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(3U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(15, false).ok()); + ASSERT_FALSE(iter_->Valid()); + ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(3U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); +} + +// The merge helper stops upon encountering a corrupt key +TEST_F(MergeHelperTest, CorruptKey) { + AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); + AddKeyVal("a", 25, kTypeMerge, EncodeInt(1U)); + // Corrupt key + AddKeyVal("a", 20, kTypeDeletion, "", true); // <- Iterator after merge + + ASSERT_TRUE(RunUInt64MergeHelper(15, false).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); + ASSERT_EQ(Key("a", 30, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } } // namespace rocksdb diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 76e19069c..043ca139f 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -57,9 +57,7 @@ class Status { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, msg, msg2); } - static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { - return Status(kMergeInProgress, msg, msg2); - } + static Status MergeInProgress() { return Status(kMergeInProgress); } static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } diff --git a/util/testutil.cc b/util/testutil.cc index ebe7a308b..72ecdeec4 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -123,5 +123,11 @@ SequentialFileReader* GetSequentialFileReader(SequentialFile* se) { return new SequentialFileReader(std::move(file)); } +void CorruptKeyType(InternalKey* ikey) { + std::string keystr = ikey->Encode().ToString(); + keystr[keystr.size() - 8] = kTypeLogData; + ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); +} + } // namespace test } // namespace rocksdb diff --git a/util/testutil.h b/util/testutil.h index c02c4151b..67a2aafad 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -218,10 +218,11 @@ class StringSink: public WritableFile { class StringSource: public RandomAccessFile { public: - StringSource(const Slice& contents, uint64_t uniq_id = 0, bool mmap = false) - : contents_(contents.data(), contents.size()), uniq_id_(uniq_id), - mmap_(mmap) { - } + explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, + bool mmap = false) + : contents_(contents.data(), contents.size()), + uniq_id_(uniq_id), + mmap_(mmap) {} virtual ~StringSource() { } @@ -268,5 +269,8 @@ class NullLogger : public Logger { virtual size_t GetLogFileSize() const override { return 0; } }; +// Corrupts key by changing the type +extern void CorruptKeyType(InternalKey* ikey); + } // namespace test } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 9308ba39b..94b227dc9 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -633,7 +633,7 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, s = Status::NotFound(); break; case WriteBatchWithIndexInternal::Result::kMergeInProgress: - s = Status::MergeInProgress(""); + s = Status::MergeInProgress(); break; default: assert(false);