From 68a8e6b8fa67df745221e23ea5af12393f35ce77 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Wed, 20 Jul 2016 09:49:03 -0700 Subject: [PATCH] Introduce FullMergeV2 (eliminate memcpy from merge operators) Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque with std::vector This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque with std::vector to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075 --- db/c.cc | 24 +- db/compaction_iterator.cc | 10 + db/compaction_iterator.h | 6 + db/db_iter.cc | 50 ++-- db/db_iter_test.cc | 3 + db/db_test.cc | 9 +- db/db_test2.cc | 261 ++++++++++++++++++ db/db_test_util.cc | 34 +++ db/db_test_util.h | 2 + db/memtable.cc | 17 +- db/merge_context.h | 101 +++++-- db/merge_helper.cc | 60 ++-- db/merge_helper.h | 23 +- db/merge_operator.cc | 33 ++- db/pinned_iterators_manager.h | 4 +- db/version_set.cc | 10 +- db/write_batch.cc | 4 +- examples/compaction_filter_example.cc | 23 +- include/rocksdb/merge_operator.h | 56 +++- table/block.h | 2 + table/block_based_table_reader.cc | 46 ++- table/get_context.cc | 13 +- table/get_context.h | 11 +- table/internal_iterator.h | 5 + table/iterator_wrapper.h | 15 +- table/merger.cc | 6 + table/two_level_iterator.cc | 4 + tools/db_crashtest.py | 2 + tools/db_stress.cc | 9 +- util/testutil.h | 6 +- utilities/merge_operators.h | 3 + utilities/merge_operators/max.cc | 18 +- utilities/merge_operators/put.cc | 23 +- .../string_append/stringappend2.cc | 39 +-- .../string_append/stringappend2.h | 7 +- utilities/options/options_util_test.cc | 10 +- utilities/ttl/db_ttl_impl.h | 49 ++-- utilities/util_merge_operators_test.cc | 53 ++-- 38 files changed, 814 insertions(+), 237 deletions(-) diff --git a/db/c.cc b/db/c.cc index c446ade74..d722bb1f3 100644 --- a/db/c.cc +++ b/db/c.cc @@ -269,33 +269,31 @@ struct rocksdb_mergeoperator_t : public MergeOperator { virtual const char* Name() const override { return (*name_)(state_); } - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - size_t n = operand_list.size(); + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + size_t n = merge_in.operand_list.size(); std::vector operand_pointers(n); std::vector operand_sizes(n); for (size_t i = 0; i < n; i++) { - Slice operand(operand_list[i]); + Slice operand(merge_in.operand_list[i]); operand_pointers[i] = operand.data(); operand_sizes[i] = operand.size(); } const char* existing_value_data = nullptr; size_t existing_value_len = 0; - if (existing_value != nullptr) { - existing_value_data = existing_value->data(); - existing_value_len = existing_value->size(); + if (merge_in.existing_value != nullptr) { + existing_value_data = merge_in.existing_value->data(); + existing_value_len = merge_in.existing_value->size(); } unsigned char success; size_t new_value_len; char* tmp_new_value = (*full_merge_)( - state_, key.data(), key.size(), existing_value_data, existing_value_len, - &operand_pointers[0], &operand_sizes[0], static_cast(n), &success, - &new_value_len); - new_value->assign(tmp_new_value, new_value_len); + state_, merge_in.key.data(), merge_in.key.size(), existing_value_data, + existing_value_len, &operand_pointers[0], &operand_sizes[0], + static_cast(n), &success, &new_value_len); + merge_out->new_value.assign(tmp_new_value, new_value_len); if (delete_value_ != nullptr) { (*delete_value_)(state_, tmp_new_value, new_value_len); diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index bb4c87649..aa160bee7 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -49,6 +49,12 @@ CompactionIterator::CompactionIterator( } else { ignore_snapshots_ = false; } + input_->SetPinnedItersMgr(&pinned_iters_mgr_); +} + +CompactionIterator::~CompactionIterator() { + // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime + input_->SetPinnedItersMgr(nullptr); } void CompactionIterator::ResetRecordCounts() { @@ -83,6 +89,8 @@ void CompactionIterator::Next() { ikey_.user_key = current_key_.GetUserKey(); valid_ = true; } else { + // We consumed all pinned merge operands, release pinned iterators + pinned_iters_mgr_.ReleasePinnedIterators(); // MergeHelper moves the iterator to the first record after the merged // records, so even though we reached the end of the merge output, we do // not want to advance the iterator. @@ -368,6 +376,7 @@ void CompactionIterator::NextFromInput() { return; } + pinned_iters_mgr_.StartPinning(); // We know the merge type entry is not hidden, otherwise we would // have hit (A) // We encapsulate the merge related state machine in a different @@ -395,6 +404,7 @@ void CompactionIterator::NextFromInput() { // batch consumed by the merge operator should not shadow any keys // coming after the merges has_current_user_key_ = false; + pinned_iters_mgr_.ReleasePinnedIterators(); } } else { valid_ = true; diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 6d791d418..01f677b14 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -13,6 +13,7 @@ #include "db/compaction.h" #include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" #include "rocksdb/compaction_filter.h" #include "util/log_buffer.h" @@ -46,6 +47,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); + ~CompactionIterator(); + void ResetRecordCounts(); // Seek to the beginning of the compaction iterator output. @@ -136,6 +139,9 @@ class CompactionIterator { bool clear_and_output_next_key_ = false; MergeOutputIterator merge_out_iter_; + // PinnedIteratorsManager used to pin input_ Iterator blocks while reading + // merge operands and then releasing them after consuming them. + PinnedIteratorsManager pinned_iters_mgr_; std::string compaction_filter_value_; // "level_ptrs" holds indices that remember which file of an associated // level we were last checking during the last call to compaction-> diff --git a/db/db_iter.cc b/db/db_iter.cc index 1f9353b54..5de3b406b 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -155,7 +155,9 @@ class DBIter: public Iterator { virtual Slice value() const override { assert(valid_); if (current_entry_is_merged_) { - return saved_value_; + // If pinned_value_ is set then the result of merge operator is one of + // the merge operands and we should return it. + return pinned_value_.data() ? pinned_value_ : saved_value_; } else if (direction_ == kReverse) { return pinned_value_; } else { @@ -286,9 +288,9 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { void DBIter::Next() { assert(valid_); + // Release temporarily pinned blocks from last operation + ReleaseTempPinnedData(); if (direction_ == kReverse) { - // We only pin blocks when doing kReverse - ReleaseTempPinnedData(); FindNextUserKey(); direction_ = kForward; if (!iter_->Valid()) { @@ -433,9 +435,12 @@ void DBIter::MergeValuesNewToOld() { return; } + // Temporarily pin the blocks that hold merge operands + TempPinData(); merge_context_.Clear(); // Start the merge process by pushing the first operand - merge_context_.PushOperand(iter_->value()); + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { @@ -459,15 +464,15 @@ void DBIter::MergeValuesNewToOld() { const Slice val = iter_->value(); MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_); + logger_, statistics_, env_, &pinned_value_); // iter_ is positioned after put iter_->Next(); return; } else if (kTypeMerge == ikey.type) { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. - const Slice& val = iter_->value(); - merge_context_.PushOperand(val); + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); } else { assert(false); } @@ -479,15 +484,15 @@ void DBIter::MergeValuesNewToOld() { // client can differentiate this scenario and do things accordingly. MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_); + logger_, statistics_, env_, &pinned_value_); } void DBIter::Prev() { assert(valid_); + ReleaseTempPinnedData(); if (direction_ == kForward) { ReverseToBackward(); } - ReleaseTempPinnedData(); PrevInternal(); if (statistics_ != nullptr) { local_stats_.prev_count_++; @@ -580,6 +585,9 @@ bool DBIter::FindValueForCurrentKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kReverse); + // Temporarily pin blocks that hold (merge operands / the value) + ReleaseTempPinnedData(); + TempPinData(); size_t num_skipped = 0; while (iter_->Valid() && ikey.sequence <= sequence_ && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { @@ -592,8 +600,7 @@ bool DBIter::FindValueForCurrentKey() { switch (last_key_entry_type) { case kTypeValue: merge_context_.Clear(); - ReleaseTempPinnedData(); - TempPinData(); + assert(iter_->IsValuePinned()); pinned_value_ = iter_->value(); last_not_merge_type = kTypeValue; break; @@ -605,7 +612,8 @@ bool DBIter::FindValueForCurrentKey() { break; case kTypeMerge: assert(merge_operator_ != nullptr); - merge_context_.PushOperandBack(iter_->value()); + merge_context_.PushOperandBack( + iter_->value(), iter_->IsValuePinned() /* operand_pinned */); break; default: assert(false); @@ -628,13 +636,14 @@ bool DBIter::FindValueForCurrentKey() { if (last_not_merge_type == kTypeDeletion) { MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_); + &saved_value_, logger_, statistics_, env_, + &pinned_value_); } else { assert(last_not_merge_type == kTypeValue); MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &pinned_value_, merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_); + logger_, statistics_, env_, &pinned_value_); } break; case kTypeValue: @@ -651,6 +660,9 @@ bool DBIter::FindValueForCurrentKey() { // This function is used in FindValueForCurrentKey. // We use Seek() function instead of Prev() to find necessary value bool DBIter::FindValueForCurrentKeyUsingSeek() { + // FindValueForCurrentKey will enable pinning before calling + // FindValueForCurrentKeyUsingSeek() + assert(pinned_iters_mgr_.PinningEnabled()); std::string last_key; AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_, kValueTypeForSeek)); @@ -664,8 +676,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeValue || ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { if (ikey.type == kTypeValue) { - ReleaseTempPinnedData(); - TempPinData(); + assert(iter_->IsValuePinned()); pinned_value_ = iter_->value(); valid_ = true; return true; @@ -681,7 +692,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { while (iter_->Valid() && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) && ikey.type == kTypeMerge) { - merge_context_.PushOperand(iter_->value()); + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -691,7 +703,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr, merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_); + logger_, statistics_, env_, &pinned_value_); // Make iter_ valid and point to saved_key_ if (!iter_->Valid() || !user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { @@ -705,7 +717,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { const Slice& val = iter_->value(); MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val, merge_context_.GetOperands(), &saved_value_, - logger_, statistics_, env_); + logger_, statistics_, env_, &pinned_value_); valid_ = true; return true; } diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 3b3030110..30956e35c 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -150,6 +150,9 @@ class TestIterator : public InternalIterator { return Status::OK(); } + virtual bool IsKeyPinned() const override { return true; } + virtual bool IsValuePinned() const override { return true; } + private: bool initialized_; bool valid_; diff --git a/db/db_test.cc b/db/db_test.cc index 719d84fcb..ea7238181 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4848,12 +4848,11 @@ class DelayedMergeOperator : public MergeOperator { public: explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {} - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { db_test_->env_->addon_time_.fetch_add(1000); - *new_value = ""; + merge_out->new_value = ""; return true; } diff --git a/db/db_test2.cc b/db/db_test2.cc index 306099ef8..4fe886fbe 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1487,6 +1487,267 @@ TEST_F(DBTest2, SyncPointMarker) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #endif + +class MergeOperatorPinningTest : public DBTest2, + public testing::WithParamInterface { + public: + MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); } + + bool disable_block_cache_; +}; + +INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest, + ::testing::Bool()); + +#ifndef ROCKSDB_LITE +TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.block_size = 1; // every block will contain one entry + table_options.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + const int kKeysPerFile = 10; + const int kOperandsPerKeyPerFile = 7; + const int kOperandSize = 100; + // Filse to write in L0 before compacting to lower level + const int kFilesPerLevel = 3; + + Random rnd(301); + std::map true_data; + int batch_num = 1; + int lvl_to_fill = 4; + int key_id = 0; + while (true) { + for (int j = 0; j < kKeysPerFile; j++) { + std::string key = Key(key_id % 35); + key_id++; + for (int k = 0; k < kOperandsPerKeyPerFile; k++) { + std::string val = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), key, val)); + if (true_data[key].size() == 0) { + true_data[key] = val; + } else { + true_data[key] += "," + val; + } + } + } + + if (lvl_to_fill == -1) { + // Keep last batch in memtable and stop + break; + } + + ASSERT_OK(Flush()); + if (batch_num % kFilesPerLevel == 0) { + if (lvl_to_fill != 0) { + MoveFilesToLevel(lvl_to_fill); + } + lvl_to_fill--; + } + batch_num++; + } + + // 3 L0 files + // 1 L1 file + // 3 L2 files + // 1 L3 file + // 3 L4 Files + ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3"); + + // Verify Get() + for (auto kv : true_data) { + ASSERT_EQ(Get(kv.first), kv.second); + } + + Iterator* iter = db_->NewIterator(ReadOptions()); + + // Verify Iterator::Next() + auto data_iter = true_data.begin(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + } + ASSERT_EQ(data_iter, true_data.end()); + + // Verify Iterator::Prev() + auto data_rev = true_data.rbegin(); + for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { + ASSERT_EQ(iter->key().ToString(), data_rev->first); + ASSERT_EQ(iter->value().ToString(), data_rev->second); + } + ASSERT_EQ(data_rev, true_data.rend()); + + // Verify Iterator::Seek() + for (auto kv : true_data) { + iter->Seek(kv.first); + ASSERT_EQ(kv.first, iter->key().ToString()); + ASSERT_EQ(kv.second, iter->value().ToString()); + } + + delete iter; +} + +TEST_P(MergeOperatorPinningTest, Randomized) { + do { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateMaxOperator(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = disable_block_cache_; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + Random rnd(301); + std::map true_data; + + const int kTotalMerges = 10000; + // Every key gets ~10 operands + const int kKeyRange = kTotalMerges / 10; + const int kOperandSize = 20; + const int kNumPutBefore = kKeyRange / 10; // 10% value + const int kNumPutAfter = kKeyRange / 10; // 10% overwrite + const int kNumDelete = kKeyRange / 10; // 10% delete + + // kNumPutBefore keys will have base values + for (int i = 0; i < kNumPutBefore; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + true_data[key] = value; + } + + // Do kTotalMerges merges + for (int i = 0; i < kTotalMerges; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), key, value)); + + if (true_data[key] < value) { + true_data[key] = value; + } + } + + // Overwrite random kNumPutAfter keys + for (int i = 0; i < kNumPutAfter; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + std::string value = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + true_data[key] = value; + } + + // Delete random kNumDelete keys + for (int i = 0; i < kNumDelete; i++) { + std::string key = Key(rnd.Next() % kKeyRange); + ASSERT_OK(db_->Delete(WriteOptions(), key)); + + true_data.erase(key); + } + + VerifyDBFromMap(true_data); + + // Skip HashCuckoo since it does not support merge operators + } while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo)); +} + +class MergeOperatorHook : public MergeOperator { + public: + explicit MergeOperatorHook(std::shared_ptr _merge_op) + : merge_op_(_merge_op) {} + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + before_merge_(); + bool res = merge_op_->FullMergeV2(merge_in, merge_out); + after_merge_(); + return res; + } + + virtual const char* Name() const override { return merge_op_->Name(); } + + std::shared_ptr merge_op_; + std::function before_merge_ = []() {}; + std::function after_merge_ = []() {}; +}; + +TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) { + Options options = CurrentOptions(); + + auto merge_hook = + std::make_shared(MergeOperators::CreateMaxOperator()); + options.merge_operator = merge_hook; + options.disable_auto_compactions = true; + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.max_open_files = 20; + BlockBasedTableOptions bbto; + bbto.no_block_cache = disable_block_cache_; + if (bbto.no_block_cache == false) { + bbto.block_cache = NewLRUCache(64 * 1024 * 1024); + } else { + bbto.block_cache = nullptr; + } + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + const int kNumOperands = 30; + const int kNumKeys = 1000; + const int kOperandSize = 100; + Random rnd(301); + + // 1000 keys every key have 30 operands, every operand is in a different file + std::map true_data; + for (int i = 0; i < kNumOperands; i++) { + for (int j = 0; j < kNumKeys; j++) { + std::string k = Key(j); + std::string v = RandomString(&rnd, kOperandSize); + ASSERT_OK(db_->Merge(WriteOptions(), k, v)); + + true_data[k] = std::max(true_data[k], v); + } + ASSERT_OK(Flush()); + } + + std::vector file_numbers = ListTableFiles(env_, dbname_); + ASSERT_EQ(file_numbers.size(), kNumOperands); + int merge_cnt = 0; + + // Code executed before merge operation + merge_hook->before_merge_ = [&]() { + // Evict all tables from cache before every merge operation + for (uint64_t num : file_numbers) { + TableCache::Evict(dbfull()->TEST_table_cache(), num); + } + // Decrease cache capacity to force all unrefed blocks to be evicted + if (bbto.block_cache) { + bbto.block_cache->SetCapacity(1); + } + merge_cnt++; + }; + + // Code executed after merge operation + merge_hook->after_merge_ = [&]() { + // Increase capacity again after doing the merge + if (bbto.block_cache) { + bbto.block_cache->SetCapacity(64 * 1024 * 1024); + } + }; + + VerifyDBFromMap(true_data); + ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */); + + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + VerifyDBFromMap(true_data); +} +#endif // ROCKSDB_LITE + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 1b5d406e7..0ce19eabe 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1074,6 +1074,40 @@ std::vector DBTestBase::ListTableFiles(Env* env, return file_numbers; } +void DBTestBase::VerifyDBFromMap(std::map true_data) { + for (auto& kv : true_data) { + ASSERT_EQ(Get(kv.first), kv.second); + } + + ReadOptions ro; + ro.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro); + // Verify Iterator::Next() + auto data_iter = true_data.begin(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + } + ASSERT_EQ(data_iter, true_data.end()); + + // Verify Iterator::Prev() + auto data_rev = true_data.rbegin(); + for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { + ASSERT_EQ(iter->key().ToString(), data_rev->first); + ASSERT_EQ(iter->value().ToString(), data_rev->second); + } + ASSERT_EQ(data_rev, true_data.rend()); + + // Verify Iterator::Seek() + for (auto kv : true_data) { + iter->Seek(kv.first); + ASSERT_EQ(kv.first, iter->key().ToString()); + ASSERT_EQ(kv.second, iter->value().ToString()); + } + + delete iter; +} + #ifndef ROCKSDB_LITE Status DBTestBase::GenerateAndAddExternalFile(const Options options, diff --git a/db/db_test_util.h b/db/db_test_util.h index 81b46ea5f..297ff1fbe 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -797,6 +797,8 @@ class DBTestBase : public testing::Test { std::vector ListTableFiles(Env* env, const std::string& path); + void VerifyDBFromMap(std::map true_data); + #ifndef ROCKSDB_LITE Status GenerateAndAddExternalFile(const Options options, std::vector keys, size_t file_id); diff --git a/db/memtable.cc b/db/memtable.cc index d4e2fcbff..f263601c7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -218,12 +218,13 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator : public InternalIterator { public: - MemTableIterator( - const MemTable& mem, const ReadOptions& read_options, Arena* arena) + MemTableIterator(const MemTable& mem, const ReadOptions& read_options, + Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), valid_(false), - arena_mode_(arena != nullptr) { + arena_mode_(arena != nullptr), + value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) { if (prefix_extractor_ != nullptr && !read_options.total_order_seek) { bloom_ = mem.prefix_bloom_.get(); iter_ = mem.table_->GetDynamicPrefixIterator(arena); @@ -306,12 +307,18 @@ class MemTableIterator : public InternalIterator { return true; } + virtual bool IsValuePinned() const override { + // memtable value is always pinned, except if we allow inplace update. + return value_pinned_; + } + private: DynamicBloom* bloom_; const SliceTransform* const prefix_extractor_; MemTableRep::Iterator* iter_; bool valid_; bool arena_mode_; + bool value_pinned_; // No copying allowed MemTableIterator(const MemTableIterator&); @@ -508,7 +515,6 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeDeletion: case kTypeSingleDeletion: { if (*(s->merge_in_progress)) { - *(s->status) = Status::OK(); *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, merge_context->GetOperands(), s->value, s->logger, s->statistics, @@ -532,7 +538,8 @@ static bool SaveValue(void* arg, const char* entry) { } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->merge_in_progress) = true; - merge_context->PushOperand(v); + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); return true; } default: diff --git a/db/merge_context.h b/db/merge_context.h index 229f11d62..de1a86810 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -4,70 +4,113 @@ // of patent rights can be found in the PATENTS file in the same directory. // #pragma once +#include +#include #include "db/dbformat.h" #include "rocksdb/slice.h" -#include -#include namespace rocksdb { -const std::deque empty_operand_list; +const std::vector empty_operand_list; // The merge context for merging a user key. // When doing a Get(), DB will create such a class and pass it when // issuing Get() operation to memtables and version_set. The operands // will be fetched from the context when issuing partial of full merge. class MergeContext { -public: + public: // Clear all the operands void Clear() { - if (operand_list) { - operand_list->clear(); + if (operand_list_) { + operand_list_->clear(); + copied_operands_->clear(); } } - // Replace all operands with merge_result, which are expected to be the - // merge result of them. - void PushPartialMergeResult(std::string& merge_result) { - assert (operand_list); - operand_list->clear(); - operand_list->push_front(std::move(merge_result)); - } + // Push a merge operand - void PushOperand(const Slice& operand_slice) { + void PushOperand(const Slice& operand_slice, bool operand_pinned = false) { Initialize(); - operand_list->push_front(operand_slice.ToString()); + SetDirectionBackward(); + + if (operand_pinned) { + operand_list_->push_back(operand_slice); + } else { + // We need to have our own copy of the operand since it's not pinned + copied_operands_->emplace_back(operand_slice.data(), + operand_slice.size()); + operand_list_->push_back(copied_operands_->back()); + } } + // Push back a merge operand - void PushOperandBack(const Slice& operand_slice) { + void PushOperandBack(const Slice& operand_slice, + bool operand_pinned = false) { Initialize(); - operand_list->push_back(operand_slice.ToString()); + SetDirectionForward(); + + if (operand_pinned) { + operand_list_->push_back(operand_slice); + } else { + // We need to have our own copy of the operand since it's not pinned + copied_operands_->emplace_back(operand_slice.data(), + operand_slice.size()); + operand_list_->push_back(copied_operands_->back()); + } } + // return total number of operands in the list size_t GetNumOperands() const { - if (!operand_list) { + if (!operand_list_) { return 0; } - return operand_list->size(); + return operand_list_->size(); } + // Get the operand at the index. - Slice GetOperand(int index) const { - assert (operand_list); - return (*operand_list)[index]; + Slice GetOperand(int index) { + assert(operand_list_); + + SetDirectionForward(); + return (*operand_list_)[index]; } + // Return all the operands. - const std::deque& GetOperands() const { - if (!operand_list) { + const std::vector& GetOperands() { + if (!operand_list_) { return empty_operand_list; } - return *operand_list; + + SetDirectionForward(); + return *operand_list_; } -private: + + private: void Initialize() { - if (!operand_list) { - operand_list.reset(new std::deque()); + if (!operand_list_) { + operand_list_.reset(new std::vector()); + copied_operands_.reset(new std::vector()); } } - std::unique_ptr> operand_list; + + void SetDirectionForward() { + if (operands_reversed_ == true) { + std::reverse(operand_list_->begin(), operand_list_->end()); + operands_reversed_ = false; + } + } + + void SetDirectionBackward() { + if (operands_reversed_ == false) { + std::reverse(operand_list_->begin(), operand_list_->end()); + operands_reversed_ = true; + } + } + + // List of operands + std::unique_ptr> operand_list_; + // Copy of operands that are not pinned. + std::unique_ptr> copied_operands_; + bool operands_reversed_ = true; }; } // namespace rocksdb diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 2fb104dd6..a3d3823fc 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -20,9 +20,10 @@ namespace rocksdb { Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, const Slice& key, const Slice* value, - const std::deque& operands, + const std::vector& operands, std::string* result, Logger* logger, - Statistics* statistics, Env* env) { + Statistics* statistics, Env* env, + Slice* result_operand) { assert(merge_operator != nullptr); if (operands.size() == 0) { @@ -32,13 +33,28 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, } bool success; + Slice tmp_result_operand(nullptr, 0); + const MergeOperator::MergeOperationInput merge_in(key, value, operands, + logger); + MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand); { // Setup to time the merge StopWatchNano timer(env, statistics != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); // Do the merge - success = merge_operator->FullMerge(key, value, operands, result, logger); + success = merge_operator->FullMergeV2(merge_in, &merge_out); + + if (tmp_result_operand.data()) { + // FullMergeV2 result is an existing operand + if (result_operand != nullptr) { + *result_operand = tmp_result_operand; + } else { + result->assign(tmp_result_operand.data(), tmp_result_operand.size()); + } + } else if (result_operand) { + *result_operand = Slice(nullptr, 0); + } RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, statistics ? timer.ElapsedNanos() : 0); @@ -65,7 +81,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // Also maintain the list of merge operands seen. assert(HasOperator()); keys_.clear(); - operands_.clear(); + merge_context_.Clear(); assert(user_merge_operator_); bool first_key = true; @@ -87,7 +103,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, bool hit_the_next_user_key = false; for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { ParsedInternalKey ikey; - assert(keys_.size() == operands_.size()); + assert(keys_.size() == merge_context_.GetNumOperands()); if (!ParseInternalKey(iter->key(), &ikey)) { // stop at corrupted key @@ -142,7 +158,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr, - operands_, &merge_result, logger_, stats_, env_); + merge_context_.GetOperands(), &merge_result, logger_, + stats_, env_); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) @@ -152,9 +169,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); keys_.clear(); - operands_.clear(); + merge_context_.Clear(); keys_.emplace_front(std::move(original_key)); - operands_.emplace_front(std::move(merge_result)); + merge_context_.PushOperand(merge_result); } // move iter to the next entry @@ -188,12 +205,13 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // original_key before ParseInternalKey(keys_.back(), &orig_ikey); } - operands_.push_front(value_slice.ToString()); + merge_context_.PushOperand(value_slice, + iter->IsValuePinned() /* operand_pinned */); } } } - if (operands_.size() == 0) { + if (merge_context_.GetNumOperands() == 0) { // we filtered out all the merge operands return Status::OK(); } @@ -218,11 +236,12 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // do a final merge with nullptr as the existing value and say // bye to the merge type (it's now converted to a Put) assert(kTypeMerge == orig_ikey.type); - assert(operands_.size() >= 1); - assert(operands_.size() == keys_.size()); + assert(merge_context_.GetNumOperands() >= 1); + assert(merge_context_.GetNumOperands() == keys_.size()); std::string merge_result; s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr, - operands_, &merge_result, logger_, stats_, env_); + merge_context_.GetOperands(), &merge_result, logger_, + stats_, env_); if (s.ok()) { // The original key encountered // We are certain that keys_ is not empty here (see assertions couple of @@ -231,9 +250,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); keys_.clear(); - operands_.clear(); + merge_context_.Clear(); keys_.emplace_front(std::move(original_key)); - operands_.emplace_front(std::move(merge_result)); + merge_context_.PushOperand(merge_result); } } else { // We haven't seen the beginning of the key nor a Put/Delete. @@ -244,8 +263,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // partial merge returns Status::OK(). Should we change the status code // after a successful partial merge? s = Status::MergeInProgress(); - if (operands_.size() >= 2 && - operands_.size() >= min_partial_merge_operands_) { + if (merge_context_.GetNumOperands() >= 2 && + merge_context_.GetNumOperands() >= min_partial_merge_operands_) { bool merge_success = false; std::string merge_result; { @@ -253,7 +272,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, PERF_TIMER_GUARD(merge_operator_time_nanos); merge_success = user_merge_operator_->PartialMergeMulti( orig_ikey.user_key, - std::deque(operands_.begin(), operands_.end()), + std::deque(merge_context_.GetOperands().begin(), + merge_context_.GetOperands().end()), &merge_result, logger_); RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, stats_ ? timer.ElapsedNanosSafe() : 0); @@ -261,8 +281,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, if (merge_success) { // Merging of operands (associative merge) was successful. // Replace operands with the merge result - operands_.clear(); - operands_.emplace_front(std::move(merge_result)); + merge_context_.Clear(); + merge_context_.PushOperand(merge_result); keys_.erase(keys_.begin(), keys_.end() - 1); } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 262a5969d..7cd992f34 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -8,8 +8,10 @@ #include #include +#include #include "db/dbformat.h" +#include "db/merge_context.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" @@ -42,14 +44,13 @@ class MergeHelper { latest_snapshot_(latest_snapshot), level_(level), keys_(), - operands_(), filter_timer_(env_), total_filter_time_(0U), stats_(stats) { assert(user_comparator_ != nullptr); } - // Wrapper around MergeOperator::FullMerge() that records perf statistics. + // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. // Result of merge will be written to result if status returned is OK. // If operands is empty, the value will simply be copied to result. // Returns one of the following statuses: @@ -57,9 +58,10 @@ class MergeHelper { // - Corruption: Merge operator reported unsuccessful merge. static Status TimedFullMerge(const MergeOperator* merge_operator, const Slice& key, const Slice* value, - const std::deque& operands, + const std::vector& operands, std::string* result, Logger* logger, - Statistics* statistics, Env* env); + Statistics* statistics, Env* env, + Slice* result_operand = nullptr); // Merge entries until we hit // - a corrupted key @@ -116,7 +118,9 @@ class MergeHelper { // So keys().back() was the first key seen by iterator. // TODO: Re-style this comment to be like the first one const std::deque& keys() const { return keys_; } - const std::deque& values() const { return operands_; } + const std::vector& values() const { + return merge_context_.GetOperands(); + } uint64_t TotalFilterTime() const { return total_filter_time_; } bool HasOperator() const { return user_merge_operator_ != nullptr; } @@ -133,8 +137,11 @@ class MergeHelper { // the scratch area that holds the result of MergeUntil // valid up to the next MergeUntil call - std::deque keys_; // Keeps track of the sequence of keys seen - std::deque operands_; // Parallel with keys_; stores the values + + // Keeps track of the sequence of keys seen + std::deque keys_; + // Parallel with keys_; stores the operands + mutable MergeContext merge_context_; StopWatchNano filter_timer_; uint64_t total_filter_time_; @@ -159,7 +166,7 @@ class MergeOutputIterator { private: const MergeHelper* merge_helper_; std::deque::const_reverse_iterator it_keys_; - std::deque::const_reverse_iterator it_values_; + std::vector::const_reverse_iterator it_values_; }; } // namespace rocksdb diff --git a/db/merge_operator.cc b/db/merge_operator.cc index 5c5d04008..d4149f67e 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -11,6 +11,18 @@ namespace rocksdb { +bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // If FullMergeV2 is not implemented, we convert the operand_list to + // std::deque and pass it to FullMerge + std::deque operand_list_str; + for (auto& op : merge_in.operand_list) { + operand_list_str.emplace_back(op.data(), op.size()); + } + return FullMerge(merge_in.key, merge_in.existing_value, operand_list_str, + &merge_out->new_value, merge_in.logger); +} + // The default implementation of PartialMergeMulti, which invokes // PartialMerge multiple times internally and merges two operands at // a time. @@ -39,23 +51,20 @@ bool MergeOperator::PartialMergeMulti(const Slice& key, // Given a "real" merge from the library, call the user's // associative merge function one-by-one on each of the operands. // NOTE: It is assumed that the client's merge-operator will handle any errors. -bool AssociativeMergeOperator::FullMerge( - const Slice& key, - const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const { - +bool AssociativeMergeOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { // Simply loop through the operands Slice temp_existing; - for (const auto& operand : operand_list) { - Slice value(operand); + const Slice* existing_value = merge_in.existing_value; + for (const auto& operand : merge_in.operand_list) { std::string temp_value; - if (!Merge(key, existing_value, value, &temp_value, logger)) { + if (!Merge(merge_in.key, existing_value, operand, &temp_value, + merge_in.logger)) { return false; } - swap(temp_value, *new_value); - temp_existing = Slice(*new_value); + swap(temp_value, merge_out->new_value); + temp_existing = Slice(merge_out->new_value); existing_value = &temp_existing; } diff --git a/db/pinned_iterators_manager.h b/db/pinned_iterators_manager.h index af58c79f5..38cb89ce0 100644 --- a/db/pinned_iterators_manager.h +++ b/db/pinned_iterators_manager.h @@ -18,7 +18,7 @@ namespace rocksdb { class PinnedIteratorsManager { public: PinnedIteratorsManager() : pinning_enabled(false), pinned_iters_(nullptr) {} - ~PinnedIteratorsManager() { assert(!pinning_enabled); } + ~PinnedIteratorsManager() { ReleasePinnedIterators(); } // Enable Iterators pinning void StartPinning() { @@ -43,7 +43,7 @@ class PinnedIteratorsManager { } // Release pinned Iterators - void ReleasePinnedIterators() { + inline void ReleasePinnedIterators() { if (pinning_enabled) { pinning_enabled = false; diff --git a/db/version_set.cc b/db/version_set.cc index 8a858ddaf..a048d6d9b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -31,6 +31,7 @@ #include "db/memtable.h" #include "db/merge_context.h" #include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" #include "db/table_cache.h" #include "db/version_builder.h" #include "rocksdb/env.h" @@ -917,10 +918,17 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, *key_exists = true; } + PinnedIteratorsManager pinned_iters_mgr; GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, this->env_, seq); + value, value_found, merge_context, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr); + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } FilePicker fp( storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, diff --git a/db/write_batch.cc b/db/write_batch.cc index 4d77832aa..bd81f6522 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -922,12 +922,10 @@ class MemTableInserter : public WriteBatch::Handler { auto merge_operator = moptions->merge_operator; assert(merge_operator); - std::deque operands; - operands.push_front(value.ToString()); std::string new_value; Status merge_status = MergeHelper::TimedFullMerge( - merge_operator, key, &get_value_slice, operands, &new_value, + merge_operator, key, &get_value_slice, {value}, &new_value, moptions->info_log, moptions->statistics, Env::Default()); if (!merge_status.ok()) { diff --git a/examples/compaction_filter_example.cc b/examples/compaction_filter_example.cc index 6b0feb149..77dbd9af7 100644 --- a/examples/compaction_filter_example.cc +++ b/examples/compaction_filter_example.cc @@ -10,19 +10,18 @@ class MyMerge : public rocksdb::MergeOperator { public: - bool FullMerge(const rocksdb::Slice& key, - const rocksdb::Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - rocksdb::Logger* logger) const override { - new_value->clear(); - if (existing_value != nullptr) { - new_value->assign(existing_value->data(), existing_value->size()); + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + merge_out->new_value.clear(); + if (merge_in.existing_value != nullptr) { + merge_out->new_value.assign(merge_in.existing_value->data(), + merge_in.existing_value->size()); } - for (const std::string& m : operand_list) { - fprintf(stderr, "Merge(%s)\n", m.c_str()); - assert(m != "bad"); // the compaction filter filters out bad values - new_value->assign(m); + for (const rocksdb::Slice& m : merge_in.operand_list) { + fprintf(stderr, "Merge(%s)\n", m.ToString().c_str()); + // the compaction filter filters out bad values + assert(m.ToString() != "bad"); + merge_out->new_value.assign(m.data(), m.size()); } return true; } diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 09b9d7dd6..b3ca013b7 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "rocksdb/slice.h" @@ -32,7 +33,7 @@ class Logger; // into rocksdb); numeric addition and string concatenation are examples; // // b) MergeOperator - the generic class for all the more abstract / complex -// operations; one method (FullMerge) to merge a Put/Delete value with a +// operations; one method (FullMergeV2) to merge a Put/Delete value with a // merge operand; and another method (PartialMerge) that merges multiple // operands together. this is especially useful if your key values have // complex structures but you would still like to support client-specific @@ -69,7 +70,49 @@ class MergeOperator { const Slice* existing_value, const std::deque& operand_list, std::string* new_value, - Logger* logger) const = 0; + Logger* logger) const { + // deprecated, please use FullMergeV2() + assert(false); + return false; + } + + struct MergeOperationInput { + explicit MergeOperationInput(const Slice& _key, + const Slice* _existing_value, + const std::vector& _operand_list, + Logger* _logger) + : key(_key), + existing_value(_existing_value), + operand_list(_operand_list), + logger(_logger) {} + + // The key associated with the merge operation. + const Slice& key; + // The existing value of the current key, nullptr means that the + // value dont exist. + const Slice* existing_value; + // A list of operands to apply. + const std::vector& operand_list; + // Logger could be used by client to log any errors that happen during + // the merge operation. + Logger* logger; + }; + + struct MergeOperationOutput { + explicit MergeOperationOutput(std::string& _new_value, + Slice& _existing_operand) + : new_value(_new_value), existing_operand(_existing_operand) {} + + // Client is responsible for filling the merge result here. + std::string& new_value; + // If the merge result is one of the existing operands (or existing_value), + // client can set this field to the operand (or existing_value) instead of + // using new_value. + Slice& existing_operand; + }; + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const; // This function performs merge(left_op, right_op) // when both the operands are themselves merge operation types @@ -99,7 +142,7 @@ class MergeOperator { // TODO: Presently there is no way to differentiate between error/corruption // and simply "return false". For now, the client should simply return // false in any case it cannot perform partial-merge, regardless of reason. - // If there is corruption in the data, handle it in the FullMerge() function, + // If there is corruption in the data, handle it in the FullMergeV2() function // and return false there. The default implementation of PartialMerge will // always return false. virtual bool PartialMerge(const Slice& key, const Slice& left_operand, @@ -171,11 +214,8 @@ class AssociativeMergeOperator : public MergeOperator { private: // Default implementations of the MergeOperator functions - virtual bool FullMerge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override; + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; virtual bool PartialMerge(const Slice& key, const Slice& left_operand, diff --git a/table/block.h b/table/block.h index 033b27ba8..81ca2aa41 100644 --- a/table/block.h +++ b/table/block.h @@ -162,6 +162,8 @@ class BlockIter : public InternalIterator { virtual bool IsKeyPinned() const override { return key_pinned_; } + virtual bool IsValuePinned() const override { return true; } + private: const Comparator* comparator_; const char* data_; // underlying block contents diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 1f95755ad..4de61817b 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -12,6 +12,7 @@ #include #include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" #include "rocksdb/cache.h" #include "rocksdb/comparator.h" @@ -1381,6 +1382,10 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, BlockIter iiter; NewIndexIterator(read_options, &iiter); + PinnedIteratorsManager* pinned_iters_mgr = get_context->pinned_iters_mgr(); + bool pin_blocks = pinned_iters_mgr && pinned_iters_mgr->PinningEnabled(); + BlockIter* biter = nullptr; + bool done = false; for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) { Slice handle_value = iiter.value(); @@ -1398,37 +1403,60 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); break; } else { - BlockIter biter; - NewDataBlockIterator(rep_, read_options, iiter.value(), &biter); + BlockIter stack_biter; + if (pin_blocks) { + // We need to create the BlockIter on heap because we may need to + // pin it if we encounterd merge operands + biter = static_cast( + NewDataBlockIterator(rep_, read_options, iiter.value())); + } else { + biter = &stack_biter; + NewDataBlockIterator(rep_, read_options, iiter.value(), biter); + } if (read_options.read_tier == kBlockCacheTier && - biter.status().IsIncomplete()) { + biter->status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set get_context->MarkKeyMayExist(); break; } - if (!biter.status().ok()) { - s = biter.status(); + if (!biter->status().ok()) { + s = biter->status(); break; } // Call the *saver function on each entry/block until it returns false - for (biter.Seek(key); biter.Valid(); biter.Next()) { + for (biter->Seek(key); biter->Valid(); biter->Next()) { ParsedInternalKey parsed_key; - if (!ParseInternalKey(biter.key(), &parsed_key)) { + if (!ParseInternalKey(biter->key(), &parsed_key)) { s = Status::Corruption(Slice()); } - if (!get_context->SaveValue(parsed_key, biter.value())) { + if (!get_context->SaveValue(parsed_key, biter->value(), pin_blocks)) { done = true; break; } } - s = biter.status(); + s = biter->status(); + + if (pin_blocks) { + if (get_context->State() == GetContext::kMerge) { + // Pin blocks as long as we are merging + pinned_iters_mgr->PinIteratorIfNeeded(biter); + } else { + delete biter; + } + biter = nullptr; + } else { + // biter is on stack, Nothing to clean + } } } + if (pin_blocks && biter != nullptr) { + delete biter; + } if (s.ok()) { s = iiter.status(); } diff --git a/table/get_context.cc b/table/get_context.cc index 491007099..4a7a9693b 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -5,6 +5,7 @@ #include "table/get_context.h" #include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" @@ -36,7 +37,8 @@ GetContext::GetContext(const Comparator* ucmp, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, MergeContext* merge_context, Env* env, - SequenceNumber* seq) + SequenceNumber* seq, + PinnedIteratorsManager* _pinned_iters_mgr) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -48,7 +50,8 @@ GetContext::GetContext(const Comparator* ucmp, merge_context_(merge_context), env_(env), seq_(seq), - replay_log_(nullptr) { + replay_log_(nullptr), + pinned_iters_mgr_(_pinned_iters_mgr) { if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -77,7 +80,7 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { } bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, - const Slice& value) { + const Slice& value, bool value_pinned) { assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); if (ucmp_->Equal(parsed_key.user_key, user_key_)) { @@ -139,7 +142,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, case kTypeMerge: assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; - merge_context_->PushOperand(value); + merge_context_->PushOperand(value, value_pinned); return true; default: @@ -167,7 +170,7 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, // Since SequenceNumber is not stored and unknown, we will use // kMaxSequenceNumber. get_context->SaveValue( - ParsedInternalKey(user_key, kMaxSequenceNumber, type), value); + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, true); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index 283df90c8..4cee09a8d 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -11,6 +11,7 @@ namespace rocksdb { class MergeContext; +class PinnedIteratorsManager; class GetContext { public: @@ -26,7 +27,8 @@ class GetContext { Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, MergeContext* merge_context, Env* env, - SequenceNumber* seq = nullptr); + SequenceNumber* seq = nullptr, + PinnedIteratorsManager* _pinned_iters_mgr = nullptr); void MarkKeyMayExist(); @@ -35,7 +37,8 @@ class GetContext { // // Returns True if more keys need to be read (due to merges) or // False if the complete value has been found. - bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); + bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, + bool value_pinned = false); // Simplified version of the previous function. Should only be used when we // know that the operation is a Put. @@ -43,6 +46,8 @@ class GetContext { GetState State() const { return state_; } + PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; } + // If a non-null string is passed, all the SaveValue calls will be // logged into the string. The operations can then be replayed on // another GetContext with replayGetContextLog. @@ -68,6 +73,8 @@ class GetContext { // write to the key or kMaxSequenceNumber if unknown SequenceNumber* seq_; std::string* replay_log_; + // Used to temporarily pin blocks when state_ == GetContext::kMerge + PinnedIteratorsManager* pinned_iters_mgr_; }; void replayGetContextLog(const Slice& replay_log, const Slice& user_key, diff --git a/table/internal_iterator.h b/table/internal_iterator.h index d7bf73cac..f1f1e0bff 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -80,6 +80,11 @@ class InternalIterator : public Cleanable { // set to false. virtual bool IsKeyPinned() const { return false; } + // If true, this means that the Slice returned by value() is valid as long as + // PinnedIteratorsManager::ReleasePinnedIterators is not called and the + // Iterator is not deleted. + virtual bool IsValuePinned() const { return false; } + virtual Status GetProperty(std::string prop_name, std::string* prop) { return Status::NotSupported(""); } diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index bf5cfa6e8..e68bbf3f0 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -16,7 +16,7 @@ namespace rocksdb { // A internal wrapper class with an interface similar to Iterator that caches -// the valid(), key() and IsKeyPinned() results for an underlying iterator. +// the valid() and key() results for an underlying iterator. // This can help avoid virtual function calls and also gives better // cache locality. class IteratorWrapper { @@ -55,7 +55,6 @@ class IteratorWrapper { // Iterator interface methods bool Valid() const { return valid_; } Slice key() const { assert(Valid()); return key_; } - bool IsKeyPinned() const { assert(Valid()); return is_key_pinned_; } Slice value() const { assert(Valid()); return iter_->value(); } // Methods below require iter() != nullptr Status status() const { assert(iter_); return iter_->status(); } @@ -64,10 +63,18 @@ class IteratorWrapper { void Seek(const Slice& k) { assert(iter_); iter_->Seek(k); Update(); } void SeekToFirst() { assert(iter_); iter_->SeekToFirst(); Update(); } void SeekToLast() { assert(iter_); iter_->SeekToLast(); Update(); } + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) { assert(iter_); iter_->SetPinnedItersMgr(pinned_iters_mgr); - Update(); + } + bool IsKeyPinned() const { + assert(Valid()); + return iter_->IsKeyPinned(); + } + bool IsValuePinned() const { + assert(Valid()); + return iter_->IsValuePinned(); } private: @@ -75,14 +82,12 @@ class IteratorWrapper { valid_ = iter_->Valid(); if (valid_) { key_ = iter_->key(); - is_key_pinned_ = iter_->IsKeyPinned(); } } InternalIterator* iter_; bool valid_; Slice key_; - bool is_key_pinned_; }; class Arena; diff --git a/table/merger.cc b/table/merger.cc index f28cbcd7b..637959d9a 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -257,6 +257,12 @@ class MergingIterator : public InternalIterator { current_->IsKeyPinned(); } + virtual bool IsValuePinned() const override { + assert(Valid()); + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsValuePinned(); + } + private: // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 0ed01321c..81dc8792b 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -78,6 +78,10 @@ class TwoLevelIterator : public InternalIterator { return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && second_level_iter_.iter() && second_level_iter_.IsKeyPinned(); } + virtual bool IsValuePinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + second_level_iter_.iter() && second_level_iter_.IsValuePinned(); + } private: void SaveError(const Status& s) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index a2b0caae8..87a52f397 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -45,6 +45,8 @@ default_params = { "write_buffer_size": 4 * 1024 * 1024, "writepercent": 35, "subcompactions": lambda: random.randint(1, 4), + "use_merge": lambda: random.randint(0, 1), + "use_full_merge_v1": lambda: random.randint(0, 1), } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 8d358ff72..57be48ff2 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -453,6 +453,9 @@ static const bool FLAGS_prefix_size_dummy __attribute__((unused)) = DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " "that behaves like a Put"); +DEFINE_bool(use_full_merge_v1, false, + "On true, use a merge operator that implement the deprecated " + "version of FullMerge"); namespace rocksdb { @@ -2106,7 +2109,11 @@ class StressTest { } if (FLAGS_use_merge) { - options_.merge_operator = MergeOperators::CreatePutOperator(); + if (FLAGS_use_full_merge_v1) { + options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); + } else { + options_.merge_operator = MergeOperators::CreatePutOperator(); + } } // set universal style compaction configurations, if applicable diff --git a/util/testutil.h b/util/testutil.h index d2c4b6449..69920b18b 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -614,10 +614,8 @@ class ChanglingMergeOperator : public MergeOperator { void SetName(const std::string& name) { name_ = name; } - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { return false; } virtual bool PartialMergeMulti(const Slice& key, diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 1ad5c290f..896843ab0 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -16,6 +16,7 @@ namespace rocksdb { class MergeOperators { public: static std::shared_ptr CreatePutOperator(); + static std::shared_ptr CreateDeprecatedPutOperator(); static std::shared_ptr CreateUInt64AddOperator(); static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendTESTOperator(); @@ -27,6 +28,8 @@ class MergeOperators { const std::string& name) { if (name == "put") { return CreatePutOperator(); + } else if (name == "put_v1") { + return CreateDeprecatedPutOperator(); } else if ( name == "uint64add") { return CreateUInt64AddOperator(); } else if (name == "stringappend") { diff --git a/utilities/merge_operators/max.cc b/utilities/merge_operators/max.cc index ee05a7cc2..8c46e1fab 100644 --- a/utilities/merge_operators/max.cc +++ b/utilities/merge_operators/max.cc @@ -19,22 +19,20 @@ namespace { // anonymous namespace // Slice::compare class MaxOperator : public MergeOperator { public: - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - Slice max; - if (existing_value) { - max = Slice(existing_value->data(), existing_value->size()); + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + Slice& max = merge_out->existing_operand; + if (merge_in.existing_value) { + max = Slice(merge_in.existing_value->data(), + merge_in.existing_value->size()); } - for (const auto& op : operand_list) { + for (const auto& op : merge_in.operand_list) { if (max.compare(op) < 0) { - max = Slice(op.data(), op.size()); + max = op; } } - new_value->assign(max.data(), max.size()); return true; } diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index 04c1270b2..841717128 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -57,12 +57,33 @@ class PutOperator : public MergeOperator { } }; +class PutOperatorV2 : public PutOperator { + virtual bool FullMerge(const Slice& key, const Slice* existing_value, + const std::deque& operand_sequence, + std::string* new_value, + Logger* logger) const override { + assert(false); + return false; + } + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + // Put basically only looks at the current/latest value + assert(!merge_in.operand_list.empty()); + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } +}; + } // end of anonymous namespace namespace rocksdb { -std::shared_ptr MergeOperators::CreatePutOperator() { +std::shared_ptr MergeOperators::CreateDeprecatedPutOperator() { return std::make_shared(); } +std::shared_ptr MergeOperators::CreatePutOperator() { + return std::make_shared(); +} } diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index cceb8d547..2d7b7423c 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -21,20 +21,22 @@ StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) } // Implementation for the merge operation (concatenates two strings) -bool StringAppendTESTOperator::FullMerge( - const Slice& key, - const Slice* existing_value, - const std::deque& operands, - std::string* new_value, - Logger* logger) const { - +bool StringAppendTESTOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { // Clear the *new_value for writing. - assert(new_value); - new_value->clear(); + merge_out->new_value.clear(); + + if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) { + // Only one operand + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } // Compute the space needed for the final result. size_t numBytes = 0; - for(auto it = operands.begin(); it != operands.end(); ++it) { + for (auto it = merge_in.operand_list.begin(); + it != merge_in.operand_list.end(); ++it) { numBytes += it->size() + 1; // Plus 1 for the delimiter } @@ -42,20 +44,23 @@ bool StringAppendTESTOperator::FullMerge( bool printDelim = false; // Prepend the *existing_value if one exists. - if (existing_value) { - new_value->reserve(numBytes + existing_value->size()); - new_value->append(existing_value->data(), existing_value->size()); + if (merge_in.existing_value) { + merge_out->new_value.reserve(numBytes + merge_in.existing_value->size()); + merge_out->new_value.append(merge_in.existing_value->data(), + merge_in.existing_value->size()); printDelim = true; } else if (numBytes) { - new_value->reserve(numBytes-1); // Minus 1 since we have one less delimiter + merge_out->new_value.reserve( + numBytes - 1); // Minus 1 since we have one less delimiter } // Concatenate the sequence of strings (and add a delimiter between each) - for(auto it = operands.begin(); it != operands.end(); ++it) { + for (auto it = merge_in.operand_list.begin(); + it != merge_in.operand_list.end(); ++it) { if (printDelim) { - new_value->append(1,delim_); + merge_out->new_value.append(1, delim_); } - new_value->append(*it); + merge_out->new_value.append(it->data(), it->size()); printDelim = true; } diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 5e506ef8f..d979f1451 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -24,11 +24,8 @@ class StringAppendTESTOperator : public MergeOperator { // Constructor with delimiter explicit StringAppendTESTOperator(char delim_char); - virtual bool FullMerge(const Slice& key, - const Slice* existing_value, - const std::deque& operand_sequence, - std::string* new_value, - Logger* logger) const override; + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; virtual bool PartialMergeMulti(const Slice& key, const std::deque& operand_list, diff --git a/utilities/options/options_util_test.cc b/utilities/options/options_util_test.cc index 94ddbc408..55e2d6cbe 100644 --- a/utilities/options/options_util_test.cc +++ b/utilities/options/options_util_test.cc @@ -128,19 +128,19 @@ class DummyMergeOperator : public MergeOperator { DummyMergeOperator() {} virtual ~DummyMergeOperator() {} - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, Logger* logger) const { + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { return false; } virtual bool PartialMergeMulti(const Slice& key, const std::deque& operand_list, - std::string* new_value, Logger* logger) const { + std::string* new_value, + Logger* logger) const override { return false; } - virtual const char* Name() const { return "DummyMergeOperator"; } + virtual const char* Name() const override { return "DummyMergeOperator"; } }; class DummySliceTransform : public SliceTransform { diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index a96123d81..f16bcc51b 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -225,38 +225,43 @@ class TtlMergeOperator : public MergeOperator { assert(env); } - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operands, - std::string* new_value, Logger* logger) const - override { + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { const uint32_t ts_len = DBWithTTLImpl::kTSLength; - if (existing_value && existing_value->size() < ts_len) { - Log(InfoLogLevel::ERROR_LEVEL, logger, + if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) { + Log(InfoLogLevel::ERROR_LEVEL, merge_in.logger, "Error: Could not remove timestamp from existing value."); return false; } // Extract time-stamp from each operand to be passed to user_merge_op_ - std::deque operands_without_ts; - for (const auto& operand : operands) { + std::vector operands_without_ts; + for (const auto& operand : merge_in.operand_list) { if (operand.size() < ts_len) { - Log(InfoLogLevel::ERROR_LEVEL, logger, + Log(InfoLogLevel::ERROR_LEVEL, merge_in.logger, "Error: Could not remove timestamp from operand value."); return false; } - operands_without_ts.push_back(operand.substr(0, operand.size() - ts_len)); + operands_without_ts.push_back(operand); + operands_without_ts.back().remove_suffix(ts_len); } // Apply the user merge operator (store result in *new_value) bool good = true; - if (existing_value) { - Slice existing_value_without_ts(existing_value->data(), - existing_value->size() - ts_len); - good = user_merge_op_->FullMerge(key, &existing_value_without_ts, - operands_without_ts, new_value, logger); + MergeOperationOutput user_merge_out(merge_out->new_value, + merge_out->existing_operand); + if (merge_in.existing_value) { + Slice existing_value_without_ts(merge_in.existing_value->data(), + merge_in.existing_value->size() - ts_len); + good = user_merge_op_->FullMergeV2( + MergeOperationInput(merge_in.key, &existing_value_without_ts, + operands_without_ts, merge_in.logger), + &user_merge_out); } else { - good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts, - new_value, logger); + good = user_merge_op_->FullMergeV2( + MergeOperationInput(merge_in.key, nullptr, operands_without_ts, + merge_in.logger), + &user_merge_out); } // Return false if the user merge operator returned false @@ -264,17 +269,23 @@ class TtlMergeOperator : public MergeOperator { return false; } + if (merge_out->existing_operand.data()) { + merge_out->new_value.assign(merge_out->existing_operand.data(), + merge_out->existing_operand.size()); + merge_out->existing_operand = Slice(nullptr, 0); + } + // Augment the *new_value with the ttl time-stamp int64_t curtime; if (!env_->GetCurrentTime(&curtime).ok()) { - Log(InfoLogLevel::ERROR_LEVEL, logger, + Log(InfoLogLevel::ERROR_LEVEL, merge_in.logger, "Error: Could not get current time to be attached internally " "to the new value."); return false; } else { char ts_string[ts_len]; EncodeFixed32(ts_string, (int32_t)curtime); - new_value->append(ts_string, ts_len); + merge_out->new_value.append(ts_string, ts_len); return true; } } diff --git a/utilities/util_merge_operators_test.cc b/utilities/util_merge_operators_test.cc index c80ce6ffb..ac2fbe7a6 100644 --- a/utilities/util_merge_operators_test.cc +++ b/utilities/util_merge_operators_test.cc @@ -13,22 +13,41 @@ class UtilMergeOperatorTest : public testing::Test { public: UtilMergeOperatorTest() {} - std::string FullMerge(std::string existing_value, - std::deque operands, - std::string key = "") { - Slice existing_value_slice(existing_value); + std::string FullMergeV2(std::string existing_value, + std::vector operands, + std::string key = "") { std::string result; + Slice result_operand(nullptr, 0); + + Slice existing_value_slice(existing_value); + std::vector operands_slice(operands.begin(), operands.end()); - merge_operator_->FullMerge(key, &existing_value_slice, operands, &result, - nullptr); + const MergeOperator::MergeOperationInput merge_in( + key, &existing_value_slice, operands_slice, nullptr); + MergeOperator::MergeOperationOutput merge_out(result, result_operand); + merge_operator_->FullMergeV2(merge_in, &merge_out); + + if (result_operand.data()) { + result.assign(result_operand.data(), result_operand.size()); + } return result; } - std::string FullMerge(std::deque operands, - std::string key = "") { + std::string FullMergeV2(std::vector operands, + std::string key = "") { std::string result; + Slice result_operand(nullptr, 0); + + std::vector operands_slice(operands.begin(), operands.end()); + + const MergeOperator::MergeOperationInput merge_in(key, nullptr, + operands_slice, nullptr); + MergeOperator::MergeOperationOutput merge_out(result, result_operand); + merge_operator_->FullMergeV2(merge_in, &merge_out); - merge_operator_->FullMerge(key, nullptr, operands, &result, nullptr); + if (result_operand.data()) { + result.assign(result_operand.data(), result_operand.size()); + } return result; } @@ -56,14 +75,14 @@ class UtilMergeOperatorTest : public testing::Test { TEST_F(UtilMergeOperatorTest, MaxMergeOperator) { merge_operator_ = MergeOperators::CreateMaxOperator(); - EXPECT_EQ("B", FullMerge("B", {"A"})); - EXPECT_EQ("B", FullMerge("A", {"B"})); - EXPECT_EQ("", FullMerge({"", "", ""})); - EXPECT_EQ("A", FullMerge({"A"})); - EXPECT_EQ("ABC", FullMerge({"ABC"})); - EXPECT_EQ("Z", FullMerge({"ABC", "Z", "C", "AXX"})); - EXPECT_EQ("ZZZ", FullMerge({"ABC", "CC", "Z", "ZZZ"})); - EXPECT_EQ("a", FullMerge("a", {"ABC", "CC", "Z", "ZZZ"})); + EXPECT_EQ("B", FullMergeV2("B", {"A"})); + EXPECT_EQ("B", FullMergeV2("A", {"B"})); + EXPECT_EQ("", FullMergeV2({"", "", ""})); + EXPECT_EQ("A", FullMergeV2({"A"})); + EXPECT_EQ("ABC", FullMergeV2({"ABC"})); + EXPECT_EQ("Z", FullMergeV2({"ABC", "Z", "C", "AXX"})); + EXPECT_EQ("ZZZ", FullMergeV2({"ABC", "CC", "Z", "ZZZ"})); + EXPECT_EQ("a", FullMergeV2("a", {"ABC", "CC", "Z", "ZZZ"})); EXPECT_EQ("z", PartialMergeMulti({"a", "z", "efqfqwgwew", "aaz", "hhhhh"}));