You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/db/db_range_del_test.cc

612 lines
23 KiB

// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
class DBRangeDelTest : public DBTestBase {
public:
DBRangeDelTest() : DBTestBase("/db_range_del_test") {}
std::string GetNumericStr(int key) {
uint64_t uint64_key = static_cast<uint64_t>(key);
std::string str;
str.resize(8);
memcpy(&str[0], static_cast<void*>(&uint64_key), 8);
return str;
}
};
// PlainTableFactory and NumTableFilesAtLevel() are not supported in
// ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) {
Options opts = CurrentOptions();
opts.table_factory.reset(new PlainTableFactory());
opts.prefix_extractor.reset(NewNoopTransform());
opts.allow_mmap_reads = true;
opts.max_sequential_skip_in_iterations = 999999;
Reopen(opts);
ASSERT_TRUE(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1")
.IsNotSupported());
}
TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr1"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_F(DBRangeDelTest, CompactionOutputHasOnlyRangeTombstone) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.statistics = CreateDBStatistics();
Reopen(opts);
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z");
db_->Flush(FlushOptions());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(0, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE));
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
// regression test for exactly filled compaction output files. Previously
// another file would be generated containing all range deletions, which
// could invalidate the non-overlapping file boundary invariant.
const int kNumPerFile = 4, kNumFiles = 2, kFileBytes = 9 << 10;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumFiles;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
options.num_levels = 2;
options.target_file_size_base = kFileBytes;
BlockBasedTableOptions table_options;
table_options.block_size_deviation = 50; // each block holds two keys
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0), Key(1));
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values;
// Write 12K (4 values, each 3K)
for (int j = 0; j < kNumPerFile; j++) {
values.push_back(RandomString(&rnd, 3 << 10));
ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
if (j == 0 && i > 0) {
dbfull()->TEST_WaitForFlushMemTable();
}
}
}
// put extra key to trigger final flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(2, NumTableFilesAtLevel(1));
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c"));
db_->Put(WriteOptions(), "b2", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
// first iteration verifies query correctness in memtable, second verifies
// query correctness for a single SST file
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "b1", &value).IsNotFound());
ASSERT_OK(db_->Get(ReadOptions(), "b2", &value));
}
}
TEST_F(DBRangeDelTest, CompactRangeDelsSameStartKey) {
db_->Put(WriteOptions(), "unused", "val"); // prevents empty after compaction
db_->Put(WriteOptions(), "b1", "val");
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "c"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(3, NumTableFilesAtLevel(0));
for (int i = 0; i < 2; ++i) {
if (i > 0) {
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
}
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "b1", &value).IsNotFound());
}
}
#endif // ROCKSDB_LITE
TEST_F(DBRangeDelTest, FlushRemovesCoveredKeys) {
const int kNum = 300, kRangeBegin = 50, kRangeEnd = 250;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
Reopen(opts);
// Write a third before snapshot, a third between snapshot and tombstone, and
// a third after the tombstone. Keys older than snapshot or newer than the
// tombstone should be preserved.
const Snapshot* snapshot = nullptr;
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 3) {
snapshot = db_->GetSnapshot();
} else if (i == 2 * kNum / 3) {
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
}
db_->Flush(FlushOptions());
for (int i = 0; i < kNum; ++i) {
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
std::string value;
if (i < kRangeBegin || i > kRangeEnd || i < kNum / 3 || i >= 2 * kNum / 3) {
ASSERT_OK(db_->Get(read_opts, GetNumericStr(i), &value));
} else {
ASSERT_TRUE(db_->Get(read_opts, GetNumericStr(i), &value).IsNotFound());
}
}
db_->ReleaseSnapshot(snapshot);
}
// NumTableFilesAtLevel() is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBRangeDelTest, CompactionRemovesCoveredKeys) {
const int kNumPerFile = 100, kNumFiles = 4;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
opts.disable_auto_compactions = true;
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
opts.num_levels = 2;
opts.statistics = CreateDBStatistics();
Reopen(opts);
for (int i = 0; i < kNumFiles; ++i) {
if (i > 0) {
// range tombstone covers first half of the previous file
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr((i - 1) * kNumPerFile),
GetNumericStr((i - 1) * kNumPerFile + kNumPerFile / 2));
}
// Make sure a given key appears in each file so compaction won't be able to
// use trivial move, which would happen if the ranges were non-overlapping.
// Also, we need an extra element since flush is only triggered when the
// number of keys is one greater than SpecialSkipListFactory's limit.
// We choose a key outside the key-range used by the test to avoid conflict.
db_->Put(WriteOptions(), GetNumericStr(kNumPerFile * kNumFiles), "val");
for (int j = 0; j < kNumPerFile; ++j) {
db_->Put(WriteOptions(), GetNumericStr(i * kNumPerFile + j), "val");
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ASSERT_EQ((kNumFiles - 1) * kNumPerFile / 2,
TestGetTickerCount(opts, COMPACTION_KEY_DROP_RANGE_DEL));
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumPerFile; ++j) {
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
std::string value;
if (i == kNumFiles - 1 || j >= kNumPerFile / 2) {
ASSERT_OK(
db_->Get(read_opts, GetNumericStr(i * kNumPerFile + j), &value));
} else {
ASSERT_TRUE(
db_->Get(read_opts, GetNumericStr(i * kNumPerFile + j), &value)
.IsNotFound());
}
}
}
}
TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
const int kNumPerFile = 100, kNumFiles = 4, kFileBytes = 100 << 10;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumFiles;
options.max_bytes_for_level_base = 2 * kFileBytes;
options.max_subcompactions = 4;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
options.num_levels = 3;
options.target_file_size_base = kFileBytes;
options.target_file_size_multiplier = 1;
Reopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int j = 0; j < kNumFiles; ++j) {
if (i > 0) {
// delete [95,105) in two files, [295,305) in next two
int mid = (j + (1 - j % 2)) * kNumPerFile;
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5));
}
std::vector<std::string> values;
// Write 100KB (100 values, each 1K)
for (int k = 0; k < kNumPerFile; k++) {
values.push_back(RandomString(&rnd, 990));
ASSERT_OK(Put(Key(j * kNumPerFile + k), values[k]));
}
// put extra key to trigger flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
if (j < kNumFiles - 1) {
// background compaction may happen early for kNumFiles'th file
ASSERT_EQ(NumTableFilesAtLevel(0), j + 1);
}
if (j == options.level0_file_num_compaction_trigger - 1) {
// When i == 1, compaction will output some files to L1, at which point
// L1 is not bottommost so range deletions cannot be compacted away. The
// new L1 files must be generated with non-overlapping key ranges even
// though multiple subcompactions see the same ranges deleted, else an
// assertion will fail.
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ASSERT_GT(NumTableFilesAtLevel(2), 0);
}
}
}
}
TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
const int kNumPerFile = 100, kFilesPerLevel = 4, kNumLevels = 4;
Options options = CurrentOptions();
options.compaction_options_universal.min_merge_width = kFilesPerLevel;
options.compaction_options_universal.max_merge_width = kFilesPerLevel;
options.compaction_options_universal.size_ratio = 10;
options.compaction_style = kCompactionStyleUniversal;
options.level0_file_num_compaction_trigger = kFilesPerLevel;
options.max_subcompactions = 4;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
options.num_levels = kNumLevels;
options.target_file_size_base = kNumPerFile << 10;
options.target_file_size_multiplier = 1;
Reopen(options);
Random rnd(301);
for (int i = 0; i < kNumLevels - 1; ++i) {
for (int j = 0; j < kFilesPerLevel; ++j) {
if (i == kNumLevels - 2) {
// insert range deletions [95,105) in two files, [295,305) in next two
// to prepare L1 for later manual compaction.
int mid = (j + (1 - j % 2)) * kNumPerFile;
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(mid - 5), Key(mid + 5));
}
std::vector<std::string> values;
// Write 100KB (100 values, each 1K)
for (int k = 0; k < kNumPerFile; k++) {
values.push_back(RandomString(&rnd, 990));
ASSERT_OK(Put(Key(j * kNumPerFile + k), values[k]));
}
// put extra key to trigger flush
ASSERT_OK(Put("", ""));
dbfull()->TEST_WaitForFlushMemTable();
if (j < kFilesPerLevel - 1) {
// background compaction may happen early for kFilesPerLevel'th file
ASSERT_EQ(NumTableFilesAtLevel(0), j + 1);
}
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(kNumLevels - 1 - i), kFilesPerLevel - 1);
}
// Now L1-L3 are full, when we compact L1->L2 we should see (1) subcompactions
// happen since input level > 0; (2) range deletions are not dropped since
// output level is not bottommost. If no file boundary assertion fails, that
// probably means universal compaction + subcompaction + range deletion are
// compatible.
ASSERT_OK(dbfull()->RunManualCompaction(
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd(),
1 /* input_level */, 2 /* output_level */, 0 /* output_path_id */,
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* disallow_trivial_move */));
}
#endif // ROCKSDB_LITE
TEST_F(DBRangeDelTest, CompactionRemovesCoveredMergeOperands) {
const int kNumPerFile = 3, kNumFiles = 3;
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.memtable_factory.reset(new SpecialSkipListFactory(2 * kNumPerFile));
opts.merge_operator = MergeOperators::CreateUInt64AddOperator();
opts.num_levels = 2;
Reopen(opts);
// Iterates kNumFiles * kNumPerFile + 1 times since flushing the last file
// requires an extra entry.
for (int i = 0; i <= kNumFiles * kNumPerFile; ++i) {
if (i % kNumPerFile == 0 && i / kNumPerFile == kNumFiles - 1) {
// Delete merge operands from all but the last file
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key",
"key_");
}
std::string val;
PutFixed64(&val, i);
db_->Merge(WriteOptions(), "key", val);
// we need to prevent trivial move using Puts so compaction will actually
// process the merge operands.
db_->Put(WriteOptions(), "prevent_trivial_move", "");
if (i > 0 && i % kNumPerFile == 0) {
dbfull()->TEST_WaitForFlushMemTable();
}
}
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
std::string expected, actual;
ASSERT_OK(db_->Get(read_opts, "key", &actual));
PutFixed64(&expected, 45); // 1+2+...+9
ASSERT_EQ(expected, actual);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
expected.clear();
ASSERT_OK(db_->Get(read_opts, "key", &actual));
uint64_t tmp;
Slice tmp2(actual);
GetFixed64(&tmp2, &tmp);
PutFixed64(&expected, 30); // 6+7+8+9 (earlier operands covered by tombstone)
ASSERT_EQ(expected, actual);
}
// NumTableFilesAtLevel() is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {
// During compaction to bottommost level, verify range tombstones older than
// the oldest snapshot are removed, while others are preserved.
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.num_levels = 2;
opts.statistics = CreateDBStatistics();
Reopen(opts);
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr1"); // obsolete after compaction
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());
const Snapshot* snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr2",
"dr2"); // protected by snapshot
db_->Put(WriteOptions(), "key", "val");
db_->Flush(FlushOptions());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, TestGetTickerCount(opts, COMPACTION_RANGE_DEL_DROP_OBSOLETE));
db_->ReleaseSnapshot(snapshot);
}
#endif // ROCKSDB_LITE
TEST_F(DBRangeDelTest, GetCoveredKeyFromMutableMemtable) {
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ReadOptions read_opts;
std::string value;
ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound());
}
TEST_F(DBRangeDelTest, GetCoveredKeyFromImmutableMemtable) {
Options opts = CurrentOptions();
opts.max_write_buffer_number = 3;
opts.min_write_buffer_number_to_merge = 2;
// SpecialSkipListFactory lets us specify maximum number of elements the
// memtable can hold. It switches the active memtable to immutable (flush is
// prevented by the above options) upon inserting an element that would
// overflow the memtable.
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(opts);
db_->Put(WriteOptions(), "key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "blah", "val");
ReadOptions read_opts;
std::string value;
ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound());
}
TEST_F(DBRangeDelTest, GetCoveredKeyFromSst) {
db_->Put(WriteOptions(), "key", "val");
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
ReadOptions read_opts;
std::string value;
ASSERT_TRUE(db_->Get(read_opts, "key", &value).IsNotFound());
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, GetIgnoresRangeDeletions) {
Options opts = CurrentOptions();
opts.max_write_buffer_number = 4;
opts.min_write_buffer_number_to_merge = 3;
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(opts);
db_->Put(WriteOptions(), "sst_key", "val");
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
db_->Put(WriteOptions(), "imm_key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "mem_key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
for (std::string key : {"sst_key", "imm_key", "mem_key"}) {
std::string value;
ASSERT_OK(db_->Get(read_opts, key, &value));
}
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, IteratorRemovesCoveredKeys) {
const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
Reopen(opts);
// Write half of the keys before the tombstone and half after the tombstone.
// Only covered keys (i.e., within the range and older than the tombstone)
// should be deleted.
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 2) {
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
}
ReadOptions read_opts;
auto* iter = db_->NewIterator(read_opts);
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(GetNumericStr(expected), iter->key());
if (expected == kRangeBegin - 1) {
expected = kNum / 2;
} else {
++expected;
}
}
ASSERT_EQ(kNum, expected);
delete iter;
}
TEST_F(DBRangeDelTest, IteratorOverUserSnapshot) {
const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
Options opts = CurrentOptions();
opts.comparator = test::Uint64Comparator();
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
Reopen(opts);
const Snapshot* snapshot = nullptr;
// Put a snapshot before the range tombstone, verify an iterator using that
// snapshot sees all inserted keys.
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 2) {
snapshot = db_->GetSnapshot();
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(kRangeBegin), GetNumericStr(kRangeEnd));
}
db_->Put(WriteOptions(), GetNumericStr(i), "val");
}
ReadOptions read_opts;
read_opts.snapshot = snapshot;
auto* iter = db_->NewIterator(read_opts);
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(GetNumericStr(expected), iter->key());
++expected;
}
ASSERT_EQ(kNum / 2, expected);
delete iter;
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, IteratorIgnoresRangeDeletions) {
Options opts = CurrentOptions();
opts.max_write_buffer_number = 4;
opts.min_write_buffer_number_to_merge = 3;
opts.memtable_factory.reset(new SpecialSkipListFactory(1));
Reopen(opts);
db_->Put(WriteOptions(), "sst_key", "val");
// snapshot prevents key from being deleted during flush
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ASSERT_OK(db_->Flush(FlushOptions()));
db_->Put(WriteOptions(), "imm_key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
db_->Put(WriteOptions(), "mem_key", "val");
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
ReadOptions read_opts;
read_opts.ignore_range_deletions = true;
auto* iter = db_->NewIterator(read_opts);
int i = 0;
std::string expected[] = {"imm_key", "mem_key", "sst_key"};
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++i) {
std::string key;
ASSERT_EQ(expected[i], iter->key());
}
ASSERT_EQ(3, i);
delete iter;
db_->ReleaseSnapshot(snapshot);
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}