From fe228da0a9a3a2ea4b55c933ad2421352cc46c91 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 9 Feb 2018 14:43:56 -0800 Subject: [PATCH] WritePrepared Txn: Support merge operator Summary: CompactionIterator invoke MergeHelper::MergeUntil() to do partial merge between snapshot boundaries. Previously it only depend on sequence number to tell snapshot boundary, but we also need to make use of snapshot_checker to verify visibility of the merge operands to the snapshots. For example, say there is a snapshot with seq = 2 but only can see data with seq <= 1. There are three merges, each with seq = 1, 2, 3. A correct compaction output would be (1),(2+3). Without taking snapshot_checker into account when generating merge result, compaction will generate output (1+2),(3). By filtering uncommitted keys with read callback, the read path already take care of merges well and don't need additional updates. Closes https://github.com/facebook/rocksdb/pull/3475 Differential Revision: D6926087 Pulled By: yiwu-arbug fbshipit-source-id: 8f539d6f897cfe29b6dc27a8992f68c2a629d40a --- db/builder.cc | 3 +- db/compaction_iterator.cc | 4 -- db/compaction_iterator_test.cc | 45 ++++++------- db/compaction_job.cc | 4 +- db/db_merge_operator_test.cc | 120 +++++++++++++++++++++++++++++++++ db/merge_helper.cc | 11 ++- db/merge_helper.h | 5 +- 7 files changed, 158 insertions(+), 34 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index b91cbe6c7..afb8e4403 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -131,7 +131,8 @@ Status BuildTable( MergeHelper merge(env, internal_comparator.user_comparator(), ioptions.merge_operator, nullptr, ioptions.info_log, true /* internal key corruption is not ok */, - snapshots.empty() ? 0 : snapshots.back()); + snapshots.empty() ? 0 : snapshots.back(), + snapshot_checker); CompactionIterator c_iter( iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 8eaefb3f8..8a40cd405 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -559,10 +559,6 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - // In case snapshot_checker is present, we can probably merge further - // beyond prev_snapshot, since there could be more keys with sequence - // smaller than prev_snapshot, but reported by snapshot_checker as not - // visible by prev_snapshot. But it will make the logic more complicated. Status s = merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot, bottommost_level_); merge_out_iter_.SeekToFirst(); diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 86ff51fdc..223798064 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -229,14 +229,15 @@ class CompactionIteratorTest : public testing::TestWithParam { compaction_proxy_->is_bottommost_level = bottommost_level; compaction.reset(compaction_proxy_); } - merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter, - nullptr, false, 0, 0, nullptr, - &shutting_down_)); bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) { snapshot_checker_.reset( new TestSnapshotChecker(last_committed_sequence, snapshot_map_)); } + merge_helper_.reset( + new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false, + 0 /*latest_snapshot*/, snapshot_checker_.get(), + 0 /*level*/, nullptr /*statistics*/, &shutting_down_)); iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); @@ -775,17 +776,18 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) { {"v4", "", "v1"}, 3 /*last_committed_seq*/); } -TEST_F(CompactionIteratorWithSnapshotCheckerTest, - DISABLED_DedupSameSnapshot_Merge) { +TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) { AddSnapshot(2, 1); + AddSnapshot(4, 3); auto merge_op = MergeOperators::CreateStringAppendOperator(); RunTest( - {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge), - test::KeyStr("foo", 2, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)}, - {"v4", "v3", "v2", "v1"}, - {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge), + {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge), + test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)}, - {"v4", "v2,v3", "v1"}, 3 /*last_committed_seq*/, merge_op.get()); + {"v5", "v4", "v3", "v2", "v1"}, + {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge), + test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)}, + {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get()); } TEST_F(CompactionIteratorWithSnapshotCheckerTest, @@ -886,8 +888,9 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, 2 /*earliest_write_conflict_snapshot*/); } -// Compaction filter should keep uncommitted key as-is, and trigger on the -// first committed version of a key. +// Compaction filter should keep uncommitted key as-is, and +// * Convert the latest velue to deletion, and/or +// * if latest value is a merge, apply filter to all suequent merges. TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) { std::unique_ptr compaction_filter( @@ -915,22 +918,18 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) { } TEST_F(CompactionIteratorWithSnapshotCheckerTest, - DISABLED_CompactionFilter_PartialMerge) { + CompactionFilter_PartialMerge) { std::shared_ptr merge_op = MergeOperators::CreateStringAppendOperator(); std::unique_ptr compaction_filter( new FilterAllKeysCompactionFilter()); - RunTest( - {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), - test::KeyStr("a", 1, kTypeMerge)}, - {"v3", "v2", "v1"}, - {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)}, - {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(), - compaction_filter.get()); + RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), + test::KeyStr("a", 1, kTypeMerge)}, + {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"}, + 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get()); } -TEST_F(CompactionIteratorWithSnapshotCheckerTest, - DISABLED_CompactionFilter_FullMerge) { +TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) { std::shared_ptr merge_op = MergeOperators::CreateStringAppendOperator(); std::unique_ptr compaction_filter( @@ -939,7 +938,7 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), test::KeyStr("a", 1, kTypeValue)}, {"v3", "v2", "v1"}, - {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)}, + {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)}, {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get()); } diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4caf43c74..440d64879 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -747,8 +747,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { compaction_filter, db_options_.info_log.get(), false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), - compact_->compaction->level(), db_options_.statistics.get(), - shutting_down_); + snapshot_checker_, compact_->compaction->level(), + db_options_.statistics.get(), shutting_down_); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index a1b3ec43c..e109422fa 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -14,10 +14,43 @@ namespace rocksdb { +class TestReadCallback : public ReadCallback { + public: + TestReadCallback(SnapshotChecker* snapshot_checker, + SequenceNumber snapshot_seq) + : snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} + + bool IsCommitted(SequenceNumber seq) override { + return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_); + } + + private: + SnapshotChecker* snapshot_checker_; + SequenceNumber snapshot_seq_; +}; + // Test merge operator functionality. class DBMergeOperatorTest : public DBTestBase { public: DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {} + + std::string GetWithReadCallback(SnapshotChecker* snapshot_checker, + const Slice& key, + const Snapshot* snapshot = nullptr) { + SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber() + : snapshot->GetSequenceNumber(); + TestReadCallback read_callback(snapshot_checker, seq); + ReadOptions read_opt; + read_opt.snapshot = snapshot; + PinnableSlice value; + Status s = + dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value, + nullptr /*value_found*/, &read_callback); + if (!s.ok()) { + return s.ToString(); + } + return value.ToString(); + } }; TEST_F(DBMergeOperatorTest, LimitMergeOperands) { @@ -508,6 +541,93 @@ TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) { } #endif // ROCKSDB_LITE +TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + DestroyAndReopen(options); + + class TestSnapshotChecker : public SnapshotChecker { + bool IsInSnapshot(SequenceNumber seq, + SequenceNumber snapshot_seq) const override { + switch (snapshot_seq) { + case 0: + return seq == 0; + case 1: + return seq <= 1; + case 2: + // seq = 2 not visible to snapshot with seq = 2 + return seq <= 1; + case 3: + return seq <= 3; + case 4: + // seq = 4 not visible to snpahost with seq = 4 + return seq <= 3; + default: + // seq >=4 is uncommitted + return seq <= 4; + }; + } + }; + TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker(); + dbfull()->SetSnapshotChecker(snapshot_checker); + + std::string value; + ASSERT_OK(Merge("foo", "v1")); + ASSERT_EQ(1, db_->GetLatestSequenceNumber()); + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo")); + ASSERT_OK(Merge("foo", "v2")); + ASSERT_EQ(2, db_->GetLatestSequenceNumber()); + // v2 is not visible to latest snapshot, which has seq = 2. + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo")); + // Take a snapshot with seq = 2. + const Snapshot* snapshot1 = db_->GetSnapshot(); + ASSERT_EQ(2, snapshot1->GetSequenceNumber()); + // v2 is not visible to snapshot1, which has seq = 2 + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1)); + + // Verify flush doesn't alter the result. + ASSERT_OK(Flush()); + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1)); + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo")); + + ASSERT_OK(Merge("foo", "v3")); + ASSERT_EQ(3, db_->GetLatestSequenceNumber()); + ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo")); + ASSERT_OK(Merge("foo", "v4")); + ASSERT_EQ(4, db_->GetLatestSequenceNumber()); + // v4 is not visible to latest snapshot, which has seq = 4. + ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo")); + const Snapshot* snapshot2 = db_->GetSnapshot(); + ASSERT_EQ(4, snapshot2->GetSequenceNumber()); + // v4 is not visible to snapshot2, which has seq = 4. + ASSERT_EQ("v1,v2,v3", + GetWithReadCallback(snapshot_checker, "foo", snapshot2)); + + // Verify flush doesn't alter the result. + ASSERT_OK(Flush()); + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1)); + ASSERT_EQ("v1,v2,v3", + GetWithReadCallback(snapshot_checker, "foo", snapshot2)); + ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo")); + + ASSERT_OK(Merge("foo", "v5")); + ASSERT_EQ(5, db_->GetLatestSequenceNumber()); + // v5 is uncommitted + ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo")); + + // full manual compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Verify compaction doesn't alter the result. + ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1)); + ASSERT_EQ("v1,v2,v3", + GetWithReadCallback(snapshot_checker, "foo", snapshot2)); + ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo")); + + db_->ReleaseSnapshot(snapshot1); + db_->ReleaseSnapshot(snapshot2); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 55f8254cf..a1489dedc 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -5,12 +5,12 @@ #include "db/merge_helper.h" -#include #include #include "db/dbformat.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" +#include "port/likely.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" @@ -22,7 +22,8 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator, const MergeOperator* user_merge_operator, const CompactionFilter* compaction_filter, Logger* logger, bool assert_valid_internal_key, - SequenceNumber latest_snapshot, int level, + SequenceNumber latest_snapshot, + const SnapshotChecker* snapshot_checker, int level, Statistics* stats, const std::atomic* shutting_down) : env_(env), @@ -34,6 +35,7 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator, assert_valid_internal_key_(assert_valid_internal_key), allow_single_operand_(false), latest_snapshot_(latest_snapshot), + snapshot_checker_(snapshot_checker), level_(level), keys_(), filter_timer_(env_), @@ -158,7 +160,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // hit a different user key, stop right here hit_the_next_user_key = true; break; - } else if (stop_before && ikey.sequence <= stop_before) { + } else if (stop_before > 0 && ikey.sequence <= stop_before && + LIKELY(snapshot_checker_ == nullptr || + snapshot_checker_->IsInSnapshot(ikey.sequence, + stop_before))) { // hit an entry that's visible by the previous snapshot, can't touch that break; } diff --git a/db/merge_helper.h b/db/merge_helper.h index b9ef12a4c..db941808e 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -13,6 +13,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" #include "db/range_del_aggregator.h" +#include "db/snapshot_checker.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" @@ -33,7 +34,8 @@ class MergeHelper { const MergeOperator* user_merge_operator, const CompactionFilter* compaction_filter, Logger* logger, bool assert_valid_internal_key, SequenceNumber latest_snapshot, - int level = 0, Statistics* stats = nullptr, + const SnapshotChecker* snapshot_checker = nullptr, int level = 0, + Statistics* stats = nullptr, const std::atomic* shutting_down = nullptr); // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. @@ -145,6 +147,7 @@ class MergeHelper { bool assert_valid_internal_key_; // enforce no internal key corruption? bool allow_single_operand_; SequenceNumber latest_snapshot_; + const SnapshotChecker* const snapshot_checker_; int level_; // the scratch area that holds the result of MergeUntil