Support Merge for wide-column entities in the compaction logic (#10946)

Summary:
The patch extends the compaction logic to handle `Merge`s in conjunction with wide-column entities. As usual, the merge operation is applied to the anonymous default column, and any other columns are unaffected.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10946

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D41233722

Pulled By: ltamasi

fbshipit-source-id: dfd9b1362222f01bafcecb139eb48480eb279fed
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent d1aca4a5ae
commit 5e8947057b
  1. 11
      db/merge_helper.cc
  2. 125
      db/wide/db_wide_basic_test.cc

@ -343,9 +343,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
/* result_operand */ nullptr, /* result_operand */ nullptr,
/* update_num_ops_stats */ false); /* update_num_ops_stats */ false);
} else if (ikey.type == kTypeWideColumnEntity) { } else if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities s = TimedFullMergeWithEntity(
return Status::NotSupported( user_merge_operator_, ikey.user_key, iter->value(),
"Merge currently not supported for wide-column entities"); merge_context_.GetOperands(), &merge_result, logger_, stats_,
clock_, /* update_num_ops_stats */ false);
} else { } else {
s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr, s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr,
merge_context_.GetOperands(), &merge_result, logger_, merge_context_.GetOperands(), &merge_result, logger_,
@ -359,7 +360,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
if (s.ok()) { if (s.ok()) {
// The original key encountered // The original key encountered
original_key = std::move(keys_.back()); original_key = std::move(keys_.back());
orig_ikey.type = kTypeValue; orig_ikey.type = ikey.type == kTypeWideColumnEntity
? kTypeWideColumnEntity
: kTypeValue;
UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
keys_.clear(); keys_.clear();
merge_context_.Clear(); merge_context_.Clear();

@ -396,15 +396,16 @@ TEST_F(DBWideBasicTest, MergeEntity) {
second_merge_operand)); second_merge_operand));
}; };
auto verify = [&]() { const std::string first_expected_default(first_columns[0].value().ToString() +
const std::string first_expected_default( delim + first_merge_operand);
first_columns[0].value().ToString() + delim + first_merge_operand); const std::string second_expected_default(delim + second_merge_operand);
auto verify_basic = [&]() {
WideColumns first_expected_columns{ WideColumns first_expected_columns{
{kDefaultWideColumnName, first_expected_default}, {kDefaultWideColumnName, first_expected_default},
first_columns[1], first_columns[1],
first_columns[2]}; first_columns[2]};
const std::string second_expected_default(delim + second_merge_operand);
WideColumns second_expected_columns{ WideColumns second_expected_columns{
{kDefaultWideColumnName, second_expected_default}, {kDefaultWideColumnName, second_expected_default},
second_columns[0], second_columns[0],
@ -424,24 +425,6 @@ TEST_F(DBWideBasicTest, MergeEntity) {
ASSERT_EQ(result.columns(), first_expected_columns); ASSERT_EQ(result.columns(), first_expected_columns);
} }
{
constexpr size_t num_merge_operands = 2;
std::array<PinnableSlice, num_merge_operands> merge_operands;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_EQ(merge_operands[0], first_columns[0].value());
ASSERT_EQ(merge_operands[1], first_merge_operand);
}
{ {
PinnableSlice result; PinnableSlice result;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
@ -456,24 +439,6 @@ TEST_F(DBWideBasicTest, MergeEntity) {
ASSERT_EQ(result.columns(), second_expected_columns); ASSERT_EQ(result.columns(), second_expected_columns);
} }
{
constexpr size_t num_merge_operands = 2;
std::array<PinnableSlice, num_merge_operands> merge_operands;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_TRUE(merge_operands[0].empty());
ASSERT_EQ(merge_operands[1], second_merge_operand);
}
{ {
constexpr size_t num_keys = 2; constexpr size_t num_keys = 2;
@ -532,6 +497,70 @@ TEST_F(DBWideBasicTest, MergeEntity) {
} }
}; };
auto verify_merge_ops_pre_compaction = [&]() {
constexpr size_t num_merge_operands = 2;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
{
std::array<PinnableSlice, num_merge_operands> merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_EQ(merge_operands[0], first_columns[0].value());
ASSERT_EQ(merge_operands[1], first_merge_operand);
}
{
std::array<PinnableSlice, num_merge_operands> merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_TRUE(merge_operands[0].empty());
ASSERT_EQ(merge_operands[1], second_merge_operand);
}
};
auto verify_merge_ops_post_compaction = [&]() {
constexpr size_t num_merge_operands = 1;
GetMergeOperandsOptions get_merge_opts;
get_merge_opts.expected_max_number_of_operands = num_merge_operands;
{
std::array<PinnableSlice, num_merge_operands> merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_EQ(merge_operands[0], first_expected_default);
}
{
std::array<PinnableSlice, num_merge_operands> merge_operands;
int number_of_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &merge_operands[0],
&get_merge_opts, &number_of_operands));
ASSERT_EQ(number_of_operands, num_merge_operands);
ASSERT_EQ(merge_operands[0], second_expected_default);
}
};
{ {
// Base KVs and Merge operands both in memtable (note: we take a snapshot in // Base KVs and Merge operands both in memtable (note: we take a snapshot in
// between to make sure they do not get reconciled during the subsequent // between to make sure they do not get reconciled during the subsequent
@ -539,11 +568,13 @@ TEST_F(DBWideBasicTest, MergeEntity) {
write_base(); write_base();
ManagedSnapshot snapshot(db_); ManagedSnapshot snapshot(db_);
write_merge(); write_merge();
verify(); verify_basic();
verify_merge_ops_pre_compaction();
// Base KVs and Merge operands both in storage // Base KVs and Merge operands both in storage
ASSERT_OK(Flush()); ASSERT_OK(Flush());
verify(); verify_basic();
verify_merge_ops_pre_compaction();
} }
// Base KVs in storage, Merge operands in memtable // Base KVs in storage, Merge operands in memtable
@ -551,7 +582,15 @@ TEST_F(DBWideBasicTest, MergeEntity) {
write_base(); write_base();
ASSERT_OK(Flush()); ASSERT_OK(Flush());
write_merge(); write_merge();
verify(); verify_basic();
verify_merge_ops_pre_compaction();
// Flush and compact
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
/* end */ nullptr));
verify_basic();
verify_merge_ops_post_compaction();
} }
TEST_F(DBWideBasicTest, PutEntityTimestampError) { TEST_F(DBWideBasicTest, PutEntityTimestampError) {

Loading…
Cancel
Save