New API to get all merge operands for a Key (#5604)

Summary:
This is a new API added to db.h to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. Example use-cases:
1. Update subset of columns and read subset of columns -
Imagine a SQL Table, a row is encoded as a K/V pair (as it is done in MyRocks). If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU.
2. Updating very few attributes in a value which is a JSON-like document -
Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don't need to do a full merge.
----------------------------------------------------------------------------------------------------
API :
Status GetMergeOperands(
      const ReadOptions& options, ColumnFamilyHandle* column_family,
      const Slice& key, PinnableSlice* merge_operands,
      GetMergeOperandsOptions* get_merge_operands_options,
      int* number_of_operands)

Example usage :
int size = 100;
int number_of_operands = 0;
std::vector<PinnableSlice> values(size);
GetMergeOperandsOptions merge_operands_info;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info, &number_of_operands);

Description :
Returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion.
merge_operands-> Points to an array of at-least merge_operands_options.expected_max_number_of_operands and the caller is responsible for allocating it. If the status returned is Incomplete then number_of_operands will contain the total number of merge operands found in DB for key.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5604

Test Plan:
Added unit test and perf test in db_bench that can be run using the command:
./db_bench -benchmarks=getmergeoperands --merge_operator=sortlist

Differential Revision: D16657366

Pulled By: vjnadimpalli

fbshipit-source-id: 0faadd752351745224ee12d4ae9ef3cb529951bf
main
Vijay Nadimpalli 5 years ago committed by Facebook Github Bot
parent 4f98b43ba3
commit d150e01474
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 8
      TARGETS
  4. 2
      appveyor.yml
  5. 4
      db/compacted_db_impl.cc
  6. 8
      db/db_blob_index_test.cc
  7. 102
      db/db_impl/db_impl.cc
  8. 43
      db/db_impl/db_impl.h
  9. 3
      db/db_impl/db_impl_files.cc
  10. 240
      db/db_merge_operand_test.cc
  11. 8
      db/db_merge_operator_test.cc
  12. 9
      db/db_test.cc
  13. 8
      db/db_test2.cc
  14. 22
      db/memtable.cc
  15. 13
      db/memtable.h
  16. 14
      db/memtable_list.cc
  17. 7
      db/memtable_list.h
  18. 17
      db/version_set.cc
  19. 12
      db/version_set.h
  20. 3
      file/filename.cc
  21. 20
      include/rocksdb/db.h
  22. 1
      include/rocksdb/status.h
  23. 11
      include/rocksdb/utilities/stackable_db.h
  24. 2
      src.mk
  25. 8
      table/block_based/data_block_hash_index_test.cc
  26. 12
      table/cuckoo/cuckoo_table_reader_test.cc
  27. 55
      table/get_context.cc
  28. 15
      table/get_context.h
  29. 2
      table/table_reader_bench.cc
  30. 16
      table/table_test.cc
  31. 105
      tools/db_bench_tool.cc
  32. 17
      utilities/blob_db/blob_db_impl.cc
  33. 3
      utilities/merge_operators.h
  34. 100
      utilities/merge_operators/sortlist.cc
  35. 38
      utilities/merge_operators/sortlist.h
  36. 8
      utilities/transactions/write_prepared_txn.cc
  37. 8
      utilities/transactions/write_prepared_txn_db.cc
  38. 16
      utilities/transactions/write_unprepared_txn.cc
  39. 8
      utilities/transactions/write_unprepared_txn_db.cc
  40. 7
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -661,6 +661,7 @@ set(SOURCES
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/sortlist.cc
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc
@ -887,6 +888,7 @@ if(WITH_TESTS)
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc

@ -454,6 +454,7 @@ TESTS = \
db_iterator_test \
db_memtable_test \
db_merge_operator_test \
db_merge_operand_test \
db_options_test \
db_range_del_test \
db_secondary_test \
@ -1254,6 +1255,9 @@ db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -301,6 +301,7 @@ cpp_library(
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/sortlist.cc",
"utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc",
@ -755,6 +756,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_merge_operand_test",
"db/db_merge_operand_test.cc",
"parallel",
[],
[],
],
[
"db_options_test",
"db/db_options_test.cc",

@ -60,7 +60,7 @@ build:
test:
test_script:
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test -Concurrency 8
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test,db_merge_operand_test -Concurrency 8
on_failure:
- cmd: 7z a build-failed.zip %APPVEYOR_BUILD_FOLDER%\build\ && appveyor PushArtifact build-failed.zip

@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, nullptr);
true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
@ -70,7 +70,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
value.assign(pinnable_val.data(), pinnable_val.size());

@ -63,9 +63,11 @@ class DBBlobIndexTest : public DBTestBase {
ReadOptions read_options;
read_options.snapshot = snapshot;
PinnableSlice value;
auto s = dbfull()->GetImpl(read_options, cfh(), key, &value,
nullptr /*value_found*/, nullptr /*callback*/,
is_blob_index);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cfh();
get_impl_options.value = &value;
get_impl_options.is_blob_index = is_blob_index;
auto s = dbfull()->GetImpl(read_options, key, get_impl_options);
if (s.IsNotFound()) {
return "NOT_FOUND";
}

@ -1441,19 +1441,22 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options);
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
assert(pinnable_val != nullptr);
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
auto cfd = cfh->cfd();
if (tracer_) {
@ -1461,7 +1464,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
tracer_->Get(get_impl_options.column_family, key);
}
}
@ -1473,9 +1476,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (callback) {
if (get_impl_options.callback) {
// Already calculated based on read_options.snapshot
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
@ -1489,12 +1492,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
get_impl_options.callback->Refresh(snapshot);
// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
@ -1505,7 +1508,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
@ -1526,20 +1529,40 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
// Get value associated with key
if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
pinnable_val->PinSelf();
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
pinnable_val->PinSelf();
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
} else {
// Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options, nullptr,
nullptr, false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
@ -1547,9 +1570,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, value_found, nullptr, nullptr,
callback, is_blob_index);
sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context,
&max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}
@ -1561,7 +1589,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
size = pinnable_val->size();
if (get_impl_options.get_value) {
size = get_impl_options.value->size();
} else {
// Return all merge operands for get_impl_options.key
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
if (*get_impl_options.number_of_operands >
get_impl_options.get_merge_operands_options
->expected_max_number_of_operands) {
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
@ -2222,7 +2268,11 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
PinnableSlice pinnable_val;
auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = value_found;
auto s = GetImpl(roptions, key, get_impl_options);
value->assign(pinnable_val.data(), pinnable_val.size());
// If block_cache is enabled and the index block of the table didn't

@ -159,6 +159,21 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using DB::GetMergeOperands;
Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* merge_operands,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands) override {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.merge_operands = merge_operands;
get_impl_options.get_merge_operands_options = get_merge_operands_options;
get_impl_options.number_of_operands = number_of_operands;
get_impl_options.get_value = false;
return GetImpl(options, key, get_impl_options);
}
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
@ -395,12 +410,32 @@ class DBImpl : public DB {
// ---- End of implementations of the DB interface ----
struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
PinnableSlice* value = nullptr;
bool* value_found = nullptr;
ReadCallback* callback = nullptr;
bool* is_blob_index = nullptr;
// If true return value associated with key via value pointer else return
// all merge operands for key via merge_operands pointer
bool get_value = true;
// Pointer to an array of size
// get_merge_operands_options.expected_max_number_of_operands allocated by
// user
PinnableSlice* merge_operands = nullptr;
GetMergeOperandsOptions* get_merge_operands_options = nullptr;
int* number_of_operands = nullptr;
};
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
// This function is also called by GetMergeOperands
// If get_impl_options.get_value = true get value associated with
// get_impl_options.key via get_impl_options.value
// If get_impl_options.get_value = false get merge operands associated with
// get_impl_options.key via get_impl_options.merge_operands
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions get_impl_options);
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,

