WritePrepared Txn: Support merge operator

Summary:
CompactionIterator invoke MergeHelper::MergeUntil() to do partial merge between snapshot boundaries. Previously it only depend on sequence number to tell snapshot boundary, but we also need to make use of snapshot_checker to verify visibility of the merge operands to the snapshots. For example, say there is a snapshot with seq = 2 but only can see data with seq <= 1. There are three merges, each with seq = 1, 2, 3. A correct compaction output would be (1),(2+3). Without taking snapshot_checker into account when generating merge result, compaction will generate output (1+2),(3).

By filtering uncommitted keys with read callback, the read path already take care of merges well and don't need additional updates.
Closes https://github.com/facebook/rocksdb/pull/3475

Differential Revision: D6926087

Pulled By: yiwu-arbug

fbshipit-source-id: 8f539d6f897cfe29b6dc27a8992f68c2a629d40a
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 9fc72d6f16
commit fe228da0a9
  1. 3
      db/builder.cc
  2. 4
      db/compaction_iterator.cc
  3. 45
      db/compaction_iterator_test.cc
  4. 4
      db/compaction_job.cc
  5. 120
      db/db_merge_operator_test.cc
  6. 11
      db/merge_helper.cc
  7. 5
      db/merge_helper.h

@ -131,7 +131,8 @@ Status BuildTable(
MergeHelper merge(env, internal_comparator.user_comparator(), MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log, ioptions.merge_operator, nullptr, ioptions.info_log,
true /* internal key corruption is not ok */, true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back()); snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
CompactionIterator c_iter( CompactionIterator c_iter(
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,

@ -559,10 +559,6 @@ void CompactionIterator::NextFromInput() {
// have hit (A) // have hit (A)
// We encapsulate the merge related state machine in a different // We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. // object to minimize change to the existing flow.
// In case snapshot_checker is present, we can probably merge further
// beyond prev_snapshot, since there could be more keys with sequence
// smaller than prev_snapshot, but reported by snapshot_checker as not
// visible by prev_snapshot. But it will make the logic more complicated.
Status s = merge_helper_->MergeUntil(input_, range_del_agg_, Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
prev_snapshot, bottommost_level_); prev_snapshot, bottommost_level_);
merge_out_iter_.SeekToFirst(); merge_out_iter_.SeekToFirst();

@ -229,14 +229,15 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
compaction_proxy_->is_bottommost_level = bottommost_level; compaction_proxy_->is_bottommost_level = bottommost_level;
compaction.reset(compaction_proxy_); compaction.reset(compaction_proxy_);
} }
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, false, 0, 0, nullptr,
&shutting_down_));
bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) { if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
snapshot_checker_.reset( snapshot_checker_.reset(
new TestSnapshotChecker(last_committed_sequence, snapshot_map_)); new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
} }
merge_helper_.reset(
new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
0 /*latest_snapshot*/, snapshot_checker_.get(),
0 /*level*/, nullptr /*statistics*/, &shutting_down_));
iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst(); iter_->SeekToFirst();
@ -775,17 +776,18 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
{"v4", "", "v1"}, 3 /*last_committed_seq*/); {"v4", "", "v1"}, 3 /*last_committed_seq*/);
} }
TEST_F(CompactionIteratorWithSnapshotCheckerTest, TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
DISABLED_DedupSameSnapshot_Merge) {
AddSnapshot(2, 1); AddSnapshot(2, 1);
AddSnapshot(4, 3);
auto merge_op = MergeOperators::CreateStringAppendOperator(); auto merge_op = MergeOperators::CreateStringAppendOperator();
RunTest( RunTest(
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge), {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
test::KeyStr("foo", 2, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)}, test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge),
test::KeyStr("foo", 1, kTypeValue)}, test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v2,v3", "v1"}, 3 /*last_committed_seq*/, merge_op.get()); {"v5", "v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
{"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
} }
TEST_F(CompactionIteratorWithSnapshotCheckerTest, TEST_F(CompactionIteratorWithSnapshotCheckerTest,
@ -886,8 +888,9 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
2 /*earliest_write_conflict_snapshot*/); 2 /*earliest_write_conflict_snapshot*/);
} }
// Compaction filter should keep uncommitted key as-is, and trigger on the // Compaction filter should keep uncommitted key as-is, and
// first committed version of a key. // * Convert the latest velue to deletion, and/or
// * if latest value is a merge, apply filter to all suequent merges.
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) { TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
std::unique_ptr<CompactionFilter> compaction_filter( std::unique_ptr<CompactionFilter> compaction_filter(
@ -915,22 +918,18 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
} }
TEST_F(CompactionIteratorWithSnapshotCheckerTest, TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DISABLED_CompactionFilter_PartialMerge) { CompactionFilter_PartialMerge) {
std::shared_ptr<MergeOperator> merge_op = std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator(); MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter( std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter()); new FilterAllKeysCompactionFilter());
RunTest( RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), test::KeyStr("a", 1, kTypeMerge)},
test::KeyStr("a", 1, kTypeMerge)}, {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
{"v3", "v2", "v1"}, 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)},
{"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
compaction_filter.get());
} }
TEST_F(CompactionIteratorWithSnapshotCheckerTest, TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
DISABLED_CompactionFilter_FullMerge) {
std::shared_ptr<MergeOperator> merge_op = std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator(); MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter( std::unique_ptr<CompactionFilter> compaction_filter(
@ -939,7 +938,7 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
test::KeyStr("a", 1, kTypeValue)}, test::KeyStr("a", 1, kTypeValue)},
{"v3", "v2", "v1"}, {"v3", "v2", "v1"},
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)}, {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
{"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(), {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
compaction_filter.get()); compaction_filter.get());
} }

