From d150e01474a0cb281792f51b81260b629b18457f Mon Sep 17 00:00:00 2001 From: Vijay Nadimpalli Date: Tue, 6 Aug 2019 14:22:34 -0700 Subject: [PATCH] New API to get all merge operands for a Key (#5604) Summary: This is a new API added to db.h to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. Example use-cases: 1. Update subset of columns and read subset of columns - Imagine a SQL Table, a row is encoded as a K/V pair (as it is done in MyRocks). If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU. 2. Updating very few attributes in a value which is a JSON-like document - Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don't need to do a full merge. ---------------------------------------------------------------------------------------------------- API : Status GetMergeOperands( const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* merge_operands, GetMergeOperandsOptions* get_merge_operands_options, int* number_of_operands) Example usage : int size = 100; int number_of_operands = 0; std::vector values(size); GetMergeOperandsOptions merge_operands_info; db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info, &number_of_operands); Description : Returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion. merge_operands-> Points to an array of at-least merge_operands_options.expected_max_number_of_operands and the caller is responsible for allocating it. If the status returned is Incomplete then number_of_operands will contain the total number of merge operands found in DB for key. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5604 Test Plan: Added unit test and perf test in db_bench that can be run using the command: ./db_bench -benchmarks=getmergeoperands --merge_operator=sortlist Differential Revision: D16657366 Pulled By: vjnadimpalli fbshipit-source-id: 0faadd752351745224ee12d4ae9ef3cb529951bf --- CMakeLists.txt | 2 + Makefile | 4 + TARGETS | 8 + appveyor.yml | 2 +- db/compacted_db_impl.cc | 4 +- db/db_blob_index_test.cc | 8 +- db/db_impl/db_impl.cc | 112 +++++--- db/db_impl/db_impl.h | 43 +++- db/db_impl/db_impl_files.cc | 3 +- db/db_merge_operand_test.cc | 240 ++++++++++++++++++ db/db_merge_operator_test.cc | 8 +- db/db_test.cc | 9 + db/db_test2.cc | 8 +- db/memtable.cc | 32 ++- db/memtable.h | 13 +- db/memtable_list.cc | 14 + db/memtable_list.h | 7 + db/version_set.cc | 17 +- db/version_set.h | 30 ++- file/filename.cc | 3 +- include/rocksdb/db.h | 20 ++ include/rocksdb/status.h | 1 + include/rocksdb/utilities/stackable_db.h | 11 + src.mk | 2 + .../block_based/data_block_hash_index_test.cc | 8 +- table/cuckoo/cuckoo_table_reader_test.cc | 12 +- table/get_context.cc | 113 ++++++--- table/get_context.h | 15 +- table/table_reader_bench.cc | 2 +- table/table_test.cc | 16 +- tools/db_bench_tool.cc | 105 +++++++- utilities/blob_db/blob_db_impl.cc | 17 +- utilities/merge_operators.h | 3 + utilities/merge_operators/sortlist.cc | 100 ++++++++ utilities/merge_operators/sortlist.h | 38 +++ utilities/transactions/write_prepared_txn.cc | 8 +- .../transactions/write_prepared_txn_db.cc | 8 +- .../transactions/write_unprepared_txn.cc | 16 +- .../transactions/write_unprepared_txn_db.cc | 8 +- .../write_batch_with_index.cc | 7 +- 40 files changed, 914 insertions(+), 163 deletions(-) create mode 100644 db/db_merge_operand_test.cc create mode 100644 utilities/merge_operators/sortlist.cc create mode 100644 utilities/merge_operators/sortlist.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bb99d1b7e..8622242aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -661,6 +661,7 @@ set(SOURCES utilities/merge_operators/bytesxor.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc + utilities/merge_operators/sortlist.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc @@ -887,6 +888,7 @@ if(WITH_TESTS) db/db_log_iter_test.cc db/db_memtable_test.cc db/db_merge_operator_test.cc + db/db_merge_operand_test.cc db/db_options_test.cc db/db_properties_test.cc db/db_range_del_test.cc diff --git a/Makefile b/Makefile index 4502be8e4..1718309cb 100644 --- a/Makefile +++ b/Makefile @@ -454,6 +454,7 @@ TESTS = \ db_iterator_test \ db_memtable_test \ db_merge_operator_test \ + db_merge_operand_test \ db_options_test \ db_range_del_test \ db_secondary_test \ @@ -1254,6 +1255,9 @@ db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 25d7ff667..bac5c4311 100644 --- a/TARGETS +++ b/TARGETS @@ -301,6 +301,7 @@ cpp_library( "utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", + "utilities/merge_operators/sortlist.cc", "utilities/merge_operators/string_append/stringappend.cc", "utilities/merge_operators/string_append/stringappend2.cc", "utilities/merge_operators/uint64add.cc", @@ -755,6 +756,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_merge_operand_test", + "db/db_merge_operand_test.cc", + "parallel", + [], + [], + ], [ "db_options_test", "db/db_options_test.cc", diff --git a/appveyor.yml b/appveyor.yml index 6bdb164e8..77901c407 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -60,7 +60,7 @@ build: test: test_script: - - ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test -Concurrency 8 + - ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test,db_merge_operand_test -Concurrency 8 on_failure: - cmd: 7z a build-failed.zip %APPVEYOR_BUILD_FOLDER%\build\ && appveyor PushArtifact build-failed.zip diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index 88928391a..13cccbd77 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, PinnableSlice* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, - nullptr, nullptr); + true, nullptr, nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(), &get_context, nullptr); @@ -70,7 +70,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, std::string& value = (*values)[idx]; GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, keys[idx], &pinnable_val, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context, nullptr); value.assign(pinnable_val.data(), pinnable_val.size()); diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc index 005a23d63..e9618885a 100644 --- a/db/db_blob_index_test.cc +++ b/db/db_blob_index_test.cc @@ -63,9 +63,11 @@ class DBBlobIndexTest : public DBTestBase { ReadOptions read_options; read_options.snapshot = snapshot; PinnableSlice value; - auto s = dbfull()->GetImpl(read_options, cfh(), key, &value, - nullptr /*value_found*/, nullptr /*callback*/, - is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cfh(); + get_impl_options.value = &value; + get_impl_options.is_blob_index = is_blob_index; + auto s = dbfull()->GetImpl(read_options, key, get_impl_options); if (s.IsNotFound()) { return "NOT_FOUND"; } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 81c44388b..9236d911e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1441,19 +1441,22 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - return GetImpl(read_options, column_family, key, value); + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = value; + return GetImpl(read_options, key, get_impl_options); } -Status DBImpl::GetImpl(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const Slice& key, - PinnableSlice* pinnable_val, bool* value_found, - ReadCallback* callback, bool* is_blob_index) { - assert(pinnable_val != nullptr); +Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, + GetImplOptions get_impl_options) { + assert(get_impl_options.value != nullptr || + get_impl_options.merge_operands != nullptr); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); - auto cfh = reinterpret_cast(column_family); + auto cfh = + reinterpret_cast(get_impl_options.column_family); auto cfd = cfh->cfd(); if (tracer_) { @@ -1461,7 +1464,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // tracing is enabled. InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { - tracer_->Get(column_family, key); + tracer_->Get(get_impl_options.column_family, key); } } @@ -1473,9 +1476,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { - if (callback) { + if (get_impl_options.callback) { // Already calculated based on read_options.snapshot - snapshot = callback->max_visible_seq(); + snapshot = get_impl_options.callback->max_visible_seq(); } else { snapshot = reinterpret_cast(read_options.snapshot)->number_; @@ -1489,12 +1492,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); - if (callback) { + if (get_impl_options.callback) { // The unprep_seqs are not published for write unprepared, so it could be // that max_visible_seq is larger. Seek to the std::max of the two. // However, we still want our callback to contain the actual snapshot so // that it can do the correct visibility filtering. - callback->Refresh(snapshot); + get_impl_options.callback->Refresh(snapshot); // Internally, WriteUnpreparedTxnReadCallback::Refresh would set // max_visible_seq = max(max_visible_seq, snapshot) @@ -1505,7 +1508,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // be needed. // // assert(callback->max_visible_seq() >= snapshot); - snapshot = callback->max_visible_seq(); + snapshot = get_impl_options.callback->max_visible_seq(); } } TEST_SYNC_POINT("DBImpl::GetImpl:3"); @@ -1526,19 +1529,39 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); + // Get value associated with key + if (get_impl_options.get_value) { + if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s, + &merge_context, &max_covering_tombstone_seq, + read_options, get_impl_options.callback, + get_impl_options.is_blob_index)) { + done = true; + get_impl_options.value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s, + &merge_context, &max_covering_tombstone_seq, + read_options, get_impl_options.callback, + get_impl_options.is_blob_index)) { + done = true; + get_impl_options.value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + } else { + // Get Merge Operands associated with key, Merge Operands should not be + // merged and raw values should be returned to the user. + if (sv->mem->Get(lkey, nullptr, &s, &merge_context, + &max_covering_tombstone_seq, read_options, nullptr, + nullptr, false)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->GetMergeOperands(lkey, &s, &merge_context, + &max_covering_tombstone_seq, + read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } } if (!done && !s.ok() && !s.IsMergeInProgress()) { ReturnAndCleanupSuperVersion(cfd, sv); @@ -1547,9 +1570,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, value_found, nullptr, nullptr, - callback, is_blob_index); + sv->current->Get( + read_options, lkey, get_impl_options.value, &s, &merge_context, + &max_covering_tombstone_seq, + get_impl_options.get_value ? get_impl_options.value_found : nullptr, + nullptr, nullptr, + get_impl_options.get_value ? get_impl_options.callback : nullptr, + get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr, + get_impl_options.get_value); RecordTick(stats_, MEMTABLE_MISS); } @@ -1561,7 +1589,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, RecordTick(stats_, NUMBER_KEYS_READ); size_t size = 0; if (s.ok()) { - size = pinnable_val->size(); + if (get_impl_options.get_value) { + size = get_impl_options.value->size(); + } else { + // Return all merge operands for get_impl_options.key + *get_impl_options.number_of_operands = + static_cast(merge_context.GetNumOperands()); + if (*get_impl_options.number_of_operands > + get_impl_options.get_merge_operands_options + ->expected_max_number_of_operands) { + s = Status::Incomplete( + Status::SubCode::KMergeOperandsInsufficientCapacity); + } else { + for (const Slice& sl : merge_context.GetOperands()) { + size += sl.size(); + get_impl_options.merge_operands->PinSelf(sl); + get_impl_options.merge_operands++; + } + } + } RecordTick(stats_, BYTES_READ, size); PERF_COUNTER_ADD(get_read_bytes, size); } @@ -2222,7 +2268,11 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options, ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only PinnableSlice pinnable_val; - auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = value_found; + auto s = GetImpl(roptions, key, get_impl_options); value->assign(pinnable_val.data(), pinnable_val.size()); // If block_cache is enabled and the index block of the table didn't diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index fe3a2f6f2..f1dbc5d02 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -159,6 +159,21 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using DB::GetMergeOperands; + Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* merge_operands, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) override { + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.merge_operands = merge_operands; + get_impl_options.get_merge_operands_options = get_merge_operands_options; + get_impl_options.number_of_operands = number_of_operands; + get_impl_options.get_value = false; + return GetImpl(options, key, get_impl_options); + } + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -395,12 +410,32 @@ class DBImpl : public DB { // ---- End of implementations of the DB interface ---- + struct GetImplOptions { + ColumnFamilyHandle* column_family = nullptr; + PinnableSlice* value = nullptr; + bool* value_found = nullptr; + ReadCallback* callback = nullptr; + bool* is_blob_index = nullptr; + // If true return value associated with key via value pointer else return + // all merge operands for key via merge_operands pointer + bool get_value = true; + // Pointer to an array of size + // get_merge_operands_options.expected_max_number_of_operands allocated by + // user + PinnableSlice* merge_operands = nullptr; + GetMergeOperandsOptions* get_merge_operands_options = nullptr; + int* number_of_operands = nullptr; + }; + // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); + // This function is also called by GetMergeOperands + // If get_impl_options.get_value = true get value associated with + // get_impl_options.key via get_impl_options.value + // If get_impl_options.get_value = false get merge operands associated with + // get_impl_options.key via get_impl_options.merge_operands + Status GetImpl(const ReadOptions& options, const Slice& key, + GetImplOptions get_impl_options); ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ColumnFamilyData* cfd, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index e3b2f5765..3c5fd4fcd 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -318,8 +318,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // We may ignore the dbname when generating the file names. for (auto& file : state.sst_delete_files) { candidate_files.emplace_back( - MakeTableFileName(file.metadata->fd.GetNumber()), - file.path); + MakeTableFileName(file.metadata->fd.GetNumber()), file.path); if (file.metadata->table_reader_handle) { table_cache_->Release(file.metadata->table_reader_handle); } diff --git a/db/db_merge_operand_test.cc b/db/db_merge_operand_test.cc new file mode 100644 index 000000000..e6280ad8c --- /dev/null +++ b/db/db_merge_operand_test.cc @@ -0,0 +1,240 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/utilities/debug.h" +#include "table/block_based/block_builder.h" +#include "test_util/fault_injection_test_env.h" +#if !defined(ROCKSDB_LITE) +#include "test_util/sync_point.h" +#endif +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/sortlist.h" +#include "utilities/merge_operators/string_append/stringappend2.h" + +namespace rocksdb { + +class DBMergeOperandTest : public DBTestBase { + public: + DBMergeOperandTest() : DBTestBase("/db_merge_operand_test") {} +}; + +TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) { + class LimitedStringAppendMergeOp : public StringAppendTESTOperator { + public: + LimitedStringAppendMergeOp(int limit, char delim) + : StringAppendTESTOperator(delim), limit_(limit) {} + + const char* Name() const override { + return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; + } + + bool ShouldMerge(const std::vector& operands) const override { + if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { + return true; + } + return false; + } + + private: + size_t limit_ = 0; + }; + + Options options; + options.create_if_missing = true; + // Use only the latest two merge operands. + options.merge_operator = std::make_shared(2, ','); + options.env = env_; + Reopen(options); + int num_records = 4; + int number_of_operands = 0; + std::vector values(num_records); + GetMergeOperandsOptions merge_operands_info; + merge_operands_info.expected_max_number_of_operands = num_records; + + // k0 value in memtable + Put("k0", "PutARock"); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "PutARock"); + + // k0.1 value in SST + Put("k0.1", "RockInSST"); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "RockInSST"); + + // All k1 values are in memtable. + ASSERT_OK(Merge("k1", "a")); + Put("k1", "x"); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "d")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "x"); + ASSERT_EQ(values[1], "b"); + ASSERT_EQ(values[2], "c"); + ASSERT_EQ(values[3], "d"); + + // expected_max_number_of_operands is less than number of merge operands so + // status should be Incomplete. + merge_operands_info.expected_max_number_of_operands = num_records - 1; + Status status = db_->GetMergeOperands( + ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), + &merge_operands_info, &number_of_operands); + ASSERT_EQ(status.IsIncomplete(), true); + merge_operands_info.expected_max_number_of_operands = num_records; + + // All k1.1 values are in memtable. + ASSERT_OK(Merge("k1.1", "r")); + Delete("k1.1"); + ASSERT_OK(Merge("k1.1", "c")); + ASSERT_OK(Merge("k1.1", "k")); + ASSERT_OK(Merge("k1.1", "s")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "c"); + ASSERT_EQ(values[1], "k"); + ASSERT_EQ(values[2], "s"); + + // All k2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2", "q")); + ASSERT_OK(Merge("k2", "w")); + ASSERT_OK(Merge("k2", "e")); + ASSERT_OK(Merge("k2", "r")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "q"); + ASSERT_EQ(values[1], "w"); + ASSERT_EQ(values[2], "e"); + ASSERT_EQ(values[3], "r"); + + // All k2.1 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2.1", "m")); + Put("k2.1", "l"); + ASSERT_OK(Merge("k2.1", "n")); + ASSERT_OK(Merge("k2.1", "o")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "l,n,o"); + + // All k2.2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2.2", "g")); + Delete("k2.2"); + ASSERT_OK(Merge("k2.2", "o")); + ASSERT_OK(Merge("k2.2", "t")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "o,t"); + + // Do some compaction that will make the following tests more predictable + // Slice start("PutARock"); + // Slice end("t"); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // All k3 values are flushed and are in different files. + ASSERT_OK(Merge("k3", "ab")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "bc")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "ab"); + ASSERT_EQ(values[1], "bc"); + ASSERT_EQ(values[2], "cd"); + ASSERT_EQ(values[3], "de"); + + // All k3.1 values are flushed and are in different files. + ASSERT_OK(Merge("k3.1", "ab")); + ASSERT_OK(Flush()); + Put("k3.1", "bc"); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.1", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.1", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "bc"); + ASSERT_EQ(values[1], "cd"); + ASSERT_EQ(values[2], "de"); + + // All k3.2 values are flushed and are in different files. + ASSERT_OK(Merge("k3.2", "ab")); + ASSERT_OK(Flush()); + Delete("k3.2"); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.2", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.2", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "cd"); + ASSERT_EQ(values[1], "de"); + + // All K4 values are in different levels + ASSERT_OK(Merge("k4", "ba")); + ASSERT_OK(Flush()); + MoveFilesToLevel(4); + ASSERT_OK(Merge("k4", "cb")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_OK(Merge("k4", "dc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Merge("k4", "ed")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "ba"); + ASSERT_EQ(values[1], "cb"); + ASSERT_EQ(values[2], "dc"); + ASSERT_EQ(values[3], "ed"); + + // First 3 k5 values are in SST and next 4 k5 values are in Immutable Memtable + ASSERT_OK(Merge("k5", "who")); + ASSERT_OK(Merge("k5", "am")); + ASSERT_OK(Merge("k5", "i")); + ASSERT_OK(Flush()); + Put("k5", "remember"); + ASSERT_OK(Merge("k5", "i")); + ASSERT_OK(Merge("k5", "am")); + ASSERT_OK(Merge("k5", "rocks")); + dbfull()->TEST_SwitchMemtable(); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k5", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "remember"); + ASSERT_EQ(values[1], "i"); + ASSERT_EQ(values[2], "am"); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 31bd2e491..8358ddb56 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -46,9 +46,11 @@ class DBMergeOperatorTest : public DBTestBase { ReadOptions read_opt; read_opt.snapshot = snapshot; PinnableSlice value; - Status s = - dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value, - nullptr /*value_found*/, &read_callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = db_->DefaultColumnFamily(); + get_impl_options.value = &value; + get_impl_options.callback = &read_callback; + Status s = dbfull()->GetImpl(read_opt, key, get_impl_options); if (!s.ok()) { return s.ToString(); } diff --git a/db/db_test.cc b/db/db_test.cc index f53afa17d..5c96bec36 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2540,6 +2540,15 @@ class ModelDB : public DB { return Status::NotSupported(key); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands( + const ReadOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, + const Slice& key, PinnableSlice* /*slice*/, + GetMergeOperandsOptions* /*merge_operands_options*/, + int* /*number_of_operands*/) override { + return Status::NotSupported(key); + } + using DB::MultiGet; std::vector MultiGet( const ReadOptions& /*options*/, diff --git a/db/db_test2.cc b/db/db_test2.cc index 3664b3a24..26604c53a 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2797,8 +2797,12 @@ TEST_F(DBTest2, ReadCallbackTest) { ReadOptions roptions; TestReadCallback callback(seq); bool dont_care = true; - Status s = dbfull()->GetImpl(roptions, dbfull()->DefaultColumnFamily(), key, - &pinnable_val, &dont_care, &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = dbfull()->DefaultColumnFamily(); + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = &dont_care; + get_impl_options.callback = &callback; + Status s = dbfull()->GetImpl(roptions, key, get_impl_options); ASSERT_TRUE(s.ok()); // Assuming that after each Put the DB increased seq by one, the value and // seq number must be equal since we also inc value by 1 after each Put. diff --git a/db/memtable.cc b/db/memtable.cc index fdd1a577a..62c7339b5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -601,6 +601,7 @@ struct Saver { Logger* logger; Statistics* statistics; bool inplace_update_support; + bool do_merge; Env* env_; ReadCallback* callback_; bool* is_blob_index; @@ -627,7 +628,7 @@ static bool SaveValue(void* arg, const char* entry) { // klength varint32 // userkey char[klength-8] // tag uint64 - // vlength varint32 + // vlength varint32f // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped @@ -677,12 +678,24 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { - if (s->value != nullptr) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), s->value, s->logger, - s->statistics, s->env_, nullptr /* result_operand */, true); + if (s->do_merge) { + if (s->value != nullptr) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->value, s->logger, + s->statistics, s->env_, nullptr /* result_operand */, true); + } + } else { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); } + } else if (!s->do_merge) { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); } else if (s->value != nullptr) { s->value->assign(v.data(), v.size()); } @@ -726,7 +739,8 @@ static bool SaveValue(void* arg, const char* entry) { *(s->merge_in_progress) = true; merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); - if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) { + if (s->do_merge && merge_operator->ShouldMerge( + merge_context->GetOperandsDirectionBackward())) { *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, merge_context->GetOperands(), s->value, s->logger, s->statistics, @@ -750,7 +764,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback, bool* is_blob_index) { + ReadCallback* callback, bool* is_blob_index, bool do_merge) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -810,8 +824,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.env_ = env_; saver.callback_ = callback; saver.is_blob_index = is_blob_index; + saver.do_merge = do_merge; table_->Get(key, &saver, SaveValue); - *seq = saver.seq; } diff --git a/db/memtable.h b/db/memtable.h index 6b8c4141f..36ba0df79 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -175,6 +175,10 @@ class MemTable { const Slice& value, bool allow_concurrent = false, MemTablePostProcessInfo* post_process_info = nullptr); + // Used to Get value associated with key or Get Merge Operands associated + // with key. + // If do_merge = true the default behavior which is Get value for key is + // executed. Expected behavior is described right below. // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. @@ -188,20 +192,23 @@ class MemTable { // returned). Otherwise, *seq will be set to kMaxSequenceNumber. // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // status returned indicates a corruption or other unexpected error. + // If do_merge = false then any Merge Operands encountered for key are simply + // stored in merge_context.operands_list and never actually merged to get a + // final value. The raw Merge Operands are eventually returned to the user. bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); + bool* is_blob_index = nullptr, bool do_merge = true); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr) { + bool* is_blob_index = nullptr, bool do_merge = true) { SequenceNumber seq; return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, - read_opts, callback, is_blob_index); + read_opts, callback, is_blob_index, do_merge); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0f796eb9a..d06a82df8 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -109,6 +109,20 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, is_blob_index); } +bool MemTableListVersion::GetMergeOperands( + const LookupKey& key, Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { + for (MemTable* memtable : memlist_) { + bool done = memtable->Get(key, nullptr, s, merge_context, + max_covering_tombstone_seq, read_opts, nullptr, + nullptr, false); + if (done) { + return true; + } + } + return false; +} + bool MemTableListVersion::GetFromHistory( const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, diff --git a/db/memtable_list.h b/db/memtable_list.h index a72077ff3..2bd225b83 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -71,6 +71,13 @@ class MemTableListVersion { read_opts, callback, is_blob_index); } + // Returns all the merge operands corresponding to the key by searching all + // memtables starting from the most recent one. + bool GetMergeOperands(const LookupKey& key, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts); + // Similar to Get(), but searches the Memtable history of memtables that // have already been flushed. Should only be used from in-memory only // queries (such as Transaction validation) as the history may contain diff --git a/db/version_set.cc b/db/version_set.cc index 3a1f47790..af0168f76 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1651,7 +1651,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found, bool* key_exists, SequenceNumber* seq, ReadCallback* callback, - bool* is_blob) { + bool* is_blob, bool do_merge) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -1671,8 +1671,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, max_covering_tombstone_seq, this->env_, - seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, + do_merge ? value : nullptr, value_found, merge_context, do_merge, + max_covering_tombstone_seq, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_get_id); // Pin blocks that we read to hold merge operands @@ -1737,7 +1738,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } else if (fp.GetHitFileLevel() >= 2) { RecordTick(db_statistics_, GET_HIT_L2_AND_UP); } - PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); return; case GetContext::kDeleted: // Use empty error message for speed @@ -1755,11 +1757,14 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } f = fp.GetNextFile(); } - if (db_statistics_ != nullptr) { get_context.ReportCounters(); } if (GetContext::kMerge == get_context.State()) { + if (!do_merge) { + *status = Status::OK(); + return; + } if (!merge_operator_) { *status = Status::InvalidArgument( "merge_operator is not properly initialized."); @@ -1806,7 +1811,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, - iter->value, nullptr, &(iter->merge_context), + iter->value, nullptr, &(iter->merge_context), true, &iter->max_covering_tombstone_seq, this->env_, &iter->seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); diff --git a/db/version_set.h b/db/version_set.h index 391bb902c..25598630e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -63,7 +63,6 @@ class VersionSet; class WriteBufferManager; class MergeContext; class ColumnFamilySet; -class TableCache; class MergeIteratorBuilder; // Return the smallest index i such that file_level.files[i]->largest >= key. @@ -561,28 +560,33 @@ class Version { const Slice& largest_user_key, int level, bool* overlap); - // Lookup the value for key. If found, store it in *val and - // return OK. Else return a non-OK status. - // Uses *operands to store merge_operator operations to apply later. + // Lookup the value for key or get all merge operands for key. + // If do_merge = true (default) then lookup value for key. + // Behavior if do_merge = true: + // If found, store it in *value and + // return OK. Else return a non-OK status. + // Uses *operands to store merge_operator operations to apply later. // - // If the ReadOptions.read_tier is set to do a read-only fetch, then - // *value_found will be set to false if it cannot be determined whether - // this value exists without doing IO. + // If the ReadOptions.read_tier is set to do a read-only fetch, then + // *value_found will be set to false if it cannot be determined whether + // this value exists without doing IO. // - // If the key is Deleted, *status will be set to NotFound and + // If the key is Deleted, *status will be set to NotFound and // *key_exists will be set to true. - // If no key was found, *status will be set to NotFound and + // If no key was found, *status will be set to NotFound and // *key_exists will be set to false. - // If seq is non-null, *seq will be set to the sequence number found - // for the key if a key was found. - // + // If seq is non-null, *seq will be set to the sequence number found + // for the key if a key was found. + // Behavior if do_merge = false + // If the key has any merge operands then store them in + // merge_context.operands_list and don't merge the operands // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, - bool* is_blob = nullptr); + bool* is_blob = nullptr, bool do_merge = true); void MultiGet(const ReadOptions&, MultiGetRange* range, ReadCallback* callback = nullptr, bool* is_blob = nullptr); diff --git a/file/filename.cc b/file/filename.cc index 65ec33149..ba5d84c29 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -60,8 +60,7 @@ static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) { static std::string MakeFileName(uint64_t number, const char* suffix) { char buf[100]; snprintf(buf, sizeof(buf), "%06llu.%s", - static_cast(number), - suffix); + static_cast(number), suffix); return buf; } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 1d90dc50b..36d6fea92 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -116,6 +116,10 @@ struct IngestExternalFileArg { IngestExternalFileOptions options; }; +struct GetMergeOperandsOptions { + int expected_max_number_of_operands = 0; +}; + // A collections of table properties objects, where // key: is the table's file name. // value: the table properties object of the given table. @@ -403,6 +407,22 @@ class DB { return Get(options, DefaultColumnFamily(), key, value); } + // Returns all the merge operands corresponding to the key. If the + // number of merge operands in DB is greater than + // merge_operands_options.expected_max_number_of_operands + // no merge operands are returned and status is Incomplete. Merge operands + // returned are in the order of insertion. + // merge_operands- Points to an array of at-least + // merge_operands_options.expected_max_number_of_operands and the + // caller is responsible for allocating it. If the status + // returned is Incomplete then number_of_operands will contain + // the total number of merge operands found in DB for key. + virtual Status GetMergeOperands( + const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* merge_operands, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) = 0; + // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index ac97ce442..e4360126d 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -76,6 +76,7 @@ class Status { kMemoryLimit = 7, kSpaceLimit = 8, kPathNotFound = 9, + KMergeOperandsInsufficientCapacity = 10, kMaxSubCode }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 67bf4e2fa..35fddc804 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -88,6 +88,17 @@ class StackableDB : public DB { return db_->Get(options, column_family, key, value); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands( + const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* slice, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) override { + return db_->GetMergeOperands(options, column_family, key, slice, + get_merge_operands_options, + number_of_operands); + } + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/src.mk b/src.mk index 0c6142e41..6d1d655c7 100644 --- a/src.mk +++ b/src.mk @@ -191,6 +191,7 @@ LIB_SOURCES = \ utilities/memory/memory_util.cc \ utilities/merge_operators/max.cc \ utilities/merge_operators/put.cc \ + utilities/merge_operators/sortlist.cc \ utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/uint64add.cc \ @@ -291,6 +292,7 @@ MAIN_SOURCES = \ db/db_log_iter_test.cc \ db/db_memtable_test.cc \ db/db_merge_operator_test.cc \ + db/db_merge_operand_test.cc \ db/db_options_test.cc \ db/db_properties_test.cc \ db/db_range_del_test.cc \ diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 484617d7e..ae23f6ef2 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -631,7 +631,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 60, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -656,7 +656,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 60, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -681,7 +681,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 120, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -706,7 +706,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 5, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kNotFound); diff --git a/table/cuckoo/cuckoo_table_reader_test.cc b/table/cuckoo/cuckoo_table_reader_test.cc index dd1557db1..8043d36ab 100644 --- a/table/cuckoo/cuckoo_table_reader_test.cc +++ b/table/cuckoo/cuckoo_table_reader_test.cc @@ -122,7 +122,7 @@ class CuckooReaderTest : public testing::Test { PinnableSlice value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(keys[i]), &get_context, nullptr)); ASSERT_STREQ(values[i].c_str(), value.data()); @@ -336,7 +336,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AppendInternalKey(¬_found_key, ikey); PinnableSlice value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, - Slice(not_found_key), &value, nullptr, nullptr, + Slice(not_found_key), &value, nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(not_found_key), &get_context, nullptr)); @@ -351,7 +351,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { value.Reset(); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2, nullptr)); ASSERT_TRUE(value.empty()); @@ -367,7 +367,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { value.Reset(); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(unused_key), &get_context3, nullptr)); ASSERT_TRUE(value.empty()); @@ -443,7 +443,7 @@ void WriteFile(const std::vector& keys, // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { value.Reset(); value.clear(); @@ -491,7 +491,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); uint64_t start_time = env->NowMicros(); if (batch_size > 0) { for (uint64_t i = 0; i < num; i += batch_size) { diff --git a/table/get_context.cc b/table/get_context.cc index f0c7928bf..cdb5798f7 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -42,9 +42,9 @@ GetContext::GetContext( const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, - SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq, - PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, - bool* is_blob_index, uint64_t tracing_get_id) + bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env, + SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, + ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -60,6 +60,7 @@ GetContext::GetContext( replay_log_(nullptr), pinned_iters_mgr_(_pinned_iters_mgr), callback_(callback), + do_merge_(do_merge), is_blob_index_(is_blob_index), tracing_get_id_(tracing_get_id) { if (seq_) { @@ -215,29 +216,44 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } if (kNotFound == state_) { state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - if (LIKELY(value_pinner != nullptr)) { - // If the backing resources for the value are provided, pin them - pinnable_val_->PinSlice(value, value_pinner); - } else { - TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", this); + if (do_merge_) { + if (LIKELY(pinnable_val_ != nullptr)) { + if (LIKELY(value_pinner != nullptr)) { + // If the backing resources for the value are provided, pin them + pinnable_val_->PinSlice(value, value_pinner); + } else { + TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", + this); - // Otherwise copy the value - pinnable_val_->PinSelf(value); + // Otherwise copy the value + pinnable_val_->PinSelf(value); + } } + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(value, value_pinner); } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, &value, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + if (do_merge_) { + if (LIKELY(pinnable_val_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, &value, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(value, value_pinner); } } if (is_blob_index_ != nullptr) { @@ -256,14 +272,18 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } + // If do_merge_ = false then the current value shouldn't be part of + // merge_context_->operand_list } } return false; @@ -272,24 +292,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; // value_pinner is not set from plain_table_reader.cc for example. - if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && - value_pinner != nullptr) { - value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); - merge_context_->PushOperand(value, true /*value_pinned*/); - } else { - merge_context_->PushOperand(value, false); - } - if (merge_operator_ != nullptr && - merge_operator_->ShouldMerge(merge_context_->GetOperandsDirectionBackward())) { + push_operand(value, value_pinner); + if (do_merge_ && merge_operator_ != nullptr && + merge_operator_->ShouldMerge( + merge_context_->GetOperandsDirectionBackward())) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + // do_merge_ = true this is the case where this function is called + // as part of DB Get API hence merge operators should be merged. + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } } return false; @@ -306,6 +325,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; } +void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } +} + void replayGetContextLog(const Slice& replay_log, const Slice& user_key, GetContext* get_context, Cleanable* value_pinner) { #ifndef ROCKSDB_LITE diff --git a/table/get_context.h b/table/get_context.h index 7110ceae8..97d73ec0b 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -66,6 +66,9 @@ class GetContext { GetContextStats get_context_stats_; // Constructor + // @param value Holds the value corresponding to user_key. If its nullptr + // then return all merge operands corresponding to user_key + // via merge_context // @param value_found If non-nullptr, set to false if key may be present // but we can't be certain because we cannot do IO // @param max_covering_tombstone_seq Pointer to highest sequence number of @@ -78,10 +81,14 @@ class GetContext { // for visibility of a key // @param is_blob_index If non-nullptr, will be used to indicate if a found // key is of type blob index + // @param do_merge True if value associated with user_key has to be returned + // and false if all the merge operands associated with user_key has to be + // returned. Id do_merge=false then all the merge operands are stored in + // merge_context and they are never merged. The value pointer is untouched. GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* value, bool* value_found, - MergeContext* merge_context, + MergeContext* merge_context, bool do_merge, SequenceNumber* max_covering_tombstone_seq, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, @@ -140,6 +147,8 @@ class GetContext { uint64_t get_tracing_get_id() const { return tracing_get_id_; } + void push_operand(const Slice& value, Cleanable* value_pinner); + private: const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -162,6 +171,10 @@ class GetContext { PinnedIteratorsManager* pinned_iters_mgr_; ReadCallback* callback_; bool sample_; + // Value is true if it's called as part of DB Get API and false if it's + // called as part of DB GetMergeOperands API. When it's false merge operators + // are never merged. + bool do_merge_; bool* is_blob_index_; // Used for block cache tracing only. A tracing get id uniquely identifies a // Get or a MultiGet. diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index cec62df59..45d760f0e 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -175,7 +175,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, ioptions.merge_operator, ioptions.info_log, ioptions.statistics, GetContext::kNotFound, Slice(key), &value, nullptr, &merge_context, - &max_covering_tombstone_seq, env); + true, &max_covering_tombstone_seq, env); s = table_reader->Get(read_options, key, &get_context, nullptr); } else { s = db->Get(read_options, key, &result); diff --git a/table/table_test.cc b/table/table_test.cc index 6cd26bc73..749048b78 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2323,8 +2323,8 @@ TEST_P(BlockBasedTableTest, TracingGetTest) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, - nullptr, /*get_id=*/i); + nullptr, true, nullptr, nullptr, nullptr, nullptr, + nullptr, nullptr, /*tracing_get_id=*/i); get_perf_context()->Reset(); ASSERT_OK(c.GetTableReader()->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -2579,7 +2579,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), nullptr, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); // a hack that just to trigger BlockBasedTable::GetFilter. reader->Get(ReadOptions(), "non-exist-key", &get_context, moptions.prefix_extractor.get()); @@ -2750,7 +2750,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), internal_key.Encode(), &get_context, moptions4.prefix_extractor.get())); ASSERT_STREQ(value.data(), "hello"); @@ -2836,7 +2836,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); get_perf_context()->Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -2862,7 +2862,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); get_perf_context()->Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -4230,7 +4230,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) { std::string user_key = ExtractUserKey(kv.first).ToString(); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ro, kv.first, &get_context, moptions.prefix_extractor.get())); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -4256,7 +4256,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ro, encoded_key, &get_context, moptions.prefix_extractor.get())); ASSERT_EQ(get_context.State(), GetContext::kNotFound); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f6a9d9458..001dd4d2f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -71,6 +71,7 @@ #include "utilities/blob_db/blob_db.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/bytesxor.h" +#include "utilities/merge_operators/sortlist.h" #include "utilities/persistent_cache/block_cache_tier.h" #ifdef OS_WIN @@ -120,7 +121,8 @@ DEFINE_string( "fillseekseq," "randomtransaction," "randomreplacekeys," - "timeseries", + "timeseries," + "getmergeoperands", "Comma-separated list of operations to run in the specified" " order. Available benchmarks:\n" @@ -190,7 +192,13 @@ DEFINE_string( "\tlevelstats -- Print the number of files and bytes per level\n" "\tsstables -- Print sstable info\n" "\theapprofile -- Dump a heap profile (if supported by this port)\n" - "\treplay -- replay the trace file specified with trace_file\n"); + "\treplay -- replay the trace file specified with trace_file\n" + "\tgetmergeoperands -- Insert lots of merge records which are a list of " + "sorted ints for a key and then compare performance of lookup for another " + "key " + "by doing a Get followed by binary searching in the large sorted list vs " + "doing a GetMergeOperands and binary searching in the operands which are" + "sorted sub-lists. The MergeOperator used is sortlist.h\n"); DEFINE_int64(num, 1000000, "Number of key/values to place in database"); @@ -2880,6 +2888,8 @@ class Benchmark { exit(1); } method = &Benchmark::Replay; + } else if (name == "getmergeoperands") { + method = &Benchmark::GetMergeOperands; } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); exit(1); @@ -5921,6 +5931,97 @@ class Benchmark { } } + bool binary_search(std::vector& data, int start, int end, int key) { + if (data.empty()) return false; + if (start > end) return false; + int mid = start + (end - start) / 2; + if (mid > static_cast(data.size()) - 1) return false; + if (data[mid] == key) { + return true; + } else if (data[mid] > key) { + return binary_search(data, start, mid - 1, key); + } else { + return binary_search(data, mid + 1, end, key); + } + } + + // Does a bunch of merge operations for a key(key1) where the merge operand + // is a sorted list. Next performance comparison is done between doing a Get + // for key1 followed by searching for another key(key2) in the large sorted + // list vs calling GetMergeOperands for key1 and then searching for the key2 + // in all the sorted sub-lists. Later case is expected to be a lot faster. + void GetMergeOperands(ThreadState* thread) { + DB* db = SelectDB(thread); + const int kTotalValues = 100000; + const int kListSize = 100; + std::string key = "my_key"; + std::string value; + + for (int i = 1; i < kTotalValues; i++) { + if (i % kListSize == 0) { + // Remove trailing ',' + value.pop_back(); + db->Merge(WriteOptions(), key, value); + value.clear(); + } else { + value.append(std::to_string(i)).append(","); + } + } + + SortList s; + std::vector data; + // This value can be experimented with and it will demonstrate the + // perf difference between doing a Get and searching for lookup_key in the + // resultant large sorted list vs doing GetMergeOperands and searching + // for lookup_key within this resultant sorted sub-lists. + int lookup_key = 1; + + // Get API call + std::cout << "--- Get API call --- \n"; + PinnableSlice p_slice; + uint64_t st = FLAGS_env->NowNanos(); + db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice); + s.MakeVector(data, p_slice); + bool found = + binary_search(data, 0, static_cast(data.size() - 1), lookup_key); + std::cout << "Found key? " << std::to_string(found) << "\n"; + uint64_t sp = FLAGS_env->NowNanos(); + std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n"; + std::string* dat_ = p_slice.GetSelf(); + std::cout << "Sample data from Get API call: " << dat_->substr(0, 10) + << "\n"; + data.clear(); + + // GetMergeOperands API call + std::cout << "--- GetMergeOperands API --- \n"; + std::vector a_slice((kTotalValues / kListSize) + 1); + st = FLAGS_env->NowNanos(); + int number_of_operands = 0; + GetMergeOperandsOptions get_merge_operands_options; + get_merge_operands_options.expected_max_number_of_operands = + (kTotalValues / 100) + 1; + db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key, + a_slice.data(), &get_merge_operands_options, + &number_of_operands); + for (PinnableSlice& psl : a_slice) { + s.MakeVector(data, psl); + found = + binary_search(data, 0, static_cast(data.size() - 1), lookup_key); + data.clear(); + if (found) break; + } + std::cout << "Found key? " << std::to_string(found) << "\n"; + sp = FLAGS_env->NowNanos(); + std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0 + << " seconds \n"; + int to_print = 0; + std::cout << "Sample data from GetMergeOperands API call: "; + for (PinnableSlice& psl : a_slice) { + std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n"; + if (to_print++ > 2) break; + } + } + #ifndef ROCKSDB_LITE // This benchmark stress tests Transactions. For a given --duration (or // total number of --writes, a Transaction will perform a read-modify-write diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index caa9b0988..86501280d 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1146,9 +1146,11 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options, PinnableSlice index_entry; Status s; bool is_blob_index = false; - s = db_impl_->GetImpl(ro, column_family, key, &index_entry, - nullptr /*value_found*/, nullptr /*read_callback*/, - &is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = &index_entry; + get_impl_options.is_blob_index = &is_blob_index; + s = db_impl_->GetImpl(ro, key, get_impl_options); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (expiration != nullptr) { @@ -1535,9 +1537,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, SequenceNumber latest_seq = GetLatestSequenceNumber(); bool is_blob_index = false; PinnableSlice index_entry; - Status get_status = db_impl_->GetImpl( - ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, - nullptr /*read_callback*/, &is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cfh; + get_impl_options.value = &index_entry; + get_impl_options.is_blob_index = &is_blob_index; + Status get_status = + db_impl_->GetImpl(ReadOptions(), record.key, get_impl_options); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); if (!get_status.ok() && !get_status.IsNotFound()) { // error diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 4c720b822..6e3464bdf 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -23,6 +23,7 @@ class MergeOperators { static std::shared_ptr CreateStringAppendTESTOperator(); static std::shared_ptr CreateMaxOperator(); static std::shared_ptr CreateBytesXOROperator(); + static std::shared_ptr CreateSortOperator(); // Will return a different merge operator depending on the string. // TODO: Hook the "name" up to the actual Name() of the MergeOperators? @@ -42,6 +43,8 @@ class MergeOperators { return CreateMaxOperator(); } else if (name == "bytesxor") { return CreateBytesXOROperator(); + } else if (name == "sortlist") { + return CreateSortOperator(); } else { // Empty or unknown, just return nullptr return nullptr; diff --git a/utilities/merge_operators/sortlist.cc b/utilities/merge_operators/sortlist.cc new file mode 100644 index 000000000..5dbf05115 --- /dev/null +++ b/utilities/merge_operators/sortlist.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/sortlist.h" + +using rocksdb::Logger; +using rocksdb::MergeOperator; +using rocksdb::Slice; + +namespace rocksdb { + +bool SortList::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + std::vector left; + for (Slice slice : merge_in.operand_list) { + std::vector right; + MakeVector(right, slice); + left = Merge(left, right); + } + for (int i = 0; i < static_cast(left.size()) - 1; i++) { + merge_out->new_value.append(std::to_string(left[i])).append(","); + } + merge_out->new_value.append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const { + std::vector left; + std::vector right; + MakeVector(left, left_operand); + MakeVector(right, right_operand); + left = Merge(left, right); + for (int i = 0; i < static_cast(left.size()) - 1; i++) { + new_value->append(std::to_string(left[i])).append(","); + } + new_value->append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMergeMulti(const Slice& /*key*/, + const std::deque& operand_list, + std::string* new_value, + Logger* /*logger*/) const { + (void)operand_list; + (void)new_value; + return true; +} + +const char* SortList::Name() const { return "MergeSortOperator"; } + +void SortList::MakeVector(std::vector& operand, Slice slice) const { + do { + const char* begin = slice.data_; + while (*slice.data_ != ',' && *slice.data_) slice.data_++; + operand.push_back(std::stoi(std::string(begin, slice.data_))); + } while (0 != *slice.data_++); +} + +std::vector SortList::Merge(std::vector& left, + std::vector& right) const { + // Fill the resultant vector with sorted results from both vectors + std::vector result; + unsigned left_it = 0, right_it = 0; + + while (left_it < left.size() && right_it < right.size()) { + // If the left value is smaller than the right it goes next + // into the resultant vector + if (left[left_it] < right[right_it]) { + result.push_back(left[left_it]); + left_it++; + } else { + result.push_back(right[right_it]); + right_it++; + } + } + + // Push the remaining data from both vectors onto the resultant + while (left_it < left.size()) { + result.push_back(left[left_it]); + left_it++; + } + + while (right_it < right.size()) { + result.push_back(right[right_it]); + right_it++; + } + + return result; +} + +std::shared_ptr MergeOperators::CreateSortOperator() { + return std::make_shared(); +} +} // namespace rocksdb diff --git a/utilities/merge_operators/sortlist.h b/utilities/merge_operators/sortlist.h new file mode 100644 index 000000000..02c93edf5 --- /dev/null +++ b/utilities/merge_operators/sortlist.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// A MergeOperator for RocksDB that implements Merge Sort. +// It is built using the MergeOperator interface. The operator works by taking +// an input which contains one or more merge operands where each operand is a +// list of sorted ints and merges them to form a large sorted list. +#pragma once + +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace rocksdb { + +class SortList : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + bool PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const override; + + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const override; + + const char* Name() const override; + + void MakeVector(std::vector& operand, Slice slice) const; + + private: + std::vector Merge(std::vector& left, std::vector& right) const; +}; + +} // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 188f61120..8dfc0d1d4 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -290,8 +290,12 @@ Status WritePreparedTxn::RollbackInternal() { PinnableSlice pinnable_val; bool not_used; auto cf_handle = handles_[cf]; - s = db_->GetImpl(roptions_, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_->GetImpl(roptions_, key, get_impl_options); assert(s.ok() || s.IsNotFound()); if (s.ok()) { s = rollback_batch_->Put(cf_handle, key, pinnable_val); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index e6d710206..7441cb3c0 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -231,8 +231,12 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted, backed_by_snapshot); bool* dont_care = nullptr; - auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = value; + get_impl_options.value_found = dont_care; + get_impl_options.callback = &callback; + auto res = db_impl_->GetImpl(options, key, get_impl_options); if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) { return res; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index a1862d32d..321110ea1 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -567,8 +567,12 @@ Status WriteUnpreparedTxn::RollbackInternal() { const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; - s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_impl_->GetImpl(roptions, key, get_impl_options); if (s.ok()) { s = rollback_batch.Put(cf_handle, key, pinnable_val); @@ -721,8 +725,12 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; - s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_impl_->GetImpl(roptions, key, get_impl_options); if (s.ok()) { s = write_batch_.Put(cf_handle, key, pinnable_val); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index defaf9fce..3a8eff5ec 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -86,8 +86,12 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( PinnableSlice pinnable_val; bool not_used; auto cf_handle = handles_[cf]; - s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_->GetImpl(roptions, key, get_impl_options); assert(s.ok() || s.IsNotFound()); if (s.ok()) { s = rollback_batch_->Put(cf_handle, key, pinnable_val); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 3ffa2e0c6..272a2ab48 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -891,9 +891,12 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( if (!callback) { s = db->Get(read_options, column_family, key, pinnable_val); } else { + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = pinnable_val; + get_impl_options.callback = callback; s = static_cast_with_check(db->GetRootDB()) - ->GetImpl(read_options, column_family, key, pinnable_val, nullptr, - callback); + ->GetImpl(read_options, key, get_impl_options); } if (s.ok() || s.IsNotFound()) { // DB Get Succeeded