diff --git a/CMakeLists.txt b/CMakeLists.txt index 6384de141..870c9c3a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ set(TESTS db/corruption_test.cc db/cuckoo_table_db_test.cc db/db_iter_test.cc + db/db_log_iter_test.cc db/db_test.cc db/db_compaction_filter_test.cc db/db_dynamic_level_test.cc diff --git a/Makefile b/Makefile index 653a968fb..998a43e85 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,7 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TESTS = \ db_test \ db_iter_test \ + db_log_iter_test \ db_dynamic_level_test \ block_hash_index_test \ autovector_test \ @@ -676,6 +677,9 @@ slice_transform_test: util/slice_transform_test.o $(LIBOBJECTS) $(TESTHARNESS) db_test: db/db_test.o util/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_log_iter_test: db/db_log_iter_test.o util/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_compaction_filter_test: db/db_compaction_filter_test.o util/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_log_iter_test.cc b/db/db_log_iter_test.cc new file mode 100644 index 000000000..a1e8d2012 --- /dev/null +++ b/db/db_log_iter_test.cc @@ -0,0 +1,290 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Introduction of SyncPoint effectively disabled building and running this test +// in Release build. +// which is a pity, it is a good test +#if !(defined NDEBUG) || !defined(OS_WIN) + +#include "port/stack_trace.h" +#include "util/db_test_util.h" + +namespace rocksdb { + +class DBTestXactLogIterator : public DBTestBase { + public: + DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {} + + std::unique_ptr OpenTransactionLogIter( + const SequenceNumber seq) { + unique_ptr iter; + Status status = dbfull()->GetUpdatesSince(seq, &iter); + EXPECT_OK(status); + EXPECT_TRUE(iter->Valid()); + return std::move(iter); + } +}; + +namespace { +SequenceNumber ReadRecords( + std::unique_ptr& iter, + int& count) { + count = 0; + SequenceNumber lastSequence = 0; + BatchResult res; + while (iter->Valid()) { + res = iter->GetBatch(); + EXPECT_TRUE(res.sequence > lastSequence); + ++count; + lastSequence = res.sequence; + EXPECT_OK(iter->status()); + iter->Next(); + } + return res.sequence; +} + +void ExpectRecords( + const int expected_no_records, + std::unique_ptr& iter) { + int num_records; + ReadRecords(iter, num_records); + ASSERT_EQ(num_records, expected_no_records); +} +} // namespace + +TEST_F(DBTestXactLogIterator, TransactionLogIterator) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + Put(0, "key1", DummyString(1024)); + Put(1, "key2", DummyString(1024)); + Put(1, "key2", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(3, iter); + } + ReopenWithColumnFamilies({"default", "pikachu"}, options); + env_->SleepForMicroseconds(2 * 1000 * 1000); + { + Put(0, "key4", DummyString(1024)); + Put(1, "key5", DummyString(1024)); + Put(0, "key6", DummyString(1024)); + } + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(6, iter); + } + } while (ChangeCompactOptions()); +} + +#ifndef NDEBUG // sync point is not included with DNDEBUG build +TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { + static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; + static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { + {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"}, + {"WalManager::GetSortedWalsOfType:1", + "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", + "WalManager::GetSortedWalsOfType:2"}}; + for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { sync_points[test][0], sync_points[test][1] }, + { sync_points[test][2], sync_points[test][3] }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); + } +} +#endif + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + auto iter = OpenTransactionLogIter(0); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + ASSERT_OK(iter->status()); + Put("key2", DummyString(1024)); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + Put("key1", DummyString(1024)); + Put("key2", DummyString(1023)); + dbfull()->Flush(FlushOptions()); + Reopen(options); + auto iter = OpenTransactionLogIter(0); + ExpectRecords(2, iter); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + for (int i = 0; i < 1024; i++) { + Put("key"+ToString(i), DummyString(10)); + } + dbfull()->Flush(FlushOptions()); + // Corrupt this log to create a gap + rocksdb::VectorLogPtr wal_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); + const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName(); + if (mem_env_) { + mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2); + } else { + ASSERT_EQ(0, truncate(logfile_path.c_str(), + wal_files.front()->SizeFileBytes() / 2)); + } + + // Insert a new entry to a new log file + Put("key1025", DummyString(10)); + // Try to read from the beginning. Should stop before the gap and read less + // than 1025 entries + auto iter = OpenTransactionLogIter(0); + int count; + SequenceNumber last_sequence_read = ReadRecords(iter, count); + ASSERT_LT(last_sequence_read, 1025U); + // Try to read past the gap, should be able to seek to key1025 + auto iter2 = OpenTransactionLogIter(last_sequence_read + 1); + ExpectRecords(1, iter2); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + WriteBatch batch; + batch.Put(handles_[1], "key1", DummyString(1024)); + batch.Put(handles_[0], "key2", DummyString(1024)); + batch.Put(handles_[1], "key3", DummyString(1024)); + batch.Delete(handles_[0], "key2"); + dbfull()->Write(WriteOptions(), &batch); + Flush(1); + Flush(0); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + Put(1, "key4", DummyString(1024)); + auto iter = OpenTransactionLogIter(3); + ExpectRecords(2, iter); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + { + WriteBatch batch; + batch.Put(handles_[1], "key1", DummyString(1024)); + batch.Put(handles_[0], "key2", DummyString(1024)); + batch.PutLogData(Slice("blob1")); + batch.Put(handles_[1], "key3", DummyString(1024)); + batch.PutLogData(Slice("blob2")); + batch.Delete(handles_[0], "key2"); + dbfull()->Write(WriteOptions(), &batch); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + } + + auto res = OpenTransactionLogIter(0)->GetBatch(); + struct Handler : public WriteBatch::Handler { + std::string seen; + virtual Status PutCF(uint32_t cf, const Slice& key, + const Slice& value) override { + seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " + + ToString(value.size()) + ")"; + return Status::OK(); + } + virtual Status MergeCF(uint32_t cf, const Slice& key, + const Slice& value) override { + seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " + + ToString(value.size()) + ")"; + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { + seen += "LogData(" + blob.ToString() + ")"; + } + virtual Status DeleteCF(uint32_t cf, const Slice& key) override { + seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")"; + return Status::OK(); + } + } handler; + res.writeBatchPtr->Iterate(&handler); + ASSERT_EQ( + "Put(1, key1, 1024)" + "Put(0, key2, 1024)" + "LogData(blob1)" + "Put(1, key3, 1024)" + "LogData(blob2)" + "Delete(0, key2)", + handler.seen); +} +} // namespace rocksdb + +#endif // !(defined NDEBUG) || !defined(OS_WIN) + +int main(int argc, char** argv) { +#if !(defined NDEBUG) || !defined(OS_WIN) + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + return 0; +#endif +} diff --git a/db/db_test.cc b/db/db_test.cc index d5e551fdc..d642e18be 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6496,180 +6496,6 @@ TEST_F(DBTest, PurgeInfoLogs) { } } -namespace { -SequenceNumber ReadRecords( - std::unique_ptr& iter, - int& count) { - count = 0; - SequenceNumber lastSequence = 0; - BatchResult res; - while (iter->Valid()) { - res = iter->GetBatch(); - EXPECT_TRUE(res.sequence > lastSequence); - ++count; - lastSequence = res.sequence; - EXPECT_OK(iter->status()); - iter->Next(); - } - return res.sequence; -} - -void ExpectRecords( - const int expected_no_records, - std::unique_ptr& iter) { - int num_records; - ReadRecords(iter, num_records); - ASSERT_EQ(num_records, expected_no_records); -} -} // namespace - -TEST_F(DBTest, TransactionLogIterator) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - Put(0, "key1", DummyString(1024)); - Put(1, "key2", DummyString(1024)); - Put(1, "key2", DummyString(1024)); - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); - { - auto iter = OpenTransactionLogIter(0); - ExpectRecords(3, iter); - } - ReopenWithColumnFamilies({"default", "pikachu"}, options); - env_->SleepForMicroseconds(2 * 1000 * 1000); - { - Put(0, "key4", DummyString(1024)); - Put(1, "key5", DummyString(1024)); - Put(0, "key6", DummyString(1024)); - } - { - auto iter = OpenTransactionLogIter(0); - ExpectRecords(6, iter); - } - } while (ChangeCompactOptions()); -} - -#ifndef NDEBUG // sync point is not included with DNDEBUG build -TEST_F(DBTest, TransactionLogIteratorRace) { - static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; - static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { - {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1", - "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"}, - {"WalManager::GetSortedWalsOfType:1", - "WalManager::PurgeObsoleteFiles:1", - "WalManager::PurgeObsoleteFiles:2", - "WalManager::GetSortedWalsOfType:2"}}; - for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) { - // Setup sync point dependency to reproduce the race condition of - // a log file moved to archived dir, in the middle of GetSortedWalFiles - rocksdb::SyncPoint::GetInstance()->LoadDependency( - { { sync_points[test][0], sync_points[test][1] }, - { sync_points[test][2], sync_points[test][3] }, - }); - - do { - rocksdb::SyncPoint::GetInstance()->ClearTrace(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - Put("key1", DummyString(1024)); - dbfull()->Flush(FlushOptions()); - Put("key2", DummyString(1024)); - dbfull()->Flush(FlushOptions()); - Put("key3", DummyString(1024)); - dbfull()->Flush(FlushOptions()); - Put("key4", DummyString(1024)); - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); - - { - auto iter = OpenTransactionLogIter(0); - ExpectRecords(4, iter); - } - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - // trigger async flush, and log move. Well, log move will - // wait until the GetSortedWalFiles:1 to reproduce the race - // condition - FlushOptions flush_options; - flush_options.wait = false; - dbfull()->Flush(flush_options); - - // "key5" would be written in a new memtable and log - Put("key5", DummyString(1024)); - { - // this iter would miss "key4" if not fixed - auto iter = OpenTransactionLogIter(0); - ExpectRecords(5, iter); - } - } while (ChangeCompactOptions()); - } -} -#endif - -TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - Put("key1", DummyString(1024)); - auto iter = OpenTransactionLogIter(0); - ASSERT_OK(iter->status()); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(!iter->Valid()); - ASSERT_OK(iter->status()); - Put("key2", DummyString(1024)); - iter->Next(); - ASSERT_OK(iter->status()); - ASSERT_TRUE(iter->Valid()); - } while (ChangeCompactOptions()); -} - -TEST_F(DBTest, TransactionLogIteratorCheckAfterRestart) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - Put("key1", DummyString(1024)); - Put("key2", DummyString(1023)); - dbfull()->Flush(FlushOptions()); - Reopen(options); - auto iter = OpenTransactionLogIter(0); - ExpectRecords(2, iter); - } while (ChangeCompactOptions()); -} - -TEST_F(DBTest, TransactionLogIteratorCorruptedLog) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - for (int i = 0; i < 1024; i++) { - Put("key"+ToString(i), DummyString(10)); - } - dbfull()->Flush(FlushOptions()); - // Corrupt this log to create a gap - rocksdb::VectorLogPtr wal_files; - ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); - const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName(); - if (mem_env_) { - mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2); - } else { - ASSERT_EQ(0, truncate(logfile_path.c_str(), - wal_files.front()->SizeFileBytes() / 2)); - } - - // Insert a new entry to a new log file - Put("key1025", DummyString(10)); - // Try to read from the beginning. Should stop before the gap and read less - // than 1025 entries - auto iter = OpenTransactionLogIter(0); - int count; - SequenceNumber last_sequence_read = ReadRecords(iter, count); - ASSERT_LT(last_sequence_read, 1025U); - // Try to read past the gap, should be able to seek to key1025 - auto iter2 = OpenTransactionLogIter(last_sequence_read + 1); - ExpectRecords(1, iter2); - } while (ChangeCompactOptions()); -} // // Test WAL recovery for the various modes available @@ -6951,75 +6777,6 @@ TEST_F(DBTest, kSkipAnyCorruptedRecords) { } } -TEST_F(DBTest, TransactionLogIteratorBatchOperations) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - WriteBatch batch; - batch.Put(handles_[1], "key1", DummyString(1024)); - batch.Put(handles_[0], "key2", DummyString(1024)); - batch.Put(handles_[1], "key3", DummyString(1024)); - batch.Delete(handles_[0], "key2"); - dbfull()->Write(WriteOptions(), &batch); - Flush(1); - Flush(0); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - Put(1, "key4", DummyString(1024)); - auto iter = OpenTransactionLogIter(3); - ExpectRecords(2, iter); - } while (ChangeCompactOptions()); -} - -TEST_F(DBTest, TransactionLogIteratorBlobs) { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - { - WriteBatch batch; - batch.Put(handles_[1], "key1", DummyString(1024)); - batch.Put(handles_[0], "key2", DummyString(1024)); - batch.PutLogData(Slice("blob1")); - batch.Put(handles_[1], "key3", DummyString(1024)); - batch.PutLogData(Slice("blob2")); - batch.Delete(handles_[0], "key2"); - dbfull()->Write(WriteOptions(), &batch); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - } - - auto res = OpenTransactionLogIter(0)->GetBatch(); - struct Handler : public WriteBatch::Handler { - std::string seen; - virtual Status PutCF(uint32_t cf, const Slice& key, - const Slice& value) override { - seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " + - ToString(value.size()) + ")"; - return Status::OK(); - } - virtual Status MergeCF(uint32_t cf, const Slice& key, - const Slice& value) override { - seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " + - ToString(value.size()) + ")"; - return Status::OK(); - } - virtual void LogData(const Slice& blob) override { - seen += "LogData(" + blob.ToString() + ")"; - } - virtual Status DeleteCF(uint32_t cf, const Slice& key) override { - seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")"; - return Status::OK(); - } - } handler; - res.writeBatchPtr->Iterate(&handler); - ASSERT_EQ( - "Put(1, key1, 1024)" - "Put(0, key2, 1024)" - "LogData(blob1)" - "Put(1, key3, 1024)" - "LogData(blob2)" - "Delete(0, key2)", - handler.seen); -} // Multi-threaded test: namespace { diff --git a/src.mk b/src.mk index 043062b50..59522e124 100644 --- a/src.mk +++ b/src.mk @@ -165,6 +165,7 @@ TEST_BENCH_SOURCES = \ db/db_test.cc \ db/db_compaction_filter_test.cc \ db/db_dynamic_level_test.cc \ + db/db_log_iter_test.cc \ db/deletefile_test.cc \ db/fault_injection_test.cc \ db/file_indexer_test.cc \ diff --git a/util/db_test_util.cc b/util/db_test_util.cc index bfc743729..68959fc65 100644 --- a/util/db_test_util.cc +++ b/util/db_test_util.cc @@ -790,15 +790,6 @@ Options DBTestBase::OptionsForLogIterTest() { return options; } -std::unique_ptr DBTestBase::OpenTransactionLogIter( - const SequenceNumber seq) { - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(seq, &iter); - EXPECT_OK(status); - EXPECT_TRUE(iter->Valid()); - return std::move(iter); -} - std::string DBTestBase::DummyString(size_t len, char c) { return std::string(len, c); } diff --git a/util/db_test_util.h b/util/db_test_util.h index 0f3b3c6d5..c96d5394b 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -594,9 +594,6 @@ class DBTestBase : public testing::Test { Options OptionsForLogIterTest(); - std::unique_ptr OpenTransactionLogIter( - const SequenceNumber seq); - std::string DummyString(size_t len, char c = 'a'); void VerifyIterLast(std::string expected_key, int cf = 0);