diff --git a/CMakeLists.txt b/CMakeLists.txt index 870c9c3a8..dd2d689ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -258,6 +258,7 @@ set(TESTS db/db_test.cc db/db_compaction_filter_test.cc db/db_dynamic_level_test.cc + db/db_tailing_iter_test.cc db/dbformat_test.cc db/deletefile_test.cc db/fault_injection_test.cc diff --git a/Makefile b/Makefile index 998a43e85..dde7975bb 100644 --- a/Makefile +++ b/Makefile @@ -222,6 +222,7 @@ TESTS = \ db_iter_test \ db_log_iter_test \ db_dynamic_level_test \ + db_tailing_iter_test \ block_hash_index_test \ autovector_test \ column_family_test \ @@ -686,6 +687,9 @@ db_compaction_filter_test: db/db_compaction_filter_test.o util/db_test_util.o $( db_dynamic_level_test: db/db_dynamic_level_test.o util/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_tailing_iter_test: db/db_tailing_iter_test.o util/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_iter_test: db/db_iter_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc new file mode 100644 index 000000000..d36f2bb0a --- /dev/null +++ b/db/db_tailing_iter_test.cc @@ -0,0 +1,481 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Introduction of SyncPoint effectively disabled building and running this test +// in Release build. +// which is a pity, it is a good test +#if !(defined NDEBUG) || !defined(OS_WIN) + +#include "port/stack_trace.h" +#include "util/db_test_util.h" + +namespace rocksdb { + +class DBTestTailingIterator : public DBTestBase { + public: + DBTestTailingIterator() : DBTestBase("/db_tailing_iterator_test") {} +}; + +TEST_F(DBTestTailingIterator, TailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST_F(DBTestTailingIterator, TailingIteratorDeletes) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(Put(1, "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(Delete(1, "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + } + + // force a flush to make sure that no records are read from memtable + ASSERT_OK(Flush(1)); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) ; + + ASSERT_EQ(count, num_records); +} + +TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); + ReadOptions read_options; + read_options.tailing = true; + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(Put(1, "0101", "test")); + + ASSERT_OK(Flush(1)); + + ASSERT_OK(Put(1, "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); +} + +TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.read_tier = kBlockCacheTier; + + std::string key("key"); + std::string value("value"); + + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + // we either see the entry or it's not in cache + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + iter->SeekToFirst(); + // should still be true after compaction + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); +} + +TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 1000; + CreateAndReopenWithCF({"pikachu"}, options); + + ReadOptions read_options; + read_options.tailing = true; + + const int NROWS = 10000; + // Write rows with keys 00000, 00002, 00004 etc. + for (int i = 0; i < NROWS; ++i) { + char buf[100]; + snprintf(buf, sizeof(buf), "%05d", 2*i); + std::string key(buf); + std::string value("value"); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + std::unique_ptr iter(db_->NewIterator(read_options)); + // Seek to 00001. We expect to find 00002. + std::string start_key = "00001"; + iter->Seek(start_key); + ASSERT_TRUE(iter->Valid()); + + std::string found = iter->key().ToString(); + ASSERT_EQ("00002", found); + + // Now seek to the same key. The iterator should remain in the same + // position. + iter->Seek(found); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(found, iter->key().ToString()); +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorKeepAdding) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToNext) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorDeletes) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(Put(1, "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(Delete(1, "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + } + + // force a flush to make sure that no records are read from memtable + ASSERT_OK(Flush(1)); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) { + } + + ASSERT_EQ(count, num_records); +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(Put(1, "0101", "test")); + + ASSERT_OK(Flush(1)); + + ASSERT_OK(Put(1, "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorIncomplete) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + read_options.read_tier = kBlockCacheTier; + + std::string key = "key"; + std::string value = "value"; + + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + // we either see the entry or it's not in cache + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + iter->SeekToFirst(); + // should still be true after compaction + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); +} + +TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToSame) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 1000; + CreateAndReopenWithCF({"pikachu"}, options); + + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + const int NROWS = 10000; + // Write rows with keys 00000, 00002, 00004 etc. + for (int i = 0; i < NROWS; ++i) { + char buf[100]; + snprintf(buf, sizeof(buf), "%05d", 2 * i); + std::string key(buf); + std::string value("value"); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + std::unique_ptr iter(db_->NewIterator(read_options)); + // Seek to 00001. We expect to find 00002. + std::string start_key = "00001"; + iter->Seek(start_key); + ASSERT_TRUE(iter->Valid()); + + std::string found = iter->key().ToString(); + ASSERT_EQ("00002", found); + + // Now seek to the same key. The iterator should remain in the same + // position. + iter->Seek(found); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(found, iter->key().ToString()); +} + +} // namespace rocksdb + +#endif // !(defined NDEBUG) || !defined(OS_WIN) + +int main(int argc, char** argv) { +#if !(defined NDEBUG) || !defined(OS_WIN) + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + return 0; +#endif +} diff --git a/db/db_test.cc b/db/db_test.cc index d642e18be..e14104534 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7610,450 +7610,6 @@ TEST_F(DBTest, PrefixScan) { XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); } -TEST_F(DBTest, TailingIteratorSingle) { - ReadOptions read_options; - read_options.tailing = true; - - std::unique_ptr iter(db_->NewIterator(read_options)); - iter->SeekToFirst(); - ASSERT_TRUE(!iter->Valid()); - - // add a record and check that iter can see it - ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); - iter->SeekToFirst(); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "mirko"); - - iter->Next(); - ASSERT_TRUE(!iter->Valid()); -} - -TEST_F(DBTest, TailingIteratorKeepAdding) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - std::string value(1024, 'a'); - - const int num_records = 10000; - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "%016d", i); - - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); - - iter->Seek(key); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } -} - -TEST_F(DBTest, TailingIteratorSeekToNext) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - std::string value(1024, 'a'); - - const int num_records = 1000; - for (int i = 1; i < num_records; ++i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); - - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); - } - - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } - for (int i = 2 * num_records; i > 0; --i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); - - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); - } - - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } -} - -TEST_F(DBTest, TailingIteratorDeletes) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - - // write a single record, read it using the iterator, then delete it - ASSERT_OK(Put(1, "0test", "test")); - iter->SeekToFirst(); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0test"); - ASSERT_OK(Delete(1, "0test")); - - // write many more records - const int num_records = 10000; - std::string value(1024, 'A'); - - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "1%015d", i); - - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); - } - - // force a flush to make sure that no records are read from memtable - ASSERT_OK(Flush(1)); - - // skip "0test" - iter->Next(); - - // make sure we can read all new records using the existing iterator - int count = 0; - for (; iter->Valid(); iter->Next(), ++count) ; - - ASSERT_EQ(count, num_records); -} - -TEST_F(DBTest, TailingIteratorPrefixSeek) { - XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, - kSkipNoPrefix); - ReadOptions read_options; - read_options.tailing = true; - - Options options = CurrentOptions(); - options.env = env_; - options.create_if_missing = true; - options.disable_auto_compactions = true; - options.prefix_extractor.reset(NewFixedPrefixTransform(2)); - options.memtable_factory.reset(NewHashSkipListRepFactory(16)); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(Put(1, "0101", "test")); - - ASSERT_OK(Flush(1)); - - ASSERT_OK(Put(1, "0202", "test")); - - // Seek(0102) shouldn't find any records since 0202 has a different prefix - iter->Seek("0102"); - ASSERT_TRUE(!iter->Valid()); - - iter->Seek("0202"); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0202"); - - iter->Next(); - ASSERT_TRUE(!iter->Valid()); - XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); -} - -TEST_F(DBTest, TailingIteratorIncomplete) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - read_options.read_tier = kBlockCacheTier; - - std::string key("key"); - std::string value("value"); - - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - - std::unique_ptr iter(db_->NewIterator(read_options)); - iter->SeekToFirst(); - // we either see the entry or it's not in cache - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - iter->SeekToFirst(); - // should still be true after compaction - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); -} - -TEST_F(DBTest, TailingIteratorSeekToSame) { - Options options = CurrentOptions(); - options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 1000; - CreateAndReopenWithCF({"pikachu"}, options); - - ReadOptions read_options; - read_options.tailing = true; - - const int NROWS = 10000; - // Write rows with keys 00000, 00002, 00004 etc. - for (int i = 0; i < NROWS; ++i) { - char buf[100]; - snprintf(buf, sizeof(buf), "%05d", 2*i); - std::string key(buf); - std::string value("value"); - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - } - - std::unique_ptr iter(db_->NewIterator(read_options)); - // Seek to 00001. We expect to find 00002. - std::string start_key = "00001"; - iter->Seek(start_key); - ASSERT_TRUE(iter->Valid()); - - std::string found = iter->key().ToString(); - ASSERT_EQ("00002", found); - - // Now seek to the same key. The iterator should remain in the same - // position. - iter->Seek(found); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(found, iter->key().ToString()); -} - -TEST_F(DBTest, ManagedTailingIteratorSingle) { - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - std::unique_ptr iter(db_->NewIterator(read_options)); - iter->SeekToFirst(); - ASSERT_TRUE(!iter->Valid()); - - // add a record and check that iter can see it - ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); - iter->SeekToFirst(); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "mirko"); - - iter->Next(); - ASSERT_TRUE(!iter->Valid()); -} - -TEST_F(DBTest, ManagedTailingIteratorKeepAdding) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - std::string value(1024, 'a'); - - const int num_records = 10000; - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "%016d", i); - - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); - - iter->Seek(key); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } -} - -TEST_F(DBTest, ManagedTailingIteratorSeekToNext) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - std::string value(1024, 'a'); - - const int num_records = 1000; - for (int i = 1; i < num_records; ++i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); - - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); - } - - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } - for (int i = 2 * num_records; i > 0; --i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); - - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); - } - - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - } -} - -TEST_F(DBTest, ManagedTailingIteratorDeletes) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - - // write a single record, read it using the iterator, then delete it - ASSERT_OK(Put(1, "0test", "test")); - iter->SeekToFirst(); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0test"); - ASSERT_OK(Delete(1, "0test")); - - // write many more records - const int num_records = 10000; - std::string value(1024, 'A'); - - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "1%015d", i); - - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); - } - - // force a flush to make sure that no records are read from memtable - ASSERT_OK(Flush(1)); - - // skip "0test" - iter->Next(); - - // make sure we can read all new records using the existing iterator - int count = 0; - for (; iter->Valid(); iter->Next(), ++count) { - } - - ASSERT_EQ(count, num_records); -} - -TEST_F(DBTest, ManagedTailingIteratorPrefixSeek) { - XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, - kSkipNoPrefix); - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - Options options = CurrentOptions(); - options.env = env_; - options.create_if_missing = true; - options.disable_auto_compactions = true; - options.prefix_extractor.reset(NewFixedPrefixTransform(2)); - options.memtable_factory.reset(NewHashSkipListRepFactory(16)); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(Put(1, "0101", "test")); - - ASSERT_OK(Flush(1)); - - ASSERT_OK(Put(1, "0202", "test")); - - // Seek(0102) shouldn't find any records since 0202 has a different prefix - iter->Seek("0102"); - ASSERT_TRUE(!iter->Valid()); - - iter->Seek("0202"); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0202"); - - iter->Next(); - ASSERT_TRUE(!iter->Valid()); - XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); -} - -TEST_F(DBTest, ManagedTailingIteratorIncomplete) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - read_options.read_tier = kBlockCacheTier; - - std::string key = "key"; - std::string value = "value"; - - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - - std::unique_ptr iter(db_->NewIterator(read_options)); - iter->SeekToFirst(); - // we either see the entry or it's not in cache - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - iter->SeekToFirst(); - // should still be true after compaction - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); -} - -TEST_F(DBTest, ManagedTailingIteratorSeekToSame) { - Options options = CurrentOptions(); - options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 1000; - CreateAndReopenWithCF({"pikachu"}, options); - - ReadOptions read_options; - read_options.tailing = true; - read_options.managed = true; - - const int NROWS = 10000; - // Write rows with keys 00000, 00002, 00004 etc. - for (int i = 0; i < NROWS; ++i) { - char buf[100]; - snprintf(buf, sizeof(buf), "%05d", 2 * i); - std::string key(buf); - std::string value("value"); - ASSERT_OK(db_->Put(WriteOptions(), key, value)); - } - - std::unique_ptr iter(db_->NewIterator(read_options)); - // Seek to 00001. We expect to find 00002. - std::string start_key = "00001"; - iter->Seek(start_key); - ASSERT_TRUE(iter->Valid()); - - std::string found = iter->key().ToString(); - ASSERT_EQ("00002", found); - - // Now seek to the same key. The iterator should remain in the same - // position. - iter->Seek(found); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(found, iter->key().ToString()); -} - TEST_F(DBTest, BlockBasedTablePrefixIndexTest) { // create a DB with block prefix index BlockBasedTableOptions table_options; diff --git a/src.mk b/src.mk index 59522e124..d38f645f2 100644 --- a/src.mk +++ b/src.mk @@ -166,6 +166,7 @@ TEST_BENCH_SOURCES = \ db/db_compaction_filter_test.cc \ db/db_dynamic_level_test.cc \ db/db_log_iter_test.cc \ + db/db_tailing_iter_test.cc \ db/deletefile_test.cc \ db/fault_injection_test.cc \ db/file_indexer_test.cc \