WritePrepared Txn: update compaction_iterator_test and db_iterator_test

Summary:
Update compaction_iterator_test with write-prepared transaction DB related tests. Transaction related tests are group in CompactionIteratorWithSnapshotCheckerTest. The existing test are duplicated to make them also test with dummy SnapshotChecker that will say every key is visible to every snapshot (this is okay, we still compare sequence number to verify visibility). Merge related tests are disabled and will be revisit in another PR.

Existing db_iterator_tests are also duplicated to test with dummy read_callback that will say every key is committed.
Closes https://github.com/facebook/rocksdb/pull/3466

Differential Revision: D6909253

Pulled By: yiwu-arbug

fbshipit-source-id: 2ae4656b843a55e2e9ff8beecf21f2832f96cd25
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 2f29991701
commit 81736d8afe
  1. 10
      db/compaction_iterator.cc
  2. 457
      db/compaction_iterator_test.cc
  3. 229
      db/db_iterator_test.cc

@ -537,6 +537,11 @@ void CompactionIterator::NextFromInput() {
//
// Note: Dropping this Delete will not affect TransactionDB
// write-conflict checking since it is earlier than any snapshot.
//
// It seems that we can also drop deletion later than earliest snapshot
// given that:
// (1) The deletion is earlier than earliest_write_conflict_snapshot, and
// (2) No value exist earlier than the deletion.
++iter_stats_.num_record_drop_obsolete;
if (!bottommost_level_) {
++iter_stats_.num_optimized_del_drop_obsolete;
@ -621,9 +626,12 @@ void CompactionIterator::PrepareOutput() {
// and the earliest snapshot is larger than this seqno
// and the userkey differs from the last userkey in compaction
// then we can squash the seqno to zero.
//
// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if ((compaction_ != nullptr &&
!compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() &&

@ -9,8 +9,10 @@
#include <vector>
#include "port/port.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
@ -39,9 +41,9 @@ class NoMergingMergeOp : public MergeOperator {
// Always returns Decition::kRemove.
class StallingFilter : public CompactionFilter {
public:
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override {
Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int k = std::atoi(key.ToString().c_str());
last_seen.store(k);
while (k >= stall_at.load()) {
@ -72,6 +74,18 @@ class StallingFilter : public CompactionFilter {
mutable std::atomic<int> last_seen{0};
};
// Compaction filter that filter out all keys.
class FilterAllKeysCompactionFilter : public CompactionFilter {
public:
Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
return Decision::kRemove;
}
const char* Name() const override { return "AllKeysCompactionFilter"; }
};
class LoggingForwardVectorIterator : public InternalIterator {
public:
struct Action {
@ -144,76 +158,145 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
public:
FakeCompaction() = default;
virtual int level(size_t compaction_input_level) const { return 0; }
virtual int level(size_t compaction_input_level) const override { return 0; }
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
return key_not_exists_beyond_output_level;
const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
return is_bottommost_level || key_not_exists_beyond_output_level;
}
virtual bool bottommost_level() const { return false; }
virtual int number_levels() const { return 1; }
virtual Slice GetLargestUserKey() const {
virtual bool bottommost_level() const override { return is_bottommost_level; }
virtual int number_levels() const override { return 1; }
virtual Slice GetLargestUserKey() const override {
return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
}
virtual bool allow_ingest_behind() const { return false; }
virtual bool allow_ingest_behind() const override { return false; }
virtual bool preserve_deletes() const {return false; }
virtual bool preserve_deletes() const override { return false; }
bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false;
};
class CompactionIteratorTest : public testing::Test {
// A simplifed snapshot checker which assumes each snapshot has a global
// last visible sequence.
class TestSnapshotChecker : public SnapshotChecker {
public:
explicit TestSnapshotChecker(
SequenceNumber last_committed_sequence,
const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {})
: last_committed_sequence_(last_committed_sequence),
snapshots_(snapshots) {}
bool IsInSnapshot(SequenceNumber seq,
SequenceNumber snapshot_seq) const override {
if (snapshot_seq == kMaxSequenceNumber) {
return seq <= last_committed_sequence_;
}
assert(snapshots_.count(snapshot_seq) > 0);
return seq <= snapshots_.at(snapshot_seq);
}
private:
SequenceNumber last_committed_sequence_;
// A map of valid snapshot to last visible sequence to the snapshot.
std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
};
// Test param:
// bool: whether to pass snapshot_checker to compaction iterator.
class CompactionIteratorTest : public testing::TestWithParam<bool> {
public:
CompactionIteratorTest()
: cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
void InitIterators(const std::vector<std::string>& ks,
const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks,
const std::vector<std::string>& range_del_vs,
SequenceNumber last_sequence,
MergeOperator* merge_op = nullptr,
CompactionFilter* filter = nullptr) {
void InitIterators(
const std::vector<std::string>& ks, const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks,
const std::vector<std::string>& range_del_vs,
SequenceNumber last_sequence,
SequenceNumber last_committed_sequence = kMaxSequenceNumber,
MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
std::unique_ptr<InternalIterator> range_del_iter(
new test::VectorIterator(range_del_ks, range_del_vs));
range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_));
ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter)));
std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
if (filter) {
if (filter || bottommost_level) {
compaction_proxy_ = new FakeCompaction();
compaction_proxy_->is_bottommost_level = bottommost_level;
compaction.reset(compaction_proxy_);
}
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, false, 0, 0, nullptr,
&shutting_down_));
bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
snapshot_checker_.reset(
new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
}
iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, snapshot_checker, Env::Default(), false,
range_del_agg_.get(), std::move(compaction), filter, nullptr,
&shutting_down_));
earliest_write_conflict_snapshot, snapshot_checker_.get(),
Env::Default(), false, range_del_agg_.get(), std::move(compaction),
filter, nullptr, &shutting_down_));
}
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
void AddSnapshot(SequenceNumber snapshot,
SequenceNumber last_visible_seq = kMaxSequenceNumber) {
snapshots_.push_back(snapshot);
snapshot_map_[snapshot] = last_visible_seq;
}
virtual bool UseSnapshotChecker() const { return false; }
void RunTest(
const std::vector<std::string>& input_keys,
const std::vector<std::string>& input_values,
const std::vector<std::string>& expected_keys,
const std::vector<std::string>& expected_values,
SequenceNumber last_committed_seq = kMaxSequenceNumber,
MergeOperator* merge_operator = nullptr,
CompactionFilter* compaction_filter = nullptr,
bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
last_committed_seq, merge_operator, compaction_filter,
bottommost_level, earliest_write_conflict_snapshot);
c_iter_->SeekToFirst();
for (size_t i = 0; i < expected_keys.size(); i++) {
std::string info = "i = " + ToString(i);
ASSERT_TRUE(c_iter_->Valid()) << info;
ASSERT_OK(c_iter_->status()) << info;
ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
c_iter_->Next();
}
ASSERT_FALSE(c_iter_->Valid());
}
const Comparator* cmp_;
const InternalKeyComparator icmp_;
std::vector<SequenceNumber> snapshots_;
// A map of valid snapshot to last visible sequence to the snapshot.
std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
std::unique_ptr<MergeHelper> merge_helper_;
std::unique_ptr<LoggingForwardVectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
std::unique_ptr<RangeDelAggregator> range_del_agg_;
std::unique_ptr<SnapshotChecker> snapshot_checker_;
std::atomic<bool> shutting_down_{false};
FakeCompaction* compaction_proxy_;
};
// It is possible that the output of the compaction iterator is empty even if
// the input is not.
TEST_F(CompactionIteratorTest, EmptyResult) {
TEST_P(CompactionIteratorTest, EmptyResult) {
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue)},
{"", "val"}, {}, {}, 5);
@ -223,7 +306,7 @@ TEST_F(CompactionIteratorTest, EmptyResult) {
// If there is a corruption after a single deletion, the corrupted key should
// be preserved.
TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue, true),
test::KeyStr("b", 10, kTypeValue)},
@ -242,7 +325,7 @@ TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, SimpleRangeDeletion) {
TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
InitIterators({test::KeyStr("morning", 5, kTypeValue),
test::KeyStr("morning", 2, kTypeValue),
test::KeyStr("night", 3, kTypeValue)},
@ -258,7 +341,7 @@ TEST_F(CompactionIteratorTest, SimpleRangeDeletion) {
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) {
TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
AddSnapshot(10);
std::vector<std::string> ks1;
ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
@ -279,7 +362,7 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) {
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
class Filter : public CompactionFilter {
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value,
@ -354,7 +437,7 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
test::KeyStr("j", 99, kTypeValue)},
{"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
"fv25", "gv90", "hv91", "im95", "jv99"},
{}, {}, kMaxSequenceNumber, &merge_op, &filter);
{}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
// Compaction should output just "a", "e" and "h" keys.
c_iter_->SeekToFirst();
@ -389,13 +472,14 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
ASSERT_EQ(expected_actions, iter_->log);
}
TEST_F(CompactionIteratorTest, ShuttingDownInFilter) {
TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter);
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
&merge_op, &filter);
// Don't leave tombstones (kTypeDeletion) for filtered keys.
compaction_proxy_->key_not_exists_beyond_output_level = true;
@ -426,13 +510,14 @@ TEST_F(CompactionIteratorTest, ShuttingDownInFilter) {
// Same as ShuttingDownInFilter, but shutdown happens during filter call for
// a merge operand, not for a value.
TEST_F(CompactionIteratorTest, ShuttingDownInMerge) {
TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter);
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
&merge_op, &filter);
compaction_proxy_->key_not_exists_beyond_output_level = true;
std::atomic<bool> seek_done{false};
@ -460,7 +545,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) {
EXPECT_EQ(2, filter.last_seen.load());
}
TEST_F(CompactionIteratorTest, SingleMergeOperand) {
TEST_P(CompactionIteratorTest, SingleMergeOperand) {
class Filter : public CompactionFilter {
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value,
@ -552,7 +637,7 @@ TEST_F(CompactionIteratorTest, SingleMergeOperand) {
// c should invoke FullMerge due to kTypeValue at the beginning.
test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
{"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
&merge_op, &filter);
kMaxSequenceNumber, &merge_op, &filter);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
@ -565,6 +650,300 @@ TEST_F(CompactionIteratorTest, SingleMergeOperand) {
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}
// In bottommost level, values earlier than earliest snapshot can be output
// with sequence = 0.
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
AddSnapshot(1);
RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
{"v1", "v2"},
{test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
{"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/,
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
true /*bottommost_level*/);
}
// In bottommost level, deletions earlier than earliest snapshot can be removed
// permanently.
TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
AddSnapshot(1);
RunTest({test::KeyStr("a", 1, kTypeDeletion),
test::KeyStr("b", 2, kTypeDeletion)},
{"", ""}, {test::KeyStr("b", 2, kTypeDeletion)}, {""},
kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
nullptr /*compaction_filter*/, true /*bottommost_level*/);
}
// In bottommost level, single deletions earlier than earliest snapshot can be
// removed permanently.
TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
AddSnapshot(1);
RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
test::KeyStr("b", 2, kTypeSingleDeletion)},
{"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
nullptr /*compaction_filter*/, true /*bottommost_level*/);
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
testing::Values(true, false));
// Tests how CompactionIterator work together with SnapshotChecker.
class CompactionIteratorWithSnapshotCheckerTest
: public CompactionIteratorTest {
public:
bool UseSnapshotChecker() const override { return true; }
};
// Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
// while committed version of these keys should get compacted as usual.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Value) {
RunTest(
{test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
test::KeyStr("foo", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
{"v3", "v2"}, 2 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Deletion) {
RunTest({test::KeyStr("foo", 2, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("foo", 2, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"}, 1 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_Merge) {
auto merge_op = MergeOperators::CreateStringAppendOperator();
RunTest(
{test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
test::KeyStr("foo", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
{"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_SingleDelete) {
RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("foo", 2, kTypeSingleDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"", "v1"}, 1 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
PreserveUncommittedKeys_BlobIndex) {
RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v3", "v2", "v1"},
{test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex)},
{"v3", "v2"}, 2 /*last_committed_seq*/);
}
// Test compaction iterator dedup keys visible to the same snapshot.
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeDeletion),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeDeletion),
test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v1"}, 3 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DISABLED_DedupSameSnapshot_Merge) {
AddSnapshot(2, 1);
auto merge_op = MergeOperators::CreateStringAppendOperator();
RunTest(
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge),
test::KeyStr("foo", 2, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeMerge),
test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v2,v3", "v1"}, 3 /*last_committed_seq*/, merge_op.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DedupSameSnapshot_SingleDeletion) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("foo", 4, kTypeValue),
test::KeyStr("foo", 3, kTypeSingleDeletion),
test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
{"v4", "v1"}, 3 /*last_committed_seq*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 2, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v4", "v3", "v2", "v1"},
{test::KeyStr("foo", 4, kTypeBlobIndex),
test::KeyStr("foo", 3, kTypeBlobIndex),
test::KeyStr("foo", 1, kTypeBlobIndex)},
{"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
}
// At bottom level, sequence numbers can be zero out, and deletions can be
// removed, but only when they are visible to earliest snapshot.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
test::KeyStr("c", 3, kTypeValue)},
{"v1", "v2", "v3"},
{test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
test::KeyStr("c", 3, kTypeValue)},
{"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/,
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
true /*bottommost_level*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest(
{test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
test::KeyStr("c", 3, kTypeDeletion)},
{"", "", ""},
{test::KeyStr("b", 2, kTypeDeletion),
test::KeyStr("c", 3, kTypeDeletion)},
{"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
true /*bottommost_level*/);
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
test::KeyStr("b", 2, kTypeSingleDeletion),
test::KeyStr("c", 3, kTypeSingleDeletion)},
{"", "", ""},
{test::KeyStr("b", 2, kTypeSingleDeletion),
test::KeyStr("c", 3, kTypeSingleDeletion)},
{"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
true /*bottommost_level*/);
}
// Single delete should not cancel out values that not visible to the
// same set of snapshots
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
SingleDeleteAcrossSnapshotBoundary) {
AddSnapshot(2, 1);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"}, 2 /*last_committed_seq*/);
}
// Single delete should be kept in case it is not visible to the
// earliest write conflict snapshot. If a single delete is kept for this reason,
// corresponding value can be trimmed to save space.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
nullptr /*compaction_filter*/, false /*bottommost_level*/,
2 /*earliest_write_conflict_snapshot*/);
}
// Compaction filter should keep uncommitted key as-is, and trigger on the
// first committed version of a key.
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
{"v2", "v1", "v3", "v4"},
{test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
{"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
nullptr /*merge_operator*/, compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
{"", "v1"},
{test::KeyStr("a", 2, kTypeDeletion),
test::KeyStr("a", 1, kTypeDeletion)},
{"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DISABLED_CompactionFilter_PartialMerge) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
test::KeyStr("a", 1, kTypeMerge)},
{"v3", "v2", "v1"},
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)},
{"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
compaction_filter.get());
}
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
DISABLED_CompactionFilter_FullMerge) {
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendOperator();
std::unique_ptr<CompactionFilter> compaction_filter(
new FilterAllKeysCompactionFilter());
RunTest(
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
test::KeyStr("a", 1, kTypeValue)},
{"v3", "v2", "v1"},
{test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeDeletion)},
{"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
compaction_filter.get());
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -18,9 +18,34 @@
namespace rocksdb {
class DBIteratorTest : public DBTestBase {
// A dumb ReadCallback which saying every key is committed.
class DummyReadCallback : public ReadCallback {
bool IsCommitted(SequenceNumber /*seq*/) { return true; }
};
// Test param:
// bool: whether to pass read_callback to NewIterator().
class DBIteratorTest : public DBTestBase,
public testing::WithParamInterface<bool> {
public:
DBIteratorTest() : DBTestBase("/db_iterator_test") {}
Iterator* NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family = nullptr) {
if (column_family == nullptr) {
column_family = db_->DefaultColumnFamily();
}
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
SequenceNumber seq = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: db_->GetLatestSequenceNumber();
bool use_read_callback = GetParam();
ReadCallback* read_callback = use_read_callback ? &read_callback_ : nullptr;
return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback);
}
private:
DummyReadCallback read_callback_;
};
class FlushBlockEveryKeyPolicy : public FlushBlockPolicy {
@ -51,7 +76,7 @@ class FlushBlockEveryKeyPolicyFactory : public FlushBlockPolicyFactory {
}
};
TEST_F(DBIteratorTest, IteratorProperty) {
TEST_P(DBIteratorTest, IteratorProperty) {
// The test needs to be changed if kPersistedTier is supported in iterator.
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
@ -59,7 +84,7 @@ TEST_F(DBIteratorTest, IteratorProperty) {
ReadOptions ropt;
ropt.pin_data = false;
{
unique_ptr<Iterator> iter(db_->NewIterator(ropt, handles_[1]));
unique_ptr<Iterator> iter(NewIterator(ropt, handles_[1]));
iter->SeekToFirst();
std::string prop_value;
ASSERT_NOK(iter->GetProperty("non_existing.value", &prop_value));
@ -72,7 +97,7 @@ TEST_F(DBIteratorTest, IteratorProperty) {
Close();
}
TEST_F(DBIteratorTest, PersistedTierOnIterator) {
TEST_P(DBIteratorTest, PersistedTierOnIterator) {
// The test needs to be changed if kPersistedTier is supported in iterator.
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
@ -88,7 +113,7 @@ TEST_F(DBIteratorTest, PersistedTierOnIterator) {
Close();
}
TEST_F(DBIteratorTest, NonBlockingIteration) {
TEST_P(DBIteratorTest, NonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
Options options = CurrentOptions();
@ -100,7 +125,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) {
// scan using non-blocking iterator. We should find it because
// it is in memtable.
Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]);
Iterator* iter = NewIterator(non_blocking_opts, handles_[1]);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -117,7 +142,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) {
// kvs. Neither does it do any IOs to storage.
uint64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS);
uint64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
count++;
@ -134,7 +159,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) {
// verify that we can find it via a non-blocking scan
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -153,7 +178,7 @@ TEST_F(DBIteratorTest, NonBlockingIteration) {
}
#ifndef ROCKSDB_LITE
TEST_F(DBIteratorTest, ManagedNonBlockingIteration) {
TEST_P(DBIteratorTest, ManagedNonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
Options options = CurrentOptions();
@ -166,7 +191,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) {
// scan using non-blocking iterator. We should find it because
// it is in memtable.
Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]);
Iterator* iter = NewIterator(non_blocking_opts, handles_[1]);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -183,7 +208,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) {
// kvs. Neither does it do any IOs to storage.
int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS);
int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
count++;
@ -200,7 +225,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) {
// verify that we can find it via a non-blocking scan
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
iter = NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -219,7 +244,7 @@ TEST_F(DBIteratorTest, ManagedNonBlockingIteration) {
}
#endif // ROCKSDB_LITE
TEST_F(DBIteratorTest, IterSeekBeforePrev) {
TEST_P(DBIteratorTest, IterSeekBeforePrev) {
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
dbfull()->Flush(FlushOptions());
@ -227,7 +252,7 @@ TEST_F(DBIteratorTest, IterSeekBeforePrev) {
ASSERT_OK(Put("1", "h"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("2", "j"));
auto iter = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
iter->Seek(Slice("c"));
iter->Prev();
iter->Seek(Slice("a"));
@ -235,7 +260,7 @@ TEST_F(DBIteratorTest, IterSeekBeforePrev) {
delete iter;
}
TEST_F(DBIteratorTest, IterSeekForPrevBeforeNext) {
TEST_P(DBIteratorTest, IterSeekForPrevBeforeNext) {
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
dbfull()->Flush(FlushOptions());
@ -243,7 +268,7 @@ TEST_F(DBIteratorTest, IterSeekForPrevBeforeNext) {
ASSERT_OK(Put("1", "h"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("2", "j"));
auto iter = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
iter->SeekForPrev(Slice("0"));
iter->Next();
iter->SeekForPrev(Slice("1"));
@ -257,7 +282,7 @@ std::string MakeLongKey(size_t length, char c) {
}
} // namespace
TEST_F(DBIteratorTest, IterLongKeys) {
TEST_P(DBIteratorTest, IterLongKeys) {
ASSERT_OK(Put(MakeLongKey(20, 0), "0"));
ASSERT_OK(Put(MakeLongKey(32, 2), "2"));
ASSERT_OK(Put("a", "b"));
@ -265,7 +290,7 @@ TEST_F(DBIteratorTest, IterLongKeys) {
ASSERT_OK(Put(MakeLongKey(50, 1), "1"));
ASSERT_OK(Put(MakeLongKey(127, 3), "3"));
ASSERT_OK(Put(MakeLongKey(64, 4), "4"));
auto iter = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
iter->Seek(MakeLongKey(20, 0));
@ -287,7 +312,7 @@ TEST_F(DBIteratorTest, IterLongKeys) {
ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1");
delete iter;
iter = db_->NewIterator(ReadOptions());
iter = NewIterator(ReadOptions());
iter->Seek(MakeLongKey(50, 1));
ASSERT_EQ(IterStatus(iter), MakeLongKey(50, 1) + "->1");
iter->Next();
@ -297,13 +322,13 @@ TEST_F(DBIteratorTest, IterLongKeys) {
delete iter;
}
TEST_F(DBIteratorTest, IterNextWithNewerSeq) {
TEST_P(DBIteratorTest, IterNextWithNewerSeq) {
ASSERT_OK(Put("0", "0"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("d", "e"));
auto iter = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1;
@ -323,13 +348,13 @@ TEST_F(DBIteratorTest, IterNextWithNewerSeq) {
delete iter;
}
TEST_F(DBIteratorTest, IterPrevWithNewerSeq) {
TEST_P(DBIteratorTest, IterPrevWithNewerSeq) {
ASSERT_OK(Put("0", "0"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("d", "e"));
auto iter = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
// Create a key that needs to be skipped for Seq too new
for (uint64_t i = 0; i < last_options_.max_sequential_skip_in_iterations + 1;
@ -354,14 +379,14 @@ TEST_F(DBIteratorTest, IterPrevWithNewerSeq) {
delete iter;
}
TEST_F(DBIteratorTest, IterPrevWithNewerSeq2) {
TEST_P(DBIteratorTest, IterPrevWithNewerSeq2) {
ASSERT_OK(Put("0", "0"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("a", "b"));
ASSERT_OK(Put("c", "d"));
ASSERT_OK(Put("e", "f"));
auto iter = db_->NewIterator(ReadOptions());
auto iter2 = db_->NewIterator(ReadOptions());
auto iter = NewIterator(ReadOptions());
auto iter2 = NewIterator(ReadOptions());
iter->Seek(Slice("c"));
iter2->SeekForPrev(Slice("d"));
ASSERT_EQ(IterStatus(iter), "c->d");
@ -383,10 +408,10 @@ TEST_F(DBIteratorTest, IterPrevWithNewerSeq2) {
delete iter2;
}
TEST_F(DBIteratorTest, IterEmpty) {
TEST_P(DBIteratorTest, IterEmpty) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "(invalid)");
@ -404,11 +429,11 @@ TEST_F(DBIteratorTest, IterEmpty) {
} while (ChangeCompactOptions());
}
TEST_F(DBIteratorTest, IterSingle) {
TEST_P(DBIteratorTest, IterSingle) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
@ -455,13 +480,13 @@ TEST_F(DBIteratorTest, IterSingle) {
} while (ChangeCompactOptions());
}
TEST_F(DBIteratorTest, IterMulti) {
TEST_P(DBIteratorTest, IterMulti) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
ASSERT_OK(Put(1, "b", "vb"));
ASSERT_OK(Put(1, "c", "vc"));
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
@ -554,7 +579,7 @@ TEST_F(DBIteratorTest, IterMulti) {
// Check that we can skip over a run of user keys
// by using reseek rather than sequential scan
TEST_F(DBIteratorTest, IterReseek) {
TEST_P(DBIteratorTest, IterReseek) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
Options options = CurrentOptions(options_override);
@ -571,7 +596,7 @@ TEST_F(DBIteratorTest, IterReseek) {
ASSERT_OK(Put(1, "a", "one"));
ASSERT_OK(Put(1, "a", "two"));
ASSERT_OK(Put(1, "b", "bone"));
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "a->two");
@ -583,7 +608,7 @@ TEST_F(DBIteratorTest, IterReseek) {
// insert a total of three keys with same userkey and verify
// that reseek is still not invoked.
ASSERT_OK(Put(1, "a", "three"));
iter = db_->NewIterator(ReadOptions(), handles_[1]);
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->three");
iter->Next();
@ -594,7 +619,7 @@ TEST_F(DBIteratorTest, IterReseek) {
// insert a total of four keys with same userkey and verify
// that reseek is invoked.
ASSERT_OK(Put(1, "a", "four"));
iter = db_->NewIterator(ReadOptions(), handles_[1]);
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->four");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
@ -611,7 +636,7 @@ TEST_F(DBIteratorTest, IterReseek) {
// Insert another version of b and assert that reseek is not invoked
ASSERT_OK(Put(1, "b", "btwo"));
iter = db_->NewIterator(ReadOptions(), handles_[1]);
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "b->btwo");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
@ -626,7 +651,7 @@ TEST_F(DBIteratorTest, IterReseek) {
// of b and 4 versions of a.
ASSERT_OK(Put(1, "b", "bthree"));
ASSERT_OK(Put(1, "b", "bfour"));
iter = db_->NewIterator(ReadOptions(), handles_[1]);
iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "b->bfour");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION),
@ -640,7 +665,7 @@ TEST_F(DBIteratorTest, IterReseek) {
delete iter;
}
TEST_F(DBIteratorTest, IterSmallAndLargeMix) {
TEST_P(DBIteratorTest, IterSmallAndLargeMix) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "a", "va"));
@ -649,7 +674,7 @@ TEST_F(DBIteratorTest, IterSmallAndLargeMix) {
ASSERT_OK(Put(1, "d", std::string(100000, 'd')));
ASSERT_OK(Put(1, "e", std::string(100000, 'e')));
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
@ -681,7 +706,7 @@ TEST_F(DBIteratorTest, IterSmallAndLargeMix) {
} while (ChangeCompactOptions());
}
TEST_F(DBIteratorTest, IterMultiWithDelete) {
TEST_P(DBIteratorTest, IterMultiWithDelete) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "ka", "va"));
@ -690,7 +715,7 @@ TEST_F(DBIteratorTest, IterMultiWithDelete) {
ASSERT_OK(Delete(1, "kb"));
ASSERT_EQ("NOT_FOUND", Get(1, "kb"));
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
iter->Seek("kc");
ASSERT_EQ(IterStatus(iter), "kc->vc");
if (!CurrentOptions().merge_operator) {
@ -707,7 +732,7 @@ TEST_F(DBIteratorTest, IterMultiWithDelete) {
} while (ChangeOptions());
}
TEST_F(DBIteratorTest, IterPrevMaxSkip) {
TEST_P(DBIteratorTest, IterPrevMaxSkip) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
for (int i = 0; i < 2; i++) {
@ -737,7 +762,7 @@ TEST_F(DBIteratorTest, IterPrevMaxSkip) {
} while (ChangeOptions(kSkipMergePut | kSkipNoSeekToLast));
}
TEST_F(DBIteratorTest, IterWithSnapshot) {
TEST_P(DBIteratorTest, IterWithSnapshot) {
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
@ -751,7 +776,7 @@ TEST_F(DBIteratorTest, IterWithSnapshot) {
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions options;
options.snapshot = snapshot;
Iterator* iter = db_->NewIterator(options, handles_[1]);
Iterator* iter = NewIterator(options, handles_[1]);
ASSERT_OK(Put(1, "key0", "val0"));
// Put more values after the snapshot
@ -804,13 +829,13 @@ TEST_F(DBIteratorTest, IterWithSnapshot) {
} while (ChangeOptions(kSkipHashCuckoo));
}
TEST_F(DBIteratorTest, IteratorPinsRef) {
TEST_P(DBIteratorTest, IteratorPinsRef) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
Put(1, "foo", "hello");
// Get iterator that will yield the current contents of the DB.
Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]);
Iterator* iter = NewIterator(ReadOptions(), handles_[1]);
// Write to force compactions
Put(1, "foo", "newvalue1");
@ -830,7 +855,7 @@ TEST_F(DBIteratorTest, IteratorPinsRef) {
} while (ChangeCompactOptions());
}
TEST_F(DBIteratorTest, DBIteratorBoundTest) {
TEST_P(DBIteratorTest, DBIteratorBoundTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
@ -847,7 +872,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
@ -884,7 +909,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
Slice prefix("foo2");
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
@ -906,7 +931,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
Slice prefix("foo");
ro.iterate_upper_bound = &prefix;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
@ -930,7 +955,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
Slice upper_bound("g");
ro.iterate_upper_bound = &upper_bound;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
@ -963,7 +988,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
@ -983,7 +1008,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
Slice prefix("c");
ro.iterate_upper_bound = &prefix;
iter.reset(db_->NewIterator(ro));
iter.reset(NewIterator(ro));
get_perf_context()->Reset();
@ -1004,7 +1029,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
}
}
TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
TEST_P(DBIteratorTest, DBIteratorBoundOptimizationTest) {
int upper_bound_hits = 0;
Options options = CurrentOptions();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -1032,7 +1057,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
ReadOptions ro;
ro.iterate_upper_bound = &ub;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
@ -1050,7 +1075,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
}
// TODO(3.13): fix the issue of Seek() + Prev() which might not necessary
// return the biggest key which is smaller than the seek key.
TEST_F(DBIteratorTest, PrevAfterAndNextAfterMerge) {
TEST_P(DBIteratorTest, PrevAfterAndNextAfterMerge) {
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
@ -1063,7 +1088,7 @@ TEST_F(DBIteratorTest, PrevAfterAndNextAfterMerge) {
db_->Merge(wopts, "2", "data2");
db_->Merge(wopts, "3", "data3");
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
std::unique_ptr<Iterator> it(NewIterator(ReadOptions()));
it->Seek("2");
ASSERT_TRUE(it->Valid());
@ -1159,7 +1184,7 @@ class DBIteratorTestForPinnedData : public DBIteratorTest {
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
auto iter = NewIterator(ro);
{
// Test Seek to random keys
@ -1251,25 +1276,25 @@ class DBIteratorTestForPinnedData : public DBIteratorTest {
}
};
TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedNormal) {
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedNormal) {
PinnedDataIteratorRandomized(TestConfig::NORMAL);
}
TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCLoseAndOpen) {
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedCLoseAndOpen) {
PinnedDataIteratorRandomized(TestConfig::CLOSE_AND_OPEN);
}
TEST_F(DBIteratorTestForPinnedData,
TEST_P(DBIteratorTestForPinnedData,
PinnedDataIteratorRandomizedCompactBeforeRead) {
PinnedDataIteratorRandomized(TestConfig::COMPACT_BEFORE_READ);
}
TEST_F(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedFlush) {
TEST_P(DBIteratorTestForPinnedData, PinnedDataIteratorRandomizedFlush) {
PinnedDataIteratorRandomized(TestConfig::FLUSH_EVERY_1000);
}
#ifndef ROCKSDB_LITE
TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) {
TEST_P(DBIteratorTest, PinnedDataIteratorMultipleFiles) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
@ -1318,7 +1343,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) {
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
auto iter = NewIterator(ro);
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -1340,7 +1365,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMultipleFiles) {
}
#endif
TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) {
TEST_P(DBIteratorTest, PinnedDataIteratorMergeOperator) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
@ -1373,7 +1398,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) {
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
auto iter = NewIterator(ro);
std::vector<std::pair<Slice, std::string>> results;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -1400,7 +1425,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorMergeOperator) {
delete iter;
}
TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) {
TEST_P(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.use_delta_encoding = false;
@ -1420,7 +1445,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) {
ReadOptions ro;
ro.pin_data = true;
auto iter = db_->NewIterator(ro);
auto iter = NewIterator(ro);
// Delete 50% of the keys and update the other 50%
for (auto& kv : true_data) {
@ -1450,7 +1475,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) {
delete iter;
}
TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) {
TEST_P(DBIteratorTest, IterSeekForPrevCrossingFiles) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.disable_auto_compactions = true;
@ -1479,7 +1504,7 @@ TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) {
MoveFilesToLevel(1);
{
ReadOptions ro;
Iterator* iter = db_->NewIterator(ro);
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("a4");
ASSERT_EQ(iter->key().ToString(), "a3");
@ -1497,14 +1522,14 @@ TEST_F(DBIteratorTest, IterSeekForPrevCrossingFiles) {
{
ReadOptions ro;
ro.prefix_same_as_start = true;
Iterator* iter = db_->NewIterator(ro);
Iterator* iter = NewIterator(ro);
iter->SeekForPrev("c2");
ASSERT_TRUE(!iter->Valid());
delete iter;
}
}
TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) {
TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocks) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.block_size = 1; // every block will contain one entry
@ -1546,7 +1571,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) {
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = db_->NewIterator(ro);
Iterator* iter = NewIterator(ro);
iter->SeekToLast();
ASSERT_EQ(iter->key().ToString(), "key5");
@ -1572,7 +1597,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) {
}
}
TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
TEST_P(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
options.disable_auto_compactions = true;
@ -1648,7 +1673,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = db_->NewIterator(ro);
Iterator* iter = NewIterator(ro);
auto data_iter = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
@ -1664,7 +1689,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
{
ReadOptions ro;
ro.fill_cache = false;
Iterator* iter = db_->NewIterator(ro);
Iterator* iter = NewIterator(ro);
auto data_iter = true_data.rbegin();
int entries_right = 0;
@ -1719,7 +1744,7 @@ TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) {
}
}
TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
TEST_P(DBIteratorTest, IteratorWithLocalStatistics) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
@ -1740,7 +1765,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
std::function<void()> reader_func_next = [&]() {
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
Iterator* iter = db_->NewIterator(ReadOptions());
Iterator* iter = NewIterator(ReadOptions());
iter->SeekToFirst();
// Seek will bump ITER_BYTES_READ
@ -1767,7 +1792,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
std::function<void()> reader_func_prev = [&]() {
SetPerfLevel(kEnableCount);
Iterator* iter = db_->NewIterator(ReadOptions());
Iterator* iter = NewIterator(ReadOptions());
iter->SeekToLast();
// Seek will bump ITER_BYTES_READ
@ -1813,7 +1838,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
}
TEST_F(DBIteratorTest, ReadAhead) {
TEST_P(DBIteratorTest, ReadAhead) {
Options options;
env_->count_random_reads_ = true;
options.env = env_;
@ -1850,7 +1875,7 @@ TEST_F(DBIteratorTest, ReadAhead) {
env_->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
ReadOptions read_options;
auto* iter = db_->NewIterator(read_options);
auto* iter = NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
size_t bytes_read = env_->random_read_bytes_counter_;
@ -1859,7 +1884,7 @@ TEST_F(DBIteratorTest, ReadAhead) {
env_->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
read_options.readahead_size = 1024 * 10;
iter = db_->NewIterator(read_options);
iter = NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
size_t bytes_read_readahead = env_->random_read_bytes_counter_;
@ -1869,7 +1894,7 @@ TEST_F(DBIteratorTest, ReadAhead) {
ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
// Verify correctness.
iter = db_->NewIterator(read_options);
iter = NewIterator(read_options);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(value, iter->value());
@ -1886,7 +1911,7 @@ TEST_F(DBIteratorTest, ReadAhead) {
// Insert a key, create a snapshot iterator, overwrite key lots of times,
// seek to a smaller key. Expect DBIter to fall back to a seek instead of
// going through all the overwrites linearly.
TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
TEST_P(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
@ -1901,7 +1926,7 @@ TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
// Create iterator.
ReadOptions ro;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
std::unique_ptr<Iterator> iter(NewIterator(ro));
// Insert a lot.
for (int i = 0; i < 100; ++i) {
@ -1939,10 +1964,10 @@ TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
NUMBER_OF_RESEEKS_IN_ITERATION));
}
TEST_F(DBIteratorTest, Refresh) {
TEST_P(DBIteratorTest, Refresh) {
ASSERT_OK(Put("x", "y"));
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
std::unique_ptr<Iterator> iter(NewIterator(ReadOptions()));
iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
@ -1998,20 +2023,20 @@ TEST_F(DBIteratorTest, Refresh) {
iter.reset();
}
TEST_F(DBIteratorTest, CreationFailure) {
TEST_P(DBIteratorTest, CreationFailure) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::NewInternalIterator:StatusCallback", [](void* arg) {
*(reinterpret_cast<Status*>(arg)) = Status::Corruption("test status");
});
SyncPoint::GetInstance()->EnableProcessing();
Iterator* iter = db_->NewIterator(ReadOptions());
Iterator* iter = NewIterator(ReadOptions());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
}
TEST_F(DBIteratorTest, TableFilter) {
TEST_P(DBIteratorTest, TableFilter) {
ASSERT_OK(Put("a", "1"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(Put("b", "2"));
@ -2036,7 +2061,7 @@ TEST_F(DBIteratorTest, TableFilter) {
}
return true;
};
auto iter = db_->NewIterator(opts);
auto iter = NewIterator(opts);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->1");
iter->Next();
@ -2062,7 +2087,7 @@ TEST_F(DBIteratorTest, TableFilter) {
opts.table_filter = [](const TableProperties& props) {
return props.num_entries != 2;
};
auto iter = db_->NewIterator(opts);
auto iter = NewIterator(opts);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->1");
iter->Next();
@ -2077,7 +2102,7 @@ TEST_F(DBIteratorTest, TableFilter) {
}
}
TEST_F(DBIteratorTest, SkipStatistics) {
TEST_P(DBIteratorTest, SkipStatistics) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
@ -2099,7 +2124,7 @@ TEST_F(DBIteratorTest, SkipStatistics) {
ASSERT_OK(Delete("e"));
ASSERT_OK(Delete("f"));
Iterator* iter = db_->NewIterator(ReadOptions());
Iterator* iter = NewIterator(ReadOptions());
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -2110,7 +2135,7 @@ TEST_F(DBIteratorTest, SkipStatistics) {
skip_count += 8; // 3 deletes + 3 original keys + 2 lower in sequence
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
iter = db_->NewIterator(ReadOptions());
iter = NewIterator(ReadOptions());
count = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_OK(iter->status());
@ -2134,7 +2159,7 @@ TEST_F(DBIteratorTest, SkipStatistics) {
Slice prefix("b");
ro.iterate_upper_bound = &prefix;
iter = db_->NewIterator(ro);
iter = NewIterator(ro);
count = 0;
for(iter->Seek("aa"); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
@ -2145,7 +2170,7 @@ TEST_F(DBIteratorTest, SkipStatistics) {
skip_count += 6; // 3 deletes + 3 original keys
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
iter = db_->NewIterator(ro);
iter = NewIterator(ro);
count = 0;
for(iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_OK(iter->status());
@ -2158,7 +2183,13 @@ TEST_F(DBIteratorTest, SkipStatistics) {
ASSERT_EQ(skip_count, TestGetTickerCount(options, NUMBER_ITER_SKIP));
}
TEST_F(DBIteratorTest, ReadCallback) {
INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest,
testing::Values(true, false));
// Tests how DBIter work with ReadCallback
class DBIteratorWithReadCallbackTest : public DBIteratorTest {};
TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
class TestReadCallback : public ReadCallback {
public:
explicit TestReadCallback(SequenceNumber last_visible_seq)

Loading…
Cancel
Save