@ -747,8 +747,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
compaction_filter, db_options_.info_log.get(), compaction_filter, db_options_.info_log.get(),
false /* internal key corruption is expected */, false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
compact_->compaction->level(), db_options_.statistics.get(), snapshot_checker_, compact_->compaction->level(),
shutting_down_); db_options_.statistics.get(), shutting_down_);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); TEST_SYNC_POINT("CompactionJob::Run():Inprogress");

@ -14,10 +14,43 @@
namespace rocksdb { namespace rocksdb {
class TestReadCallback : public ReadCallback {
public:
TestReadCallback(SnapshotChecker* snapshot_checker,
SequenceNumber snapshot_seq)
: snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {}
bool IsCommitted(SequenceNumber seq) override {
return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_);
}
private:
SnapshotChecker* snapshot_checker_;
SequenceNumber snapshot_seq_;
};
// Test merge operator functionality. // Test merge operator functionality.
class DBMergeOperatorTest : public DBTestBase { class DBMergeOperatorTest : public DBTestBase {
public: public:
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {} DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
const Slice& key,
const Snapshot* snapshot = nullptr) {
SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
: snapshot->GetSequenceNumber();
TestReadCallback read_callback(snapshot_checker, seq);
ReadOptions read_opt;
read_opt.snapshot = snapshot;
PinnableSlice value;
Status s =
dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value,
nullptr /*value_found*/, &read_callback);
if (!s.ok()) {
return s.ToString();
}
return value.ToString();
}
}; };
TEST_F(DBMergeOperatorTest, LimitMergeOperands) { TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
@ -508,6 +541,93 @@ TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
class TestSnapshotChecker : public SnapshotChecker {
bool IsInSnapshot(SequenceNumber seq,
SequenceNumber snapshot_seq) const override {
switch (snapshot_seq) {
case 0:
return seq == 0;
case 1:
return seq <= 1;
case 2:
// seq = 2 not visible to snapshot with seq = 2
return seq <= 1;
case 3:
return seq <= 3;
case 4:
// seq = 4 not visible to snpahost with seq = 4
return seq <= 3;
default:
// seq >=4 is uncommitted
return seq <= 4;
};
}
};
TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
dbfull()->SetSnapshotChecker(snapshot_checker);
std::string value;
ASSERT_OK(Merge("foo", "v1"));
ASSERT_EQ(1, db_->GetLatestSequenceNumber());
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
ASSERT_OK(Merge("foo", "v2"));
ASSERT_EQ(2, db_->GetLatestSequenceNumber());
// v2 is not visible to latest snapshot, which has seq = 2.
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
// Take a snapshot with seq = 2.
const Snapshot* snapshot1 = db_->GetSnapshot();
ASSERT_EQ(2, snapshot1->GetSequenceNumber());
// v2 is not visible to snapshot1, which has seq = 2
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
// Verify flush doesn't alter the result.
ASSERT_OK(Flush());
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
ASSERT_OK(Merge("foo", "v3"));
ASSERT_EQ(3, db_->GetLatestSequenceNumber());
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
ASSERT_OK(Merge("foo", "v4"));
ASSERT_EQ(4, db_->GetLatestSequenceNumber());
// v4 is not visible to latest snapshot, which has seq = 4.
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
const Snapshot* snapshot2 = db_->GetSnapshot();
ASSERT_EQ(4, snapshot2->GetSequenceNumber());
// v4 is not visible to snapshot2, which has seq = 4.
ASSERT_EQ("v1,v2,v3",
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
// Verify flush doesn't alter the result.
ASSERT_OK(Flush());
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
ASSERT_EQ("v1,v2,v3",
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
ASSERT_OK(Merge("foo", "v5"));
ASSERT_EQ(5, db_->GetLatestSequenceNumber());
// v5 is uncommitted
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
// full manual compaction.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Verify compaction doesn't alter the result.
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
ASSERT_EQ("v1,v2,v3",
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
db_->ReleaseSnapshot(snapshot1);
db_->ReleaseSnapshot(snapshot2);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -5,12 +5,12 @@
#include "db/merge_helper.h" #include "db/merge_helper.h"
#include <stdio.h>
#include <string> #include <string>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/likely.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
@ -22,7 +22,8 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
const MergeOperator* user_merge_operator, const MergeOperator* user_merge_operator,
const CompactionFilter* compaction_filter, const CompactionFilter* compaction_filter,
Logger* logger, bool assert_valid_internal_key, Logger* logger, bool assert_valid_internal_key,
SequenceNumber latest_snapshot, int level, SequenceNumber latest_snapshot,
const SnapshotChecker* snapshot_checker, int level,
Statistics* stats, Statistics* stats,
const std::atomic<bool>* shutting_down) const std::atomic<bool>* shutting_down)
: env_(env), : env_(env),
@ -34,6 +35,7 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
assert_valid_internal_key_(assert_valid_internal_key), assert_valid_internal_key_(assert_valid_internal_key),
allow_single_operand_(false), allow_single_operand_(false),
latest_snapshot_(latest_snapshot), latest_snapshot_(latest_snapshot),
snapshot_checker_(snapshot_checker),
level_(level), level_(level),
keys_(), keys_(),
filter_timer_(env_), filter_timer_(env_),
@ -158,7 +160,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// hit a different user key, stop right here // hit a different user key, stop right here
hit_the_next_user_key = true; hit_the_next_user_key = true;
break; break;
} else if (stop_before && ikey.sequence <= stop_before) { } else if (stop_before > 0 && ikey.sequence <= stop_before &&
LIKELY(snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(ikey.sequence,
stop_before))) {
// hit an entry that's visible by the previous snapshot, can't touch that // hit an entry that's visible by the previous snapshot, can't touch that
break; break;
} }

@ -13,6 +13,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
@ -33,7 +34,8 @@ class MergeHelper {
const MergeOperator* user_merge_operator, const MergeOperator* user_merge_operator,
const CompactionFilter* compaction_filter, Logger* logger, const CompactionFilter* compaction_filter, Logger* logger,
bool assert_valid_internal_key, SequenceNumber latest_snapshot, bool assert_valid_internal_key, SequenceNumber latest_snapshot,
int level = 0, Statistics* stats = nullptr, const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
Statistics* stats = nullptr,
const std::atomic<bool>* shutting_down = nullptr); const std::atomic<bool>* shutting_down = nullptr);
// Wrapper around MergeOperator::FullMergeV2() that records perf statistics. // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
@ -145,6 +147,7 @@ class MergeHelper {
bool assert_valid_internal_key_; // enforce no internal key corruption? bool assert_valid_internal_key_; // enforce no internal key corruption?
bool allow_single_operand_; bool allow_single_operand_;
SequenceNumber latest_snapshot_; SequenceNumber latest_snapshot_;
const SnapshotChecker* const snapshot_checker_;
int level_; int level_;
// the scratch area that holds the result of MergeUntil // the scratch area that holds the result of MergeUntil

Loading…
Cancel
Save