@ -318,8 +318,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// We may ignore the dbname when generating the file names.
for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back(
MakeTableFileName(file.metadata->fd.GetNumber()),
file.path);
MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
if (file.metadata->table_reader_handle) {
table_cache_->Release(file.metadata->table_reader_handle);
}

@ -0,0 +1,240 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/debug.h"
#include "table/block_based/block_builder.h"
#include "test_util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "test_util/sync_point.h"
#endif
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/sortlist.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace rocksdb {
class DBMergeOperandTest : public DBTestBase {
public:
DBMergeOperandTest() : DBTestBase("/db_merge_operand_test") {}
};
TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
public:
LimitedStringAppendMergeOp(int limit, char delim)
: StringAppendTESTOperator(delim), limit_(limit) {}
const char* Name() const override {
return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
}
bool ShouldMerge(const std::vector<Slice>& operands) const override {
if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
return true;
}
return false;
}
private:
size_t limit_ = 0;
};
Options options;
options.create_if_missing = true;
// Use only the latest two merge operands.
options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.env = env_;
Reopen(options);
int num_records = 4;
int number_of_operands = 0;
std::vector<PinnableSlice> values(num_records);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = num_records;
// k0 value in memtable
Put("k0", "PutARock");
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "PutARock");
// k0.1 value in SST
Put("k0.1", "RockInSST");
ASSERT_OK(Flush());
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k0.1",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "RockInSST");
// All k1 values are in memtable.
ASSERT_OK(Merge("k1", "a"));
Put("k1", "x");
ASSERT_OK(Merge("k1", "b"));
ASSERT_OK(Merge("k1", "c"));
ASSERT_OK(Merge("k1", "d"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "x");
ASSERT_EQ(values[1], "b");
ASSERT_EQ(values[2], "c");
ASSERT_EQ(values[3], "d");
// expected_max_number_of_operands is less than number of merge operands so
// status should be Incomplete.
merge_operands_info.expected_max_number_of_operands = num_records - 1;
Status status = db_->GetMergeOperands(
ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
&merge_operands_info, &number_of_operands);
ASSERT_EQ(status.IsIncomplete(), true);
merge_operands_info.expected_max_number_of_operands = num_records;
// All k1.1 values are in memtable.
ASSERT_OK(Merge("k1.1", "r"));
Delete("k1.1");
ASSERT_OK(Merge("k1.1", "c"));
ASSERT_OK(Merge("k1.1", "k"));
ASSERT_OK(Merge("k1.1", "s"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1.1",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "c");
ASSERT_EQ(values[1], "k");
ASSERT_EQ(values[2], "s");
// All k2 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2", "q"));
ASSERT_OK(Merge("k2", "w"));
ASSERT_OK(Merge("k2", "e"));
ASSERT_OK(Merge("k2", "r"));
ASSERT_OK(Flush());
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "q");
ASSERT_EQ(values[1], "w");
ASSERT_EQ(values[2], "e");
ASSERT_EQ(values[3], "r");
// All k2.1 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2.1", "m"));
Put("k2.1", "l");
ASSERT_OK(Merge("k2.1", "n"));
ASSERT_OK(Merge("k2.1", "o"));
ASSERT_OK(Flush());
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.1",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "l,n,o");
// All k2.2 values are flushed to L0 into a single file.
ASSERT_OK(Merge("k2.2", "g"));
Delete("k2.2");
ASSERT_OK(Merge("k2.2", "o"));
ASSERT_OK(Merge("k2.2", "t"));
ASSERT_OK(Flush());
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2.2",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "o,t");
// Do some compaction that will make the following tests more predictable
// Slice start("PutARock");
// Slice end("t");
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// All k3 values are flushed and are in different files.
ASSERT_OK(Merge("k3", "ab"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "bc"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3", "de"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "ab");
ASSERT_EQ(values[1], "bc");
ASSERT_EQ(values[2], "cd");
ASSERT_EQ(values[3], "de");
// All k3.1 values are flushed and are in different files.
ASSERT_OK(Merge("k3.1", "ab"));
ASSERT_OK(Flush());
Put("k3.1", "bc");
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3.1", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3.1", "de"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.1",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "bc");
ASSERT_EQ(values[1], "cd");
ASSERT_EQ(values[2], "de");
// All k3.2 values are flushed and are in different files.
ASSERT_OK(Merge("k3.2", "ab"));
ASSERT_OK(Flush());
Delete("k3.2");
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3.2", "cd"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("k3.2", "de"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3.2",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "cd");
ASSERT_EQ(values[1], "de");
// All K4 values are in different levels
ASSERT_OK(Merge("k4", "ba"));
ASSERT_OK(Flush());
MoveFilesToLevel(4);
ASSERT_OK(Merge("k4", "cb"));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Merge("k4", "dc"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(Merge("k4", "ed"));
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "ba");
ASSERT_EQ(values[1], "cb");
ASSERT_EQ(values[2], "dc");
ASSERT_EQ(values[3], "ed");
// First 3 k5 values are in SST and next 4 k5 values are in Immutable Memtable
ASSERT_OK(Merge("k5", "who"));
ASSERT_OK(Merge("k5", "am"));
ASSERT_OK(Merge("k5", "i"));
ASSERT_OK(Flush());
Put("k5", "remember");
ASSERT_OK(Merge("k5", "i"));
ASSERT_OK(Merge("k5", "am"));
ASSERT_OK(Merge("k5", "rocks"));
dbfull()->TEST_SwitchMemtable();
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k5",
values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_EQ(values[0], "remember");
ASSERT_EQ(values[1], "i");
ASSERT_EQ(values[2], "am");
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -46,9 +46,11 @@ class DBMergeOperatorTest : public DBTestBase {
ReadOptions read_opt;
read_opt.snapshot = snapshot;
PinnableSlice value;
Status s =
dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value,
nullptr /*value_found*/, &read_callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = db_->DefaultColumnFamily();
get_impl_options.value = &value;
get_impl_options.callback = &read_callback;
Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
if (!s.ok()) {
return s.ToString();
}

@ -2540,6 +2540,15 @@ class ModelDB : public DB {
return Status::NotSupported(key);
}
using DB::GetMergeOperands;
virtual Status GetMergeOperands(
const ReadOptions& /*options*/, ColumnFamilyHandle* /*column_family*/,
const Slice& key, PinnableSlice* /*slice*/,
GetMergeOperandsOptions* /*merge_operands_options*/,
int* /*number_of_operands*/) override {
return Status::NotSupported(key);
}
using DB::MultiGet;
std::vector<Status> MultiGet(
const ReadOptions& /*options*/,

@ -2797,8 +2797,12 @@ TEST_F(DBTest2, ReadCallbackTest) {
ReadOptions roptions;
TestReadCallback callback(seq);
bool dont_care = true;
Status s = dbfull()->GetImpl(roptions, dbfull()->DefaultColumnFamily(), key,
&pinnable_val, &dont_care, &callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = dbfull()->DefaultColumnFamily();
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &dont_care;
get_impl_options.callback = &callback;
Status s = dbfull()->GetImpl(roptions, key, get_impl_options);
ASSERT_TRUE(s.ok());
// Assuming that after each Put the DB increased seq by one, the value and
// seq number must be equal since we also inc value by 1 after each Put.

@ -601,6 +601,7 @@ struct Saver {
Logger* logger;
Statistics* statistics;
bool inplace_update_support;
bool do_merge;
Env* env_;
ReadCallback* callback_;
bool* is_blob_index;
@ -627,7 +628,7 @@ static bool SaveValue(void* arg, const char* entry) {
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// vlength varint32f
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
@ -677,12 +678,24 @@ static bool SaveValue(void* arg, const char* entry) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->do_merge) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
}
} else if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
@ -726,7 +739,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->merge_in_progress) = true;
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) {
if (s->do_merge && merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
@ -750,7 +764,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index) {
ReadCallback* callback, bool* is_blob_index, bool do_merge) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -810,8 +824,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}

@ -175,6 +175,10 @@ class MemTable {
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);
// Used to Get value associated with key or Get Merge Operands associated
// with key.
// If do_merge = true the default behavior which is Get value for key is
// executed. Expected behavior is described right below.
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
@ -188,20 +192,23 @@ class MemTable {
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
// status returned indicates a corruption or other unexpected error.
// If do_merge = false then any Merge Operands encountered for key are simply
// stored in merge_context.operands_list and never actually merged to get a
// final value. The raw Merge Operands are eventually returned to the user.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
bool* is_blob_index = nullptr, bool do_merge = true);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) {
bool* is_blob_index = nullptr, bool do_merge = true) {
SequenceNumber seq;
return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
read_opts, callback, is_blob_index);
read_opts, callback, is_blob_index, do_merge);
}
// Attempts to update the new_value inplace, else does normal Add

@ -109,6 +109,20 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
is_blob_index);
}
bool MemTableListVersion::GetMergeOperands(
const LookupKey& key, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
for (MemTable* memtable : memlist_) {
bool done = memtable->Get(key, nullptr, s, merge_context,
max_covering_tombstone_seq, read_opts, nullptr,
nullptr, false);
if (done) {
return true;
}
}
return false;
}
bool MemTableListVersion::GetFromHistory(
const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,

@ -71,6 +71,13 @@ class MemTableListVersion {
read_opts, callback, is_blob_index);
}
// Returns all the merge operands corresponding to the key by searching all
// memtables starting from the most recent one.
bool GetMergeOperands(const LookupKey& key, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts);
// Similar to Get(), but searches the Memtable history of memtables that
// have already been flushed. Should only be used from in-memory only
// queries (such as Transaction validation) as the history may contain

