From 5e8947057b251e65940ab7db8d23ab2f764fd59c Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 11 Nov 2022 16:32:32 -0800 Subject: [PATCH] Support Merge for wide-column entities in the compaction logic (#10946) Summary: The patch extends the compaction logic to handle `Merge`s in conjunction with wide-column entities. As usual, the merge operation is applied to the anonymous default column, and any other columns are unaffected. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10946 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D41233722 Pulled By: ltamasi fbshipit-source-id: dfd9b1362222f01bafcecb139eb48480eb279fed --- db/merge_helper.cc | 11 +-- db/wide/db_wide_basic_test.cc | 125 ++++++++++++++++++++++------------ 2 files changed, 89 insertions(+), 47 deletions(-) diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 1cf74fb03..6df841012 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -343,9 +343,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, /* result_operand */ nullptr, /* update_num_ops_stats */ false); } else if (ikey.type == kTypeWideColumnEntity) { - // TODO: support wide-column entities - return Status::NotSupported( - "Merge currently not supported for wide-column entities"); + s = TimedFullMergeWithEntity( + user_merge_operator_, ikey.user_key, iter->value(), + merge_context_.GetOperands(), &merge_result, logger_, stats_, + clock_, /* update_num_ops_stats */ false); } else { s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr, merge_context_.GetOperands(), &merge_result, logger_, @@ -359,7 +360,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, if (s.ok()) { // The original key encountered original_key = std::move(keys_.back()); - orig_ikey.type = kTypeValue; + orig_ikey.type = ikey.type == kTypeWideColumnEntity + ? kTypeWideColumnEntity + : kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); keys_.clear(); merge_context_.Clear(); diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index c6670f7f7..1ffe314fe 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -396,15 +396,16 @@ TEST_F(DBWideBasicTest, MergeEntity) { second_merge_operand)); }; - auto verify = [&]() { - const std::string first_expected_default( - first_columns[0].value().ToString() + delim + first_merge_operand); + const std::string first_expected_default(first_columns[0].value().ToString() + + delim + first_merge_operand); + const std::string second_expected_default(delim + second_merge_operand); + + auto verify_basic = [&]() { WideColumns first_expected_columns{ {kDefaultWideColumnName, first_expected_default}, first_columns[1], first_columns[2]}; - const std::string second_expected_default(delim + second_merge_operand); WideColumns second_expected_columns{ {kDefaultWideColumnName, second_expected_default}, second_columns[0], @@ -424,24 +425,6 @@ TEST_F(DBWideBasicTest, MergeEntity) { ASSERT_EQ(result.columns(), first_expected_columns); } - { - constexpr size_t num_merge_operands = 2; - - std::array merge_operands; - - GetMergeOperandsOptions get_merge_opts; - get_merge_opts.expected_max_number_of_operands = num_merge_operands; - - int number_of_operands = 0; - - ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), - first_key, &merge_operands[0], - &get_merge_opts, &number_of_operands)); - ASSERT_EQ(number_of_operands, num_merge_operands); - ASSERT_EQ(merge_operands[0], first_columns[0].value()); - ASSERT_EQ(merge_operands[1], first_merge_operand); - } - { PinnableSlice result; ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, @@ -456,24 +439,6 @@ TEST_F(DBWideBasicTest, MergeEntity) { ASSERT_EQ(result.columns(), second_expected_columns); } - { - constexpr size_t num_merge_operands = 2; - - std::array merge_operands; - - GetMergeOperandsOptions get_merge_opts; - get_merge_opts.expected_max_number_of_operands = num_merge_operands; - - int number_of_operands = 0; - - ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), - second_key, &merge_operands[0], - &get_merge_opts, &number_of_operands)); - ASSERT_EQ(number_of_operands, num_merge_operands); - ASSERT_TRUE(merge_operands[0].empty()); - ASSERT_EQ(merge_operands[1], second_merge_operand); - } - { constexpr size_t num_keys = 2; @@ -532,6 +497,70 @@ TEST_F(DBWideBasicTest, MergeEntity) { } }; + auto verify_merge_ops_pre_compaction = [&]() { + constexpr size_t num_merge_operands = 2; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + { + std::array merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], first_columns[0].value()); + ASSERT_EQ(merge_operands[1], first_merge_operand); + } + + { + std::array merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_TRUE(merge_operands[0].empty()); + ASSERT_EQ(merge_operands[1], second_merge_operand); + } + }; + + auto verify_merge_ops_post_compaction = [&]() { + constexpr size_t num_merge_operands = 1; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + { + std::array merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], first_expected_default); + } + + { + std::array merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], second_expected_default); + } + }; + { // Base KVs and Merge operands both in memtable (note: we take a snapshot in // between to make sure they do not get reconciled during the subsequent @@ -539,11 +568,13 @@ TEST_F(DBWideBasicTest, MergeEntity) { write_base(); ManagedSnapshot snapshot(db_); write_merge(); - verify(); + verify_basic(); + verify_merge_ops_pre_compaction(); // Base KVs and Merge operands both in storage ASSERT_OK(Flush()); - verify(); + verify_basic(); + verify_merge_ops_pre_compaction(); } // Base KVs in storage, Merge operands in memtable @@ -551,7 +582,15 @@ TEST_F(DBWideBasicTest, MergeEntity) { write_base(); ASSERT_OK(Flush()); write_merge(); - verify(); + verify_basic(); + verify_merge_ops_pre_compaction(); + + // Flush and compact + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr, + /* end */ nullptr)); + verify_basic(); + verify_merge_ops_post_compaction(); } TEST_F(DBWideBasicTest, PutEntityTimestampError) {