|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include <string>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
|
|
#include "db/forward_iterator.h"
|
|
|
|
#include "port/stack_trace.h"
|
|
|
|
#include "rocksdb/merge_operator.h"
|
|
|
|
#include "utilities/merge_operators.h"
|
|
|
|
#include "utilities/merge_operators/string_append/stringappend2.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
class TestReadCallback : public ReadCallback {
|
|
|
|
public:
|
|
|
|
TestReadCallback(SnapshotChecker* snapshot_checker,
|
|
|
|
SequenceNumber snapshot_seq)
|
|
|
|
: ReadCallback(snapshot_seq),
|
|
|
|
snapshot_checker_(snapshot_checker),
|
|
|
|
snapshot_seq_(snapshot_seq) {}
|
|
|
|
|
|
|
|
bool IsVisibleFullCheck(SequenceNumber seq) override {
|
|
|
|
return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
|
|
|
|
SnapshotCheckerResult::kInSnapshot;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
SnapshotChecker* snapshot_checker_;
|
|
|
|
SequenceNumber snapshot_seq_;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Test merge operator functionality.
|
|
|
|
class DBMergeOperatorTest : public DBTestBase {
|
|
|
|
public:
|
|
|
|
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
|
|
|
|
|
|
|
|
std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
|
|
|
|
const Slice& key,
|
|
|
|
const Snapshot* snapshot = nullptr) {
|
|
|
|
SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
|
|
|
|
: snapshot->GetSequenceNumber();
|
|
|
|
TestReadCallback read_callback(snapshot_checker, seq);
|
|
|
|
ReadOptions read_opt;
|
|
|
|
read_opt.snapshot = snapshot;
|
|
|
|
PinnableSlice value;
|
New API to get all merge operands for a Key (#5604)
Summary:
This is a new API added to db.h to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. Example use-cases:
1. Update subset of columns and read subset of columns -
Imagine a SQL Table, a row is encoded as a K/V pair (as it is done in MyRocks). If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU.
2. Updating very few attributes in a value which is a JSON-like document -
Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don't need to do a full merge.
----------------------------------------------------------------------------------------------------
API :
Status GetMergeOperands(
const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* merge_operands,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands)
Example usage :
int size = 100;
int number_of_operands = 0;
std::vector<PinnableSlice> values(size);
GetMergeOperandsOptions merge_operands_info;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info, &number_of_operands);
Description :
Returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion.
merge_operands-> Points to an array of at-least merge_operands_options.expected_max_number_of_operands and the caller is responsible for allocating it. If the status returned is Incomplete then number_of_operands will contain the total number of merge operands found in DB for key.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5604
Test Plan:
Added unit test and perf test in db_bench that can be run using the command:
./db_bench -benchmarks=getmergeoperands --merge_operator=sortlist
Differential Revision: D16657366
Pulled By: vjnadimpalli
fbshipit-source-id: 0faadd752351745224ee12d4ae9ef3cb529951bf
5 years ago
|
|
|
DBImpl::GetImplOptions get_impl_options;
|
|
|
|
get_impl_options.column_family = db_->DefaultColumnFamily();
|
|
|
|
get_impl_options.value = &value;
|
|
|
|
get_impl_options.callback = &read_callback;
|
|
|
|
Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return s.ToString();
|
|
|
|
}
|
|
|
|
return value.ToString();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
|
|
|
|
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
|
|
|
|
public:
|
|
|
|
LimitedStringAppendMergeOp(int limit, char delim)
|
|
|
|
: StringAppendTESTOperator(delim), limit_(limit) {}
|
|
|
|
|
|
|
|
const char* Name() const override {
|
|
|
|
return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
|
|
|
|
}
|
|
|
|
|
|
|
|
bool ShouldMerge(const std::vector<Slice>& operands) const override {
|
|
|
|
if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
size_t limit_ = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
// Use only the latest two merge operands.
|
|
|
|
options.merge_operator =
|
|
|
|
std::make_shared<LimitedStringAppendMergeOp>(2, ',');
|
|
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
// All K1 values are in memtable.
|
|
|
|
ASSERT_OK(Merge("k1", "a"));
|
|
|
|
ASSERT_OK(Merge("k1", "b"));
|
|
|
|
ASSERT_OK(Merge("k1", "c"));
|
|
|
|
ASSERT_OK(Merge("k1", "d"));
|
|
|
|
std::string value;
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
|
|
|
|
// Make sure that only the latest two merge operands are used. If this was
|
|
|
|
// not the case the value would be "a,b,c,d".
|
|
|
|
ASSERT_EQ(value, "c,d");
|
|
|
|
|
|
|
|
// All K2 values are flushed to L0 into a single file.
|
|
|
|
ASSERT_OK(Merge("k2", "a"));
|
|
|
|
ASSERT_OK(Merge("k2", "b"));
|
|
|
|
ASSERT_OK(Merge("k2", "c"));
|
|
|
|
ASSERT_OK(Merge("k2", "d"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
|
|
|
|
ASSERT_EQ(value, "c,d");
|
|
|
|
|
|
|
|
// All K3 values are flushed and are in different files.
|
|
|
|
ASSERT_OK(Merge("k3", "ab"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "bc"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "cd"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "de"));
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
|
|
|
|
ASSERT_EQ(value, "cd,de");
|
|
|
|
|
|
|
|
// All K4 values are in different levels
|
|
|
|
ASSERT_OK(Merge("k4", "ab"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(4);
|
|
|
|
ASSERT_OK(Merge("k4", "bc"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(3);
|
|
|
|
ASSERT_OK(Merge("k4", "cd"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(1);
|
|
|
|
ASSERT_OK(Merge("k4", "de"));
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
|
|
|
|
ASSERT_EQ(value, "cd,de");
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
|
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
std::string value;
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
|
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
|
|
|
options.max_successive_merges = 3;
|
|
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "v2"));
|
|
|
|
// Will trigger a merge when hitting max_successive_merges and the merge
|
|
|
|
// will fail. The delta will be inserted nevertheless.
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
// Data should stay unmerged after the error.
|
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
|
|
|
options.env = env_;
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
ASSERT_OK(Put("k2", "v2"));
|
|
|
|
auto* iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k1");
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
|
|
|
iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k2");
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
iter->Prev();
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Put("k2", "v2"));
|
|
|
|
ASSERT_OK(Merge("k2", "corrupted"));
|
|
|
|
iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k1");
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
iter->Next();
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
|
|
|
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class MergeOperatorPinningTest : public DBMergeOperatorTest,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
|
|
|
|
|
|
|
|
bool disable_block_cache_;
|
|
|
|
};
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_SUITE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
|
|
|
|
::testing::Bool());
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.block_size = 1; // every block will contain one entry
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kKeysPerFile = 10;
|
|
|
|
const int kOperandsPerKeyPerFile = 7;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
// Filse to write in L0 before compacting to lower level
|
|
|
|
const int kFilesPerLevel = 3;
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
int batch_num = 1;
|
|
|
|
int lvl_to_fill = 4;
|
|
|
|
int key_id = 0;
|
|
|
|
while (true) {
|
|
|
|
for (int j = 0; j < kKeysPerFile; j++) {
|
|
|
|
std::string key = Key(key_id % 35);
|
|
|
|
key_id++;
|
|
|
|
for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
|
|
|
|
std::string val = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, val));
|
|
|
|
if (true_data[key].size() == 0) {
|
|
|
|
true_data[key] = val;
|
|
|
|
} else {
|
|
|
|
true_data[key] += "," + val;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (lvl_to_fill == -1) {
|
|
|
|
// Keep last batch in memtable and stop
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
if (batch_num % kFilesPerLevel == 0) {
|
|
|
|
if (lvl_to_fill != 0) {
|
|
|
|
MoveFilesToLevel(lvl_to_fill);
|
|
|
|
}
|
|
|
|
lvl_to_fill--;
|
|
|
|
}
|
|
|
|
batch_num++;
|
|
|
|
}
|
|
|
|
|
|
|
|
// 3 L0 files
|
|
|
|
// 1 L1 file
|
|
|
|
// 3 L2 files
|
|
|
|
// 1 L3 file
|
|
|
|
// 3 L4 Files
|
|
|
|
ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
class MergeOperatorHook : public MergeOperator {
|
|
|
|
public:
|
|
|
|
explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
|
|
|
|
: merge_op_(_merge_op) {}
|
|
|
|
|
|
|
|
bool FullMergeV2(const MergeOperationInput& merge_in,
|
|
|
|
MergeOperationOutput* merge_out) const override {
|
|
|
|
before_merge_();
|
|
|
|
bool res = merge_op_->FullMergeV2(merge_in, merge_out);
|
|
|
|
after_merge_();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* Name() const override { return merge_op_->Name(); }
|
|
|
|
|
|
|
|
std::shared_ptr<MergeOperator> merge_op_;
|
|
|
|
std::function<void()> before_merge_ = []() {};
|
|
|
|
std::function<void()> after_merge_ = []() {};
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
|
|
|
|
auto merge_hook =
|
|
|
|
std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
|
|
|
|
options.merge_operator = merge_hook;
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.max_open_files = 20;
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
bbto.no_block_cache = disable_block_cache_;
|
|
|
|
if (bbto.no_block_cache == false) {
|
|
|
|
bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
|
|
|
|
} else {
|
|
|
|
bbto.block_cache = nullptr;
|
|
|
|
}
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kNumOperands = 30;
|
|
|
|
const int kNumKeys = 1000;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
// 1000 keys every key have 30 operands, every operand is in a different file
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
for (int i = 0; i < kNumOperands; i++) {
|
|
|
|
for (int j = 0; j < kNumKeys; j++) {
|
|
|
|
std::string k = Key(j);
|
|
|
|
std::string v = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
|
|
|
|
|
|
|
|
true_data[k] = std::max(true_data[k], v);
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
|
|
|
|
ASSERT_EQ(file_numbers.size(), kNumOperands);
|
|
|
|
int merge_cnt = 0;
|
|
|
|
|
|
|
|
// Code executed before merge operation
|
|
|
|
merge_hook->before_merge_ = [&]() {
|
|
|
|
// Evict all tables from cache before every merge operation
|
|
|
|
for (uint64_t num : file_numbers) {
|
|
|
|
TableCache::Evict(dbfull()->TEST_table_cache(), num);
|
|
|
|
}
|
|
|
|
// Decrease cache capacity to force all unrefed blocks to be evicted
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(1);
|
|
|
|
}
|
|
|
|
merge_cnt++;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Code executed after merge operation
|
|
|
|
merge_hook->after_merge_ = [&]() {
|
|
|
|
// Increase capacity again after doing the merge
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(64 * 1024 * 1024);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
size_t total_reads;
|
|
|
|
VerifyDBFromMap(true_data, &total_reads);
|
|
|
|
ASSERT_EQ(merge_cnt, total_reads);
|
|
|
|
|
|
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data, &total_reads);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, TailingIterator) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateMaxOperator();
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
bbto.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kNumOperands = 100;
|
|
|
|
const int kNumWrites = 100000;
|
|
|
|
|
|
|
|
std::function<void()> writer_func = [&]() {
|
|
|
|
int k = 0;
|
|
|
|
for (int i = 0; i < kNumWrites; i++) {
|
|
|
|
db_->Merge(WriteOptions(), Key(k), Key(k));
|
|
|
|
|
|
|
|
if (i && i % kNumOperands == 0) {
|
|
|
|
k++;
|
|
|
|
}
|
|
|
|
if (i && i % 127 == 0) {
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
if (i && i % 317 == 0) {
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::function<void()> reader_func = [&]() {
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.tailing = true;
|
|
|
|
Iterator* iter = db_->NewIterator(ro);
|
|
|
|
|
|
|
|
iter->SeekToFirst();
|
|
|
|
for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
|
|
|
|
while (!iter->Valid()) {
|
|
|
|
// wait for the key to be written
|
|
|
|
env_->SleepForMicroseconds(100);
|
|
|
|
iter->Seek(Key(i));
|
|
|
|
}
|
|
|
|
ASSERT_EQ(iter->key(), Key(i));
|
|
|
|
ASSERT_EQ(iter->value(), Key(i));
|
|
|
|
|
|
|
|
iter->Next();
|
|
|
|
}
|
|
|
|
|
|
|
|
delete iter;
|
|
|
|
};
|
|
|
|
|
|
|
|
ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
|
|
|
|
ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
|
|
|
|
|
|
|
|
writer_thread.join();
|
|
|
|
reader_thread.join();
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
// Overview of the test:
|
|
|
|
// * There are two merge operands for the same key: one in an sst file,
|
|
|
|
// another in a memtable.
|
|
|
|
// * Seek a tailing iterator to this key.
|
|
|
|
// * As part of the seek, the iterator will:
|
|
|
|
// (a) first visit the operand in the memtable and tell ForwardIterator
|
|
|
|
// to pin this operand, then
|
|
|
|
// (b) move on to the operand in the sst file, then pass both operands
|
|
|
|
// to merge operator.
|
|
|
|
// * The memtable may get flushed and unreferenced by another thread between
|
|
|
|
// (a) and (b). The test simulates it by flushing the memtable inside a
|
|
|
|
// SyncPoint callback located between (a) and (b).
|
|
|
|
// * In this case it's ForwardIterator's responsibility to keep the memtable
|
|
|
|
// pinned until (b) is complete. There used to be a bug causing
|
|
|
|
// ForwardIterator to not pin it in some circumstances. This test
|
|
|
|
// reproduces it.
|
|
|
|
|
|
|
|
db_->Merge(WriteOptions(), "key", "sst");
|
|
|
|
db_->Flush(FlushOptions()); // Switch to SuperVersion A
|
|
|
|
db_->Merge(WriteOptions(), "key", "memtable");
|
|
|
|
|
|
|
|
// Pin SuperVersion A
|
|
|
|
std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
|
|
|
|
|
|
|
|
bool pushed_first_operand = false;
|
|
|
|
bool stepped_to_next_operand = false;
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
|
|
|
|
EXPECT_FALSE(pushed_first_operand);
|
|
|
|
pushed_first_operand = true;
|
|
|
|
db_->Flush(FlushOptions()); // Switch to SuperVersion B
|
|
|
|
});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
|
|
|
|
EXPECT_FALSE(stepped_to_next_operand);
|
|
|
|
stepped_to_next_operand = true;
|
|
|
|
someone_else.reset(); // Unpin SuperVersion A
|
|
|
|
});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.tailing = true;
|
|
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
|
|
|
|
iter->Seek("key");
|
|
|
|
|
|
|
|
ASSERT_TRUE(iter->status().ok());
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
|
|
|
|
EXPECT_TRUE(pushed_first_operand);
|
|
|
|
EXPECT_TRUE(stepped_to_next_operand);
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
class TestSnapshotChecker : public SnapshotChecker {
|
|
|
|
public:
|
|
|
|
SnapshotCheckerResult CheckInSnapshot(
|
|
|
|
SequenceNumber seq, SequenceNumber snapshot_seq) const override {
|
|
|
|
return IsInSnapshot(seq, snapshot_seq)
|
|
|
|
? SnapshotCheckerResult::kInSnapshot
|
|
|
|
: SnapshotCheckerResult::kNotInSnapshot;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
|
|
|
|
switch (snapshot_seq) {
|
|
|
|
case 0:
|
|
|
|
return seq == 0;
|
|
|
|
case 1:
|
|
|
|
return seq <= 1;
|
|
|
|
case 2:
|
|
|
|
// seq = 2 not visible to snapshot with seq = 2
|
|
|
|
return seq <= 1;
|
|
|
|
case 3:
|
|
|
|
return seq <= 3;
|
|
|
|
case 4:
|
|
|
|
// seq = 4 not visible to snpahost with seq = 4
|
|
|
|
return seq <= 3;
|
|
|
|
default:
|
|
|
|
// seq >=4 is uncommitted
|
|
|
|
return seq <= 4;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
};
|
|
|
|
TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
|
|
|
|
dbfull()->SetSnapshotChecker(snapshot_checker);
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
ASSERT_OK(Merge("foo", "v1"));
|
|
|
|
ASSERT_EQ(1, db_->GetLatestSequenceNumber());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
ASSERT_OK(Merge("foo", "v2"));
|
|
|
|
ASSERT_EQ(2, db_->GetLatestSequenceNumber());
|
|
|
|
// v2 is not visible to latest snapshot, which has seq = 2.
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
// Take a snapshot with seq = 2.
|
|
|
|
const Snapshot* snapshot1 = db_->GetSnapshot();
|
|
|
|
ASSERT_EQ(2, snapshot1->GetSequenceNumber());
|
|
|
|
// v2 is not visible to snapshot1, which has seq = 2
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
|
|
|
|
// Verify flush doesn't alter the result.
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
ASSERT_OK(Merge("foo", "v3"));
|
|
|
|
ASSERT_EQ(3, db_->GetLatestSequenceNumber());
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
ASSERT_OK(Merge("foo", "v4"));
|
|
|
|
ASSERT_EQ(4, db_->GetLatestSequenceNumber());
|
|
|
|
// v4 is not visible to latest snapshot, which has seq = 4.
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
const Snapshot* snapshot2 = db_->GetSnapshot();
|
|
|
|
ASSERT_EQ(4, snapshot2->GetSequenceNumber());
|
|
|
|
// v4 is not visible to snapshot2, which has seq = 4.
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
|
|
|
|
// Verify flush doesn't alter the result.
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
ASSERT_OK(Merge("foo", "v5"));
|
|
|
|
ASSERT_EQ(5, db_->GetLatestSequenceNumber());
|
|
|
|
// v5 is uncommitted
|
|
|
|
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
// full manual compaction.
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
|
|
|
|
// Verify compaction doesn't alter the result.
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
db_->ReleaseSnapshot(snapshot1);
|
|
|
|
db_->ReleaseSnapshot(snapshot2);
|
|
|
|
}
|
|
|
|
|
|
|
|
class PerConfigMergeOperatorPinningTest
|
|
|
|
: public DBMergeOperatorTest,
|
|
|
|
public testing::WithParamInterface<std::tuple<bool, int>> {
|
|
|
|
public:
|
|
|
|
PerConfigMergeOperatorPinningTest() {
|
|
|
|
std::tie(disable_block_cache_, option_config_) = GetParam();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool disable_block_cache_;
|
|
|
|
};
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_SUITE_P(
|
|
|
|
MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
|
|
|
|
::testing::Combine(::testing::Bool(),
|
|
|
|
::testing::Range(static_cast<int>(DBTestBase::kDefault),
|
|
|
|
static_cast<int>(DBTestBase::kEnd))));
|
|
|
|
|
|
|
|
TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
|
|
|
|
if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateMaxOperator();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
|
|
|
|
const int kTotalMerges = 5000;
|
|
|
|
// Every key gets ~10 operands
|
|
|
|
const int kKeyRange = kTotalMerges / 10;
|
|
|
|
const int kOperandSize = 20;
|
|
|
|
const int kNumPutBefore = kKeyRange / 10; // 10% value
|
|
|
|
const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
|
|
|
|
const int kNumDelete = kKeyRange / 10; // 10% delete
|
|
|
|
|
|
|
|
// kNumPutBefore keys will have base values
|
|
|
|
for (int i = 0; i < kNumPutBefore; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do kTotalMerges merges
|
|
|
|
for (int i = 0; i < kTotalMerges; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
if (true_data[key] < value) {
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Overwrite random kNumPutAfter keys
|
|
|
|
for (int i = 0; i < kNumPutAfter; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete random kNumDelete keys
|
|
|
|
for (int i = 0; i < kNumDelete; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
ASSERT_OK(db_->Delete(WriteOptions(), key));
|
|
|
|
|
|
|
|
true_data.erase(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
|
|
}
|