You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/db/db_log_iter_test.cc

296 lines
9.7 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Introduction of SyncPoint effectively disabled building and running this test
// in Release build.
// which is a pity, it is a good test
#if !defined(ROCKSDB_LITE)
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace rocksdb {
class DBTestXactLogIterator : public DBTestBase {
public:
DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
const SequenceNumber seq) {
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(seq, &iter);
EXPECT_OK(status);
EXPECT_TRUE(iter->Valid());
return iter;
}
};
namespace {
SequenceNumber ReadRecords(
std::unique_ptr<TransactionLogIterator>& iter,
int& count) {
count = 0;
SequenceNumber lastSequence = 0;
BatchResult res;
while (iter->Valid()) {
res = iter->GetBatch();
EXPECT_TRUE(res.sequence > lastSequence);
++count;
lastSequence = res.sequence;
EXPECT_OK(iter->status());
iter->Next();
}
return res.sequence;
}
void ExpectRecords(
const int expected_no_records,
std::unique_ptr<TransactionLogIterator>& iter) {
int num_records;
ReadRecords(iter, num_records);
ASSERT_EQ(num_records, expected_no_records);
}
} // namespace
TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Put(0, "key1", DummyString(1024));
Put(1, "key2", DummyString(1024));
Put(1, "key2", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(3, iter);
}
ReopenWithColumnFamilies({"default", "pikachu"}, options);
env_->SleepForMicroseconds(2 * 1000 * 1000);
{
Put(0, "key4", DummyString(1024));
Put(1, "key5", DummyString(1024));
Put(0, "key6", DummyString(1024));
}
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(6, iter);
}
} while (ChangeCompactOptions());
}
#ifndef NDEBUG // sync point is not included with DNDEBUG build
TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
{"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
"WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
{"WalManager::GetSortedWalsOfType:1",
"WalManager::PurgeObsoleteFiles:1",
"WalManager::PurgeObsoleteFiles:2",
"WalManager::GetSortedWalsOfType:2"}};
for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
// Setup sync point dependency to reproduce the race condition of
// a log file moved to archived dir, in the middle of GetSortedWalFiles
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{ { sync_points[test][0], sync_points[test][1] },
{ sync_points[test][2], sync_points[test][3] },
});
do {
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
Put("key1", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key2", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key3", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key4", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
dbfull()->FlushWAL(false);
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(4, iter);
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// trigger async flush, and log move. Well, log move will
// wait until the GetSortedWalFiles:1 to reproduce the race
// condition
FlushOptions flush_options;
flush_options.wait = false;
dbfull()->Flush(flush_options);
// "key5" would be written in a new memtable and log
Put("key5", DummyString(1024));
dbfull()->FlushWAL(false);
{
// this iter would miss "key4" if not fixed
auto iter = OpenTransactionLogIter(0);
ExpectRecords(5, iter);
}
} while (ChangeCompactOptions());
}
}
#endif
TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
Put("key1", DummyString(1024));
auto iter = OpenTransactionLogIter(0);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(!iter->Valid());
ASSERT_OK(iter->status());
Put("key2", DummyString(1024));
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
} while (ChangeCompactOptions());
}
TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
Put("key1", DummyString(1024));
Put("key2", DummyString(1023));
dbfull()->Flush(FlushOptions());
Reopen(options);
auto iter = OpenTransactionLogIter(0);
ExpectRecords(2, iter);
} while (ChangeCompactOptions());
}
TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
for (int i = 0; i < 1024; i++) {
Put("key"+ToString(i), DummyString(10));
}
dbfull()->Flush(FlushOptions());
dbfull()->FlushWAL(false);
// Corrupt this log to create a gap
rocksdb::VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
if (mem_env_) {
mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
} else {
ASSERT_EQ(0, truncate(logfile_path.c_str(),
wal_files.front()->SizeFileBytes() / 2));
}
// Insert a new entry to a new log file
Put("key1025", DummyString(10));
dbfull()->FlushWAL(false);
// Try to read from the beginning. Should stop before the gap and read less
// than 1025 entries
auto iter = OpenTransactionLogIter(0);
int count;
SequenceNumber last_sequence_read = ReadRecords(iter, count);
ASSERT_LT(last_sequence_read, 1025U);
// Try to read past the gap, should be able to seek to key1025
auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
ExpectRecords(1, iter2);
} while (ChangeCompactOptions());
}
TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
WriteBatch batch;
batch.Put(handles_[1], "key1", DummyString(1024));
batch.Put(handles_[0], "key2", DummyString(1024));
batch.Put(handles_[1], "key3", DummyString(1024));
batch.Delete(handles_[0], "key2");
dbfull()->Write(WriteOptions(), &batch);
Flush(1);
Flush(0);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Put(1, "key4", DummyString(1024));
auto iter = OpenTransactionLogIter(3);
ExpectRecords(2, iter);
} while (ChangeCompactOptions());
}
TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
{
WriteBatch batch;
batch.Put(handles_[1], "key1", DummyString(1024));
batch.Put(handles_[0], "key2", DummyString(1024));
batch.PutLogData(Slice("blob1"));
batch.Put(handles_[1], "key3", DummyString(1024));
batch.PutLogData(Slice("blob2"));
batch.Delete(handles_[0], "key2");
dbfull()->Write(WriteOptions(), &batch);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
}
auto res = OpenTransactionLogIter(0)->GetBatch();
struct Handler : public WriteBatch::Handler {
std::string seen;
virtual Status PutCF(uint32_t cf, const Slice& key,
const Slice& value) override {
seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
ToString(value.size()) + ")";
return Status::OK();
}
virtual Status MergeCF(uint32_t cf, const Slice& key,
const Slice& value) override {
seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
ToString(value.size()) + ")";
return Status::OK();
}
virtual void LogData(const Slice& blob) override {
seen += "LogData(" + blob.ToString() + ")";
}
virtual Status DeleteCF(uint32_t cf, const Slice& key) override {
seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
return Status::OK();
}
} handler;
res.writeBatchPtr->Iterate(&handler);
ASSERT_EQ(
"Put(1, key1, 1024)"
"Put(0, key2, 1024)"
"LogData(blob1)"
"Put(1, key3, 1024)"
"LogData(blob2)"
"Delete(0, key2)",
handler.seen);
}
} // namespace rocksdb
#endif // !defined(ROCKSDB_LITE)
int main(int argc, char** argv) {
#if !defined(ROCKSDB_LITE)
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
#else
(void) argc;
(void) argv;
return 0;
#endif
}