diff --git a/CMakeLists.txt b/CMakeLists.txt index bb99d1b7e..8622242aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -661,6 +661,7 @@ set(SOURCES utilities/merge_operators/bytesxor.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc + utilities/merge_operators/sortlist.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc @@ -887,6 +888,7 @@ if(WITH_TESTS) db/db_log_iter_test.cc db/db_memtable_test.cc db/db_merge_operator_test.cc + db/db_merge_operand_test.cc db/db_options_test.cc db/db_properties_test.cc db/db_range_del_test.cc diff --git a/Makefile b/Makefile index 4502be8e4..1718309cb 100644 --- a/Makefile +++ b/Makefile @@ -454,6 +454,7 @@ TESTS = \ db_iterator_test \ db_memtable_test \ db_merge_operator_test \ + db_merge_operand_test \ db_options_test \ db_range_del_test \ db_secondary_test \ @@ -1254,6 +1255,9 @@ db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 25d7ff667..bac5c4311 100644 --- a/TARGETS +++ b/TARGETS @@ -301,6 +301,7 @@ cpp_library( "utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", + "utilities/merge_operators/sortlist.cc", "utilities/merge_operators/string_append/stringappend.cc", "utilities/merge_operators/string_append/stringappend2.cc", "utilities/merge_operators/uint64add.cc", @@ -755,6 +756,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_merge_operand_test", + "db/db_merge_operand_test.cc", + "parallel", + [], + [], + ], [ "db_options_test", "db/db_options_test.cc", diff --git a/appveyor.yml b/appveyor.yml index 6bdb164e8..77901c407 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -60,7 +60,7 @@ build: test: test_script: - - ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test -Concurrency 8 + - ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test,db_merge_operand_test -Concurrency 8 on_failure: - cmd: 7z a build-failed.zip %APPVEYOR_BUILD_FOLDER%\build\ && appveyor PushArtifact build-failed.zip diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index 88928391a..13cccbd77 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, PinnableSlice* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, - nullptr, nullptr); + true, nullptr, nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(), &get_context, nullptr); @@ -70,7 +70,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, std::string& value = (*values)[idx]; GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, keys[idx], &pinnable_val, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context, nullptr); value.assign(pinnable_val.data(), pinnable_val.size()); diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc index 005a23d63..e9618885a 100644 --- a/db/db_blob_index_test.cc +++ b/db/db_blob_index_test.cc @@ -63,9 +63,11 @@ class DBBlobIndexTest : public DBTestBase { ReadOptions read_options; read_options.snapshot = snapshot; PinnableSlice value; - auto s = dbfull()->GetImpl(read_options, cfh(), key, &value, - nullptr /*value_found*/, nullptr /*callback*/, - is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cfh(); + get_impl_options.value = &value; + get_impl_options.is_blob_index = is_blob_index; + auto s = dbfull()->GetImpl(read_options, key, get_impl_options); if (s.IsNotFound()) { return "NOT_FOUND"; } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 81c44388b..9236d911e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1441,19 +1441,22 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const { Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - return GetImpl(read_options, column_family, key, value); + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = value; + return GetImpl(read_options, key, get_impl_options); } -Status DBImpl::GetImpl(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const Slice& key, - PinnableSlice* pinnable_val, bool* value_found, - ReadCallback* callback, bool* is_blob_index) { - assert(pinnable_val != nullptr); +Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, + GetImplOptions get_impl_options) { + assert(get_impl_options.value != nullptr || + get_impl_options.merge_operands != nullptr); PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); - auto cfh = reinterpret_cast(column_family); + auto cfh = + reinterpret_cast(get_impl_options.column_family); auto cfd = cfh->cfd(); if (tracer_) { @@ -1461,7 +1464,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // tracing is enabled. InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { - tracer_->Get(column_family, key); + tracer_->Get(get_impl_options.column_family, key); } } @@ -1473,9 +1476,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { - if (callback) { + if (get_impl_options.callback) { // Already calculated based on read_options.snapshot - snapshot = callback->max_visible_seq(); + snapshot = get_impl_options.callback->max_visible_seq(); } else { snapshot = reinterpret_cast(read_options.snapshot)->number_; @@ -1489,12 +1492,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); - if (callback) { + if (get_impl_options.callback) { // The unprep_seqs are not published for write unprepared, so it could be // that max_visible_seq is larger. Seek to the std::max of the two. // However, we still want our callback to contain the actual snapshot so // that it can do the correct visibility filtering. - callback->Refresh(snapshot); + get_impl_options.callback->Refresh(snapshot); // Internally, WriteUnpreparedTxnReadCallback::Refresh would set // max_visible_seq = max(max_visible_seq, snapshot) @@ -1505,7 +1508,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // be needed. // // assert(callback->max_visible_seq() >= snapshot); - snapshot = callback->max_visible_seq(); + snapshot = get_impl_options.callback->max_visible_seq(); } } TEST_SYNC_POINT("DBImpl::GetImpl:3"); @@ -1526,19 +1529,39 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } else if ((s.ok() || s.IsMergeInProgress()) && - sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &max_covering_tombstone_seq, read_options, callback, - is_blob_index)) { - done = true; - pinnable_val->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); + // Get value associated with key + if (get_impl_options.get_value) { + if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s, + &merge_context, &max_covering_tombstone_seq, + read_options, get_impl_options.callback, + get_impl_options.is_blob_index)) { + done = true; + get_impl_options.value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s, + &merge_context, &max_covering_tombstone_seq, + read_options, get_impl_options.callback, + get_impl_options.is_blob_index)) { + done = true; + get_impl_options.value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + } else { + // Get Merge Operands associated with key, Merge Operands should not be + // merged and raw values should be returned to the user. + if (sv->mem->Get(lkey, nullptr, &s, &merge_context, + &max_covering_tombstone_seq, read_options, nullptr, + nullptr, false)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->GetMergeOperands(lkey, &s, &merge_context, + &max_covering_tombstone_seq, + read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } } if (!done && !s.ok() && !s.IsMergeInProgress()) { ReturnAndCleanupSuperVersion(cfd, sv); @@ -1547,9 +1570,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &max_covering_tombstone_seq, value_found, nullptr, nullptr, - callback, is_blob_index); + sv->current->Get( + read_options, lkey, get_impl_options.value, &s, &merge_context, + &max_covering_tombstone_seq, + get_impl_options.get_value ? get_impl_options.value_found : nullptr, + nullptr, nullptr, + get_impl_options.get_value ? get_impl_options.callback : nullptr, + get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr, + get_impl_options.get_value); RecordTick(stats_, MEMTABLE_MISS); } @@ -1561,7 +1589,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, RecordTick(stats_, NUMBER_KEYS_READ); size_t size = 0; if (s.ok()) { - size = pinnable_val->size(); + if (get_impl_options.get_value) { + size = get_impl_options.value->size(); + } else { + // Return all merge operands for get_impl_options.key + *get_impl_options.number_of_operands = + static_cast(merge_context.GetNumOperands()); + if (*get_impl_options.number_of_operands > + get_impl_options.get_merge_operands_options + ->expected_max_number_of_operands) { + s = Status::Incomplete( + Status::SubCode::KMergeOperandsInsufficientCapacity); + } else { + for (const Slice& sl : merge_context.GetOperands()) { + size += sl.size(); + get_impl_options.merge_operands->PinSelf(sl); + get_impl_options.merge_operands++; + } + } + } RecordTick(stats_, BYTES_READ, size); PERF_COUNTER_ADD(get_read_bytes, size); } @@ -2222,7 +2268,11 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options, ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only PinnableSlice pinnable_val; - auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = value_found; + auto s = GetImpl(roptions, key, get_impl_options); value->assign(pinnable_val.data(), pinnable_val.size()); // If block_cache is enabled and the index block of the table didn't diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index fe3a2f6f2..f1dbc5d02 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -159,6 +159,21 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using DB::GetMergeOperands; + Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* merge_operands, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) override { + GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.merge_operands = merge_operands; + get_impl_options.get_merge_operands_options = get_merge_operands_options; + get_impl_options.number_of_operands = number_of_operands; + get_impl_options.get_value = false; + return GetImpl(options, key, get_impl_options); + } + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -395,12 +410,32 @@ class DBImpl : public DB { // ---- End of implementations of the DB interface ---- + struct GetImplOptions { + ColumnFamilyHandle* column_family = nullptr; + PinnableSlice* value = nullptr; + bool* value_found = nullptr; + ReadCallback* callback = nullptr; + bool* is_blob_index = nullptr; + // If true return value associated with key via value pointer else return + // all merge operands for key via merge_operands pointer + bool get_value = true; + // Pointer to an array of size + // get_merge_operands_options.expected_max_number_of_operands allocated by + // user + PinnableSlice* merge_operands = nullptr; + GetMergeOperandsOptions* get_merge_operands_options = nullptr; + int* number_of_operands = nullptr; + }; + // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); + // This function is also called by GetMergeOperands + // If get_impl_options.get_value = true get value associated with + // get_impl_options.key via get_impl_options.value + // If get_impl_options.get_value = false get merge operands associated with + // get_impl_options.key via get_impl_options.merge_operands + Status GetImpl(const ReadOptions& options, const Slice& key, + GetImplOptions get_impl_options); ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ColumnFamilyData* cfd, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index e3b2f5765..3c5fd4fcd 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -318,8 +318,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // We may ignore the dbname when generating the file names. for (auto& file : state.sst_delete_files) { candidate_files.emplace_back( - MakeTableFileName(file.metadata->fd.GetNumber()), - file.path); + MakeTableFileName(file.metadata->fd.GetNumber()), file.path); if (file.metadata->table_reader_handle) { table_cache_->Release(file.metadata->table_reader_handle); } diff --git a/db/db_merge_operand_test.cc b/db/db_merge_operand_test.cc new file mode 100644 index 000000000..e6280ad8c --- /dev/null +++ b/db/db_merge_operand_test.cc @@ -0,0 +1,240 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/utilities/debug.h" +#include "table/block_based/block_builder.h" +#include "test_util/fault_injection_test_env.h" +#if !defined(ROCKSDB_LITE) +#include "test_util/sync_point.h" +#endif +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/sortlist.h" +#include "utilities/merge_operators/string_append/stringappend2.h" + +namespace rocksdb { + +class DBMergeOperandTest : public DBTestBase { + public: + DBMergeOperandTest() : DBTestBase("/db_merge_operand_test") {} +}; + +TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) { + class LimitedStringAppendMergeOp : public StringAppendTESTOperator { + public: + LimitedStringAppendMergeOp(int limit, char delim) + : StringAppendTESTOperator(delim), limit_(limit) {} + + const char* Name() const override { + return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; + } + + bool ShouldMerge(const std::vector& operands) const override { + if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { + return true; + } + return false; + } + + private: + size_t limit_ = 0; + }; + + Options options; + options.create_if_missing = true; + // Use only the latest two merge operands. + options.merge_operator = std::make_shared(2, ','); + options.env = env_; + Reopen(options); + int num_records = 4; + int number_of_operands = 0; + std::vector values(num_records); + GetMergeOperandsOptions merge_operands_info; + merge_operands_info.expected_max_number_of_operands = num_records; + + // k0 value in memtable + Put("k0", "PutARock"); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "PutARock"); + + // k0.1 value in SST + Put("k0.1", "RockInSST"); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "RockInSST"); + + // All k1 values are in memtable. + ASSERT_OK(Merge("k1", "a")); + Put("k1", "x"); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "d")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "x"); + ASSERT_EQ(values[1], "b"); + ASSERT_EQ(values[2], "c"); + ASSERT_EQ(values[3], "d"); + + // expected_max_number_of_operands is less than number of merge operands so + // status should be Incomplete. + merge_operands_info.expected_max_number_of_operands = num_records - 1; + Status status = db_->GetMergeOperands( + ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), + &merge_operands_info, &number_of_operands); + ASSERT_EQ(status.IsIncomplete(), true); + merge_operands_info.expected_max_number_of_operands = num_records; + + // All k1.1 values are in memtable. + ASSERT_OK(Merge("k1.1", "r")); + Delete("k1.1"); + ASSERT_OK(Merge("k1.1", "c")); + ASSERT_OK(Merge("k1.1", "k")); + ASSERT_OK(Merge("k1.1", "s")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "c"); + ASSERT_EQ(values[1], "k"); + ASSERT_EQ(values[2], "s"); + + // All k2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2", "q")); + ASSERT_OK(Merge("k2", "w")); + ASSERT_OK(Merge("k2", "e")); + ASSERT_OK(Merge("k2", "r")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "q"); + ASSERT_EQ(values[1], "w"); + ASSERT_EQ(values[2], "e"); + ASSERT_EQ(values[3], "r"); + + // All k2.1 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2.1", "m")); + Put("k2.1", "l"); + ASSERT_OK(Merge("k2.1", "n")); + ASSERT_OK(Merge("k2.1", "o")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "l,n,o"); + + // All k2.2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2.2", "g")); + Delete("k2.2"); + ASSERT_OK(Merge("k2.2", "o")); + ASSERT_OK(Merge("k2.2", "t")); + ASSERT_OK(Flush()); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "o,t"); + + // Do some compaction that will make the following tests more predictable + // Slice start("PutARock"); + // Slice end("t"); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // All k3 values are flushed and are in different files. + ASSERT_OK(Merge("k3", "ab")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "bc")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "ab"); + ASSERT_EQ(values[1], "bc"); + ASSERT_EQ(values[2], "cd"); + ASSERT_EQ(values[3], "de"); + + // All k3.1 values are flushed and are in different files. + ASSERT_OK(Merge("k3.1", "ab")); + ASSERT_OK(Flush()); + Put("k3.1", "bc"); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.1", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.1", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.1", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "bc"); + ASSERT_EQ(values[1], "cd"); + ASSERT_EQ(values[2], "de"); + + // All k3.2 values are flushed and are in different files. + ASSERT_OK(Merge("k3.2", "ab")); + ASSERT_OK(Flush()); + Delete("k3.2"); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.2", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3.2", "de")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.2", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "cd"); + ASSERT_EQ(values[1], "de"); + + // All K4 values are in different levels + ASSERT_OK(Merge("k4", "ba")); + ASSERT_OK(Flush()); + MoveFilesToLevel(4); + ASSERT_OK(Merge("k4", "cb")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_OK(Merge("k4", "dc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Merge("k4", "ed")); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "ba"); + ASSERT_EQ(values[1], "cb"); + ASSERT_EQ(values[2], "dc"); + ASSERT_EQ(values[3], "ed"); + + // First 3 k5 values are in SST and next 4 k5 values are in Immutable Memtable + ASSERT_OK(Merge("k5", "who")); + ASSERT_OK(Merge("k5", "am")); + ASSERT_OK(Merge("k5", "i")); + ASSERT_OK(Flush()); + Put("k5", "remember"); + ASSERT_OK(Merge("k5", "i")); + ASSERT_OK(Merge("k5", "am")); + ASSERT_OK(Merge("k5", "rocks")); + dbfull()->TEST_SwitchMemtable(); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k5", + values.data(), &merge_operands_info, + &number_of_operands); + ASSERT_EQ(values[0], "remember"); + ASSERT_EQ(values[1], "i"); + ASSERT_EQ(values[2], "am"); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 31bd2e491..8358ddb56 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -46,9 +46,11 @@ class DBMergeOperatorTest : public DBTestBase { ReadOptions read_opt; read_opt.snapshot = snapshot; PinnableSlice value; - Status s = - dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value, - nullptr /*value_found*/, &read_callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = db_->DefaultColumnFamily(); + get_impl_options.value = &value; + get_impl_options.callback = &read_callback; + Status s = dbfull()->GetImpl(read_opt, key, get_impl_options); if (!s.ok()) { return s.ToString(); } diff --git a/db/db_test.cc b/db/db_test.cc index f53afa17d..5c96bec36 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2540,6 +2540,15 @@ class ModelDB : public DB { return Status::NotSupported(key); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands( + const ReadOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, + const Slice& key, PinnableSlice* /*slice*/, + GetMergeOperandsOptions* /*merge_operands_options*/, + int* /*number_of_operands*/) override { + return Status::NotSupported(key); + } + using DB::MultiGet; std::vector MultiGet( const ReadOptions& /*options*/, diff --git a/db/db_test2.cc b/db/db_test2.cc index 3664b3a24..26604c53a 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2797,8 +2797,12 @@ TEST_F(DBTest2, ReadCallbackTest) { ReadOptions roptions; TestReadCallback callback(seq); bool dont_care = true; - Status s = dbfull()->GetImpl(roptions, dbfull()->DefaultColumnFamily(), key, - &pinnable_val, &dont_care, &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = dbfull()->DefaultColumnFamily(); + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = &dont_care; + get_impl_options.callback = &callback; + Status s = dbfull()->GetImpl(roptions, key, get_impl_options); ASSERT_TRUE(s.ok()); // Assuming that after each Put the DB increased seq by one, the value and // seq number must be equal since we also inc value by 1 after each Put. diff --git a/db/memtable.cc b/db/memtable.cc index fdd1a577a..62c7339b5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -601,6 +601,7 @@ struct Saver { Logger* logger; Statistics* statistics; bool inplace_update_support; + bool do_merge; Env* env_; ReadCallback* callback_; bool* is_blob_index; @@ -627,7 +628,7 @@ static bool SaveValue(void* arg, const char* entry) { // klength varint32 // userkey char[klength-8] // tag uint64 - // vlength varint32 + // vlength varint32f // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped @@ -677,12 +678,24 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { - if (s->value != nullptr) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), s->value, s->logger, - s->statistics, s->env_, nullptr /* result_operand */, true); + if (s->do_merge) { + if (s->value != nullptr) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->value, s->logger, + s->statistics, s->env_, nullptr /* result_operand */, true); + } + } else { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); } + } else if (!s->do_merge) { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); } else if (s->value != nullptr) { s->value->assign(v.data(), v.size()); } @@ -726,7 +739,8 @@ static bool SaveValue(void* arg, const char* entry) { *(s->merge_in_progress) = true; merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); - if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) { + if (s->do_merge && merge_operator->ShouldMerge( + merge_context->GetOperandsDirectionBackward())) { *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, merge_context->GetOperands(), s->value, s->logger, s->statistics, @@ -750,7 +764,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback, bool* is_blob_index) { + ReadCallback* callback, bool* is_blob_index, bool do_merge) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -810,8 +824,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.env_ = env_; saver.callback_ = callback; saver.is_blob_index = is_blob_index; + saver.do_merge = do_merge; table_->Get(key, &saver, SaveValue); - *seq = saver.seq; } diff --git a/db/memtable.h b/db/memtable.h index 6b8c4141f..36ba0df79 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -175,6 +175,10 @@ class MemTable { const Slice& value, bool allow_concurrent = false, MemTablePostProcessInfo* post_process_info = nullptr); + // Used to Get value associated with key or Get Merge Operands associated + // with key. + // If do_merge = true the default behavior which is Get value for key is + // executed. Expected behavior is described right below. // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. @@ -188,20 +192,23 @@ class MemTable { // returned). Otherwise, *seq will be set to kMaxSequenceNumber. // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // status returned indicates a corruption or other unexpected error. + // If do_merge = false then any Merge Operands encountered for key are simply + // stored in merge_context.operands_list and never actually merged to get a + // final value. The raw Merge Operands are eventually returned to the user. bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); + bool* is_blob_index = nullptr, bool do_merge = true); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr) { + bool* is_blob_index = nullptr, bool do_merge = true) { SequenceNumber seq; return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, - read_opts, callback, is_blob_index); + read_opts, callback, is_blob_index, do_merge); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0f796eb9a..d06a82df8 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -109,6 +109,20 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, is_blob_index); } +bool MemTableListVersion::GetMergeOperands( + const LookupKey& key, Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { + for (MemTable* memtable : memlist_) { + bool done = memtable->Get(key, nullptr, s, merge_context, + max_covering_tombstone_seq, read_opts, nullptr, + nullptr, false); + if (done) { + return true; + } + } + return false; +} + bool MemTableListVersion::GetFromHistory( const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, diff --git a/db/memtable_list.h b/db/memtable_list.h index a72077ff3..2bd225b83 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -71,6 +71,13 @@ class MemTableListVersion { read_opts, callback, is_blob_index); } + // Returns all the merge operands corresponding to the key by searching all + // memtables starting from the most recent one. + bool GetMergeOperands(const LookupKey& key, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts); + // Similar to Get(), but searches the Memtable history of memtables that // have already been flushed. Should only be used from in-memory only // queries (such as Transaction validation) as the history may contain diff --git a/db/version_set.cc b/db/version_set.cc index 3a1f47790..af0168f76 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1651,7 +1651,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found, bool* key_exists, SequenceNumber* seq, ReadCallback* callback, - bool* is_blob) { + bool* is_blob, bool do_merge) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -1671,8 +1671,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, max_covering_tombstone_seq, this->env_, - seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, + do_merge ? value : nullptr, value_found, merge_context, do_merge, + max_covering_tombstone_seq, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_get_id); // Pin blocks that we read to hold merge operands @@ -1737,7 +1738,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } else if (fp.GetHitFileLevel() >= 2) { RecordTick(db_statistics_, GET_HIT_L2_AND_UP); } - PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); return; case GetContext::kDeleted: // Use empty error message for speed @@ -1755,11 +1757,14 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } f = fp.GetNextFile(); } - if (db_statistics_ != nullptr) { get_context.ReportCounters(); } if (GetContext::kMerge == get_context.State()) { + if (!do_merge) { + *status = Status::OK(); + return; + } if (!merge_operator_) { *status = Status::InvalidArgument( "merge_operator is not properly initialized."); @@ -1806,7 +1811,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, - iter->value, nullptr, &(iter->merge_context), + iter->value, nullptr, &(iter->merge_context), true, &iter->max_covering_tombstone_seq, this->env_, &iter->seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); diff --git a/db/version_set.h b/db/version_set.h index 391bb902c..25598630e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -63,7 +63,6 @@ class VersionSet; class WriteBufferManager; class MergeContext; class ColumnFamilySet; -class TableCache; class MergeIteratorBuilder; // Return the smallest index i such that file_level.files[i]->largest >= key. @@ -561,28 +560,33 @@ class Version { const Slice& largest_user_key, int level, bool* overlap); - // Lookup the value for key. If found, store it in *val and - // return OK. Else return a non-OK status. - // Uses *operands to store merge_operator operations to apply later. + // Lookup the value for key or get all merge operands for key. + // If do_merge = true (default) then lookup value for key. + // Behavior if do_merge = true: + // If found, store it in *value and + // return OK. Else return a non-OK status. + // Uses *operands to store merge_operator operations to apply later. // - // If the ReadOptions.read_tier is set to do a read-only fetch, then - // *value_found will be set to false if it cannot be determined whether - // this value exists without doing IO. + // If the ReadOptions.read_tier is set to do a read-only fetch, then + // *value_found will be set to false if it cannot be determined whether + // this value exists without doing IO. // - // If the key is Deleted, *status will be set to NotFound and + // If the key is Deleted, *status will be set to NotFound and // *key_exists will be set to true. - // If no key was found, *status will be set to NotFound and + // If no key was found, *status will be set to NotFound and // *key_exists will be set to false. - // If seq is non-null, *seq will be set to the sequence number found - // for the key if a key was found. - // + // If seq is non-null, *seq will be set to the sequence number found + // for the key if a key was found. + // Behavior if do_merge = false + // If the key has any merge operands then store them in + // merge_context.operands_list and don't merge the operands // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, - bool* is_blob = nullptr); + bool* is_blob = nullptr, bool do_merge = true); void MultiGet(const ReadOptions&, MultiGetRange* range, ReadCallback* callback = nullptr, bool* is_blob = nullptr); diff --git a/file/filename.cc b/file/filename.cc index 65ec33149..ba5d84c29 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -60,8 +60,7 @@ static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) { static std::string MakeFileName(uint64_t number, const char* suffix) { char buf[100]; snprintf(buf, sizeof(buf), "%06llu.%s", - static_cast(number), - suffix); + static_cast(number), suffix); return buf; } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 1d90dc50b..36d6fea92 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -116,6 +116,10 @@ struct IngestExternalFileArg { IngestExternalFileOptions options; }; +struct GetMergeOperandsOptions { + int expected_max_number_of_operands = 0; +}; + // A collections of table properties objects, where // key: is the table's file name. // value: the table properties object of the given table. @@ -403,6 +407,22 @@ class DB { return Get(options, DefaultColumnFamily(), key, value); } + // Returns all the merge operands corresponding to the key. If the + // number of merge operands in DB is greater than + // merge_operands_options.expected_max_number_of_operands + // no merge operands are returned and status is Incomplete. Merge operands + // returned are in the order of insertion. + // merge_operands- Points to an array of at-least + // merge_operands_options.expected_max_number_of_operands and the + // caller is responsible for allocating it. If the status + // returned is Incomplete then number_of_operands will contain + // the total number of merge operands found in DB for key. + virtual Status GetMergeOperands( + const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* merge_operands, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) = 0; + // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index ac97ce442..e4360126d 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -76,6 +76,7 @@ class Status { kMemoryLimit = 7, kSpaceLimit = 8, kPathNotFound = 9, + KMergeOperandsInsufficientCapacity = 10, kMaxSubCode }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 67bf4e2fa..35fddc804 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -88,6 +88,17 @@ class StackableDB : public DB { return db_->Get(options, column_family, key, value); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands( + const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* slice, + GetMergeOperandsOptions* get_merge_operands_options, + int* number_of_operands) override { + return db_->GetMergeOperands(options, column_family, key, slice, + get_merge_operands_options, + number_of_operands); + } + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/src.mk b/src.mk index 0c6142e41..6d1d655c7 100644 --- a/src.mk +++ b/src.mk @@ -191,6 +191,7 @@ LIB_SOURCES = \ utilities/memory/memory_util.cc \ utilities/merge_operators/max.cc \ utilities/merge_operators/put.cc \ + utilities/merge_operators/sortlist.cc \ utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/uint64add.cc \ @@ -291,6 +292,7 @@ MAIN_SOURCES = \ db/db_log_iter_test.cc \ db/db_memtable_test.cc \ db/db_merge_operator_test.cc \ + db/db_merge_operand_test.cc \ db/db_options_test.cc \ db/db_properties_test.cc \ db/db_range_del_test.cc \ diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 484617d7e..ae23f6ef2 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -631,7 +631,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 60, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -656,7 +656,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 60, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -681,7 +681,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 120, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -706,7 +706,7 @@ TEST(DataBlockHashIndex, BlockBoundary) { InternalKey seek_ikey(seek_ukey, 5, kTypeValue); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, seek_ukey, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options); ASSERT_EQ(get_context.State(), GetContext::kNotFound); diff --git a/table/cuckoo/cuckoo_table_reader_test.cc b/table/cuckoo/cuckoo_table_reader_test.cc index dd1557db1..8043d36ab 100644 --- a/table/cuckoo/cuckoo_table_reader_test.cc +++ b/table/cuckoo/cuckoo_table_reader_test.cc @@ -122,7 +122,7 @@ class CuckooReaderTest : public testing::Test { PinnableSlice value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(keys[i]), &get_context, nullptr)); ASSERT_STREQ(values[i].c_str(), value.data()); @@ -336,7 +336,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AppendInternalKey(¬_found_key, ikey); PinnableSlice value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, - Slice(not_found_key), &value, nullptr, nullptr, + Slice(not_found_key), &value, nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(not_found_key), &get_context, nullptr)); @@ -351,7 +351,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { value.Reset(); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2, nullptr)); ASSERT_TRUE(value.empty()); @@ -367,7 +367,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { value.Reset(); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, - nullptr, nullptr, nullptr, nullptr); + nullptr, nullptr, true, nullptr, nullptr); ASSERT_OK( reader.Get(ReadOptions(), Slice(unused_key), &get_context3, nullptr)); ASSERT_TRUE(value.empty()); @@ -443,7 +443,7 @@ void WriteFile(const std::vector& keys, // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { value.Reset(); value.clear(); @@ -491,7 +491,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); uint64_t start_time = env->NowMicros(); if (batch_size > 0) { for (uint64_t i = 0; i < num; i += batch_size) { diff --git a/table/get_context.cc b/table/get_context.cc index f0c7928bf..cdb5798f7 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -42,9 +42,9 @@ GetContext::GetContext( const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, - SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq, - PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, - bool* is_blob_index, uint64_t tracing_get_id) + bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env, + SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, + ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -60,6 +60,7 @@ GetContext::GetContext( replay_log_(nullptr), pinned_iters_mgr_(_pinned_iters_mgr), callback_(callback), + do_merge_(do_merge), is_blob_index_(is_blob_index), tracing_get_id_(tracing_get_id) { if (seq_) { @@ -215,29 +216,44 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } if (kNotFound == state_) { state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - if (LIKELY(value_pinner != nullptr)) { - // If the backing resources for the value are provided, pin them - pinnable_val_->PinSlice(value, value_pinner); - } else { - TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", this); + if (do_merge_) { + if (LIKELY(pinnable_val_ != nullptr)) { + if (LIKELY(value_pinner != nullptr)) { + // If the backing resources for the value are provided, pin them + pinnable_val_->PinSlice(value, value_pinner); + } else { + TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", + this); - // Otherwise copy the value - pinnable_val_->PinSelf(value); + // Otherwise copy the value + pinnable_val_->PinSelf(value); + } } + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(value, value_pinner); } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); state_ = kFound; - if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, &value, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + if (do_merge_) { + if (LIKELY(pinnable_val_ != nullptr)) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, &value, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } + } else { + // It means this function is called as part of DB GetMergeOperands + // API and the current value should be part of + // merge_context_->operand_list + push_operand(value, value_pinner); } } if (is_blob_index_ != nullptr) { @@ -256,14 +272,18 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } + // If do_merge_ = false then the current value shouldn't be part of + // merge_context_->operand_list } } return false; @@ -272,24 +292,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); state_ = kMerge; // value_pinner is not set from plain_table_reader.cc for example. - if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && - value_pinner != nullptr) { - value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); - merge_context_->PushOperand(value, true /*value_pinned*/); - } else { - merge_context_->PushOperand(value, false); - } - if (merge_operator_ != nullptr && - merge_operator_->ShouldMerge(merge_context_->GetOperandsDirectionBackward())) { + push_operand(value, value_pinner); + if (do_merge_ && merge_operator_ != nullptr && + merge_operator_->ShouldMerge( + merge_context_->GetOperandsDirectionBackward())) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; + // do_merge_ = true this is the case where this function is called + // as part of DB Get API hence merge operators should be merged. + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } } } return false; @@ -306,6 +325,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; } +void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } +} + void replayGetContextLog(const Slice& replay_log, const Slice& user_key, GetContext* get_context, Cleanable* value_pinner) { #ifndef ROCKSDB_LITE diff --git a/table/get_context.h b/table/get_context.h index 7110ceae8..97d73ec0b 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -66,6 +66,9 @@ class GetContext { GetContextStats get_context_stats_; // Constructor + // @param value Holds the value corresponding to user_key. If its nullptr + // then return all merge operands corresponding to user_key + // via merge_context // @param value_found If non-nullptr, set to false if key may be present // but we can't be certain because we cannot do IO // @param max_covering_tombstone_seq Pointer to highest sequence number of @@ -78,10 +81,14 @@ class GetContext { // for visibility of a key // @param is_blob_index If non-nullptr, will be used to indicate if a found // key is of type blob index + // @param do_merge True if value associated with user_key has to be returned + // and false if all the merge operands associated with user_key has to be + // returned. Id do_merge=false then all the merge operands are stored in + // merge_context and they are never merged. The value pointer is untouched. GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* value, bool* value_found, - MergeContext* merge_context, + MergeContext* merge_context, bool do_merge, SequenceNumber* max_covering_tombstone_seq, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, @@ -140,6 +147,8 @@ class GetContext { uint64_t get_tracing_get_id() const { return tracing_get_id_; } + void push_operand(const Slice& value, Cleanable* value_pinner); + private: const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -162,6 +171,10 @@ class GetContext { PinnedIteratorsManager* pinned_iters_mgr_; ReadCallback* callback_; bool sample_; + // Value is true if it's called as part of DB Get API and false if it's + // called as part of DB GetMergeOperands API. When it's false merge operators + // are never merged. + bool do_merge_; bool* is_blob_index_; // Used for block cache tracing only. A tracing get id uniquely identifies a // Get or a MultiGet. diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index cec62df59..45d760f0e 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -175,7 +175,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, ioptions.merge_operator, ioptions.info_log, ioptions.statistics, GetContext::kNotFound, Slice(key), &value, nullptr, &merge_context, - &max_covering_tombstone_seq, env); + true, &max_covering_tombstone_seq, env); s = table_reader->Get(read_options, key, &get_context, nullptr); } else { s = db->Get(read_options, key, &result); diff --git a/table/table_test.cc b/table/table_test.cc index 6cd26bc73..749048b78 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2323,8 +2323,8 @@ TEST_P(BlockBasedTableTest, TracingGetTest) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, - nullptr, /*get_id=*/i); + nullptr, true, nullptr, nullptr, nullptr, nullptr, + nullptr, nullptr, /*tracing_get_id=*/i); get_perf_context()->Reset(); ASSERT_OK(c.GetTableReader()->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -2579,7 +2579,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), nullptr, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); // a hack that just to trigger BlockBasedTable::GetFilter. reader->Get(ReadOptions(), "non-exist-key", &get_context, moptions.prefix_extractor.get()); @@ -2750,7 +2750,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), internal_key.Encode(), &get_context, moptions4.prefix_extractor.get())); ASSERT_STREQ(value.data(), "hello"); @@ -2836,7 +2836,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); get_perf_context()->Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -2862,7 +2862,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); get_perf_context()->Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context, moptions.prefix_extractor.get())); @@ -4230,7 +4230,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) { std::string user_key = ExtractUserKey(kv.first).ToString(); GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ro, kv.first, &get_context, moptions.prefix_extractor.get())); ASSERT_EQ(get_context.State(), GetContext::kFound); @@ -4256,7 +4256,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) { PinnableSlice value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr, nullptr); + nullptr, true, nullptr, nullptr); ASSERT_OK(reader->Get(ro, encoded_key, &get_context, moptions.prefix_extractor.get())); ASSERT_EQ(get_context.State(), GetContext::kNotFound); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f6a9d9458..001dd4d2f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -71,6 +71,7 @@ #include "utilities/blob_db/blob_db.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/bytesxor.h" +#include "utilities/merge_operators/sortlist.h" #include "utilities/persistent_cache/block_cache_tier.h" #ifdef OS_WIN @@ -120,7 +121,8 @@ DEFINE_string( "fillseekseq," "randomtransaction," "randomreplacekeys," - "timeseries", + "timeseries," + "getmergeoperands", "Comma-separated list of operations to run in the specified" " order. Available benchmarks:\n" @@ -190,7 +192,13 @@ DEFINE_string( "\tlevelstats -- Print the number of files and bytes per level\n" "\tsstables -- Print sstable info\n" "\theapprofile -- Dump a heap profile (if supported by this port)\n" - "\treplay -- replay the trace file specified with trace_file\n"); + "\treplay -- replay the trace file specified with trace_file\n" + "\tgetmergeoperands -- Insert lots of merge records which are a list of " + "sorted ints for a key and then compare performance of lookup for another " + "key " + "by doing a Get followed by binary searching in the large sorted list vs " + "doing a GetMergeOperands and binary searching in the operands which are" + "sorted sub-lists. The MergeOperator used is sortlist.h\n"); DEFINE_int64(num, 1000000, "Number of key/values to place in database"); @@ -2880,6 +2888,8 @@ class Benchmark { exit(1); } method = &Benchmark::Replay; + } else if (name == "getmergeoperands") { + method = &Benchmark::GetMergeOperands; } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); exit(1); @@ -5921,6 +5931,97 @@ class Benchmark { } } + bool binary_search(std::vector& data, int start, int end, int key) { + if (data.empty()) return false; + if (start > end) return false; + int mid = start + (end - start) / 2; + if (mid > static_cast(data.size()) - 1) return false; + if (data[mid] == key) { + return true; + } else if (data[mid] > key) { + return binary_search(data, start, mid - 1, key); + } else { + return binary_search(data, mid + 1, end, key); + } + } + + // Does a bunch of merge operations for a key(key1) where the merge operand + // is a sorted list. Next performance comparison is done between doing a Get + // for key1 followed by searching for another key(key2) in the large sorted + // list vs calling GetMergeOperands for key1 and then searching for the key2 + // in all the sorted sub-lists. Later case is expected to be a lot faster. + void GetMergeOperands(ThreadState* thread) { + DB* db = SelectDB(thread); + const int kTotalValues = 100000; + const int kListSize = 100; + std::string key = "my_key"; + std::string value; + + for (int i = 1; i < kTotalValues; i++) { + if (i % kListSize == 0) { + // Remove trailing ',' + value.pop_back(); + db->Merge(WriteOptions(), key, value); + value.clear(); + } else { + value.append(std::to_string(i)).append(","); + } + } + + SortList s; + std::vector data; + // This value can be experimented with and it will demonstrate the + // perf difference between doing a Get and searching for lookup_key in the + // resultant large sorted list vs doing GetMergeOperands and searching + // for lookup_key within this resultant sorted sub-lists. + int lookup_key = 1; + + // Get API call + std::cout << "--- Get API call --- \n"; + PinnableSlice p_slice; + uint64_t st = FLAGS_env->NowNanos(); + db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice); + s.MakeVector(data, p_slice); + bool found = + binary_search(data, 0, static_cast(data.size() - 1), lookup_key); + std::cout << "Found key? " << std::to_string(found) << "\n"; + uint64_t sp = FLAGS_env->NowNanos(); + std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n"; + std::string* dat_ = p_slice.GetSelf(); + std::cout << "Sample data from Get API call: " << dat_->substr(0, 10) + << "\n"; + data.clear(); + + // GetMergeOperands API call + std::cout << "--- GetMergeOperands API --- \n"; + std::vector a_slice((kTotalValues / kListSize) + 1); + st = FLAGS_env->NowNanos(); + int number_of_operands = 0; + GetMergeOperandsOptions get_merge_operands_options; + get_merge_operands_options.expected_max_number_of_operands = + (kTotalValues / 100) + 1; + db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key, + a_slice.data(), &get_merge_operands_options, + &number_of_operands); + for (PinnableSlice& psl : a_slice) { + s.MakeVector(data, psl); + found = + binary_search(data, 0, static_cast(data.size() - 1), lookup_key); + data.clear(); + if (found) break; + } + std::cout << "Found key? " << std::to_string(found) << "\n"; + sp = FLAGS_env->NowNanos(); + std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0 + << " seconds \n"; + int to_print = 0; + std::cout << "Sample data from GetMergeOperands API call: "; + for (PinnableSlice& psl : a_slice) { + std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n"; + if (to_print++ > 2) break; + } + } + #ifndef ROCKSDB_LITE // This benchmark stress tests Transactions. For a given --duration (or // total number of --writes, a Transaction will perform a read-modify-write diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index caa9b0988..86501280d 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1146,9 +1146,11 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options, PinnableSlice index_entry; Status s; bool is_blob_index = false; - s = db_impl_->GetImpl(ro, column_family, key, &index_entry, - nullptr /*value_found*/, nullptr /*read_callback*/, - &is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = &index_entry; + get_impl_options.is_blob_index = &is_blob_index; + s = db_impl_->GetImpl(ro, key, get_impl_options); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (expiration != nullptr) { @@ -1535,9 +1537,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, SequenceNumber latest_seq = GetLatestSequenceNumber(); bool is_blob_index = false; PinnableSlice index_entry; - Status get_status = db_impl_->GetImpl( - ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, - nullptr /*read_callback*/, &is_blob_index); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cfh; + get_impl_options.value = &index_entry; + get_impl_options.is_blob_index = &is_blob_index; + Status get_status = + db_impl_->GetImpl(ReadOptions(), record.key, get_impl_options); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); if (!get_status.ok() && !get_status.IsNotFound()) { // error diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 4c720b822..6e3464bdf 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -23,6 +23,7 @@ class MergeOperators { static std::shared_ptr CreateStringAppendTESTOperator(); static std::shared_ptr CreateMaxOperator(); static std::shared_ptr CreateBytesXOROperator(); + static std::shared_ptr CreateSortOperator(); // Will return a different merge operator depending on the string. // TODO: Hook the "name" up to the actual Name() of the MergeOperators? @@ -42,6 +43,8 @@ class MergeOperators { return CreateMaxOperator(); } else if (name == "bytesxor") { return CreateBytesXOROperator(); + } else if (name == "sortlist") { + return CreateSortOperator(); } else { // Empty or unknown, just return nullptr return nullptr; diff --git a/utilities/merge_operators/sortlist.cc b/utilities/merge_operators/sortlist.cc new file mode 100644 index 000000000..5dbf05115 --- /dev/null +++ b/utilities/merge_operators/sortlist.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/sortlist.h" + +using rocksdb::Logger; +using rocksdb::MergeOperator; +using rocksdb::Slice; + +namespace rocksdb { + +bool SortList::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + std::vector left; + for (Slice slice : merge_in.operand_list) { + std::vector right; + MakeVector(right, slice); + left = Merge(left, right); + } + for (int i = 0; i < static_cast(left.size()) - 1; i++) { + merge_out->new_value.append(std::to_string(left[i])).append(","); + } + merge_out->new_value.append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const { + std::vector left; + std::vector right; + MakeVector(left, left_operand); + MakeVector(right, right_operand); + left = Merge(left, right); + for (int i = 0; i < static_cast(left.size()) - 1; i++) { + new_value->append(std::to_string(left[i])).append(","); + } + new_value->append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMergeMulti(const Slice& /*key*/, + const std::deque& operand_list, + std::string* new_value, + Logger* /*logger*/) const { + (void)operand_list; + (void)new_value; + return true; +} + +const char* SortList::Name() const { return "MergeSortOperator"; } + +void SortList::MakeVector(std::vector& operand, Slice slice) const { + do { + const char* begin = slice.data_; + while (*slice.data_ != ',' && *slice.data_) slice.data_++; + operand.push_back(std::stoi(std::string(begin, slice.data_))); + } while (0 != *slice.data_++); +} + +std::vector SortList::Merge(std::vector& left, + std::vector& right) const { + // Fill the resultant vector with sorted results from both vectors + std::vector result; + unsigned left_it = 0, right_it = 0; + + while (left_it < left.size() && right_it < right.size()) { + // If the left value is smaller than the right it goes next + // into the resultant vector + if (left[left_it] < right[right_it]) { + result.push_back(left[left_it]); + left_it++; + } else { + result.push_back(right[right_it]); + right_it++; + } + } + + // Push the remaining data from both vectors onto the resultant + while (left_it < left.size()) { + result.push_back(left[left_it]); + left_it++; + } + + while (right_it < right.size()) { + result.push_back(right[right_it]); + right_it++; + } + + return result; +} + +std::shared_ptr MergeOperators::CreateSortOperator() { + return std::make_shared(); +} +} // namespace rocksdb diff --git a/utilities/merge_operators/sortlist.h b/utilities/merge_operators/sortlist.h new file mode 100644 index 000000000..02c93edf5 --- /dev/null +++ b/utilities/merge_operators/sortlist.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// A MergeOperator for RocksDB that implements Merge Sort. +// It is built using the MergeOperator interface. The operator works by taking +// an input which contains one or more merge operands where each operand is a +// list of sorted ints and merges them to form a large sorted list. +#pragma once + +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace rocksdb { + +class SortList : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + bool PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const override; + + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const override; + + const char* Name() const override; + + void MakeVector(std::vector& operand, Slice slice) const; + + private: + std::vector Merge(std::vector& left, std::vector& right) const; +}; + +} // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 188f61120..8dfc0d1d4 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -290,8 +290,12 @@ Status WritePreparedTxn::RollbackInternal() { PinnableSlice pinnable_val; bool not_used; auto cf_handle = handles_[cf]; - s = db_->GetImpl(roptions_, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_->GetImpl(roptions_, key, get_impl_options); assert(s.ok() || s.IsNotFound()); if (s.ok()) { s = rollback_batch_->Put(cf_handle, key, pinnable_val); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index e6d710206..7441cb3c0 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -231,8 +231,12 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted, backed_by_snapshot); bool* dont_care = nullptr; - auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = value; + get_impl_options.value_found = dont_care; + get_impl_options.callback = &callback; + auto res = db_impl_->GetImpl(options, key, get_impl_options); if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) { return res; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index a1862d32d..321110ea1 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -567,8 +567,12 @@ Status WriteUnpreparedTxn::RollbackInternal() { const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; - s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_impl_->GetImpl(roptions, key, get_impl_options); if (s.ok()) { s = rollback_batch.Put(cf_handle, key, pinnable_val); @@ -721,8 +725,12 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; - s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_impl_->GetImpl(roptions, key, get_impl_options); if (s.ok()) { s = write_batch_.Put(cf_handle, key, pinnable_val); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index defaf9fce..3a8eff5ec 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -86,8 +86,12 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( PinnableSlice pinnable_val; bool not_used; auto cf_handle = handles_[cf]; - s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = cf_handle; + get_impl_options.value = &pinnable_val; + get_impl_options.value_found = ¬_used; + get_impl_options.callback = &callback; + s = db_->GetImpl(roptions, key, get_impl_options); assert(s.ok() || s.IsNotFound()); if (s.ok()) { s = rollback_batch_->Put(cf_handle, key, pinnable_val); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 3ffa2e0c6..272a2ab48 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -891,9 +891,12 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( if (!callback) { s = db->Get(read_options, column_family, key, pinnable_val); } else { + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = pinnable_val; + get_impl_options.callback = callback; s = static_cast_with_check(db->GetRootDB()) - ->GetImpl(read_options, column_family, key, pinnable_val, nullptr, - callback); + ->GetImpl(read_options, key, get_impl_options); } if (s.ok() || s.IsNotFound()) { // DB Get Succeeded