@ -1651,7 +1651,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob) {
bool* is_blob, bool do_merge) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
@ -1671,8 +1671,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, max_covering_tombstone_seq, this->env_,
seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
do_merge ? value : nullptr, value_found, merge_context, do_merge,
max_covering_tombstone_seq, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_get_id);
// Pin blocks that we read to hold merge operands
@ -1737,7 +1738,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel());
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
return;
case GetContext::kDeleted:
// Use empty error message for speed
@ -1755,11 +1757,14 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
f = fp.GetNextFile();
}
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!do_merge) {
*status = Status::OK();
return;
}
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
@ -1806,7 +1811,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, nullptr, &(iter->merge_context),
iter->value, nullptr, &(iter->merge_context), true,
&iter->max_covering_tombstone_seq, this->env_, &iter->seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_mget_id);

@ -63,7 +63,6 @@ class VersionSet;
class WriteBufferManager;
class MergeContext;
class ColumnFamilySet;
class TableCache;
class MergeIteratorBuilder;
// Return the smallest index i such that file_level.files[i]->largest >= key.
@ -561,7 +560,10 @@ class Version {
const Slice& largest_user_key,
int level, bool* overlap);
// Lookup the value for key. If found, store it in *val and
// Lookup the value for key or get all merge operands for key.
// If do_merge = true (default) then lookup value for key.
// Behavior if do_merge = true:
// If found, store it in *value and
// return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later.
//
@ -575,14 +577,16 @@ class Version {
// *key_exists will be set to false.
// If seq is non-null, *seq will be set to the sequence number found
// for the key if a key was found.
//
// Behavior if do_merge = false
// If the key has any merge operands then store them in
// merge_context.operands_list and don't merge the operands
// REQUIRES: lock is not held
void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value,
Status* status, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
bool* value_found = nullptr, bool* key_exists = nullptr,
SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr,
bool* is_blob = nullptr);
bool* is_blob = nullptr, bool do_merge = true);
void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr);

