diff --git a/db/builder.cc b/db/builder.cc index 9f499e650..953213610 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -9,7 +9,9 @@ #include "db/builder.h" +#include #include + #include "db/dbformat.h" #include "db/filename.h" #include "db/internal_stats.h" @@ -24,8 +26,8 @@ #include "table/block_based_table_builder.h" #include "util/file_reader_writer.h" #include "util/iostats_context_imp.h" -#include "util/thread_status_util.h" #include "util/stop_watch.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -150,36 +152,27 @@ Status BuildTable( // TODO: pass statistics to MergeUntil merge.MergeUntil(iter, 0 /* don't worry about snapshot */); iterator_at_next = true; - if (merge.IsSuccess()) { - // Merge completed correctly. - // Add the resulting merge key/value and continue to next - builder->Add(merge.key(), merge.value()); - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - } else { - // Merge did not find a Put/Delete. - // Can not compact these merges into a kValueType. - // Write them out one-by-one. (Proceed back() to front()) - const std::deque& keys = merge.keys(); - const std::deque& values = merge.values(); - assert(keys.size() == values.size() && keys.size() >= 1); - std::deque::const_reverse_iterator key_iter; - std::deque::const_reverse_iterator value_iter; - for (key_iter=keys.rbegin(), value_iter = values.rbegin(); - key_iter != keys.rend() && value_iter != values.rend(); - ++key_iter, ++value_iter) { - - builder->Add(Slice(*key_iter), Slice(*value_iter)); - } - - // Sanity check. Both iterators should end at the same time - assert(key_iter == keys.rend() && value_iter == values.rend()); - - prev_key.assign(keys.front()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); + + // Write them out one-by-one. (Proceed back() to front()) + // If the merge successfully merged the input into + // a kTypeValue, the list contains a single element. + const std::deque& keys = merge.keys(); + const std::deque& values = merge.values(); + assert(keys.size() == values.size() && keys.size() >= 1); + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + for (key_iter = keys.rbegin(), value_iter = values.rbegin(); + key_iter != keys.rend() && value_iter != values.rend(); + ++key_iter, ++value_iter) { + builder->Add(Slice(*key_iter), Slice(*value_iter)); } + + // Sanity check. Both iterators should end at the same time + assert(key_iter == keys.rend() && value_iter == values.rend()); + + prev_key.assign(keys.front()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); } else { // Handle Put/Delete-type keys by simply writing them builder->Add(key, value); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 376e82b51..dd68b2147 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -587,31 +587,24 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, merge.MergeUntil(input, prev_snapshot, bottommost_level_, db_options_.statistics.get(), env_); - if (merge.IsSuccess()) { - // Successfully found Put/Delete/(end-of-key-range) while merging - // Get the merge result - key = merge.key(); - ParseInternalKey(key, &ikey); - status = WriteKeyValue(key, merge.value(), ikey, input->status()); - } else { - // Did not find a Put/Delete/(end-of-key-range) while merging - // We now have some stack of merge operands to write out. - // NOTE: key,value, and ikey are now referring to old entries. - // These will be correctly set below. - const auto& keys = merge.keys(); - const auto& values = merge.values(); - assert(!keys.empty()); - assert(keys.size() == values.size()); - - // We have a list of keys to write, write all keys in the list. - for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); - !status.ok() || key_iter != keys.rend(); - key_iter++, value_iter++) { - key = Slice(*key_iter); - value = Slice(*value_iter); - ParseInternalKey(key, &ikey); - status = WriteKeyValue(key, value, ikey, input->status()); - } + // NOTE: key, value, and ikey refer to old entries. + // These will be correctly set below. + const auto& keys = merge.keys(); + const auto& values = merge.values(); + assert(!keys.empty()); + assert(keys.size() == values.size()); + + // We have a list of keys to write, write all keys in the list. + for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); + !status.ok() || key_iter != keys.rend(); key_iter++, value_iter++) { + key = Slice(*key_iter); + value = Slice(*value_iter); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key, &ikey); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + status = WriteKeyValue(key, value, ikey, input->status()); } } else { status = WriteKeyValue(key, value, ikey, input->status()); diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 9c2b61e91..a7140b595 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -92,13 +92,6 @@ class CompactionJobTest : public testing::Test { return InternalKey(user_key, seq_num, t).Encode().ToString(); } - // Corrupts key by changing the type - void CorruptKeyType(InternalKey* ikey) { - std::string keystr = ikey->Encode().ToString(); - keystr[keystr.size() - 8] = kTypeLogData; - ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); - } - void AddMockFile(const mock::MockFileContents& contents, int level = 0) { assert(contents.size() > 0); @@ -171,8 +164,8 @@ class CompactionJobTest : public testing::Test { // file InternalKey bottommost_internal_key(key, 0, kTypeValue); if (corrupt_id(k)) { - CorruptKeyType(&internal_key); - CorruptKeyType(&bottommost_internal_key); + test::CorruptKeyType(&internal_key); + test::CorruptKeyType(&bottommost_internal_key); } contents.insert({ internal_key.Encode().ToString(), value }); if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) { diff --git a/db/memtable.cc b/db/memtable.cc index c447cbbb1..fded51e67 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -534,7 +534,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // No change to value, since we have not yet found a Put/Delete if (!found_final_value && merge_in_progress) { - *s = Status::MergeInProgress(""); + *s = Status::MergeInProgress(); } PERF_COUNTER_ADD(get_from_memtable_count, 1); return found_final_value; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index c8a4b140c..3e8c3e5cf 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -4,16 +4,17 @@ // of patent rights can be found in the PATENTS file in the same directory. // +#include "db/merge_helper.h" + #include #include -#include "merge_helper.h" #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" -#include "util/statistics.h" #include "util/perf_context_imp.h" +#include "util/statistics.h" #include "util/stop_watch.h" namespace rocksdb { @@ -58,9 +59,9 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, // keys_ stores the list of keys encountered while merging. // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. -void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, - const bool at_bottom, Statistics* stats, - Env* env_) { +Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, + const bool at_bottom, Statistics* stats, + Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -70,8 +71,6 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, operands_.push_front(iter->value().ToString()); assert(user_merge_operator_); - success_ = false; // Will become true if we hit Put/Delete or bottom - // We need to parse the internal key again as the parsed key is // backed by the internal key! // Assume no internal key corruption as it has been successfully parsed @@ -80,6 +79,7 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, ParsedInternalKey orig_ikey; ParseInternalKey(keys_.back(), &orig_ikey); + Status s; bool hit_the_next_user_key = false; for (iter->Next(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; @@ -114,28 +114,33 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Success! + // + // TODO(noetzli) If the merge operator returns false, we are currently + // (almost) silently dropping the put/delete. That's probably not what we + // want. const Slice val = iter->value(); const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; - Status s = - TimedFullMerge(ikey.user_key, val_ptr, operands_, + s = TimedFullMerge(ikey.user_key, val_ptr, operands_, user_merge_operator_, stats, env_, logger_, &merge_result); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (s.ok()) { - std::string& original_key = - keys_.back(); // The original key encountered + // The original key encountered + std::string original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - operands_.back() = std::move(merge_result); - success_ = true; + keys_.clear(); + operands_.clear(); + keys_.emplace_front(std::move(original_key)); + operands_.emplace_front(std::move(merge_result)); } // move iter to the next entry iter->Next(); - return; + return s; } else { // hit a merge // => merge the operand into the front of the operands_ list @@ -173,28 +178,28 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); std::string merge_result; - { - StopWatchNano timer(env_, stats != nullptr); - PERF_TIMER_GUARD(merge_operator_time_nanos); - success_ = user_merge_operator_->FullMerge( - orig_ikey.user_key, nullptr, operands_, &merge_result, logger_); - RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); - } - if (success_) { - std::string& original_key = keys_.back(); // The original key encountered + s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_, + user_merge_operator_, stats, env_, logger_, + &merge_result); + if (s.ok()) { + // The original key encountered + std::string original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); - operands_.back() = std::move(merge_result); - } else { - RecordTick(stats, NUMBER_MERGE_FAILURES); - // Do nothing if not success_. Leave keys() and operands() as they are. + keys_.clear(); + operands_.clear(); + keys_.emplace_front(std::move(original_key)); + operands_.emplace_front(std::move(merge_result)); } } else { // We haven't seen the beginning of the key nor a Put/Delete. // Attempt to use the user's associative merge function to // merge the stacked merge operands into a single operand. - + // + // TODO(noetzli) The docblock of MergeUntil suggests that a successful + // partial merge returns Status::OK(). Should we change the status code + // after a successful partial merge? + s = Status::MergeInProgress(); if (operands_.size() >= 2 && operands_.size() >= min_partial_merge_operands_) { bool merge_success = false; @@ -218,6 +223,8 @@ void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, } } } + + return s; } } // namespace rocksdb diff --git a/db/merge_helper.h b/db/merge_helper.h index 8ad6acc07..d88f3e356 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -33,12 +33,17 @@ class MergeHelper { min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), keys_(), - operands_(), - success_(false) {} + operands_() { + assert(user_comparator_ != nullptr); + } // Wrapper around MergeOperator::FullMerge() that records perf statistics. // Result of merge will be written to result if status returned is OK. // If operands is empty, the value will simply be copied to result. + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - Corruption: Merge operator reported unsuccessful merge. + // - NotSupported: Merge operator is missing. static Status TimedFullMerge(const Slice& key, const Slice* value, const std::deque& operands, const MergeOperator* merge_operator, @@ -57,18 +62,23 @@ class MergeHelper { // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. - void MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, - const bool at_bottom = false, Statistics* stats = nullptr, - Env* env_ = nullptr); + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - MergeInProgress: Put/Delete not encountered and unable to merge operands. + // - Corruption: Merge operator reported unsuccessful merge. + // + // REQUIRED: The first key in the input is not corrupted. + Status MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, + const bool at_bottom = false, Statistics* stats = nullptr, + Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call // If the merging was successful: - // - IsSuccess() will be true - // - key() will have the latest sequence number of the merges. - // The type will be Put or Merge. See IMPORTANT 1 note, below. - // - value() will be the result of merging all the operands together - // - The user should ignore keys() and values(). + // - keys() contains a single element with the latest sequence number of + // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. + // - values() contains a single element with the result of merging all the + // operands together // // IMPORTANT 1: the key type could change after the MergeUntil call. // Put/Delete + Merge + ... + Merge => Put @@ -76,7 +86,6 @@ class MergeHelper { // // If the merge operator is not associative, and if a Put/Delete is not found // then the merging will be unsuccessful. In this case: - // - IsSuccess() will be false // - keys() contains the list of internal keys seen in order of iteration. // - values() contains the list of values (merges) seen in the same order. // values() is parallel to keys() so that the first entry in @@ -84,20 +93,12 @@ class MergeHelper { // and so on. These lists will be the same length. // All of these pairs will be merges over the same user key. // See IMPORTANT 2 note below. - // - The user should ignore key() and value(). // // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. // So keys().back() was the first key seen by iterator. // TODO: Re-style this comment to be like the first one - bool IsSuccess() const { return success_; } - Slice key() const { assert(success_); return Slice(keys_.back()); } - Slice value() const { assert(success_); return Slice(operands_.back()); } - const std::deque& keys() const { - assert(!success_); return keys_; - } - const std::deque& values() const { - assert(!success_); return operands_; - } + const std::deque& keys() const { return keys_; } + const std::deque& values() const { return operands_; } bool HasOperator() const { return user_merge_operator_ != nullptr; } private: @@ -111,7 +112,6 @@ class MergeHelper { // valid up to the next MergeUntil call std::deque keys_; // Keeps track of the sequence of keys seen std::deque operands_; // Parallel with keys_; stores the values - bool success_; }; } // namespace rocksdb diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 1dd17fb5f..53b13a2cb 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -21,34 +21,38 @@ class MergeHelperTest : public testing::Test { MergeHelperTest() = default; ~MergeHelperTest() = default; - void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { + Status RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { InitIterator(); merge_op_ = MergeOperators::CreateUInt64AddOperator(); merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, true)); - merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - Env::Default()); + nullptr, 2U, false)); + return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, + nullptr, Env::Default()); } - void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) { + Status RunStringAppendMergeHelper(SequenceNumber stop_before, + bool at_bottom) { InitIterator(); merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, true)); - merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - Env::Default()); + nullptr, 2U, false)); + return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, + nullptr, Env::Default()); } std::string Key(const std::string& user_key, const SequenceNumber& seq, const ValueType& t) { - std::string ikey; - AppendInternalKey(&ikey, ParsedInternalKey(Slice(user_key), seq, t)); - return ikey; + return InternalKey(user_key, seq, t).Encode().ToString(); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, - const ValueType& t, const std::string& val) { - ks_.push_back(Key(user_key, seq, t)); + const ValueType& t, const std::string& val, + bool corrupt = false) { + InternalKey ikey = InternalKey(user_key, seq, t); + if (corrupt) { + test::CorruptKeyType(&ikey); + } + ks_.push_back(ikey.Encode().ToString()); vs_.push_back(val); } @@ -63,15 +67,6 @@ class MergeHelperTest : public testing::Test { return result; } - void CheckState(bool success, int iter_pos) { - ASSERT_EQ(success, merge_helper_->IsSuccess()); - if (iter_pos == -1) { - ASSERT_FALSE(iter_->Valid()); - } else { - ASSERT_EQ(ks_[iter_pos], iter_->key()); - } - } - std::unique_ptr iter_; std::shared_ptr merge_op_; std::unique_ptr merge_helper_; @@ -86,10 +81,12 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(3U)); AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge - RunUInt64MergeHelper(0, true); - CheckState(true, 2); - ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(4U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(0, true).ok()); + ASSERT_EQ(ks_[2], iter_->key()); + ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a value results in a successful merge. @@ -99,10 +96,12 @@ TEST_F(MergeHelperTest, MergeValue) { AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); // <- Iterator after merge AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); - RunUInt64MergeHelper(0, false); - CheckState(true, 3); - ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(8U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(0, false).ok()); + ASSERT_EQ(ks_[3], iter_->key()); + ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(8U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging stops before a snapshot. @@ -113,10 +112,12 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); - RunUInt64MergeHelper(31, true); - CheckState(false, 2); + ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // MergeHelper preserves the operand stack for merge operators that @@ -126,12 +127,26 @@ TEST_F(MergeHelperTest, NoPartialMerge) { AddKeyVal("a", 40, kTypeMerge, "v"); // <- Iterator after merge AddKeyVal("a", 30, kTypeMerge, "v"); - RunStringAppendMergeHelper(31, true); - CheckState(false, 2); + ASSERT_TRUE(RunStringAppendMergeHelper(31, true).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); ASSERT_EQ("v2", merge_helper_->values()[1]); + ASSERT_EQ(2U, merge_helper_->keys().size()); + ASSERT_EQ(2U, merge_helper_->values().size()); +} + +// A single operand can not be merged. +TEST_F(MergeHelperTest, SingleOperand) { + AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U)); + + ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + ASSERT_FALSE(iter_->Valid()); + ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(1U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a deletion turns the deletion into a value @@ -139,10 +154,27 @@ TEST_F(MergeHelperTest, MergeDeletion) { AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); AddKeyVal("a", 20, kTypeDeletion, ""); - RunUInt64MergeHelper(15, false); - CheckState(true, -1); - ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key()); - ASSERT_EQ(EncodeInt(3U), merge_helper_->value()); + ASSERT_TRUE(RunUInt64MergeHelper(15, false).ok()); + ASSERT_FALSE(iter_->Valid()); + ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(3U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); +} + +// The merge helper stops upon encountering a corrupt key +TEST_F(MergeHelperTest, CorruptKey) { + AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); + AddKeyVal("a", 25, kTypeMerge, EncodeInt(1U)); + // Corrupt key + AddKeyVal("a", 20, kTypeDeletion, "", true); // <- Iterator after merge + + ASSERT_TRUE(RunUInt64MergeHelper(15, false).IsMergeInProgress()); + ASSERT_EQ(ks_[2], iter_->key()); + ASSERT_EQ(Key("a", 30, kTypeMerge), merge_helper_->keys()[0]); + ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(1U, merge_helper_->keys().size()); + ASSERT_EQ(1U, merge_helper_->values().size()); } } // namespace rocksdb diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 76e19069c..043ca139f 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -57,9 +57,7 @@ class Status { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, msg, msg2); } - static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { - return Status(kMergeInProgress, msg, msg2); - } + static Status MergeInProgress() { return Status(kMergeInProgress); } static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } diff --git a/util/testutil.cc b/util/testutil.cc index ebe7a308b..72ecdeec4 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -123,5 +123,11 @@ SequentialFileReader* GetSequentialFileReader(SequentialFile* se) { return new SequentialFileReader(std::move(file)); } +void CorruptKeyType(InternalKey* ikey) { + std::string keystr = ikey->Encode().ToString(); + keystr[keystr.size() - 8] = kTypeLogData; + ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); +} + } // namespace test } // namespace rocksdb diff --git a/util/testutil.h b/util/testutil.h index c02c4151b..67a2aafad 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -218,10 +218,11 @@ class StringSink: public WritableFile { class StringSource: public RandomAccessFile { public: - StringSource(const Slice& contents, uint64_t uniq_id = 0, bool mmap = false) - : contents_(contents.data(), contents.size()), uniq_id_(uniq_id), - mmap_(mmap) { - } + explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, + bool mmap = false) + : contents_(contents.data(), contents.size()), + uniq_id_(uniq_id), + mmap_(mmap) {} virtual ~StringSource() { } @@ -268,5 +269,8 @@ class NullLogger : public Logger { virtual size_t GetLogFileSize() const override { return 0; } }; +// Corrupts key by changing the type +extern void CorruptKeyType(InternalKey* ikey); + } // namespace test } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 9308ba39b..94b227dc9 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -633,7 +633,7 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, s = Status::NotFound(); break; case WriteBatchWithIndexInternal::Result::kMergeInProgress: - s = Status::MergeInProgress(""); + s = Status::MergeInProgress(); break; default: assert(false);