diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 613e40a8e..8eaefb3f8 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -537,6 +537,11 @@ void CompactionIterator::NextFromInput() { // // Note: Dropping this Delete will not affect TransactionDB // write-conflict checking since it is earlier than any snapshot. + // + // It seems that we can also drop deletion later than earliest snapshot + // given that: + // (1) The deletion is earlier than earliest_write_conflict_snapshot, and + // (2) No value exist earlier than the deletion. ++iter_stats_.num_record_drop_obsolete; if (!bottommost_level_) { ++iter_stats_.num_optimized_del_drop_obsolete; @@ -621,9 +626,12 @@ void CompactionIterator::PrepareOutput() { // and the earliest snapshot is larger than this seqno // and the userkey differs from the last userkey in compaction // then we can squash the seqno to zero. - + // // This is safe for TransactionDB write-conflict checking since transactions // only care about sequence number larger than any active snapshots. + // + // Can we do the same for levels above bottom level as long as + // KeyNotExistsBeyondOutputLevel() return true? if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && ikeyNotNeededForIncrementalSnapshot() && diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 7c8919023..86ff51fdc 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -9,8 +9,10 @@ #include #include "port/port.h" +#include "util/string_util.h" #include "util/testharness.h" #include "util/testutil.h" +#include "utilities/merge_operators.h" namespace rocksdb { @@ -39,9 +41,9 @@ class NoMergingMergeOp : public MergeOperator { // Always returns Decition::kRemove. class StallingFilter : public CompactionFilter { public: - virtual Decision FilterV2(int level, const Slice& key, ValueType t, - const Slice& existing_value, std::string* new_value, - std::string* skip_until) const override { + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { int k = std::atoi(key.ToString().c_str()); last_seen.store(k); while (k >= stall_at.load()) { @@ -72,6 +74,18 @@ class StallingFilter : public CompactionFilter { mutable std::atomic last_seen{0}; }; +// Compaction filter that filter out all keys. +class FilterAllKeysCompactionFilter : public CompactionFilter { + public: + Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + return Decision::kRemove; + } + + const char* Name() const override { return "AllKeysCompactionFilter"; } +}; + class LoggingForwardVectorIterator : public InternalIterator { public: struct Action { @@ -144,76 +158,145 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { public: FakeCompaction() = default; - virtual int level(size_t compaction_input_level) const { return 0; } + virtual int level(size_t compaction_input_level) const override { return 0; } virtual bool KeyNotExistsBeyondOutputLevel( - const Slice& user_key, std::vector* level_ptrs) const { - return key_not_exists_beyond_output_level; + const Slice& user_key, std::vector* level_ptrs) const override { + return is_bottommost_level || key_not_exists_beyond_output_level; } - virtual bool bottommost_level() const { return false; } - virtual int number_levels() const { return 1; } - virtual Slice GetLargestUserKey() const { + virtual bool bottommost_level() const override { return is_bottommost_level; } + virtual int number_levels() const override { return 1; } + virtual Slice GetLargestUserKey() const override { return "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; } - virtual bool allow_ingest_behind() const { return false; } + virtual bool allow_ingest_behind() const override { return false; } - virtual bool preserve_deletes() const {return false; } + virtual bool preserve_deletes() const override { return false; } bool key_not_exists_beyond_output_level = false; + + bool is_bottommost_level = false; }; -class CompactionIteratorTest : public testing::Test { +// A simplifed snapshot checker which assumes each snapshot has a global +// last visible sequence. +class TestSnapshotChecker : public SnapshotChecker { + public: + explicit TestSnapshotChecker( + SequenceNumber last_committed_sequence, + const std::unordered_map& snapshots = {}) + : last_committed_sequence_(last_committed_sequence), + snapshots_(snapshots) {} + + bool IsInSnapshot(SequenceNumber seq, + SequenceNumber snapshot_seq) const override { + if (snapshot_seq == kMaxSequenceNumber) { + return seq <= last_committed_sequence_; + } + assert(snapshots_.count(snapshot_seq) > 0); + return seq <= snapshots_.at(snapshot_seq); + } + + private: + SequenceNumber last_committed_sequence_; + // A map of valid snapshot to last visible sequence to the snapshot. + std::unordered_map snapshots_; +}; + +// Test param: +// bool: whether to pass snapshot_checker to compaction iterator. +class CompactionIteratorTest : public testing::TestWithParam { public: CompactionIteratorTest() : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} - void InitIterators(const std::vector& ks, - const std::vector& vs, - const std::vector& range_del_ks, - const std::vector& range_del_vs, - SequenceNumber last_sequence, - MergeOperator* merge_op = nullptr, - CompactionFilter* filter = nullptr) { + void InitIterators( + const std::vector& ks, const std::vector& vs, + const std::vector& range_del_ks, + const std::vector& range_del_vs, + SequenceNumber last_sequence, + SequenceNumber last_committed_sequence = kMaxSequenceNumber, + MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr, + bool bottommost_level = false, + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { std::unique_ptr range_del_iter( new test::VectorIterator(range_del_ks, range_del_vs)); range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_)); ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter))); std::unique_ptr compaction; - if (filter) { + if (filter || bottommost_level) { compaction_proxy_ = new FakeCompaction(); + compaction_proxy_->is_bottommost_level = bottommost_level; compaction.reset(compaction_proxy_); } - // TODO(yiwu) add a mock snapshot checker and add test for it. - SnapshotChecker* snapshot_checker = nullptr; - merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false, 0, 0, nullptr, &shutting_down_)); + bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); + if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) { + snapshot_checker_.reset( + new TestSnapshotChecker(last_committed_sequence, snapshot_map_)); + } + iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - kMaxSequenceNumber, snapshot_checker, Env::Default(), false, - range_del_agg_.get(), std::move(compaction), filter, nullptr, - &shutting_down_)); + earliest_write_conflict_snapshot, snapshot_checker_.get(), + Env::Default(), false, range_del_agg_.get(), std::move(compaction), + filter, nullptr, &shutting_down_)); } - void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } + void AddSnapshot(SequenceNumber snapshot, + SequenceNumber last_visible_seq = kMaxSequenceNumber) { + snapshots_.push_back(snapshot); + snapshot_map_[snapshot] = last_visible_seq; + } + + virtual bool UseSnapshotChecker() const { return false; } + + void RunTest( + const std::vector& input_keys, + const std::vector& input_values, + const std::vector& expected_keys, + const std::vector& expected_values, + SequenceNumber last_committed_seq = kMaxSequenceNumber, + MergeOperator* merge_operator = nullptr, + CompactionFilter* compaction_filter = nullptr, + bool bottommost_level = false, + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { + InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber, + last_committed_seq, merge_operator, compaction_filter, + bottommost_level, earliest_write_conflict_snapshot); + c_iter_->SeekToFirst(); + for (size_t i = 0; i < expected_keys.size(); i++) { + std::string info = "i = " + ToString(i); + ASSERT_TRUE(c_iter_->Valid()) << info; + ASSERT_OK(c_iter_->status()) << info; + ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info; + ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info; + c_iter_->Next(); + } + ASSERT_FALSE(c_iter_->Valid()); + } const Comparator* cmp_; const InternalKeyComparator icmp_; std::vector snapshots_; + // A map of valid snapshot to last visible sequence to the snapshot. + std::unordered_map snapshot_map_; std::unique_ptr merge_helper_; std::unique_ptr iter_; std::unique_ptr c_iter_; std::unique_ptr range_del_agg_; + std::unique_ptr snapshot_checker_; std::atomic shutting_down_{false}; FakeCompaction* compaction_proxy_; }; // It is possible that the output of the compaction iterator is empty even if // the input is not. -TEST_F(CompactionIteratorTest, EmptyResult) { +TEST_P(CompactionIteratorTest, EmptyResult) { InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), test::KeyStr("a", 3, kTypeValue)}, {"", "val"}, {}, {}, 5); @@ -223,7 +306,7 @@ TEST_F(CompactionIteratorTest, EmptyResult) { // If there is a corruption after a single deletion, the corrupted key should // be preserved. -TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { +TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) { InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), test::KeyStr("a", 3, kTypeValue, true), test::KeyStr("b", 10, kTypeValue)}, @@ -242,7 +325,7 @@ TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { ASSERT_FALSE(c_iter_->Valid()); } -TEST_F(CompactionIteratorTest, SimpleRangeDeletion) { +TEST_P(CompactionIteratorTest, SimpleRangeDeletion) { InitIterators({test::KeyStr("morning", 5, kTypeValue), test::KeyStr("morning", 2, kTypeValue), test::KeyStr("night", 3, kTypeValue)}, @@ -258,7 +341,7 @@ TEST_F(CompactionIteratorTest, SimpleRangeDeletion) { ASSERT_FALSE(c_iter_->Valid()); } -TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { +TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) { AddSnapshot(10); std::vector ks1; ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion)); @@ -279,7 +362,7 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { ASSERT_FALSE(c_iter_->Valid()); } -TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { +TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) { class Filter : public CompactionFilter { virtual Decision FilterV2(int level, const Slice& key, ValueType t, const Slice& existing_value, @@ -354,7 +437,7 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { test::KeyStr("j", 99, kTypeValue)}, {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30", "fv25", "gv90", "hv91", "im95", "jv99"}, - {}, {}, kMaxSequenceNumber, &merge_op, &filter); + {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter); // Compaction should output just "a", "e" and "h" keys. c_iter_->SeekToFirst(); @@ -389,13 +472,14 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { ASSERT_EQ(expected_actions, iter_->log); } -TEST_F(CompactionIteratorTest, ShuttingDownInFilter) { +TEST_P(CompactionIteratorTest, ShuttingDownInFilter) { NoMergingMergeOp merge_op; StallingFilter filter; InitIterators( {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue), test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)}, - {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter); + {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, + &merge_op, &filter); // Don't leave tombstones (kTypeDeletion) for filtered keys. compaction_proxy_->key_not_exists_beyond_output_level = true; @@ -426,13 +510,14 @@ TEST_F(CompactionIteratorTest, ShuttingDownInFilter) { // Same as ShuttingDownInFilter, but shutdown happens during filter call for // a merge operand, not for a value. -TEST_F(CompactionIteratorTest, ShuttingDownInMerge) { +TEST_P(CompactionIteratorTest, ShuttingDownInMerge) { NoMergingMergeOp merge_op; StallingFilter filter; InitIterators( {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge), test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)}, - {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter); + {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, + &merge_op, &filter); compaction_proxy_->key_not_exists_beyond_output_level = true; std::atomic seek_done{false}; @@ -460,7 +545,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) { EXPECT_EQ(2, filter.last_seen.load()); } -TEST_F(CompactionIteratorTest, SingleMergeOperand) { +TEST_P(CompactionIteratorTest, SingleMergeOperand) { class Filter : public CompactionFilter { virtual Decision FilterV2(int level, const Slice& key, ValueType t, const Slice& existing_value, @@ -552,7 +637,7 @@ TEST_F(CompactionIteratorTest, SingleMergeOperand) { // c should invoke FullMerge due to kTypeValue at the beginning. test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)}, {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber, - &merge_op, &filter); + kMaxSequenceNumber, &merge_op, &filter); c_iter_->SeekToFirst(); ASSERT_TRUE(c_iter_->Valid()); @@ -565,6 +650,300 @@ TEST_F(CompactionIteratorTest, SingleMergeOperand) { ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); } +// In bottommost level, values earlier than earliest snapshot can be output +// with sequence = 0. +TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { + AddSnapshot(1); + RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)}, + {"v1", "v2"}, + {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)}, + {"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/); +} + +// In bottommost level, deletions earlier than earliest snapshot can be removed +// permanently. +TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) { + AddSnapshot(1); + RunTest({test::KeyStr("a", 1, kTypeDeletion), + test::KeyStr("b", 2, kTypeDeletion)}, + {"", ""}, {test::KeyStr("b", 2, kTypeDeletion)}, {""}, + kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/, + nullptr /*compaction_filter*/, true /*bottommost_level*/); +} + +// In bottommost level, single deletions earlier than earliest snapshot can be +// removed permanently. +TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) { + AddSnapshot(1); + RunTest({test::KeyStr("a", 1, kTypeSingleDeletion), + test::KeyStr("b", 2, kTypeSingleDeletion)}, + {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""}, + kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/, + nullptr /*compaction_filter*/, true /*bottommost_level*/); +} + +INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest, + testing::Values(true, false)); + +// Tests how CompactionIterator work together with SnapshotChecker. +class CompactionIteratorWithSnapshotCheckerTest + : public CompactionIteratorTest { + public: + bool UseSnapshotChecker() const override { return true; } +}; + +// Uncommitted keys (keys with seq > last_committed_seq) should be output as-is +// while committed version of these keys should get compacted as usual. + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + PreserveUncommittedKeys_Value) { + RunTest( + {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue), + test::KeyStr("foo", 1, kTypeValue)}, + {"v3", "v2", "v1"}, + {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)}, + {"v3", "v2"}, 2 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + PreserveUncommittedKeys_Deletion) { + RunTest({test::KeyStr("foo", 2, kTypeDeletion), + test::KeyStr("foo", 1, kTypeValue)}, + {"", "v1"}, + {test::KeyStr("foo", 2, kTypeDeletion), + test::KeyStr("foo", 1, kTypeValue)}, + {"", "v1"}, 1 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + PreserveUncommittedKeys_Merge) { + auto merge_op = MergeOperators::CreateStringAppendOperator(); + RunTest( + {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge), + test::KeyStr("foo", 1, kTypeValue)}, + {"v3", "v2", "v1"}, + {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)}, + {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + PreserveUncommittedKeys_SingleDelete) { + RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion), + test::KeyStr("foo", 1, kTypeValue)}, + {"", "v1"}, + {test::KeyStr("foo", 2, kTypeSingleDeletion), + test::KeyStr("foo", 1, kTypeValue)}, + {"", "v1"}, 1 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + PreserveUncommittedKeys_BlobIndex) { + RunTest({test::KeyStr("foo", 3, kTypeBlobIndex), + test::KeyStr("foo", 2, kTypeBlobIndex), + test::KeyStr("foo", 1, kTypeBlobIndex)}, + {"v3", "v2", "v1"}, + {test::KeyStr("foo", 3, kTypeBlobIndex), + test::KeyStr("foo", 2, kTypeBlobIndex)}, + {"v3", "v2"}, 2 /*last_committed_seq*/); +} + +// Test compaction iterator dedup keys visible to the same snapshot. + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) { + AddSnapshot(2, 1); + RunTest( + {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue), + test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "v3", "v2", "v1"}, + {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue), + test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "v3", "v1"}, 3 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) { + AddSnapshot(2, 1); + RunTest( + {test::KeyStr("foo", 4, kTypeValue), + test::KeyStr("foo", 3, kTypeDeletion), + test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "", "v2", "v1"}, + {test::KeyStr("foo", 4, kTypeValue), + test::KeyStr("foo", 3, kTypeDeletion), + test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "", "v1"}, 3 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + DISABLED_DedupSameSnapshot_Merge) { + AddSnapshot(2, 1); + auto merge_op = MergeOperators::CreateStringAppendOperator(); + RunTest( + {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge), + test::KeyStr("foo", 2, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "v3", "v2", "v1"}, + {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge), + test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "v2,v3", "v1"}, 3 /*last_committed_seq*/, merge_op.get()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + DedupSameSnapshot_SingleDeletion) { + AddSnapshot(2, 1); + RunTest( + {test::KeyStr("foo", 4, kTypeValue), + test::KeyStr("foo", 3, kTypeSingleDeletion), + test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "", "v2", "v1"}, + {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)}, + {"v4", "v1"}, 3 /*last_committed_seq*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) { + AddSnapshot(2, 1); + RunTest({test::KeyStr("foo", 4, kTypeBlobIndex), + test::KeyStr("foo", 3, kTypeBlobIndex), + test::KeyStr("foo", 2, kTypeBlobIndex), + test::KeyStr("foo", 1, kTypeBlobIndex)}, + {"v4", "v3", "v2", "v1"}, + {test::KeyStr("foo", 4, kTypeBlobIndex), + test::KeyStr("foo", 3, kTypeBlobIndex), + test::KeyStr("foo", 1, kTypeBlobIndex)}, + {"v4", "v3", "v1"}, 3 /*last_committed_seq*/); +} + +// At bottom level, sequence numbers can be zero out, and deletions can be +// removed, but only when they are visible to earliest snapshot. + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) { + AddSnapshot(2, 1); + RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue), + test::KeyStr("c", 3, kTypeValue)}, + {"v1", "v2", "v3"}, + {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue), + test::KeyStr("c", 3, kTypeValue)}, + {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + NotRemoveDeletionIfNotVisibleToEarliestSnapshot) { + AddSnapshot(2, 1); + RunTest( + {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion), + test::KeyStr("c", 3, kTypeDeletion)}, + {"", "", ""}, + {test::KeyStr("b", 2, kTypeDeletion), + test::KeyStr("c", 3, kTypeDeletion)}, + {"", ""}, kMaxSequenceNumber /*last_commited_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) { + AddSnapshot(2, 1); + RunTest({test::KeyStr("a", 1, kTypeSingleDeletion), + test::KeyStr("b", 2, kTypeSingleDeletion), + test::KeyStr("c", 3, kTypeSingleDeletion)}, + {"", "", ""}, + {test::KeyStr("b", 2, kTypeSingleDeletion), + test::KeyStr("c", 3, kTypeSingleDeletion)}, + {"", ""}, kMaxSequenceNumber /*last_commited_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/); +} + +// Single delete should not cancel out values that not visible to the +// same set of snapshots +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + SingleDeleteAcrossSnapshotBoundary) { + AddSnapshot(2, 1); + RunTest({test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", "v1"}, + {test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", "v1"}, 2 /*last_committed_seq*/); +} + +// Single delete should be kept in case it is not visible to the +// earliest write conflict snapshot. If a single delete is kept for this reason, +// corresponding value can be trimmed to save space. +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + KeepSingleDeletionForWriteConflictChecking) { + AddSnapshot(2, 0); + RunTest({test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", "v1"}, + {test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/, + nullptr /*compaction_filter*/, false /*bottommost_level*/, + 2 /*earliest_write_conflict_snapshot*/); +} + +// Compaction filter should keep uncommitted key as-is, and trigger on the +// first committed version of a key. + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) { + std::unique_ptr compaction_filter( + new FilterAllKeysCompactionFilter()); + RunTest( + {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue), + test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)}, + {"v2", "v1", "v3", "v4"}, + {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion), + test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)}, + {"v2", "", "v3", ""}, 1 /*last_committed_seq*/, + nullptr /*merge_operator*/, compaction_filter.get()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) { + std::unique_ptr compaction_filter( + new FilterAllKeysCompactionFilter()); + RunTest( + {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)}, + {"", "v1"}, + {test::KeyStr("a", 2, kTypeDeletion), + test::KeyStr("a", 1, kTypeDeletion)}, + {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/, + compaction_filter.get()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + DISABLED_CompactionFilter_PartialMerge) { + std::shared_ptr merge_op = + MergeOperators::CreateStringAppendOperator(); + std::unique_ptr compaction_filter( + new FilterAllKeysCompactionFilter()); + RunTest( + {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), + test::KeyStr("a", 1, kTypeMerge)}, + {"v3", "v2", "v1"}, + {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)}, + {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(), + compaction_filter.get()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + DISABLED_CompactionFilter_FullMerge) { + std::shared_ptr merge_op = + MergeOperators::CreateStringAppendOperator(); + std::unique_ptr compaction_filter( + new FilterAllKeysCompactionFilter()); + RunTest( + {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge), + test::KeyStr("a", 1, kTypeValue)}, + {"v3", "v2", "v1"}, + {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)}, + {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(), + compaction_filter.get()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7cf97dab5..e59fe3b7e 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -18,9 +18,34 @@ namespace rocksdb { -class DBIteratorTest : public DBTestBase { +// A dumb ReadCallback which saying every key is committed. +class DummyReadCallback : public ReadCallback { + bool IsCommitted(SequenceNumber /*seq*/) { return true; } +}; + +// Test param: +// bool: whether to pass read_callback to NewIterator(). +class DBIteratorTest : public DBTestBase, + public testing::WithParamInterface { public: DBIteratorTest() : DBTestBase("/db_iterator_test") {} + + Iterator* NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family = nullptr) { + if (column_family == nullptr) { + column_family = db_->DefaultColumnFamily(); + } + auto* cfd = reinterpret_cast(column_family)->cfd(); + SequenceNumber seq = read_options.snapshot != nullptr + ? read_options.snapshot->GetSequenceNumber() + : db_->GetLatestSequenceNumber(); + bool use_read_callback = GetParam(); + ReadCallback* read_callback = use_read_callback ? &read_callback_ : nullptr; + return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback); + } + + private: + DummyReadCallback read_callback_; }; class FlushBlockEveryKeyPolicy : public FlushBlockPolicy { @@ -51,7 +76,7 @@ class FlushBlockEveryKeyPolicyFactory : public FlushBlockPolicyFactory { } }; -TEST_F(DBIteratorTest, IteratorProperty) { +TEST_P(DBIteratorTest, IteratorProperty) { // The test needs to be changed if kPersistedTier is supported in iterator. Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu"}, options); @@ -59,7 +84,7 @@ TEST_F(DBIteratorTest, IteratorProperty) { ReadOptions ropt; ropt.pin_data = false; { - unique_ptr iter(db_->NewIterator(ropt, handles_[1])); + unique_ptr iter(NewIterator(ropt, handles_[1])); iter->SeekToFirst(); std::string prop_value; ASSERT_NOK(iter->GetProperty("non_existing.value", &prop_value)); @@ -72,7 +97,7 @@ TEST_F(DBIteratorTest, IteratorProperty) { Close(); } -TEST_F(DBIteratorTest, PersistedTierOnIterator) { +TEST_P(DBIteratorTest, PersistedTierOnIterator) { // The test needs to be changed if kPersistedTier is supported in iterator. Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu"}, options); @@ -88,7 +113,7 @@ TEST_F(DBIteratorTest, PersistedTierOnIterator) { Close(); } -TEST_F(DBIteratorTest, NonBlockingIteration) { +TEST_P(DBIteratorTest, NonBlockingIteration) { do { ReadOptions non_blocking_opts, regular_opts; Options options = CurrentOptions(); @@ -100,7 +125,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) { // scan using non-blocking iterator. We should find it because // it is in memtable. - Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]); + Iterator* iter = NewIterator(non_blocking_opts, handles_[1]); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -117,7 +142,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) { // kvs. Neither does it do any IOs to storage. uint64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS); uint64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); - iter = db_->NewIterator(non_blocking_opts, handles_[1]); + iter = NewIterator(non_blocking_opts, handles_[1]); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { count++; @@ -134,7 +159,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) { // verify that we can find it via a non-blocking scan numopen = TestGetTickerCount(options, NO_FILE_OPENS); cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); - iter = db_->NewIterator(non_blocking_opts, handles_[1]); + iter = NewIterator(non_blocking_opts, handles_[1]); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -153,7 +178,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) { } #ifndef ROCKSDB_LITE -TEST_F(DBIteratorTest, ManagedNonBlockingIteration) { +TEST_P(DBIteratorTest, ManagedNonBlockingIteration) { do { ReadOptions non_blocking_opts, regular_opts; Options options = CurrentOptions(); @@ -166,7 +191,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) { // scan using non-blocking iterator. We should find it because // it is in memtable. - Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]); + Iterator* iter = NewIterator(non_blocking_opts, handles_[1]); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -183,7 +208,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) { // kvs. Neither does it do any IOs to storage. int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS); int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); - iter = db_->NewIterator(non_blocking_opts, handles_[1]); + iter = NewIterator(non_blocking_opts, handles_[1]); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { count++; @@ -200,7 +225,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) { // verify that we can find it via a non-blocking scan numopen = TestGetTickerCount(options, NO_FILE_OPENS); cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); - iter = db_->NewIterator(non_blocking_opts, handles_[1]); + iter = NewIterator(non_blocking_opts, handles_[1]); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -219,7 +244,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) { } #endif // ROCKSDB_LITE -TEST_F(DBIteratorTest, IterSeekBeforePrev) { +TEST_P(DBIteratorTest, IterSeekBeforePrev) { ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("c", "d")); dbfull()->Flush(FlushOptions()); @@ -227,7 +252,7 @@ TEST_F(DBIteratorTest, IterSeekBeforePrev) { ASSERT_OK(Put("1", "h")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("2", "j")); - auto iter = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); iter->Seek(Slice("c")); iter->Prev(); iter->Seek(Slice("a")); @@ -235,7 +260,7 @@ TEST_F(DBIteratorTest, IterSeekBeforePrev) { delete iter; } -TEST_F(DBIteratorTest, IterSeekForPrevBeforeNext) { +TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) { ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("c", "d")); dbfull()->Flush(FlushOptions()); @@ -243,7 +268,7 @@ TEST_F(DBIteratorTest, IterSeekForPrevBeforeNext) { ASSERT_OK(Put("1", "h")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("2", "j")); - auto iter = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); iter->SeekForPrev(Slice("0")); iter->Next(); iter->SeekForPrev(Slice("1")); @@ -257,7 +282,7 @@ std::string MakeLongKey(size_t length, char c) { } } // namespace -TEST_F(DBIteratorTest, IterLongKeys) { +TEST_P(DBIteratorTest, IterLongKeys) { ASSERT_OK(Put(MakeLongKey(20, 0), "0")); ASSERT_OK(Put(MakeLongKey(32, 2), "2")); ASSERT_OK(Put("a", "b")); @@ -265,7 +290,7 @@ TEST_F(DBIteratorTest, IterLongKeys) { ASSERT_OK(Put(MakeLongKey(50, 1), "1")); ASSERT_OK(Put(MakeLongKey(127, 3), "3")); ASSERT_OK(Put(MakeLongKey(64, 4), "4")); - auto iter = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); // Create a key that needs to be skipped for Seq too new iter->Seek(MakeLongKey(20, 0)); @@ -287,7 +312,7 @@ TEST_F(DBIteratorTest, IterLongKeys) { ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1"); delete iter; - iter = db_->NewIterator(ReadOptions()); + iter = NewIterator(ReadOptions()); iter->Seek(MakeLongKey(50, 1)); ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1"); iter->Next(); @@ -297,13 +322,13 @@ TEST_F(DBIteratorTest, IterLongKeys) { delete iter; } -TEST_F(DBIteratorTest, IterNextWithNewerSeq) { +TEST_P(DBIteratorTest, IterNextWithNewerSeq) { ASSERT_OK(Put("0", "0")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("c", "d")); ASSERT_OK(Put("d", "e")); - auto iter = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); // Create a key that needs to be skipped for Seq too new for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1; @@ -323,13 +348,13 @@ TEST_F(DBIteratorTest, IterNextWithNewerSeq) { delete iter; } -TEST_F(DBIteratorTest, IterPrevWithNewerSeq) { +TEST_P(DBIteratorTest, IterPrevWithNewerSeq) { ASSERT_OK(Put("0", "0")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("c", "d")); ASSERT_OK(Put("d", "e")); - auto iter = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); // Create a key that needs to be skipped for Seq too new for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1; @@ -354,14 +379,14 @@ TEST_F(DBIteratorTest, IterPrevWithNewerSeq) { delete iter; } -TEST_F(DBIteratorTest, IterPrevWithNewerSeq2) { +TEST_P(DBIteratorTest, IterPrevWithNewerSeq2) { ASSERT_OK(Put("0", "0")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("a", "b")); ASSERT_OK(Put("c", "d")); ASSERT_OK(Put("e", "f")); - auto iter = db_->NewIterator(ReadOptions()); - auto iter2 = db_->NewIterator(ReadOptions()); + auto iter = NewIterator(ReadOptions()); + auto iter2 = NewIterator(ReadOptions()); iter->Seek(Slice("c")); iter2->SeekForPrev(Slice("d")); ASSERT_EQ(IterStatus(iter), "c->d"); @@ -383,10 +408,10 @@ TEST_F(DBIteratorTest, IterPrevWithNewerSeq2) { delete iter2; } -TEST_F(DBIteratorTest, IterEmpty) { +TEST_P(DBIteratorTest, IterEmpty) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "(invalid)"); @@ -404,11 +429,11 @@ TEST_F(DBIteratorTest, IterEmpty) { } while (ChangeCompactOptions()); } -TEST_F(DBIteratorTest, IterSingle) { +TEST_P(DBIteratorTest, IterSingle) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "a", "va")); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->va"); @@ -455,13 +480,13 @@ TEST_F(DBIteratorTest, IterSingle) { } while (ChangeCompactOptions()); } -TEST_F(DBIteratorTest, IterMulti) { +TEST_P(DBIteratorTest, IterMulti) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "a", "va")); ASSERT_OK(Put(1, "b", "vb")); ASSERT_OK(Put(1, "c", "vc")); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->va"); @@ -554,7 +579,7 @@ TEST_F(DBIteratorTest, IterMulti) { // Check that we can skip over a run of user keys // by using reseek rather than sequential scan -TEST_F(DBIteratorTest, IterReseek) { +TEST_P(DBIteratorTest, IterReseek) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; Options options = CurrentOptions(options_override); @@ -571,7 +596,7 @@ TEST_F(DBIteratorTest, IterReseek) { ASSERT_OK(Put(1, "a", "one")); ASSERT_OK(Put(1, "a", "two")); ASSERT_OK(Put(1, "b", "bone")); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0); ASSERT_EQ(IterStatus(iter), "a->two"); @@ -583,7 +608,7 @@ TEST_F(DBIteratorTest, IterReseek) { // insert a total of three keys with same userkey and verify // that reseek is still not invoked. ASSERT_OK(Put(1, "a", "three")); - iter = db_->NewIterator(ReadOptions(), handles_[1]); + iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->three"); iter->Next(); @@ -594,7 +619,7 @@ TEST_F(DBIteratorTest, IterReseek) { // insert a total of four keys with same userkey and verify // that reseek is invoked. ASSERT_OK(Put(1, "a", "four")); - iter = db_->NewIterator(ReadOptions(), handles_[1]); + iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->four"); ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0); @@ -611,7 +636,7 @@ TEST_F(DBIteratorTest, IterReseek) { // Insert another version of b and assert that reseek is not invoked ASSERT_OK(Put(1, "b", "btwo")); - iter = db_->NewIterator(ReadOptions(), handles_[1]); + iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToLast(); ASSERT_EQ(IterStatus(iter), "b->btwo"); ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), @@ -626,7 +651,7 @@ TEST_F(DBIteratorTest, IterReseek) { // of b and 4 versions of a. ASSERT_OK(Put(1, "b", "bthree")); ASSERT_OK(Put(1, "b", "bfour")); - iter = db_->NewIterator(ReadOptions(), handles_[1]); + iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToLast(); ASSERT_EQ(IterStatus(iter), "b->bfour"); ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), @@ -640,7 +665,7 @@ TEST_F(DBIteratorTest, IterReseek) { delete iter; } -TEST_F(DBIteratorTest, IterSmallAndLargeMix) { +TEST_P(DBIteratorTest, IterSmallAndLargeMix) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "a", "va")); @@ -649,7 +674,7 @@ TEST_F(DBIteratorTest, IterSmallAndLargeMix) { ASSERT_OK(Put(1, "d", std::string(100000, 'd'))); ASSERT_OK(Put(1, "e", std::string(100000, 'e'))); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->va"); @@ -681,7 +706,7 @@ TEST_F(DBIteratorTest, IterSmallAndLargeMix) { } while (ChangeCompactOptions()); } -TEST_F(DBIteratorTest, IterMultiWithDelete) { +TEST_P(DBIteratorTest, IterMultiWithDelete) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "ka", "va")); @@ -690,7 +715,7 @@ TEST_F(DBIteratorTest, IterMultiWithDelete) { ASSERT_OK(Delete(1, "kb")); ASSERT_EQ("NOT_FOUND", Get(1, "kb")); - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); iter->Seek("kc"); ASSERT_EQ(IterStatus(iter), "kc->vc"); if (!CurrentOptions().merge_operator) { @@ -707,7 +732,7 @@ TEST_F(DBIteratorTest, IterMultiWithDelete) { } while (ChangeOptions()); } -TEST_F(DBIteratorTest, IterPrevMaxSkip) { +TEST_P(DBIteratorTest, IterPrevMaxSkip) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); for (int i = 0; i < 2; i++) { @@ -737,7 +762,7 @@ TEST_F(DBIteratorTest, IterPrevMaxSkip) { } while (ChangeOptions(kSkipMergePut | kSkipNoSeekToLast)); } -TEST_F(DBIteratorTest, IterWithSnapshot) { +TEST_P(DBIteratorTest, IterWithSnapshot) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; do { @@ -751,7 +776,7 @@ TEST_F(DBIteratorTest, IterWithSnapshot) { const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions options; options.snapshot = snapshot; - Iterator* iter = db_->NewIterator(options, handles_[1]); + Iterator* iter = NewIterator(options, handles_[1]); ASSERT_OK(Put(1, "key0", "val0")); // Put more values after the snapshot @@ -804,13 +829,13 @@ TEST_F(DBIteratorTest, IterWithSnapshot) { } while (ChangeOptions(kSkipHashCuckoo)); } -TEST_F(DBIteratorTest, IteratorPinsRef) { +TEST_P(DBIteratorTest, IteratorPinsRef) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); Put(1, "foo", "hello"); // Get iterator that will yield the current contents of the DB. - Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = NewIterator(ReadOptions(), handles_[1]); // Write to force compactions Put(1, "foo", "newvalue1"); @@ -830,7 +855,7 @@ TEST_F(DBIteratorTest, IteratorPinsRef) { } while (ChangeCompactOptions()); } -TEST_F(DBIteratorTest, DBIteratorBoundTest) { +TEST_P(DBIteratorTest, DBIteratorBoundTest) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; @@ -847,7 +872,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { ReadOptions ro; ro.iterate_upper_bound = nullptr; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->Seek("foo"); @@ -884,7 +909,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { Slice prefix("foo2"); ro.iterate_upper_bound = &prefix; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->Seek("foo"); @@ -906,7 +931,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { Slice prefix("foo"); ro.iterate_upper_bound = &prefix; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->SeekToLast(); ASSERT_TRUE(iter->Valid()); @@ -930,7 +955,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { Slice upper_bound("g"); ro.iterate_upper_bound = &upper_bound; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->Seek("foo"); @@ -963,7 +988,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { ReadOptions ro; ro.iterate_upper_bound = nullptr; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->Seek("b"); ASSERT_TRUE(iter->Valid()); @@ -983,7 +1008,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { Slice prefix("c"); ro.iterate_upper_bound = &prefix; - iter.reset(db_->NewIterator(ro)); + iter.reset(NewIterator(ro)); get_perf_context()->Reset(); @@ -1004,7 +1029,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { } } -TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { +TEST_P(DBIteratorTest, DBIteratorBoundOptimizationTest) { int upper_bound_hits = 0; Options options = CurrentOptions(); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -1032,7 +1057,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { ReadOptions ro; ro.iterate_upper_bound = &ub; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); iter->Seek("foo"); ASSERT_TRUE(iter->Valid()); @@ -1050,7 +1075,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { } // TODO(3.13): fix the issue of Seek() + Prev() which might not necessary // return the biggest key which is smaller than the seek key. -TEST_F(DBIteratorTest, PrevAfterAndNextAfterMerge) { +TEST_P(DBIteratorTest, PrevAfterAndNextAfterMerge) { Options options; options.create_if_missing = true; options.merge_operator = MergeOperators::CreatePutOperator(); @@ -1063,7 +1088,7 @@ TEST_F(DBIteratorTest, PrevAfterAndNextAfterMerge) { db_->Merge(wopts, "2", "data2"); db_->Merge(wopts, "3", "data3"); - std::unique_ptr it(db_->NewIterator(ReadOptions())); + std::unique_ptr it(NewIterator(ReadOptions())); it->Seek("2"); ASSERT_TRUE(it->Valid()); @@ -1159,7 +1184,7 @@ class DBIteratorTestForPinnedData : public DBIteratorTest { ReadOptions ro; ro.pin_data = true; - auto iter = db_->NewIterator(ro); + auto iter = NewIterator(ro); { // Test Seek to random keys @@ -1251,25 +1276,25 @@ class DBIteratorTestForPinnedData : public DBIteratorTest { } }; -TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedNormal) { +TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedNormal) { PinnedDataIteratorRandomized(TestConfig::NORMAL); } -TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCLoseAndOpen) { +TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCLoseAndOpen) { PinnedDataIteratorRandomized(TestConfig::CLOSE_AND_OPEN); } -TEST_F(DBIteratorTestForPinnedData, +TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCompactBeforeRead) { PinnedDataIteratorRandomized(TestConfig::COMPACT_BEFORE_READ); } -TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedFlush) { +TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedFlush) { PinnedDataIteratorRandomized(TestConfig::FLUSH_EVERY_1000); } #ifndef ROCKSDB_LITE -TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) { +TEST_P(DBIteratorTest, PinnedDataIteratorMultipleFiles) { Options options = CurrentOptions(); BlockBasedTableOptions table_options; table_options.use_delta_encoding = false; @@ -1318,7 +1343,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) { ReadOptions ro; ro.pin_data = true; - auto iter = db_->NewIterator(ro); + auto iter = NewIterator(ro); std::vector> results; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1340,7 +1365,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) { } #endif -TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) { +TEST_P(DBIteratorTest, PinnedDataIteratorMergeOperator) { Options options = CurrentOptions(); BlockBasedTableOptions table_options; table_options.use_delta_encoding = false; @@ -1373,7 +1398,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) { ReadOptions ro; ro.pin_data = true; - auto iter = db_->NewIterator(ro); + auto iter = NewIterator(ro); std::vector> results; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1400,7 +1425,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) { delete iter; } -TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) { +TEST_P(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) { Options options = CurrentOptions(); BlockBasedTableOptions table_options; table_options.use_delta_encoding = false; @@ -1420,7 +1445,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) { ReadOptions ro; ro.pin_data = true; - auto iter = db_->NewIterator(ro); + auto iter = NewIterator(ro); // Delete 50% of the keys and update the other 50% for (auto& kv : true_data) { @@ -1450,7 +1475,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) { delete iter; } -TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) { +TEST_P(DBIteratorTest, IterSeekForPrevCrossingFiles) { Options options = CurrentOptions(); options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.disable_auto_compactions = true; @@ -1479,7 +1504,7 @@ TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) { MoveFilesToLevel(1); { ReadOptions ro; - Iterator* iter = db_->NewIterator(ro); + Iterator* iter = NewIterator(ro); iter->SeekForPrev("a4"); ASSERT_EQ(iter->key().ToString(), "a3"); @@ -1497,14 +1522,14 @@ TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) { { ReadOptions ro; ro.prefix_same_as_start = true; - Iterator* iter = db_->NewIterator(ro); + Iterator* iter = NewIterator(ro); iter->SeekForPrev("c2"); ASSERT_TRUE(!iter->Valid()); delete iter; } } -TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) { +TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocks) { Options options = CurrentOptions(); BlockBasedTableOptions table_options; table_options.block_size = 1; // every block will contain one entry @@ -1546,7 +1571,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) { { ReadOptions ro; ro.fill_cache = false; - Iterator* iter = db_->NewIterator(ro); + Iterator* iter = NewIterator(ro); iter->SeekToLast(); ASSERT_EQ(iter->key().ToString(), "key5"); @@ -1572,7 +1597,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) { } } -TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { +TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { Options options = CurrentOptions(); options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); options.disable_auto_compactions = true; @@ -1648,7 +1673,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { { ReadOptions ro; ro.fill_cache = false; - Iterator* iter = db_->NewIterator(ro); + Iterator* iter = NewIterator(ro); auto data_iter = true_data.rbegin(); for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { @@ -1664,7 +1689,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { { ReadOptions ro; ro.fill_cache = false; - Iterator* iter = db_->NewIterator(ro); + Iterator* iter = NewIterator(ro); auto data_iter = true_data.rbegin(); int entries_right = 0; @@ -1719,7 +1744,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { } } -TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { +TEST_P(DBIteratorTest, IteratorWithLocalStatistics) { Options options = CurrentOptions(); options.statistics = rocksdb::CreateDBStatistics(); DestroyAndReopen(options); @@ -1740,7 +1765,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { std::function reader_func_next = [&]() { SetPerfLevel(kEnableCount); get_perf_context()->Reset(); - Iterator* iter = db_->NewIterator(ReadOptions()); + Iterator* iter = NewIterator(ReadOptions()); iter->SeekToFirst(); // Seek will bump ITER_BYTES_READ @@ -1767,7 +1792,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { std::function reader_func_prev = [&]() { SetPerfLevel(kEnableCount); - Iterator* iter = db_->NewIterator(ReadOptions()); + Iterator* iter = NewIterator(ReadOptions()); iter->SeekToLast(); // Seek will bump ITER_BYTES_READ @@ -1813,7 +1838,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { } -TEST_F(DBIteratorTest, ReadAhead) { +TEST_P(DBIteratorTest, ReadAhead) { Options options; env_->count_random_reads_ = true; options.env = env_; @@ -1850,7 +1875,7 @@ TEST_F(DBIteratorTest, ReadAhead) { env_->random_read_bytes_counter_ = 0; options.statistics->setTickerCount(NO_FILE_OPENS, 0); ReadOptions read_options; - auto* iter = db_->NewIterator(read_options); + auto* iter = NewIterator(read_options); iter->SeekToFirst(); int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS); size_t bytes_read = env_->random_read_bytes_counter_; @@ -1859,7 +1884,7 @@ TEST_F(DBIteratorTest, ReadAhead) { env_->random_read_bytes_counter_ = 0; options.statistics->setTickerCount(NO_FILE_OPENS, 0); read_options.readahead_size = 1024 * 10; - iter = db_->NewIterator(read_options); + iter = NewIterator(read_options); iter->SeekToFirst(); int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS); size_t bytes_read_readahead = env_->random_read_bytes_counter_; @@ -1869,7 +1894,7 @@ TEST_F(DBIteratorTest, ReadAhead) { ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3); // Verify correctness. - iter = db_->NewIterator(read_options); + iter = NewIterator(read_options); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(value, iter->value()); @@ -1886,7 +1911,7 @@ TEST_F(DBIteratorTest, ReadAhead) { // Insert a key, create a snapshot iterator, overwrite key lots of times, // seek to a smaller key. Expect DBIter to fall back to a seek instead of // going through all the overwrites linearly. -TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) { +TEST_P(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; @@ -1901,7 +1926,7 @@ TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) { // Create iterator. ReadOptions ro; - std::unique_ptr iter(db_->NewIterator(ro)); + std::unique_ptr iter(NewIterator(ro)); // Insert a lot. for (int i = 0; i < 100; ++i) { @@ -1939,10 +1964,10 @@ TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) { NUMBER_OF_RESEEKS_IN_ITERATION)); } -TEST_F(DBIteratorTest, Refresh) { +TEST_P(DBIteratorTest, Refresh) { ASSERT_OK(Put("x", "y")); - std::unique_ptr iter(db_->NewIterator(ReadOptions())); + std::unique_ptr iter(NewIterator(ReadOptions())); iter->Seek(Slice("a")); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(iter->key().compare(Slice("x")), 0); @@ -1998,20 +2023,20 @@ TEST_F(DBIteratorTest, Refresh) { iter.reset(); } -TEST_F(DBIteratorTest, CreationFailure) { +TEST_P(DBIteratorTest, CreationFailure) { SyncPoint::GetInstance()->SetCallBack( "DBImpl::NewInternalIterator:StatusCallback", [](void* arg) { *(reinterpret_cast(arg)) = Status::Corruption("test status"); }); SyncPoint::GetInstance()->EnableProcessing(); - Iterator* iter = db_->NewIterator(ReadOptions()); + Iterator* iter = NewIterator(ReadOptions()); ASSERT_FALSE(iter->Valid()); ASSERT_TRUE(iter->status().IsCorruption()); delete iter; } -TEST_F(DBIteratorTest, TableFilter) { +TEST_P(DBIteratorTest, TableFilter) { ASSERT_OK(Put("a", "1")); dbfull()->Flush(FlushOptions()); ASSERT_OK(Put("b", "2")); @@ -2036,7 +2061,7 @@ TEST_F(DBIteratorTest, TableFilter) { } return true; }; - auto iter = db_->NewIterator(opts); + auto iter = NewIterator(opts); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->1"); iter->Next(); @@ -2062,7 +2087,7 @@ TEST_F(DBIteratorTest, TableFilter) { opts.table_filter = [](const TableProperties& props) { return props.num_entries != 2; }; - auto iter = db_->NewIterator(opts); + auto iter = NewIterator(opts); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter), "a->1"); iter->Next(); @@ -2077,7 +2102,7 @@ TEST_F(DBIteratorTest, TableFilter) { } } -TEST_F(DBIteratorTest, SkipStatistics) { +TEST_P(DBIteratorTest, SkipStatistics) { Options options = CurrentOptions(); options.statistics = rocksdb::CreateDBStatistics(); DestroyAndReopen(options); @@ -2099,7 +2124,7 @@ TEST_F(DBIteratorTest, SkipStatistics) { ASSERT_OK(Delete("e")); ASSERT_OK(Delete("f")); - Iterator* iter = db_->NewIterator(ReadOptions()); + Iterator* iter = NewIterator(ReadOptions()); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -2110,7 +2135,7 @@ TEST_F(DBIteratorTest, SkipStatistics) { skip_count += 8; // 3 deletes + 3 original keys + 2 lower in sequence ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP)); - iter = db_->NewIterator(ReadOptions()); + iter = NewIterator(ReadOptions()); count = 0; for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { ASSERT_OK(iter->status()); @@ -2134,7 +2159,7 @@ TEST_F(DBIteratorTest, SkipStatistics) { Slice prefix("b"); ro.iterate_upper_bound = &prefix; - iter = db_->NewIterator(ro); + iter = NewIterator(ro); count = 0; for(iter->Seek("aa"); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); @@ -2145,7 +2170,7 @@ TEST_F(DBIteratorTest, SkipStatistics) { skip_count += 6; // 3 deletes + 3 original keys ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP)); - iter = db_->NewIterator(ro); + iter = NewIterator(ro); count = 0; for(iter->SeekToLast(); iter->Valid(); iter->Prev()) { ASSERT_OK(iter->status()); @@ -2158,7 +2183,13 @@ TEST_F(DBIteratorTest, SkipStatistics) { ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP)); } -TEST_F(DBIteratorTest, ReadCallback) { +INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest, + testing::Values(true, false)); + +// Tests how DBIter work with ReadCallback +class DBIteratorWithReadCallbackTest : public DBIteratorTest {}; + +TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { class TestReadCallback : public ReadCallback { public: explicit TestReadCallback(SequenceNumber last_visible_seq)