@ -60,8 +60,7 @@ static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) {
static std::string MakeFileName(uint64_t number, const char* suffix) {
char buf[100];
snprintf(buf, sizeof(buf), "%06llu.%s",
static_cast<unsigned long long>(number),
suffix);
static_cast<unsigned long long>(number), suffix);
return buf;
}

@ -116,6 +116,10 @@ struct IngestExternalFileArg {
IngestExternalFileOptions options;
};
struct GetMergeOperandsOptions {
int expected_max_number_of_operands = 0;
};
// A collections of table properties objects, where
// key: is the table's file name.
// value: the table properties object of the given table.
@ -403,6 +407,22 @@ class DB {
return Get(options, DefaultColumnFamily(), key, value);
}
// Returns all the merge operands corresponding to the key. If the
// number of merge operands in DB is greater than
// merge_operands_options.expected_max_number_of_operands
// no merge operands are returned and status is Incomplete. Merge operands
// returned are in the order of insertion.
// merge_operands- Points to an array of at-least
// merge_operands_options.expected_max_number_of_operands and the
// caller is responsible for allocating it. If the status
// returned is Incomplete then number_of_operands will contain
// the total number of merge operands found in DB for key.
virtual Status GetMergeOperands(
const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* merge_operands,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands) = 0;
// If keys[i] does not exist in the database, then the i'th returned
// status will be one for which Status::IsNotFound() is true, and
// (*values)[i] will be set to some arbitrary value (often ""). Otherwise,

@ -76,6 +76,7 @@ class Status {
kMemoryLimit = 7,
kSpaceLimit = 8,
kPathNotFound = 9,
KMergeOperandsInsufficientCapacity = 10,
kMaxSubCode
};

@ -88,6 +88,17 @@ class StackableDB : public DB {
return db_->Get(options, column_family, key, value);
}
using DB::GetMergeOperands;
virtual Status GetMergeOperands(
const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* slice,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands) override {
return db_->GetMergeOperands(options, column_family, key, slice,
get_merge_operands_options,
number_of_operands);
}
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,

