diff --git a/db/memtable.cc b/db/memtable.cc index 2dac88dd3..9099d5e39 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1064,20 +1064,10 @@ static bool SaveValue(void* arg, const char* entry) { assert(s->do_merge); if (s->value || s->columns) { - std::string result; *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), &result, s->logger, s->statistics, - s->clock, nullptr /* result_operand */, true); - - if (s->status->ok()) { - if (s->value) { - *(s->value) = std::move(result); - } else { - assert(s->columns); - s->columns->SetPlainValue(result); - } - } + merge_context->GetOperands(), s->value, s->columns, s->logger, + s->statistics, s->clock, nullptr /* result_operand */, true); } } else if (s->value) { s->value->assign(v.data(), v.size()); @@ -1148,10 +1138,10 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeSingleDeletion: case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { - if (s->value != nullptr) { + if (s->value || s->columns) { *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, s->logger, + merge_context->GetOperands(), s->value, s->columns, s->logger, s->statistics, s->clock, nullptr /* result_operand */, true); } } else { @@ -1177,10 +1167,13 @@ static bool SaveValue(void* arg, const char* entry) { v, s->inplace_update_support == false /* operand_pinned */); if (s->do_merge && merge_operator->ShouldMerge( merge_context->GetOperandsDirectionBackward())) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, s->logger, s->statistics, - s->clock, nullptr /* result_operand */, true); + if (s->value || s->columns) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, s->columns, s->logger, + s->statistics, s->clock, nullptr /* result_operand */, true); + } + *(s->found_final_value) = true; return false; } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 5430509eb..680365b98 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -110,6 +110,36 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, return Status::OK(); } +Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* base_value, + const std::vector& operands, + std::string* value, + PinnableWideColumns* columns, Logger* logger, + Statistics* statistics, SystemClock* clock, + Slice* result_operand, + bool update_num_ops_stats) { + assert(value || columns); + assert(!value || !columns); + + std::string result; + const Status s = + TimedFullMerge(merge_operator, key, base_value, operands, &result, logger, + statistics, clock, result_operand, update_num_ops_stats); + if (!s.ok()) { + return s; + } + + if (value) { + *value = std::move(result); + return Status::OK(); + } + + assert(columns); + columns->SetPlainValue(result); + + return Status::OK(); +} + // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) // keys_, operands_ are updated to reflect the merge result. diff --git a/db/merge_helper.h b/db/merge_helper.h index ae4262806..a1af1c002 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -56,6 +56,15 @@ class MergeHelper { Slice* result_operand = nullptr, bool update_num_ops_stats = false); + static Status TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* base_value, + const std::vector& operands, + std::string* value, PinnableWideColumns* columns, + Logger* logger, Statistics* statistics, + SystemClock* clock, + Slice* result_operand = nullptr, + bool update_num_ops_stats = false); + // Merge entries until we hit // - a corrupted key // - a Put/Delete, diff --git a/db/version_set.cc b/db/version_set.cc index 7a4ca3431..0c452e19f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2403,12 +2403,16 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; - *status = MergeHelper::TimedFullMerge( - merge_operator_, user_key, nullptr, merge_context->GetOperands(), - str_value, info_log_, db_statistics_, clock_, - nullptr /* result_operand */, true); - if (LIKELY(value != nullptr)) { - value->PinSelf(); + if (str_value || columns) { + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, columns, info_log_, db_statistics_, clock_, + nullptr /* result_operand */, true); + if (status->ok()) { + if (LIKELY(value != nullptr)) { + value->PinSelf(); + } + } } } else { if (key_exists != nullptr) { diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index bddc5717f..c18b1bef9 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -209,6 +209,148 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) { ASSERT_OK(db_->Write(WriteOptions(), &batch)); } +TEST_F(DBWideBasicTest, MergePlainKeyValue) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + // Put + Merge + constexpr char first_key[] = "first"; + constexpr char first_base_value[] = "hello"; + constexpr char first_merge_op[] = "world"; + + // Delete + Merge + constexpr char second_key[] = "second"; + constexpr char second_merge_op[] = "foo"; + + // Merge without any preceding KV + constexpr char third_key[] = "third"; + constexpr char third_merge_op[] = "bar"; + + auto write_base = [&]() { + // Write "base" KVs: a Put for the 1st key and a Delete for the 2nd one; + // note there is no "base" KV for the 3rd + ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_base_value)); + ASSERT_OK( + db_->Delete(WriteOptions(), db_->DefaultColumnFamily(), second_key)); + }; + + auto write_merge = [&]() { + // Write Merge operands + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_merge_op)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + second_merge_op)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), third_key, + third_merge_op)); + }; + + const std::string expected_first_column(std::string(first_base_value) + "," + + first_merge_op); + const WideColumns expected_first_columns{ + {kDefaultWideColumnName, expected_first_column}}; + const WideColumns expected_second_columns{ + {kDefaultWideColumnName, second_merge_op}}; + const WideColumns expected_third_columns{ + {kDefaultWideColumnName, third_merge_op}}; + + auto verify = [&]() { + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), expected_first_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), expected_second_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + third_key, &result)); + + ASSERT_EQ(result.columns(), expected_third_columns); + } + + { + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), expected_first_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_first_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), expected_second_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_second_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), expected_third_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), expected_third_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), expected_second_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_second_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), expected_first_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_first_columns); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + // Base KVs (if any) and Merge operands both in memtable + write_base(); + write_merge(); + verify(); + + // Base KVs (if any) and Merge operands both in storage + ASSERT_OK(Flush()); + verify(); + + // Base KVs (if any) in storage, Merge operands in memtable + DestroyAndReopen(options); + write_base(); + ASSERT_OK(Flush()); + write_merge(); + verify(); +} + TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) { Options options = GetDefaultOptions(); options.merge_operator = MergeOperators::CreateStringAppendOperator(); diff --git a/table/get_context.cc b/table/get_context.cc index fca809cc3..435e3e14f 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -407,7 +407,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kDeleted; } else if (kMerge == state_) { state_ = kFound; - Merge(nullptr); + if (do_merge_) { + Merge(nullptr); + } // If do_merge_ = false then the current value shouldn't be part of // merge_context_->operand_list } @@ -438,16 +440,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } void GetContext::Merge(const Slice* value) { + assert(do_merge_); + assert(!pinnable_val_ || !columns_); + + const Status s = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, value, merge_context_->GetOperands(), + pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, + statistics_, clock_); + if (!s.ok()) { + state_ = kCorrupt; + return; + } + if (LIKELY(pinnable_val_ != nullptr)) { - if (do_merge_) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, value, merge_context_->GetOperands(), - pinnable_val_->GetSelf(), logger_, statistics_, clock_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } - } + pinnable_val_->PinSelf(); } }