diff --git a/db/builder.cc b/db/builder.cc index 08e76b539..ce85ae589 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -73,6 +73,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, MergeHelper merge(internal_comparator.user_comparator(), options.merge_operator.get(), options.info_log.get(), + options.min_partial_merge_operands, true /* internal key corruption is not ok */); if (purge) { diff --git a/db/c.cc b/db/c.cc index e4946f351..9084c4a9a 100644 --- a/db/c.cc +++ b/db/c.cc @@ -159,12 +159,10 @@ struct rocksdb_mergeoperator_t : public MergeOperator { const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length); - char* (*partial_merge_)( - void*, - const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, - unsigned char* success, size_t* new_value_length); + char* (*partial_merge_)(void*, const char* key, size_t key_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length); void (*delete_value_)( void*, const char* value, size_t value_length); @@ -219,21 +217,23 @@ struct rocksdb_mergeoperator_t : public MergeOperator { return success; } - virtual bool PartialMerge( - const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + size_t operand_count = operand_list.size(); + std::vector operand_pointers(operand_count); + std::vector operand_sizes(operand_count); + for (size_t i = 0; i < operand_count; ++i) { + Slice operand(operand_list[i]); + operand_pointers[i] = operand.data(); + operand_sizes[i] = operand.size(); + } unsigned char success; size_t new_value_len; char* tmp_new_value = (*partial_merge_)( - state_, - key.data(), key.size(), - left_operand.data(), left_operand.size(), - right_operand.data(), right_operand.size(), - &success, &new_value_len); + state_, key.data(), key.size(), &operand_pointers[0], &operand_sizes[0], + operand_count, &success, &new_value_len); new_value->assign(tmp_new_value, new_value_len); if (delete_value_ != nullptr) { @@ -1094,24 +1094,18 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) { } rocksdb_mergeoperator_t* rocksdb_mergeoperator_create( - void* state, - void (*destructor)(void*), - char* (*full_merge)( - void*, - const char* key, size_t key_length, - const char* existing_value, size_t existing_value_length, - const char* const* operands_list, const size_t* operands_list_length, - int num_operands, - unsigned char* success, size_t* new_value_length), - char* (*partial_merge)( - void*, - const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, - unsigned char* success, size_t* new_value_length), - void (*delete_value)( - void*, - const char* value, size_t value_length), + void* state, void (*destructor)(void*), + char* (*full_merge)(void*, const char* key, size_t key_length, + const char* existing_value, + size_t existing_value_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length), + char* (*partial_merge)(void*, const char* key, size_t key_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length), + void (*delete_value)(void*, const char* value, size_t value_length), const char* (*name)(void*)) { rocksdb_mergeoperator_t* result = new rocksdb_mergeoperator_t; result->state_ = state; diff --git a/db/c_test.c b/db/c_test.c index a68abca48..f17e37128 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -175,8 +175,8 @@ static char* MergeOperatorFullMerge( static char* MergeOperatorPartialMerge( void* arg, const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, + const char* const* operands_list, const size_t* operands_list_length, + int num_operands, unsigned char* success, size_t* new_value_length) { *new_value_length = 4; *success = 1; diff --git a/db/db_impl.cc b/db/db_impl.cc index 5b79807f5..b8dc9cb4b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2463,6 +2463,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, std::vector delete_key; // for compaction filter MergeHelper merge(user_comparator(), options_.merge_operator.get(), options_.info_log.get(), + options_.min_partial_merge_operands, false /* internal key corruption is expected */); auto compaction_filter = options_.compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; diff --git a/db/db_iter.cc b/db/db_iter.cc index e7491f7e3..bbf8a8115 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -313,21 +313,6 @@ void DBIter::MergeValuesNewToOld() { // when complete, add result to operands and continue. const Slice& value = iter_->value(); operands.push_front(value.ToString()); - while(operands.size() >= 2) { - // Call user associative-merge until it returns false - if (user_merge_operator_->PartialMerge(ikey.user_key, - Slice(operands[0]), - Slice(operands[1]), - &merge_result, - logger_)) { - operands.pop_front(); - swap(operands.front(), merge_result); - } else { - // Associative merge returns false ==> stack the operands - break; - } - } - } } diff --git a/db/memtable.cc b/db/memtable.cc index b787ec24e..5fefab04b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -36,7 +36,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) kWriteBufferSize(options.write_buffer_size), arena_(options.arena_block_size), table_(options.memtable_factory->CreateMemTableRep( - comparator_, &arena_, options.prefix_extractor.get())), + comparator_, &arena_, options.prefix_extractor.get())), flush_in_progress_(false), flush_completed_(false), file_number_(0), @@ -353,17 +353,6 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->merge_in_progress) = true; merge_context->PushOperand(v); - while (merge_context->GetNumOperands() >= 2) { - // Attempt to associative merge. (Returns true if successful) - if (merge_operator->PartialMerge( - s->key->user_key(), merge_context->GetOperand(0), - merge_context->GetOperand(1), &merge_result, s->logger)) { - merge_context->PushPartialMergeResult(merge_result); - } else { - // Stack them because user can't associative merge - break; - } - } return true; } default: diff --git a/db/merge_context.h b/db/merge_context.h index 91d9f8a01..bf483a827 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -25,12 +25,12 @@ public: operand_list->clear(); } } - // Replace the first two operands of merge_result, which are expected be the - // merge results of them. + // Replace all operands with merge_result, which are expected to be the + // merge result of them. void PushPartialMergeResult(std::string& merge_result) { assert (operand_list); - operand_list->pop_front(); - swap(operand_list->front(), merge_result); + operand_list->clear(); + operand_list->push_front(std::move(merge_result)); } // Push a merge operand void PushOperand(const Slice& operand_slice) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index e3f3adb1f..cc1dac6c1 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -129,28 +129,10 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => then continue because we haven't yet seen a Put/Delete. assert(!operands_.empty()); // Should have at least one element in it + // keep queuing keys and operands until we either meet a put / delete + // request or later did a partial merge. keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); - while (operands_.size() >= 2) { - // Returns false when the merge_operator can no longer process it - if (user_merge_operator_->PartialMerge(ikey.user_key, - Slice(operands_[0]), - Slice(operands_[1]), - &merge_result, - logger_)) { - // Merging of operands (associative merge) was successful. - // Replace these frontmost two operands with the merge result - keys_.pop_front(); - operands_.pop_front(); - swap(operands_.front(), merge_result); - } else { - // Merging of operands (associative merge) returned false. - // The user merge_operator does not know how to merge these operands. - // So we just stack them up until we find a Put/Delete or end of key. - break; - } - } - continue; } } @@ -192,6 +174,23 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, RecordTick(stats, NUMBER_MERGE_FAILURES); // Do nothing if not success_. Leave keys() and operands() as they are. } + } else { + // We haven't seen the beginning of the key nor a Put/Delete. + // Attempt to use the user's associative merge function to + // merge the stacked merge operands into a single operand. + + if (operands_.size() >= 2 && + operands_.size() >= min_partial_merge_operands_ && + user_merge_operator_->PartialMergeMulti( + ikey.user_key, + std::deque(operands_.begin(), operands_.end()), + &merge_result, logger_)) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + operands_.clear(); + operands_.push_front(std::move(merge_result)); + keys_.erase(keys_.begin(), keys_.end() - 1); + } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 6fe9bfb23..5311555a0 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -22,12 +22,13 @@ class Statistics; class MergeHelper { public: MergeHelper(const Comparator* user_comparator, - const MergeOperator* user_merge_operator, - Logger* logger, + const MergeOperator* user_merge_operator, Logger* logger, + unsigned min_partial_merge_operands, bool assert_valid_internal_key) : user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), logger_(logger), + min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), keys_(), operands_(), @@ -88,6 +89,7 @@ class MergeHelper { const Comparator* user_comparator_; const MergeOperator* user_merge_operator_; Logger* logger_; + unsigned min_partial_merge_operands_; bool assert_valid_internal_key_; // enforce no internal key corruption? // the scratch area that holds the result of MergeUntil diff --git a/db/merge_operator.cc b/db/merge_operator.cc index 7d1ee4e5f..43a8df371 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -11,6 +11,28 @@ namespace rocksdb { +// The default implementation of PartialMergeMulti, which invokes +// PartialMerge multiple times internally and merges two operands at +// a time. +bool MergeOperator::PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + // Simply loop through the operands + std::string temp_value; + Slice temp_slice; + for (const auto& operand : operand_list) { + if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { + return false; + } + swap(temp_value, *new_value); + temp_slice = Slice(*new_value); + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + // Given a "real" merge from the library, call the user's // associative merge function one-by-one on each of the operands. // NOTE: It is assumed that the client's merge-operator will handle any errors. @@ -46,7 +68,6 @@ bool AssociativeMergeOperator::PartialMerge( const Slice& right_operand, std::string* new_value, Logger* logger) const { - return Merge(key, &left_operand, right_operand, new_value, logger); } diff --git a/db/merge_test.cc b/db/merge_test.cc index 887d8ad42..4b98f0581 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -24,10 +24,14 @@ using namespace rocksdb; namespace { int numMergeOperatorCalls; - void resetNumMergeOperatorCalls() { numMergeOperatorCalls = 0; } + + int num_partial_merge_calls; + void resetNumPartialMergeCalls() { + num_partial_merge_calls = 0; + } } class CountMergeOperator : public AssociativeMergeOperator { @@ -42,6 +46,11 @@ class CountMergeOperator : public AssociativeMergeOperator { std::string* new_value, Logger* logger) const override { ++numMergeOperatorCalls; + if (existing_value == nullptr) { + new_value->assign(value.data(), value.size()); + return true; + } + return mergeOperator_->PartialMerge( key, *existing_value, @@ -50,6 +59,14 @@ class CountMergeOperator : public AssociativeMergeOperator { logger); } + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + ++num_partial_merge_calls; + return mergeOperator_->PartialMergeMulti(key, operand_list, new_value, + logger); + } + virtual const char* Name() const override { return "UInt64AddOperator"; } @@ -58,16 +75,16 @@ class CountMergeOperator : public AssociativeMergeOperator { std::shared_ptr mergeOperator_; }; -std::shared_ptr OpenDb( - const string& dbname, - const bool ttl = false, - const unsigned max_successive_merges = 0) { +std::shared_ptr OpenDb(const string& dbname, const bool ttl = false, + const size_t max_successive_merges = 0, + const uint32_t min_partial_merge_operands = 2) { DB* db; StackableDB* sdb; Options options; options.create_if_missing = true; options.merge_operator = std::make_shared(); options.max_successive_merges = max_successive_merges; + options.min_partial_merge_operands = min_partial_merge_operands; Status s; DestroyDB(dbname, Options()); if (ttl) { @@ -306,6 +323,44 @@ void testSuccessiveMerge( } } +void testPartialMerge(Counters* counters, DB* db, int max_merge, int min_merge, + int count) { + FlushOptions o; + o.wait = true; + + // Test case 1: partial merge should be called when the number of merge + // operands exceeds the threshold. + uint64_t tmp_sum = 0; + resetNumPartialMergeCalls(); + for (int i = 1; i <= count; i++) { + counters->assert_add("b", i); + tmp_sum += i; + } + db->Flush(o); + db->CompactRange(nullptr, nullptr); + ASSERT_EQ(tmp_sum, counters->assert_get("b")); + if (count > max_merge) { + // in this case, FullMerge should be called instead. + ASSERT_EQ(num_partial_merge_calls, 0); + } else { + // if count >= min_merge, then partial merge should be called once. + ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1)); + } + + // Test case 2: partial merge should not be called when a put is found. + resetNumPartialMergeCalls(); + tmp_sum = 0; + db->Put(rocksdb::WriteOptions(), "c", "10"); + for (int i = 1; i <= count; i++) { + counters->assert_add("c", i); + tmp_sum += i; + } + db->Flush(o); + db->CompactRange(nullptr, nullptr); + ASSERT_EQ(tmp_sum, counters->assert_get("c")); + ASSERT_EQ(num_partial_merge_calls, 0); +} + void testSingleBatchSuccessiveMerge( DB* db, int max_num_merges, @@ -370,20 +425,40 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { { cout << "Test merge in memtable... \n"; - unsigned maxMerge = 5; - auto db = OpenDb(dbname, use_ttl, maxMerge); + size_t max_merge = 5; + auto db = OpenDb(dbname, use_ttl, max_merge); MergeBasedCounters counters(db, 0); testCounters(counters, db.get(), compact); - testSuccessiveMerge(counters, maxMerge, maxMerge * 2); + testSuccessiveMerge(counters, max_merge, max_merge * 2); testSingleBatchSuccessiveMerge(db.get(), 5, 7); DestroyDB(dbname, Options()); } + { + cout << "Test Partial-Merge\n"; + size_t max_merge = 100; + for (uint32_t min_merge = 5; min_merge < 25; min_merge += 5) { + for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) { + auto db = OpenDb(dbname, use_ttl, max_merge, min_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, count); + DestroyDB(dbname, Options()); + } + { + auto db = OpenDb(dbname, use_ttl, max_merge, min_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, + min_merge * 10); + DestroyDB(dbname, Options()); + } + } + } } int main(int argc, char *argv[]) { //TODO: Make this test like a general rocksdb unit-test runTest(argc, test::TmpDir() + "/merge_testdb"); runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database + printf("Passed all tests!\n"); return 0; } diff --git a/db/version_set.cc b/db/version_set.cc index 70da5b9d5..3d9b0f128 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -419,18 +419,7 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, assert(s->state == kNotFound || s->state == kMerge); s->state = kMerge; merge_contex->PushOperand(v); - while (merge_contex->GetNumOperands() >= 2) { - // Attempt to merge operands together via user associateive merge - if (s->merge_operator->PartialMerge( - s->user_key, merge_contex->GetOperand(0), - merge_contex->GetOperand(1), &merge_result, s->logger)) { - merge_contex->PushPartialMergeResult(merge_result); - } else { - // Associative merge returns false ==> stack the operands - break; - } - } - return true; + return true; default: assert(false); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 62be94fe4..23c63f24f 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -425,8 +425,8 @@ extern rocksdb_mergeoperator_t* rocksdb_mergeoperator_create( char* (*partial_merge)( void*, const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, + const char* const* operands_list, const size_t* operands_list_length, + int num_operands, unsigned char* success, size_t* new_value_length), void (*delete_value)( void*, diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index bd4c36c07..2ae64c1bc 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -32,9 +32,9 @@ class Logger; // // b) MergeOperator - the generic class for all the more abstract / complex // operations; one method (FullMerge) to merge a Put/Delete value with a -// merge operand; and another method (PartialMerge) that merges two -// operands together. this is especially useful if your key values have a -// complex structure but you would still like to support client-specific +// merge operand; and another method (PartialMerge) that merges multiple +// operands together. this is especially useful if your key values have +// complex structures but you would still like to support client-specific // incremental updates. // // AssociativeMergeOperator is simpler to implement. MergeOperator is simply @@ -80,6 +80,13 @@ class MergeOperator { // DB::Merge(key, *new_value) would yield the same result as a call // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). // + // The default implementation of PartialMergeMulti will use this function + // as a helper, for backward compatibility. Any successor class of + // MergeOperator should either implement PartialMerge or PartialMergeMulti, + // although implementing PartialMergeMulti is suggested as it is in general + // more effective to merge multiple operands at a time instead of two + // operands at a time. + // // If it is impossible or infeasible to combine the two operations, // leave new_value unchanged and return false. The library will // internally keep track of the operations, and apply them in the @@ -89,12 +96,38 @@ class MergeOperator { // and simply "return false". For now, the client should simply return // false in any case it cannot perform partial-merge, regardless of reason. // If there is corruption in the data, handle it in the FullMerge() function, - // and return false there. - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const = 0; + // and return false there. The default implementation of PartialMerge will + // always return false. + virtual bool PartialMerge(const Slice& key, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* logger) const { + return false; + } + + // This function performs merge when all the operands are themselves merge + // operation types that you would have passed to a DB::Merge() call in the + // same order (front() first) + // (i.e. DB::Merge(key, operand_list[0]), followed by + // DB::Merge(key, operand_list[1]), ...) + // + // PartialMergeMulti should combine them into a single merge operation that is + // saved into *new_value, and then it should return true. *new_value should + // be constructed such that a call to DB::Merge(key, *new_value) would yield + // the same result as subquential individual calls to DB::Merge(key, operand) + // for each operand in operand_list from front() to back(). + // + // The PartialMergeMulti function will be called only when the list of + // operands are long enough. The minimum amount of operands that will be + // passed to the function are specified by the "min_partial_merge_operands" + // option. + // + // In the default implementation, PartialMergeMulti will invoke PartialMerge + // multiple times, where each time it only merges two operands. Developers + // should either implement PartialMergeMulti, or implement PartialMerge which + // is served as the helper function of the default PartialMergeMulti. + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const; // The name of the MergeOperator. Used to check for MergeOperator // mismatches (i.e., a DB created with one MergeOperator is diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index bb676f985..0e5ff577e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -723,6 +723,15 @@ struct Options { // Default: 0 (disabled) size_t max_successive_merges; + // The number of partial merge operands to accumulate before partial + // merge will be performed. Partial merge will not be called + // if the list of values to merge is less than min_partial_merge_operands. + // + // If min_partial_merge_operands < 2, then it will be treated as 2. + // + // Default: 2 + uint32_t min_partial_merge_operands; + // Allow RocksDB to use thread local storage to optimize performance. // Default: true bool allow_thread_local; diff --git a/util/options.cc b/util/options.cc index 17ef3da9f..007584b94 100644 --- a/util/options.cc +++ b/util/options.cc @@ -30,16 +30,15 @@ Options::Options() : comparator(BytewiseComparator()), merge_operator(nullptr), compaction_filter(nullptr), - compaction_filter_factory( - std::shared_ptr( - new DefaultCompactionFilterFactory())), + compaction_filter_factory(std::shared_ptr( + new DefaultCompactionFilterFactory())), create_if_missing(false), error_if_exists(false), paranoid_checks(false), env(Env::Default()), info_log(nullptr), info_log_level(INFO), - write_buffer_size(4<<20), + write_buffer_size(4 << 20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), max_open_files(1000), @@ -95,7 +94,7 @@ Options::Options() is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), - block_size_deviation (10), + block_size_deviation(10), advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), @@ -106,13 +105,14 @@ Options::Options() max_sequential_skip_in_iterations(8), memtable_factory(std::shared_ptr(new SkipListFactory)), table_factory( - std::shared_ptr(new BlockBasedTableFactory())), + std::shared_ptr(new BlockBasedTableFactory())), inplace_update_support(false), inplace_update_num_locks(10000), inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6), max_successive_merges(0), + min_partial_merge_operands(2), allow_thread_local(true) { assert(memtable_factory.get() != nullptr); } @@ -306,6 +306,8 @@ Options::Dump(Logger* log) const inplace_update_support); Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); + Log(log, " Options.min_partial_merge_operands: %u", + min_partial_merge_operands); // TODO: easier config for bloom (maybe based on avg key/value size) Log(log, " Options.memtable_prefix_bloom_bits: %d", memtable_prefix_bloom_bits); diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index e77449d32..333084313 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -1,3 +1,8 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + #include #include "rocksdb/slice.h" #include "rocksdb/merge_operator.h" @@ -38,6 +43,15 @@ class PutOperator : public MergeOperator { return true; } + using MergeOperator::PartialMergeMulti; + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override { + new_value->assign(operand_list.back().data(), operand_list.back().size()); + return true; + } + virtual const char* Name() const override { return "PutOperator"; } diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index e153a388e..b2e03588f 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -6,6 +6,7 @@ #include "stringappend2.h" #include +#include #include #include "rocksdb/slice.h" @@ -61,31 +62,39 @@ bool StringAppendTESTOperator::FullMerge( return true; } -bool StringAppendTESTOperator::PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { +bool StringAppendTESTOperator::PartialMergeMulti( + const Slice& key, const std::deque& operand_list, + std::string* new_value, Logger* logger) const { return false; } // A version of PartialMerge that actually performs "partial merging". // Use this to simulate the exact behaviour of the StringAppendOperator. -bool StringAppendTESTOperator::_AssocPartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { - // Clear the *new_value for writing. +bool StringAppendTESTOperator::_AssocPartialMergeMulti( + const Slice& key, const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + // Clear the *new_value for writing assert(new_value); new_value->clear(); + assert(operand_list.size() >= 2); // Generic append - // Reserve correct size for *new_value, and apply concatenation. - new_value->reserve(left_operand.size() + 1 + right_operand.size()); - new_value->assign(left_operand.data(), left_operand.size()); - new_value->append(1,delim_); - new_value->append(right_operand.data(), right_operand.size()); + // Determine and reserve correct size for *new_value. + size_t size = 0; + for (const auto& operand : operand_list) { + size += operand.size(); + } + size += operand_list.size() - 1; // Delimiters + new_value->reserve(size); + + // Apply concatenation + new_value->assign(operand_list.front().data(), operand_list.front().size()); + + for (std::deque::const_iterator it = operand_list.begin() + 1; + it != operand_list.end(); ++it) { + new_value->append(1, delim_); + new_value->append(it->data(), it->size()); + } return true; } diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 01a4be4db..5e506ef8f 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -11,6 +11,9 @@ */ #pragma once +#include +#include + #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" @@ -18,8 +21,8 @@ namespace rocksdb { class StringAppendTESTOperator : public MergeOperator { public: - - StringAppendTESTOperator(char delim_char); /// Constructor with delimiter + // Constructor with delimiter + explicit StringAppendTESTOperator(char delim_char); virtual bool FullMerge(const Slice& key, const Slice* existing_value, @@ -27,22 +30,19 @@ class StringAppendTESTOperator : public MergeOperator { std::string* new_value, Logger* logger) const override; - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const override; + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override; virtual const char* Name() const override; private: // A version of PartialMerge that actually performs "partial merging". // Use this to simulate the exact behaviour of the StringAppendOperator. - bool _AssocPartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const; + bool _AssocPartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const; char delim_; // The delimiter is inserted between elements diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 2fdc664e2..519ae32c7 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -3,6 +3,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include + #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/compaction_filter.h" @@ -268,24 +271,27 @@ class TtlMergeOperator : public MergeOperator { } } - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const override { + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override { const uint32_t ts_len = DBWithTTL::kTSLength; + std::deque operands_without_ts; - if (left_operand.size() < ts_len || right_operand.size() < ts_len) { - Log(logger, "Error: Could not remove timestamp from value."); - return false; + for (const auto& operand : operand_list) { + if (operand.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from value."); + return false; + } + + operands_without_ts.push_back( + Slice(operand.data(), operand.size() - ts_len)); } // Apply the user partial-merge operator (store result in *new_value) assert(new_value); - Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len); - Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len); - if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, - new_value, logger)) { + if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, + logger)) { return false; }