@ -191,6 +191,7 @@ LIB_SOURCES = \
utilities/memory/memory_util.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
utilities/merge_operators/sortlist.cc \
utilities/merge_operators/string_append/stringappend.cc \
utilities/merge_operators/string_append/stringappend2.cc \
utilities/merge_operators/uint64add.cc \
@ -291,6 +292,7 @@ MAIN_SOURCES = \
db/db_log_iter_test.cc \
db/db_memtable_test.cc \
db/db_merge_operator_test.cc \
db/db_merge_operand_test.cc \
db/db_options_test.cc \
db/db_properties_test.cc \
db/db_range_del_test.cc \

@ -631,7 +631,7 @@ TEST(DataBlockHashIndex, BlockBoundary) {
InternalKey seek_ikey(seek_ukey, 60, kTypeValue);
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, seek_ukey, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options);
ASSERT_EQ(get_context.State(), GetContext::kFound);
@ -656,7 +656,7 @@ TEST(DataBlockHashIndex, BlockBoundary) {
InternalKey seek_ikey(seek_ukey, 60, kTypeValue);
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, seek_ukey, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options);
ASSERT_EQ(get_context.State(), GetContext::kFound);
@ -681,7 +681,7 @@ TEST(DataBlockHashIndex, BlockBoundary) {
InternalKey seek_ikey(seek_ukey, 120, kTypeValue);
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, seek_ukey, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options);
ASSERT_EQ(get_context.State(), GetContext::kFound);
@ -706,7 +706,7 @@ TEST(DataBlockHashIndex, BlockBoundary) {
InternalKey seek_ikey(seek_ukey, 5, kTypeValue);
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, seek_ukey, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
TestBoundary(ik1, v1, ik2, v2, seek_ikey, get_context, options);
ASSERT_EQ(get_context.State(), GetContext::kNotFound);

@ -122,7 +122,7 @@ class CuckooReaderTest : public testing::Test {
PinnableSlice value;
GetContext get_context(ucomp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(user_keys[i]), &value,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
ASSERT_OK(
reader.Get(ReadOptions(), Slice(keys[i]), &get_context, nullptr));
ASSERT_STREQ(values[i].c_str(), value.data());
@ -336,7 +336,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
AppendInternalKey(&not_found_key, ikey);
PinnableSlice value;
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
Slice(not_found_key), &value, nullptr, nullptr,
Slice(not_found_key), &value, nullptr, nullptr, true,
nullptr, nullptr);
ASSERT_OK(
reader.Get(ReadOptions(), Slice(not_found_key), &get_context, nullptr));
@ -351,7 +351,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
value.Reset();
GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(not_found_key2), &value,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
ASSERT_OK(
reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2, nullptr));
ASSERT_TRUE(value.empty());
@ -367,7 +367,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
value.Reset();
GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(unused_key), &value,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
ASSERT_OK(
reader.Get(ReadOptions(), Slice(unused_key), &get_context3, nullptr));
ASSERT_TRUE(value.empty());
@ -443,7 +443,7 @@ void WriteFile(const std::vector<std::string>& keys,
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
for (uint64_t i = 0; i < num; ++i) {
value.Reset();
value.clear();
@ -491,7 +491,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
uint64_t start_time = env->NowMicros();
if (batch_size > 0) {
for (uint64_t i = 0; i < num; i += batch_size) {

@ -42,9 +42,9 @@ GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
bool do_merge, SequenceNumber* _max_covering_tombstone_seq, Env* env,
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@ -60,6 +60,7 @@ GetContext::GetContext(
replay_log_(nullptr),
pinned_iters_mgr_(_pinned_iters_mgr),
callback_(callback),
do_merge_(do_merge),
is_blob_index_(is_blob_index),
tracing_get_id_(tracing_get_id) {
if (seq_) {
@ -215,20 +216,29 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
if (kNotFound == state_) {
state_ = kFound;
if (do_merge_) {
if (LIKELY(pinnable_val_ != nullptr)) {
if (LIKELY(value_pinner != nullptr)) {
// If the backing resources for the value are provided, pin them
pinnable_val_->PinSlice(value, value_pinner);
} else {
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", this);
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
this);
// Otherwise copy the value
pinnable_val_->PinSelf(value);
}
}
} else {
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
// merge_context_->operand_list
push_operand(value, value_pinner);
}
} else if (kMerge == state_) {
assert(merge_operator_ != nullptr);
state_ = kFound;
if (do_merge_) {
if (LIKELY(pinnable_val_ != nullptr)) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value,
@ -239,6 +249,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kCorrupt;
}
}
} else {
// It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of
// merge_context_->operand_list
push_operand(value, value_pinner);
}
}
if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex);
@ -256,6 +272,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
} else if (kMerge == state_) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
@ -265,6 +282,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kCorrupt;
}
}
// If do_merge_ = false then the current value shouldn't be part of
// merge_context_->operand_list
}
}
return false;
@ -272,17 +292,15 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(state_ == kNotFound || state_ == kMerge);
state_ = kMerge;
// value_pinner is not set from plain_table_reader.cc for example.
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
value_pinner != nullptr) {
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
merge_context_->PushOperand(value, true /*value_pinned*/);
} else {
merge_context_->PushOperand(value, false);
}
if (merge_operator_ != nullptr &&
merge_operator_->ShouldMerge(merge_context_->GetOperandsDirectionBackward())) {
push_operand(value, value_pinner);
if (do_merge_ && merge_operator_ != nullptr &&
merge_operator_->ShouldMerge(
merge_context_->GetOperandsDirectionBackward())) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
// do_merge_ = true this is the case where this function is called
// as part of DB Get API hence merge operators should be merged.
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, nullptr,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
@ -292,6 +310,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kCorrupt;
}
}
}
return false;
}
return true;
@ -306,6 +325,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false;
}
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
value_pinner != nullptr) {
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
merge_context_->PushOperand(value, true /*value_pinned*/);
} else {
merge_context_->PushOperand(value, false);
}
}
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context, Cleanable* value_pinner) {
#ifndef ROCKSDB_LITE

@ -66,6 +66,9 @@ class GetContext {
GetContextStats get_context_stats_;
// Constructor
// @param value Holds the value corresponding to user_key. If its nullptr
// then return all merge operands corresponding to user_key
// via merge_context
// @param value_found If non-nullptr, set to false if key may be present
// but we can't be certain because we cannot do IO
// @param max_covering_tombstone_seq Pointer to highest sequence number of
@ -78,10 +81,14 @@ class GetContext {
// for visibility of a key
// @param is_blob_index If non-nullptr, will be used to indicate if a found
// key is of type blob index
// @param do_merge True if value associated with user_key has to be returned
// and false if all the merge operands associated with user_key has to be
// returned. Id do_merge=false then all the merge operands are stored in
// merge_context and they are never merged. The value pointer is untouched.
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value, bool* value_found,
MergeContext* merge_context,
MergeContext* merge_context, bool do_merge,
SequenceNumber* max_covering_tombstone_seq, Env* env,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
@ -140,6 +147,8 @@ class GetContext {
uint64_t get_tracing_get_id() const { return tracing_get_id_; }
void push_operand(const Slice& value, Cleanable* value_pinner);
private:
const Comparator* ucmp_;
const MergeOperator* merge_operator_;
@ -162,6 +171,10 @@ class GetContext {
PinnedIteratorsManager* pinned_iters_mgr_;
ReadCallback* callback_;
bool sample_;
// Value is true if it's called as part of DB Get API and false if it's
// called as part of DB GetMergeOperands API. When it's false merge operators
// are never merged.
bool do_merge_;
bool* is_blob_index_;
// Used for block cache tracing only. A tracing get id uniquely identifies a
// Get or a MultiGet.

@ -175,7 +175,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
ioptions.merge_operator, ioptions.info_log,
ioptions.statistics, GetContext::kNotFound,
Slice(key), &value, nullptr, &merge_context,
&max_covering_tombstone_seq, env);
true, &max_covering_tombstone_seq, env);
s = table_reader->Get(read_options, key, &get_context, nullptr);
} else {
s = db->Get(read_options, key, &result);

@ -2323,8 +2323,8 @@ TEST_P(BlockBasedTableTest, TracingGetTest) {
PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr,
nullptr, /*get_id=*/i);
nullptr, true, nullptr, nullptr, nullptr, nullptr,
nullptr, nullptr, /*tracing_get_id=*/i);
get_perf_context()->Reset();
ASSERT_OK(c.GetTableReader()->Get(ReadOptions(), encoded_key, &get_context,
moptions.prefix_extractor.get()));
@ -2579,7 +2579,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) {
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), nullptr, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
// a hack that just to trigger BlockBasedTable::GetFilter.
reader->Get(ReadOptions(), "non-exist-key", &get_context,
moptions.prefix_extractor.get());
@ -2750,7 +2750,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
ASSERT_OK(reader->Get(ReadOptions(), internal_key.Encode(), &get_context,
moptions4.prefix_extractor.get()));
ASSERT_STREQ(value.data(), "hello");
@ -2836,7 +2836,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
get_perf_context()->Reset();
ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
moptions.prefix_extractor.get()));
@ -2862,7 +2862,7 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
get_perf_context()->Reset();
ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
moptions.prefix_extractor.get()));
@ -4230,7 +4230,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
std::string user_key = ExtractUserKey(kv.first).ToString();
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
ASSERT_OK(reader->Get(ro, kv.first, &get_context,
moptions.prefix_extractor.get()));
ASSERT_EQ(get_context.State(), GetContext::kFound);
@ -4256,7 +4256,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr, nullptr);
nullptr, true, nullptr, nullptr);
ASSERT_OK(reader->Get(ro, encoded_key, &get_context,
moptions.prefix_extractor.get()));
ASSERT_EQ(get_context.State(), GetContext::kNotFound);

