diff --git a/db/memtable.cc b/db/memtable.cc index 4d8640711..7328f9846 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1090,20 +1090,6 @@ static bool SaveValue(void* arg, const char* entry) { return false; } case kTypeWideColumnEntity: { - if (!s->do_merge) { - *(s->status) = Status::NotSupported( - "GetMergeOperands not supported for wide-column entities"); - *(s->found_final_value) = true; - return false; - } - - if (*(s->merge_in_progress)) { - *(s->status) = Status::NotSupported( - "Merge not supported for wide-column entities"); - *(s->found_final_value) = true; - return false; - } - if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); } @@ -1112,7 +1098,29 @@ static bool SaveValue(void* arg, const char* entry) { *(s->status) = Status::OK(); - if (s->value) { + if (!s->do_merge) { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + + Slice value_of_default; + *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( + v, value_of_default); + + if (s->status->ok()) { + merge_context->PushOperand( + value_of_default, + s->inplace_update_support == false /* operand_pinned */); + } + } else if (*(s->merge_in_progress)) { + assert(s->do_merge); + + if (s->value || s->columns) { + *(s->status) = MergeHelper::TimedFullMergeWithEntity( + merge_operator, s->key->user_key(), v, + merge_context->GetOperands(), s->value, s->columns, s->logger, + s->statistics, s->clock, nullptr /* result_operand */, true); + } + } else if (s->value) { Slice value_of_default; *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( v, value_of_default); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 86a4d11f6..ab758876f 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -12,6 +12,7 @@ #include "db/blob/prefetch_buffer_collection.h" #include "db/compaction/compaction_iteration_stats.h" #include "db/dbformat.h" +#include "db/wide/wide_column_serialization.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "port/likely.h" @@ -140,6 +141,70 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, return Status::OK(); } +Status MergeHelper::TimedFullMergeWithEntity( + const MergeOperator* merge_operator, const Slice& key, Slice base_entity, + const std::vector& operands, std::string* value, + PinnableWideColumns* columns, Logger* logger, Statistics* statistics, + SystemClock* clock, Slice* result_operand, bool update_num_ops_stats) { + assert(value || columns); + assert(!value || !columns); + + WideColumns base_columns; + + { + const Status s = + WideColumnSerialization::Deserialize(base_entity, base_columns); + if (!s.ok()) { + return s; + } + } + + const bool has_default_column = + !base_columns.empty() && base_columns[0].name() == kDefaultWideColumnName; + + Slice value_of_default; + if (has_default_column) { + value_of_default = base_columns[0].value(); + } + + std::string result; + + { + const Status s = TimedFullMerge( + merge_operator, key, &value_of_default, operands, &result, logger, + statistics, clock, result_operand, update_num_ops_stats); + if (!s.ok()) { + return s; + } + } + + if (value) { + *value = std::move(result); + return Status::OK(); + } + + assert(columns); + + std::string output; + + if (has_default_column) { + base_columns[0].value() = result; + + const Status s = WideColumnSerialization::Serialize(base_columns, output); + if (!s.ok()) { + return s; + } + } else { + const Status s = + WideColumnSerialization::Serialize(result, base_columns, output); + if (!s.ok()) { + return s; + } + } + + return columns->SetWideColumnValue(output); +} + // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) // keys_, operands_ are updated to reflect the merge result. diff --git a/db/merge_helper.h b/db/merge_helper.h index 956e3ff78..4b6328d24 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -16,6 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" +#include "rocksdb/wide_columns.h" #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { @@ -65,6 +66,13 @@ class MergeHelper { Slice* result_operand = nullptr, bool update_num_ops_stats = false); + static Status TimedFullMergeWithEntity( + const MergeOperator* merge_operator, const Slice& key, Slice base_entity, + const std::vector& operands, std::string* value, + PinnableWideColumns* columns, Logger* logger, Statistics* statistics, + SystemClock* clock, Slice* result_operand = nullptr, + bool update_num_ops_stats = false); + // During compaction, merge entries until we hit // - a corrupted key // - a Put/Delete, diff --git a/db/version_set.cc b/db/version_set.cc index c27933deb..4a2043263 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2366,10 +2366,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, "Encounter unexpected blob index. Please open DB with " "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); return; - case GetContext::kUnexpectedWideColumnEntity: - *status = - Status::NotSupported("Encountered unexpected wide-column entity"); - return; } f = fp.GetNextFile(); } diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h index 86f0c2077..755585990 100644 --- a/db/version_set_sync_and_async.h +++ b/db/version_set_sync_and_async.h @@ -141,11 +141,6 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); file_range.MarkKeyDone(iter); continue; - case GetContext::kUnexpectedWideColumnEntity: - *status = - Status::NotSupported("Encountered unexpected wide-column entity"); - file_range.MarkKeyDone(iter); - continue; } } diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index c18b1bef9..cb3ec1af0 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -334,14 +334,19 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) { } }; - // Base KVs (if any) and Merge operands both in memtable - write_base(); - write_merge(); - verify(); - - // Base KVs (if any) and Merge operands both in storage - ASSERT_OK(Flush()); - verify(); + { + // Base KVs (if any) and Merge operands both in memtable (note: we take a + // snapshot in between to make sure they do not get reconciled during the + // subsequent flush) + write_base(); + ManagedSnapshot snapshot(db_); + write_merge(); + verify(); + + // Base KVs (if any) and Merge operands both in storage + ASSERT_OK(Flush()); + verify(); + } // Base KVs (if any) in storage, Merge operands in memtable DestroyAndReopen(options); @@ -351,28 +356,126 @@ TEST_F(DBWideBasicTest, MergePlainKeyValue) { verify(); } -TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) { +TEST_F(DBWideBasicTest, MergeEntity) { Options options = GetDefaultOptions(); - options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.create_if_missing = true; + + const std::string delim("|"); + options.merge_operator = MergeOperators::CreateStringAppendOperator(delim); + Reopen(options); + // Test Merge with two entities: one that has the default column and one that + // doesn't constexpr char first_key[] = "first"; + WideColumns first_columns{{kDefaultWideColumnName, "a"}, + {"attr_name1", "foo"}, + {"attr_name2", "bar"}}; + constexpr char first_merge_operand[] = "bla1"; + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + constexpr char second_merge_operand[] = "bla2"; + + auto write_base = [&]() { + // Use the DB::PutEntity API + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + + // Use WriteBatch + WriteBatch batch; + ASSERT_OK(batch.PutEntity(db_->DefaultColumnFamily(), second_key, + second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + }; + + auto write_merge = [&]() { + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_merge_operand)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + second_merge_operand)); + }; - // Note: Merge is currently not supported for wide-column entities auto verify = [&]() { + const std::string first_expected_default( + first_columns[0].value().ToString() + delim + first_merge_operand); + { PinnableSlice result; - ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, - &result) - .IsNotSupported()); + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, + &result)); + ASSERT_EQ(result, first_expected_default); } + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, first_expected_default}, + first_columns[1], + first_columns[2]}; + + ASSERT_EQ(result.columns(), expected_columns); + } + + { + constexpr size_t num_merge_operands = 2; + + std::array merge_operands; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], first_columns[0].value()); + ASSERT_EQ(merge_operands[1], first_merge_operand); + } + + const std::string second_expected_default(delim + second_merge_operand); + { PinnableSlice result; - ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), - second_key, &result) - .IsNotSupported()); + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, + &result)); + ASSERT_EQ(result, second_expected_default); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, second_expected_default}, + second_columns[0], + second_columns[1]}; + + ASSERT_EQ(result.columns(), expected_columns); + } + + { + constexpr size_t num_merge_operands = 2; + + std::array merge_operands; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_TRUE(merge_operands[0].empty()); + ASSERT_EQ(merge_operands[1], second_merge_operand); } { @@ -385,13 +488,15 @@ TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) { db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); - ASSERT_TRUE(values[0].empty()); - ASSERT_TRUE(statuses[0].IsNotSupported()); + ASSERT_EQ(values[0], first_expected_default); + ASSERT_OK(statuses[0]); - ASSERT_TRUE(values[1].empty()); - ASSERT_TRUE(statuses[1].IsNotSupported()); + ASSERT_EQ(values[1], second_expected_default); + ASSERT_OK(statuses[1]); } + // Note: Merge is currently not supported for wide-column entities in + // iterator { std::unique_ptr iter(db_->NewIterator(ReadOptions())); @@ -405,47 +510,25 @@ TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) { } }; - // Use the DB::PutEntity API - WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; - - ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), - first_key, first_columns)); - - // Use WriteBatch - WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; - - WriteBatch batch; - ASSERT_OK( - batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)); - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - - ASSERT_OK(Flush()); - - // Add a couple of merge operands - constexpr char merge_operand[] = "bla"; - - ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, - merge_operand)); - ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, - merge_operand)); - - // Try reading when PutEntity is in storage, Merge is in memtable - verify(); - - // Try reading when PutEntity and Merge are both in storage + { + // Base KVs and Merge operands both in memtable (note: we take a snapshot in + // between to make sure they do not get reconciled during the subsequent + // flush) + write_base(); + ManagedSnapshot snapshot(db_); + write_merge(); + verify(); + + // Base KVs and Merge operands both in storage + ASSERT_OK(Flush()); + verify(); + } + + // Base KVs in storage, Merge operands in memtable + DestroyAndReopen(options); + write_base(); ASSERT_OK(Flush()); - - verify(); - - // Try reading when PutEntity and Merge are both in memtable - ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), - first_key, first_columns)); - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, - merge_operand)); - ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, - merge_operand)); - + write_merge(); verify(); } diff --git a/table/get_context.cc b/table/get_context.cc index dc72a79fb..41825d89e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -351,9 +351,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, Slice blob_value(pin_val); push_operand(blob_value, nullptr); } else if (type == kTypeWideColumnEntity) { - // TODO: support wide-column entities - state_ = kUnexpectedWideColumnEntity; - return false; + Slice value_copy = value; + Slice value_of_default; + + if (!WideColumnSerialization::GetValueOfDefaultColumn( + value_copy, value_of_default) + .ok()) { + state_ = kCorrupt; + return false; + } + + push_operand(value_of_default, value_pinner); } else { assert(type == kTypeValue); push_operand(value, value_pinner); @@ -377,9 +385,26 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, push_operand(blob_value, nullptr); } } else if (type == kTypeWideColumnEntity) { - // TODO: support wide-column entities - state_ = kUnexpectedWideColumnEntity; - return false; + state_ = kFound; + + if (do_merge_) { + MergeWithEntity(value); + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + Slice value_copy = value; + Slice value_of_default; + + if (!WideColumnSerialization::GetValueOfDefaultColumn( + value_copy, value_of_default) + .ok()) { + state_ = kCorrupt; + return false; + } + + push_operand(value_of_default, value_pinner); + } } else { assert(type == kTypeValue); @@ -457,6 +482,24 @@ void GetContext::Merge(const Slice* value) { } } +void GetContext::MergeWithEntity(Slice entity) { + assert(do_merge_); + assert(!pinnable_val_ || !columns_); + + const Status s = MergeHelper::TimedFullMergeWithEntity( + merge_operator_, user_key_, entity, merge_context_->GetOperands(), + pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, + statistics_, clock_); + if (!s.ok()) { + state_ = kCorrupt; + return; + } + + if (LIKELY(pinnable_val_ != nullptr)) { + pinnable_val_->PinSelf(); + } +} + bool GetContext::GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value) { constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; diff --git a/table/get_context.h b/table/get_context.h index 57f8b7eea..dcc7ab8d6 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -75,8 +75,6 @@ class GetContext { kCorrupt, kMerge, // saver contains the current merge result (the operands) kUnexpectedBlobIndex, - // TODO: remove once wide-column entities are supported by Get/MultiGet - kUnexpectedWideColumnEntity, }; GetContextStats get_context_stats_; @@ -185,6 +183,7 @@ class GetContext { private: void Merge(const Slice* value); + void MergeWithEntity(Slice entity); bool GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value); const Comparator* ucmp_;