@ -71,6 +71,7 @@
#include "utilities/blob_db/blob_db.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/bytesxor.h"
#include "utilities/merge_operators/sortlist.h"
#include "utilities/persistent_cache/block_cache_tier.h"
#ifdef OS_WIN
@ -120,7 +121,8 @@ DEFINE_string(
"fillseekseq,"
"randomtransaction,"
"randomreplacekeys,"
"timeseries",
"timeseries,"
"getmergeoperands",
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
@ -190,7 +192,13 @@ DEFINE_string(
"\tlevelstats -- Print the number of files and bytes per level\n"
"\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this port)\n"
"\treplay -- replay the trace file specified with trace_file\n");
"\treplay -- replay the trace file specified with trace_file\n"
"\tgetmergeoperands -- Insert lots of merge records which are a list of "
"sorted ints for a key and then compare performance of lookup for another "
"key "
"by doing a Get followed by binary searching in the large sorted list vs "
"doing a GetMergeOperands and binary searching in the operands which are"
"sorted sub-lists. The MergeOperator used is sortlist.h\n");
DEFINE_int64(num, 1000000, "Number of key/values to place in database");
@ -2880,6 +2888,8 @@ class Benchmark {
exit(1);
}
method = &Benchmark::Replay;
} else if (name == "getmergeoperands") {
method = &Benchmark::GetMergeOperands;
} else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
exit(1);
@ -5921,6 +5931,97 @@ class Benchmark {
}
}
bool binary_search(std::vector<int>& data, int start, int end, int key) {
if (data.empty()) return false;
if (start > end) return false;
int mid = start + (end - start) / 2;
if (mid > static_cast<int>(data.size()) - 1) return false;
if (data[mid] == key) {
return true;
} else if (data[mid] > key) {
return binary_search(data, start, mid - 1, key);
} else {
return binary_search(data, mid + 1, end, key);
}
}
// Does a bunch of merge operations for a key(key1) where the merge operand
// is a sorted list. Next performance comparison is done between doing a Get
// for key1 followed by searching for another key(key2) in the large sorted
// list vs calling GetMergeOperands for key1 and then searching for the key2
// in all the sorted sub-lists. Later case is expected to be a lot faster.
void GetMergeOperands(ThreadState* thread) {
DB* db = SelectDB(thread);
const int kTotalValues = 100000;
const int kListSize = 100;
std::string key = "my_key";
std::string value;
for (int i = 1; i < kTotalValues; i++) {
if (i % kListSize == 0) {
// Remove trailing ','
value.pop_back();
db->Merge(WriteOptions(), key, value);
value.clear();
} else {
value.append(std::to_string(i)).append(",");
}
}
SortList s;
std::vector<int> data;
// This value can be experimented with and it will demonstrate the
// perf difference between doing a Get and searching for lookup_key in the
// resultant large sorted list vs doing GetMergeOperands and searching
// for lookup_key within this resultant sorted sub-lists.
int lookup_key = 1;
// Get API call
std::cout << "--- Get API call --- \n";
PinnableSlice p_slice;
uint64_t st = FLAGS_env->NowNanos();
db->Get(ReadOptions(), db->DefaultColumnFamily(), key, &p_slice);
s.MakeVector(data, p_slice);
bool found =
binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
std::cout << "Found key? " << std::to_string(found) << "\n";
uint64_t sp = FLAGS_env->NowNanos();
std::cout << "Get: " << (sp - st) / 1000000000.0 << " seconds\n";
std::string* dat_ = p_slice.GetSelf();
std::cout << "Sample data from Get API call: " << dat_->substr(0, 10)
<< "\n";
data.clear();
// GetMergeOperands API call
std::cout << "--- GetMergeOperands API --- \n";
std::vector<PinnableSlice> a_slice((kTotalValues / kListSize) + 1);
st = FLAGS_env->NowNanos();
int number_of_operands = 0;
GetMergeOperandsOptions get_merge_operands_options;
get_merge_operands_options.expected_max_number_of_operands =
(kTotalValues / 100) + 1;
db->GetMergeOperands(ReadOptions(), db->DefaultColumnFamily(), key,
a_slice.data(), &get_merge_operands_options,
&number_of_operands);
for (PinnableSlice& psl : a_slice) {
s.MakeVector(data, psl);
found =
binary_search(data, 0, static_cast<int>(data.size() - 1), lookup_key);
data.clear();
if (found) break;
}
std::cout << "Found key? " << std::to_string(found) << "\n";
sp = FLAGS_env->NowNanos();
std::cout << "Get Merge operands: " << (sp - st) / 1000000000.0
<< " seconds \n";
int to_print = 0;
std::cout << "Sample data from GetMergeOperands API call: ";
for (PinnableSlice& psl : a_slice) {
std::cout << "List: " << to_print << " : " << *psl.GetSelf() << "\n";
if (to_print++ > 2) break;
}
}
#ifndef ROCKSDB_LITE
// This benchmark stress tests Transactions. For a given --duration (or
// total number of --writes, a Transaction will perform a read-modify-write

@ -1146,9 +1146,11 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
PinnableSlice index_entry;
Status s;
bool is_blob_index = false;
s = db_impl_->GetImpl(ro, column_family, key, &index_entry,
nullptr /*value_found*/, nullptr /*read_callback*/,
&is_blob_index);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = &index_entry;
get_impl_options.is_blob_index = &is_blob_index;
s = db_impl_->GetImpl(ro, key, get_impl_options);
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
if (expiration != nullptr) {
@ -1535,9 +1537,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
SequenceNumber latest_seq = GetLatestSequenceNumber();
bool is_blob_index = false;
PinnableSlice index_entry;
Status get_status = db_impl_->GetImpl(
ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cfh;
get_impl_options.value = &index_entry;
get_impl_options.is_blob_index = &is_blob_index;
Status get_status =
db_impl_->GetImpl(ReadOptions(), record.key, get_impl_options);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
if (!get_status.ok() && !get_status.IsNotFound()) {
// error

@ -23,6 +23,7 @@ class MergeOperators {
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
static std::shared_ptr<MergeOperator> CreateMaxOperator();
static std::shared_ptr<MergeOperator> CreateBytesXOROperator();
static std::shared_ptr<MergeOperator> CreateSortOperator();
// Will return a different merge operator depending on the string.
// TODO: Hook the "name" up to the actual Name() of the MergeOperators?
@ -42,6 +43,8 @@ class MergeOperators {
return CreateMaxOperator();
} else if (name == "bytesxor") {
return CreateBytesXOROperator();
} else if (name == "sortlist") {
return CreateSortOperator();
} else {
// Empty or unknown, just return nullptr
return nullptr;

@ -0,0 +1,100 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/sortlist.h"
using rocksdb::Logger;
using rocksdb::MergeOperator;
using rocksdb::Slice;
namespace rocksdb {
bool SortList::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
std::vector<int> left;
for (Slice slice : merge_in.operand_list) {
std::vector<int> right;
MakeVector(right, slice);
left = Merge(left, right);
}
for (int i = 0; i < static_cast<int>(left.size()) - 1; i++) {
merge_out->new_value.append(std::to_string(left[i])).append(",");
}
merge_out->new_value.append(std::to_string(left.back()));
return true;
}
bool SortList::PartialMerge(const Slice& /*key*/, const Slice& left_operand,
const Slice& right_operand, std::string* new_value,
Logger* /*logger*/) const {
std::vector<int> left;
std::vector<int> right;
MakeVector(left, left_operand);
MakeVector(right, right_operand);
left = Merge(left, right);
for (int i = 0; i < static_cast<int>(left.size()) - 1; i++) {
new_value->append(std::to_string(left[i])).append(",");
}
new_value->append(std::to_string(left.back()));
return true;
}
bool SortList::PartialMergeMulti(const Slice& /*key*/,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* /*logger*/) const {
(void)operand_list;
(void)new_value;
return true;
}
const char* SortList::Name() const { return "MergeSortOperator"; }
void SortList::MakeVector(std::vector<int>& operand, Slice slice) const {
do {
const char* begin = slice.data_;
while (*slice.data_ != ',' && *slice.data_) slice.data_++;
operand.push_back(std::stoi(std::string(begin, slice.data_)));
} while (0 != *slice.data_++);
}
std::vector<int> SortList::Merge(std::vector<int>& left,
std::vector<int>& right) const {
// Fill the resultant vector with sorted results from both vectors
std::vector<int> result;
unsigned left_it = 0, right_it = 0;
while (left_it < left.size() && right_it < right.size()) {
// If the left value is smaller than the right it goes next
// into the resultant vector
if (left[left_it] < right[right_it]) {
result.push_back(left[left_it]);
left_it++;
} else {
result.push_back(right[right_it]);
right_it++;
}
}
// Push the remaining data from both vectors onto the resultant
while (left_it < left.size()) {
result.push_back(left[left_it]);
left_it++;
}
while (right_it < right.size()) {
result.push_back(right[right_it]);
right_it++;
}
return result;
}
std::shared_ptr<MergeOperator> MergeOperators::CreateSortOperator() {
return std::make_shared<SortList>();
}
} // namespace rocksdb

@ -0,0 +1,38 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// A MergeOperator for RocksDB that implements Merge Sort.
// It is built using the MergeOperator interface. The operator works by taking
// an input which contains one or more merge operands where each operand is a
// list of sorted ints and merges them to form a large sorted list.
#pragma once
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
namespace rocksdb {
class SortList : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
bool PartialMerge(const Slice& /*key*/, const Slice& left_operand,
const Slice& right_operand, std::string* new_value,
Logger* /*logger*/) const override;
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const override;
const char* Name() const override;
void MakeVector(std::vector<int>& operand, Slice slice) const;
private:
std::vector<int> Merge(std::vector<int>& left, std::vector<int>& right) const;
};
} // namespace rocksdb

@ -290,8 +290,12 @@ Status WritePreparedTxn::RollbackInternal() {
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
s = db_->GetImpl(roptions_, cf_handle, key, &pinnable_val, &not_used,
&callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback;
s = db_->GetImpl(roptions_, key, get_impl_options);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);

@ -231,8 +231,12 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
backed_by_snapshot);
bool* dont_care = nullptr;
auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
get_impl_options.value_found = dont_care;
get_impl_options.callback = &callback;
auto res = db_impl_->GetImpl(options, key, get_impl_options);
if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
return res;

@ -567,8 +567,12 @@ Status WriteUnpreparedTxn::RollbackInternal() {
const auto& cf_handle = cf_map.at(cfid);
PinnableSlice pinnable_val;
bool not_used;
s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback;
s = db_impl_->GetImpl(roptions, key, get_impl_options);
if (s.ok()) {
s = rollback_batch.Put(cf_handle, key, pinnable_val);
@ -721,8 +725,12 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
const auto& cf_handle = cf_map.at(cfid);
PinnableSlice pinnable_val;
bool not_used;
s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback;
s = db_impl_->GetImpl(roptions, key, get_impl_options);
if (s.ok()) {
s = write_batch_.Put(cf_handle, key, pinnable_val);

@ -86,8 +86,12 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback;
s = db_->GetImpl(roptions, key, get_impl_options);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);

@ -891,9 +891,12 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
if (!callback) {
s = db->Get(read_options, column_family, key, pinnable_val);
} else {
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = pinnable_val;
get_impl_options.callback = callback;
s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->GetImpl(read_options, column_family, key, pinnable_val, nullptr,
callback);
->GetImpl(read_options, key, get_impl_options);
}
if (s.ok() || s.IsNotFound()) { // DB Get Succeeded

Loading…
Cancel
Save