fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2666 lines
95 KiB
2666 lines
95 KiB
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "db/db_with_timestamp_test_util.h"
|
|
#include "options/options_helper.h"
|
|
#include "port/port.h"
|
|
#include "port/stack_trace.h"
|
|
#include "rocksdb/file_system.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "utilities/fault_injection_env.h"
|
|
#include "utilities/fault_injection_fs.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
class DBWALTestBase : public DBTestBase {
|
|
protected:
|
|
explicit DBWALTestBase(const std::string& dir_name)
|
|
: DBTestBase(dir_name, /*env_do_fsync=*/true) {}
|
|
|
|
#if defined(ROCKSDB_PLATFORM_POSIX)
|
|
public:
|
|
#if defined(ROCKSDB_FALLOCATE_PRESENT)
|
|
bool IsFallocateSupported() {
|
|
// Test fallocate support of running file system.
|
|
// Skip this test if fallocate is not supported.
|
|
std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
|
|
int fd = -1;
|
|
do {
|
|
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
|
|
} while (fd < 0 && errno == EINTR);
|
|
assert(fd > 0);
|
|
int alloc_status = fallocate(fd, 0, 0, 1);
|
|
int err_number = errno;
|
|
close(fd);
|
|
assert(env_->DeleteFile(fname_test_fallocate) == Status::OK());
|
|
if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
|
|
fprintf(stderr, "Skipped preallocated space check: %s\n",
|
|
errnoStr(err_number).c_str());
|
|
return false;
|
|
}
|
|
assert(alloc_status == 0);
|
|
return true;
|
|
}
|
|
#endif // ROCKSDB_FALLOCATE_PRESENT
|
|
|
|
uint64_t GetAllocatedFileSize(std::string file_name) {
|
|
struct stat sbuf;
|
|
int err = stat(file_name.c_str(), &sbuf);
|
|
assert(err == 0);
|
|
return sbuf.st_blocks * 512;
|
|
}
|
|
#endif // ROCKSDB_PLATFORM_POSIX
|
|
};
|
|
|
|
class DBWALTest : public DBWALTestBase {
|
|
public:
|
|
DBWALTest() : DBWALTestBase("/db_wal_test") {}
|
|
};
|
|
|
|
// A SpecialEnv enriched to give more insight about deleted files
|
|
class EnrichedSpecialEnv : public SpecialEnv {
|
|
public:
|
|
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
|
|
Status NewSequentialFile(const std::string& f,
|
|
std::unique_ptr<SequentialFile>* r,
|
|
const EnvOptions& soptions) override {
|
|
InstrumentedMutexLock l(&env_mutex_);
|
|
if (f == skipped_wal) {
|
|
deleted_wal_reopened = true;
|
|
if (IsWAL(f) && largest_deleted_wal.size() != 0 &&
|
|
f.compare(largest_deleted_wal) <= 0) {
|
|
gap_in_wals = true;
|
|
}
|
|
}
|
|
return SpecialEnv::NewSequentialFile(f, r, soptions);
|
|
}
|
|
Status DeleteFile(const std::string& fname) override {
|
|
if (IsWAL(fname)) {
|
|
deleted_wal_cnt++;
|
|
InstrumentedMutexLock l(&env_mutex_);
|
|
// If this is the first WAL, remember its name and skip deleting it. We
|
|
// remember its name partly because the application might attempt to
|
|
// delete the file again.
|
|
if (skipped_wal.size() != 0 && skipped_wal != fname) {
|
|
if (largest_deleted_wal.size() == 0 ||
|
|
largest_deleted_wal.compare(fname) < 0) {
|
|
largest_deleted_wal = fname;
|
|
}
|
|
} else {
|
|
skipped_wal = fname;
|
|
return Status::OK();
|
|
}
|
|
}
|
|
return SpecialEnv::DeleteFile(fname);
|
|
}
|
|
bool IsWAL(const std::string& fname) {
|
|
// printf("iswal %s\n", fname.c_str());
|
|
return fname.compare(fname.size() - 3, 3, "log") == 0;
|
|
}
|
|
|
|
InstrumentedMutex env_mutex_;
|
|
// the wal whose actual delete was skipped by the env
|
|
std::string skipped_wal = "";
|
|
// the largest WAL that was requested to be deleted
|
|
std::string largest_deleted_wal = "";
|
|
// number of WALs that were successfully deleted
|
|
std::atomic<size_t> deleted_wal_cnt = {0};
|
|
// the WAL whose delete from fs was skipped is reopened during recovery
|
|
std::atomic<bool> deleted_wal_reopened = {false};
|
|
// whether a gap in the WALs was detected during recovery
|
|
std::atomic<bool> gap_in_wals = {false};
|
|
};
|
|
|
|
class DBWALTestWithEnrichedEnv : public DBTestBase {
|
|
public:
|
|
DBWALTestWithEnrichedEnv()
|
|
: DBTestBase("db_wal_test", /*env_do_fsync=*/true) {
|
|
enriched_env_ = new EnrichedSpecialEnv(env_->target());
|
|
auto options = CurrentOptions();
|
|
options.env = enriched_env_;
|
|
options.allow_2pc = true;
|
|
Reopen(options);
|
|
delete env_;
|
|
// to be deleted by the parent class
|
|
env_ = enriched_env_;
|
|
}
|
|
|
|
protected:
|
|
EnrichedSpecialEnv* enriched_env_;
|
|
};
|
|
|
|
// Test that the recovery would successfully avoid the gaps between the logs.
|
|
// One known scenario that could cause this is that the application issue the
|
|
// WAL deletion out of order. For the sake of simplicity in the test, here we
|
|
// create the gap by manipulating the env to skip deletion of the first WAL but
|
|
// not the ones after it.
|
|
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
|
|
auto options = last_options_;
|
|
// To cause frequent WAL deletion
|
|
options.write_buffer_size = 128;
|
|
Reopen(options);
|
|
|
|
SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBImpl::PurgeObsoleteFiles:End",
|
|
"DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"}});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
WriteOptions writeOpt = WriteOptions();
|
|
for (int i = 0; i < 128 * 5; i++) {
|
|
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
|
|
}
|
|
FlushOptions fo;
|
|
fo.wait = true;
|
|
ASSERT_OK(db_->Flush(fo));
|
|
|
|
TEST_SYNC_POINT("DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush");
|
|
|
|
// some wals are deleted
|
|
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
|
|
// but not the first one
|
|
ASSERT_NE(0, enriched_env_->skipped_wal.size());
|
|
|
|
// Test that the WAL that was not deleted will be skipped during recovery
|
|
options = last_options_;
|
|
Reopen(options);
|
|
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
|
|
ASSERT_FALSE(enriched_env_->gap_in_wals);
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBWALTest, WAL) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
WriteOptions writeOpt = WriteOptions();
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "bar"));
|
|
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
// Both value's should be present.
|
|
ASSERT_EQ("v2", Get(1, "bar"));
|
|
ASSERT_EQ("v2", Get(1, "foo"));
|
|
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
// again both values should be present.
|
|
ASSERT_EQ("v3", Get(1, "foo"));
|
|
ASSERT_EQ("v3", Get(1, "bar"));
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, RollLog) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Put(1, "baz", "v5"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
for (int i = 0; i < 10; i++) {
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
}
|
|
ASSERT_OK(Put(1, "foo", "v4"));
|
|
for (int i = 0; i < 10; i++) {
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
}
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, SyncWALNotBlockWrite) {
|
|
Options options = CurrentOptions();
|
|
options.max_write_buffer_number = 4;
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Put("foo1", "bar1"));
|
|
ASSERT_OK(Put("foo5", "bar5"));
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
|
|
{"WritableFileWriter::SyncWithoutFlush:1",
|
|
"DBWALTest::SyncWALNotBlockWrite:1"},
|
|
{"DBWALTest::SyncWALNotBlockWrite:2",
|
|
"WritableFileWriter::SyncWithoutFlush:2"},
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
|
|
|
|
TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
|
|
ASSERT_OK(Put("foo2", "bar2"));
|
|
ASSERT_OK(Put("foo3", "bar3"));
|
|
FlushOptions fo;
|
|
fo.wait = false;
|
|
ASSERT_OK(db_->Flush(fo));
|
|
ASSERT_OK(Put("foo4", "bar4"));
|
|
|
|
TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
|
|
|
|
thread.join();
|
|
|
|
ASSERT_EQ(Get("foo1"), "bar1");
|
|
ASSERT_EQ(Get("foo2"), "bar2");
|
|
ASSERT_EQ(Get("foo3"), "bar3");
|
|
ASSERT_EQ(Get("foo4"), "bar4");
|
|
ASSERT_EQ(Get("foo5"), "bar5");
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBWALTest, SyncWALNotWaitWrite) {
|
|
ASSERT_OK(Put("foo1", "bar1"));
|
|
ASSERT_OK(Put("foo3", "bar3"));
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
|
|
{"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
|
|
{"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ROCKSDB_NAMESPACE::port::Thread thread(
|
|
[&]() { ASSERT_OK(Put("foo2", "bar2")); });
|
|
// Moving this to SyncWAL before the actual fsync
|
|
// TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
|
|
ASSERT_OK(db_->SyncWAL());
|
|
// Moving this to SyncWAL after actual fsync
|
|
// TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
|
|
|
|
thread.join();
|
|
|
|
ASSERT_EQ(Get("foo1"), "bar1");
|
|
ASSERT_EQ(Get("foo2"), "bar2");
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBWALTest, Recover) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Put(1, "baz", "v5"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v5", Get(1, "baz"));
|
|
ASSERT_OK(Put(1, "bar", "v2"));
|
|
ASSERT_OK(Put(1, "foo", "v3"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v3", Get(1, "foo"));
|
|
ASSERT_OK(Put(1, "foo", "v4"));
|
|
ASSERT_EQ("v4", Get(1, "foo"));
|
|
ASSERT_EQ("v2", Get(1, "bar"));
|
|
ASSERT_EQ("v5", Get(1, "baz"));
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
class DBWALTestWithTimestamp
|
|
: public DBBasicTestWithTimestampBase,
|
|
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
|
|
public:
|
|
DBWALTestWithTimestamp()
|
|
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
|
|
|
|
Status CreateAndReopenWithTs(const std::vector<std::string>& cfs,
|
|
const Options& ts_options, bool persist_udt,
|
|
bool avoid_flush_during_recovery = false) {
|
|
Options default_options = CurrentOptions();
|
|
default_options.allow_concurrent_memtable_write =
|
|
persist_udt ? true : false;
|
|
DestroyAndReopen(default_options);
|
|
CreateColumnFamilies(cfs, ts_options);
|
|
return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt,
|
|
avoid_flush_during_recovery);
|
|
}
|
|
|
|
Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
|
|
Options ts_options, bool persist_udt,
|
|
bool avoid_flush_during_recovery = false) {
|
|
Options default_options = CurrentOptions();
|
|
default_options.create_if_missing = false;
|
|
default_options.allow_concurrent_memtable_write =
|
|
persist_udt ? true : false;
|
|
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
|
|
ts_options.create_if_missing = false;
|
|
|
|
std::vector<Options> cf_options(cfs.size(), ts_options);
|
|
std::vector<std::string> cfs_plus_default = cfs;
|
|
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
|
|
cf_options.insert(cf_options.begin(), default_options);
|
|
Close();
|
|
return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
|
|
}
|
|
|
|
Status Put(uint32_t cf, const Slice& key, const Slice& ts,
|
|
const Slice& value) {
|
|
WriteOptions write_opts;
|
|
return db_->Put(write_opts, handles_[cf], key, ts, value);
|
|
}
|
|
|
|
void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
|
|
const std::string& expected_value,
|
|
const std::string& expected_ts) {
|
|
std::string actual_value;
|
|
std::string actual_ts;
|
|
ASSERT_OK(
|
|
db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
|
|
ASSERT_EQ(expected_value, actual_value);
|
|
ASSERT_EQ(expected_ts, actual_ts);
|
|
}
|
|
};
|
|
|
|
TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
|
|
// Set up the option that enables user defined timestmp size.
|
|
std::string ts1;
|
|
PutFixed64(&ts1, 1);
|
|
Options ts_options;
|
|
ts_options.create_if_missing = true;
|
|
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
// Test that user-defined timestamps are recovered from WAL regardless of
|
|
// the value of this flag because UDTs are saved in WAL nonetheless.
|
|
// We however need to explicitly disable flush during recovery by setting
|
|
// `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
|
|
// stripped when the `persist_user_defined_timestamps` flag is false, so that
|
|
// all written timestamps are available for testing user-defined time travel
|
|
// read.
|
|
bool persist_udt = test::ShouldPersistUDT(GetParam());
|
|
ts_options.persist_user_defined_timestamps = persist_udt;
|
|
bool avoid_flush_during_recovery = true;
|
|
|
|
ReadOptions read_opts;
|
|
do {
|
|
Slice ts_slice = ts1;
|
|
read_opts.timestamp = &ts_slice;
|
|
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt,
|
|
avoid_flush_during_recovery));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
|
|
ASSERT_OK(Put(1, "foo", ts1, "v1"));
|
|
ASSERT_OK(Put(1, "baz", ts1, "v5"));
|
|
|
|
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
|
|
avoid_flush_during_recovery));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
|
|
// Do a timestamped read with ts1 after second reopen.
|
|
CheckGet(read_opts, 1, "foo", "v1", ts1);
|
|
CheckGet(read_opts, 1, "baz", "v5", ts1);
|
|
|
|
// Write more value versions for key "foo" and "bar" before and after second
|
|
// reopen.
|
|
std::string ts2;
|
|
PutFixed64(&ts2, 2);
|
|
ASSERT_OK(Put(1, "bar", ts2, "v2"));
|
|
ASSERT_OK(Put(1, "foo", ts2, "v3"));
|
|
|
|
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt,
|
|
avoid_flush_during_recovery));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
|
|
std::string ts3;
|
|
PutFixed64(&ts3, 3);
|
|
ASSERT_OK(Put(1, "foo", ts3, "v4"));
|
|
|
|
// All the key value pairs available for read:
|
|
// "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")]
|
|
// "bar" -> [(ts2, "v2")]
|
|
// "baz" -> [(ts1, "v5")]
|
|
// Do a timestamped read with ts1 after third reopen.
|
|
// read_opts.timestamp is set to ts1 for below reads
|
|
CheckGet(read_opts, 1, "foo", "v1", ts1);
|
|
std::string value;
|
|
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
|
|
CheckGet(read_opts, 1, "baz", "v5", ts1);
|
|
|
|
// Do a timestamped read with ts2 after third reopen.
|
|
ts_slice = ts2;
|
|
// read_opts.timestamp is set to ts2 for below reads.
|
|
CheckGet(read_opts, 1, "foo", "v3", ts2);
|
|
CheckGet(read_opts, 1, "bar", "v2", ts2);
|
|
CheckGet(read_opts, 1, "baz", "v5", ts1);
|
|
|
|
// Do a timestamped read with ts3 after third reopen.
|
|
ts_slice = ts3;
|
|
// read_opts.timestamp is set to ts3 for below reads.
|
|
CheckGet(read_opts, 1, "foo", "v4", ts3);
|
|
CheckGet(read_opts, 1, "bar", "v2", ts2);
|
|
CheckGet(read_opts, 1, "baz", "v5", ts1);
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
|
|
// Set up the option that enables user defined timestamp size.
|
|
std::string min_ts;
|
|
std::string write_ts;
|
|
PutFixed64(&min_ts, 0);
|
|
PutFixed64(&write_ts, 1);
|
|
Options ts_options;
|
|
ts_options.create_if_missing = true;
|
|
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
bool persist_udt = test::ShouldPersistUDT(GetParam());
|
|
ts_options.persist_user_defined_timestamps = persist_udt;
|
|
|
|
std::string smallest_ukey_without_ts = "baz";
|
|
std::string largest_ukey_without_ts = "foo";
|
|
|
|
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt));
|
|
// No flush, no sst files, because of no data.
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
|
|
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
|
|
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));
|
|
|
|
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt));
|
|
// Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
|
|
// defaults to false, created one L0 file.
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);
|
|
|
|
std::vector<std::vector<FileMetaData>> level_to_files;
|
|
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
|
|
ASSERT_GT(level_to_files.size(), 1);
|
|
// L0 only has one SST file.
|
|
ASSERT_EQ(level_to_files[0].size(), 1);
|
|
auto meta = level_to_files[0][0];
|
|
if (persist_udt) {
|
|
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
|
|
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
|
|
} else {
|
|
ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
|
|
ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
|
|
}
|
|
}
|
|
|
|
// Param 0: test mode for the user-defined timestamp feature
|
|
INSTANTIATE_TEST_CASE_P(
|
|
P, DBWALTestWithTimestamp,
|
|
::testing::Values(
|
|
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
|
|
test::UserDefinedTimestampTestMode::kNormal));
|
|
|
|
TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) {
|
|
Options options;
|
|
options.create_if_missing = true;
|
|
options.comparator = BytewiseComparator();
|
|
bool avoid_flush_during_recovery = true;
|
|
ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */,
|
|
avoid_flush_during_recovery));
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5"));
|
|
|
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
|
options.persist_user_defined_timestamps = false;
|
|
// Test handle timestamp size inconsistency in WAL when enabling user-defined
|
|
// timestamps.
|
|
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
|
|
false /* persist_udt */,
|
|
avoid_flush_during_recovery));
|
|
|
|
std::string ts;
|
|
PutFixed64(&ts, 0);
|
|
Slice ts_slice = ts;
|
|
ReadOptions read_opts;
|
|
read_opts.timestamp = &ts_slice;
|
|
// Pre-existing entries are treated as if they have the min timestamp.
|
|
CheckGet(read_opts, 1, "foo", "v1", ts);
|
|
CheckGet(read_opts, 1, "baz", "v5", ts);
|
|
ts.clear();
|
|
PutFixed64(&ts, 1);
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6"));
|
|
CheckGet(read_opts, 1, "foo", "v2", ts);
|
|
CheckGet(read_opts, 1, "baz", "v6", ts);
|
|
|
|
options.comparator = BytewiseComparator();
|
|
// Open the column family again with the UDT feature disabled. Test handle
|
|
// timestamp size inconsistency in WAL when disabling user-defined timestamps
|
|
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options,
|
|
true /* persist_udt */,
|
|
avoid_flush_during_recovery));
|
|
ASSERT_EQ("v2", Get(1, "foo"));
|
|
ASSERT_EQ("v6", Get(1, "baz"));
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithTableHandle) {
|
|
do {
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.disable_auto_compactions = true;
|
|
options.avoid_flush_during_recovery = false;
|
|
DestroyAndReopen(options);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Put(1, "bar", "v2"));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_OK(Put(1, "foo", "v3"));
|
|
ASSERT_OK(Put(1, "bar", "v4"));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_OK(Put(1, "big", std::string(100, 'a')));
|
|
|
|
options = CurrentOptions();
|
|
const int kSmallMaxOpenFiles = 13;
|
|
if (option_config_ == kDBLogDir) {
|
|
// Use this option to check not preloading files
|
|
// Set the max open files to be small enough so no preload will
|
|
// happen.
|
|
options.max_open_files = kSmallMaxOpenFiles;
|
|
// RocksDB sanitize max open files to at least 20. Modify it back.
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
|
|
int* max_open_files = static_cast<int*>(arg);
|
|
*max_open_files = kSmallMaxOpenFiles;
|
|
});
|
|
|
|
} else if (option_config_ == kWalDirAndMmapReads) {
|
|
// Use this option to check always loading all files.
|
|
options.max_open_files = 100;
|
|
} else {
|
|
options.max_open_files = -1;
|
|
}
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
std::vector<std::vector<FileMetaData>> files;
|
|
dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
|
|
size_t total_files = 0;
|
|
for (const auto& level : files) {
|
|
total_files += level.size();
|
|
}
|
|
ASSERT_EQ(total_files, 3);
|
|
for (const auto& level : files) {
|
|
for (const auto& file : level) {
|
|
if (options.max_open_files == kSmallMaxOpenFiles) {
|
|
ASSERT_TRUE(file.table_reader_handle == nullptr);
|
|
} else {
|
|
ASSERT_TRUE(file.table_reader_handle != nullptr);
|
|
}
|
|
}
|
|
}
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithBlob) {
|
|
// Write a value that's below the prospective size limit for blobs and another
|
|
// one that's above. Note that blob files are not actually enabled at this
|
|
// point.
|
|
constexpr uint64_t min_blob_size = 10;
|
|
|
|
constexpr char short_value[] = "short";
|
|
static_assert(sizeof(short_value) - 1 < min_blob_size,
|
|
"short_value too long");
|
|
|
|
constexpr char long_value[] = "long_value";
|
|
static_assert(sizeof(long_value) - 1 >= min_blob_size,
|
|
"long_value too short");
|
|
|
|
ASSERT_OK(Put("key1", short_value));
|
|
ASSERT_OK(Put("key2", long_value));
|
|
|
|
// There should be no files just yet since we haven't flushed.
|
|
{
|
|
VersionSet* const versions = dbfull()->GetVersionSet();
|
|
ASSERT_NE(versions, nullptr);
|
|
|
|
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
|
|
ASSERT_NE(cfd, nullptr);
|
|
|
|
Version* const current = cfd->current();
|
|
ASSERT_NE(current, nullptr);
|
|
|
|
const VersionStorageInfo* const storage_info = current->storage_info();
|
|
ASSERT_NE(storage_info, nullptr);
|
|
|
|
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
|
|
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
|
|
}
|
|
|
|
// Reopen the database with blob files enabled. A new table file/blob file
|
|
// pair should be written during recovery.
|
|
Options options;
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = min_blob_size;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.disable_auto_compactions = true;
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_EQ(Get("key1"), short_value);
|
|
ASSERT_EQ(Get("key2"), long_value);
|
|
|
|
VersionSet* const versions = dbfull()->GetVersionSet();
|
|
ASSERT_NE(versions, nullptr);
|
|
|
|
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
|
|
ASSERT_NE(cfd, nullptr);
|
|
|
|
Version* const current = cfd->current();
|
|
ASSERT_NE(current, nullptr);
|
|
|
|
const VersionStorageInfo* const storage_info = current->storage_info();
|
|
ASSERT_NE(storage_info, nullptr);
|
|
|
|
const auto& l0_files = storage_info->LevelFiles(0);
|
|
ASSERT_EQ(l0_files.size(), 1);
|
|
|
|
const FileMetaData* const table_file = l0_files[0];
|
|
ASSERT_NE(table_file, nullptr);
|
|
|
|
const auto& blob_files = storage_info->GetBlobFiles();
|
|
ASSERT_EQ(blob_files.size(), 1);
|
|
|
|
const auto& blob_file = blob_files.front();
|
|
ASSERT_NE(blob_file, nullptr);
|
|
|
|
ASSERT_EQ(table_file->smallest.user_key(), "key1");
|
|
ASSERT_EQ(table_file->largest.user_key(), "key2");
|
|
ASSERT_EQ(table_file->fd.smallest_seqno, 1);
|
|
ASSERT_EQ(table_file->fd.largest_seqno, 2);
|
|
ASSERT_EQ(table_file->oldest_blob_file_number,
|
|
blob_file->GetBlobFileNumber());
|
|
|
|
ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
|
|
|
|
const InternalStats* const internal_stats = cfd->internal_stats();
|
|
ASSERT_NE(internal_stats, nullptr);
|
|
|
|
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
|
|
ASSERT_FALSE(compaction_stats.empty());
|
|
ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
|
|
ASSERT_EQ(compaction_stats[0].bytes_written_blob,
|
|
blob_file->GetTotalBlobBytes());
|
|
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
|
|
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
|
|
|
|
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
|
|
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
|
|
compaction_stats[0].bytes_written +
|
|
compaction_stats[0].bytes_written_blob);
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
|
|
// Write several large (4 KB) values without flushing. Note that blob files
|
|
// are not actually enabled at this point.
|
|
std::string large_value(1 << 12, 'a');
|
|
|
|
constexpr int num_keys = 64;
|
|
|
|
for (int i = 0; i < num_keys; ++i) {
|
|
ASSERT_OK(Put(Key(i), large_value));
|
|
}
|
|
|
|
// There should be no files just yet since we haven't flushed.
|
|
{
|
|
VersionSet* const versions = dbfull()->GetVersionSet();
|
|
ASSERT_NE(versions, nullptr);
|
|
|
|
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
|
|
ASSERT_NE(cfd, nullptr);
|
|
|
|
Version* const current = cfd->current();
|
|
ASSERT_NE(current, nullptr);
|
|
|
|
const VersionStorageInfo* const storage_info = current->storage_info();
|
|
ASSERT_NE(storage_info, nullptr);
|
|
|
|
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
|
|
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
|
|
}
|
|
|
|
// Reopen the database with blob files enabled and write buffer size set to a
|
|
// smaller value. Multiple table files+blob files should be written and added
|
|
// to the Version during recovery.
|
|
Options options;
|
|
options.write_buffer_size = 1 << 16; // 64 KB
|
|
options.enable_blob_files = true;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.disable_auto_compactions = true;
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
for (int i = 0; i < num_keys; ++i) {
|
|
ASSERT_EQ(Get(Key(i)), large_value);
|
|
}
|
|
|
|
VersionSet* const versions = dbfull()->GetVersionSet();
|
|
ASSERT_NE(versions, nullptr);
|
|
|
|
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
|
|
ASSERT_NE(cfd, nullptr);
|
|
|
|
Version* const current = cfd->current();
|
|
ASSERT_NE(current, nullptr);
|
|
|
|
const VersionStorageInfo* const storage_info = current->storage_info();
|
|
ASSERT_NE(storage_info, nullptr);
|
|
|
|
const auto& l0_files = storage_info->LevelFiles(0);
|
|
ASSERT_GT(l0_files.size(), 1);
|
|
|
|
const auto& blob_files = storage_info->GetBlobFiles();
|
|
ASSERT_GT(blob_files.size(), 1);
|
|
|
|
ASSERT_EQ(l0_files.size(), blob_files.size());
|
|
}
|
|
|
|
TEST_F(DBWALTest, WALWithChecksumHandoff) {
|
|
#ifndef ROCKSDB_ASSERT_STATUS_CHECKED
|
|
if (mem_env_ || encrypted_env_) {
|
|
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
|
|
return;
|
|
}
|
|
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
|
new FaultInjectionTestFS(FileSystem::Default()));
|
|
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
|
do {
|
|
Options options = CurrentOptions();
|
|
|
|
options.checksum_handoff_file_types.Add(FileType::kWalFile);
|
|
options.env = fault_fs_env.get();
|
|
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
WriteOptions writeOpt = WriteOptions();
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "bar"));
|
|
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
// Both value's should be present.
|
|
ASSERT_EQ("v2", Get(1, "bar"));
|
|
ASSERT_EQ("v2", Get(1, "foo"));
|
|
|
|
writeOpt.disableWAL = true;
|
|
// This put, data is persisted by Flush
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
writeOpt.disableWAL = false;
|
|
// Data is persisted in the WAL
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3"));
|
|
// The hash does not match, write fails
|
|
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
// Due to the write failure, Get should not find
|
|
ASSERT_NE("v3", Get(1, "foo"));
|
|
ASSERT_EQ("v3", Get(1, "zoo"));
|
|
ASSERT_EQ("v3", Get(1, "bar"));
|
|
|
|
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
|
|
// Each write will be similated as corrupted.
|
|
fault_fs->IngestDataCorruptionBeforeWrite();
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4"));
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4"));
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_NE("v4", Get(1, "foo"));
|
|
ASSERT_NE("v4", Get(1, "bar"));
|
|
fault_fs->NoDataCorruptionBeforeWrite();
|
|
|
|
fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
|
|
// The file system does not provide checksum method and verification.
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5"));
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5"));
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_EQ("v5", Get(1, "foo"));
|
|
ASSERT_EQ("v5", Get(1, "bar"));
|
|
|
|
Destroy(options);
|
|
} while (ChangeWalOptions());
|
|
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
|
}
|
|
|
|
TEST_F(DBWALTest, LockWal) {
|
|
do {
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Put("foo", "v"));
|
|
ASSERT_OK(Put("bar", "v"));
|
|
|
|
ASSERT_OK(db_->LockWAL());
|
|
// Verify writes are stopped
|
|
WriteOptions wopts;
|
|
wopts.no_slowdown = true;
|
|
Status s = db_->Put(wopts, "foo", "dontcare");
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
{
|
|
VectorLogPtr wals;
|
|
ASSERT_OK(db_->GetSortedWalFiles(wals));
|
|
ASSERT_FALSE(wals.empty());
|
|
}
|
|
port::Thread worker([&]() {
|
|
Status tmp_s = db_->Flush(FlushOptions());
|
|
ASSERT_OK(tmp_s);
|
|
});
|
|
FlushOptions flush_opts;
|
|
flush_opts.wait = false;
|
|
s = db_->Flush(flush_opts);
|
|
ASSERT_TRUE(s.IsTryAgain());
|
|
ASSERT_OK(db_->UnlockWAL());
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
|
|
|
|
worker.join();
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
class DBRecoveryTestBlobError
|
|
: public DBWALTest,
|
|
public testing::WithParamInterface<std::string> {
|
|
public:
|
|
DBRecoveryTestBlobError() : sync_point_(GetParam()) {}
|
|
|
|
std::string sync_point_;
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
|
|
::testing::ValuesIn(std::vector<std::string>{
|
|
"BlobFileBuilder::WriteBlobToFile:AddRecord",
|
|
"BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
|
|
|
|
TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
|
|
// Write a value. Note that blob files are not actually enabled at this point.
|
|
ASSERT_OK(Put("key", "blob"));
|
|
|
|
// Reopen with blob files enabled but make blob file writing fail during
|
|
// recovery.
|
|
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
|
|
Status* const s = static_cast<Status*>(arg);
|
|
assert(s);
|
|
|
|
(*s) = Status::IOError(sync_point_);
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Options options;
|
|
options.enable_blob_files = true;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.disable_auto_compactions = true;
|
|
options.env = env_;
|
|
|
|
ASSERT_NOK(TryReopen(options));
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
// Make sure the files generated by the failed recovery have been deleted.
|
|
std::vector<std::string> files;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &files));
|
|
for (const auto& file : files) {
|
|
uint64_t number = 0;
|
|
FileType type = kTableFile;
|
|
|
|
if (!ParseFileName(file, &number, &type)) {
|
|
continue;
|
|
}
|
|
|
|
ASSERT_NE(type, kTableFile);
|
|
ASSERT_NE(type, kBlobFile);
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, IgnoreRecoveredLog) {
|
|
std::string backup_logs = dbname_ + "/backup_logs";
|
|
|
|
do {
|
|
// delete old files in backup_logs directory
|
|
ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
|
|
std::vector<std::string> old_files;
|
|
ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
|
|
for (auto& file : old_files) {
|
|
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
|
|
}
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
|
|
options.wal_dir = dbname_ + "/logs";
|
|
DestroyAndReopen(options);
|
|
|
|
// fill up the DB
|
|
std::string one, two;
|
|
PutFixed64(&one, 1);
|
|
PutFixed64(&two, 2);
|
|
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
|
|
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
|
|
ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
|
|
|
|
// copy the logs to backup
|
|
std::vector<std::string> logs;
|
|
ASSERT_OK(env_->GetChildren(options.wal_dir, &logs));
|
|
for (auto& log : logs) {
|
|
CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
|
|
}
|
|
|
|
// recover the DB
|
|
Reopen(options);
|
|
ASSERT_EQ(two, Get("foo"));
|
|
ASSERT_EQ(one, Get("bar"));
|
|
Close();
|
|
|
|
// copy the logs from backup back to wal dir
|
|
for (auto& log : logs) {
|
|
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
|
|
}
|
|
// this should ignore the log files, recovery should not happen again
|
|
// if the recovery happens, the same merge operator would be called twice,
|
|
// leading to incorrect results
|
|
Reopen(options);
|
|
ASSERT_EQ(two, Get("foo"));
|
|
ASSERT_EQ(one, Get("bar"));
|
|
Close();
|
|
Destroy(options);
|
|
Reopen(options);
|
|
Close();
|
|
|
|
// copy the logs from backup back to wal dir
|
|
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
|
|
for (auto& log : logs) {
|
|
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
|
|
}
|
|
// assert that we successfully recovered only from logs, even though we
|
|
// destroyed the DB
|
|
Reopen(options);
|
|
ASSERT_EQ(two, Get("foo"));
|
|
ASSERT_EQ(one, Get("bar"));
|
|
|
|
// Recovery will fail if DB directory doesn't exist.
|
|
Destroy(options);
|
|
// copy the logs from backup back to wal dir
|
|
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
|
|
for (auto& log : logs) {
|
|
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
|
|
// we won't be needing this file no more
|
|
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log));
|
|
}
|
|
Status s = TryReopen(options);
|
|
ASSERT_NOK(s);
|
|
Destroy(options);
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoveryWithEmptyLog) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Put(1, "foo", "v2"));
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v3"));
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v3", Get(1, "foo"));
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
#if !(defined NDEBUG) || !defined(OS_WIN)
|
|
TEST_F(DBWALTest, PreallocateBlock) {
|
|
Options options = CurrentOptions();
|
|
options.write_buffer_size = 10 * 1000 * 1000;
|
|
options.max_total_wal_size = 0;
|
|
|
|
size_t expected_preallocation_size = static_cast<size_t>(
|
|
options.write_buffer_size + options.write_buffer_size / 10);
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
std::atomic<int> called(0);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
|
ASSERT_TRUE(arg != nullptr);
|
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
|
called.fetch_add(1);
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_OK(Put("", ""));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("", ""));
|
|
Close();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ASSERT_EQ(2, called.load());
|
|
|
|
options.max_total_wal_size = 1000 * 1000;
|
|
expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
|
|
Reopen(options);
|
|
called.store(0);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
|
ASSERT_TRUE(arg != nullptr);
|
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
|
called.fetch_add(1);
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_OK(Put("", ""));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("", ""));
|
|
Close();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ASSERT_EQ(2, called.load());
|
|
|
|
options.db_write_buffer_size = 800 * 1000;
|
|
expected_preallocation_size =
|
|
static_cast<size_t>(options.db_write_buffer_size);
|
|
Reopen(options);
|
|
called.store(0);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
|
ASSERT_TRUE(arg != nullptr);
|
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
|
called.fetch_add(1);
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_OK(Put("", ""));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("", ""));
|
|
Close();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ASSERT_EQ(2, called.load());
|
|
|
|
expected_preallocation_size = 700 * 1000;
|
|
std::shared_ptr<WriteBufferManager> write_buffer_manager =
|
|
std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
|
|
options.write_buffer_manager = write_buffer_manager;
|
|
Reopen(options);
|
|
called.store(0);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
|
|
ASSERT_TRUE(arg != nullptr);
|
|
size_t preallocation_size = *(static_cast<size_t*>(arg));
|
|
ASSERT_EQ(expected_preallocation_size, preallocation_size);
|
|
called.fetch_add(1);
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_OK(Put("", ""));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("", ""));
|
|
Close();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ASSERT_EQ(2, called.load());
|
|
}
|
|
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
|
|
|
TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
|
|
// TODO(ajkr): Disabled until WAL recycling is fixed for
|
|
// `kPointInTimeRecovery`.
|
|
|
|
// For github issue #1303
|
|
for (int i = 0; i < 2; ++i) {
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.recycle_log_file_num = 2;
|
|
if (i != 0) {
|
|
options.wal_dir = alternative_wal_dir_;
|
|
}
|
|
|
|
DestroyAndReopen(options);
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
VectorLogPtr log_files;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
|
ASSERT_GT(log_files.size(), 0);
|
|
ASSERT_OK(Flush());
|
|
|
|
// Now the original WAL is in log_files[0] and should be marked for
|
|
// recycling.
|
|
// Verify full purge cannot remove this file.
|
|
JobContext job_context(0);
|
|
dbfull()->TEST_LockMutex();
|
|
dbfull()->FindObsoleteFiles(&job_context, true /* force */);
|
|
dbfull()->TEST_UnlockMutex();
|
|
dbfull()->PurgeObsoleteFiles(job_context);
|
|
|
|
if (i == 0) {
|
|
ASSERT_OK(
|
|
env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
|
|
} else {
|
|
ASSERT_OK(env_->FileExists(
|
|
LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) {
|
|
// TODO(ajkr): Disabled until WAL recycling is fixed for
|
|
// `kPointInTimeRecovery`.
|
|
|
|
// Ensures full purge cannot delete a WAL while it's in the process of being
|
|
// recycled. In particular, we force the full purge after a file has been
|
|
// chosen for reuse, but before it has been renamed.
|
|
for (int i = 0; i < 2; ++i) {
|
|
Options options = CurrentOptions();
|
|
options.recycle_log_file_num = 1;
|
|
if (i != 0) {
|
|
options.wal_dir = alternative_wal_dir_;
|
|
}
|
|
DestroyAndReopen(options);
|
|
|
|
// The first flush creates a second log so writes can continue before the
|
|
// flush finishes.
|
|
ASSERT_OK(Put("foo", "bar"));
|
|
ASSERT_OK(Flush());
|
|
|
|
// The second flush can recycle the first log. Sync points enforce the
|
|
// full purge happens after choosing the log to recycle and before it is
|
|
// renamed.
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImpl::CreateWAL:BeforeReuseWritableFile1",
|
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
|
|
{"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
|
|
"DBImpl::CreateWAL:BeforeReuseWritableFile2"},
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ROCKSDB_NAMESPACE::port::Thread thread([&]() {
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
|
|
ASSERT_OK(db_->EnableFileDeletions(true));
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
|
|
});
|
|
ASSERT_OK(Put("foo", "bar"));
|
|
ASSERT_OK(Flush());
|
|
thread.join();
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, GetSortedWalFiles) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
VectorLogPtr log_files;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
|
ASSERT_EQ(0, log_files.size());
|
|
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
|
ASSERT_EQ(1, log_files.size());
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, GetCurrentWalFile) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
|
|
std::unique_ptr<LogFile>* bad_log_file = nullptr;
|
|
ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
|
|
|
|
std::unique_ptr<LogFile> log_file;
|
|
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
|
|
|
|
// nothing has been written to the log yet
|
|
ASSERT_EQ(log_file->StartSequence(), 0);
|
|
ASSERT_EQ(log_file->SizeFileBytes(), 0);
|
|
ASSERT_EQ(log_file->Type(), kAliveLogFile);
|
|
ASSERT_GT(log_file->LogNumber(), 0);
|
|
|
|
// add some data and verify that the file size actually moves foward
|
|
ASSERT_OK(Put(0, "foo", "v1"));
|
|
ASSERT_OK(Put(0, "foo2", "v2"));
|
|
ASSERT_OK(Put(0, "foo3", "v3"));
|
|
|
|
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
|
|
|
|
ASSERT_EQ(log_file->StartSequence(), 0);
|
|
ASSERT_GT(log_file->SizeFileBytes(), 0);
|
|
ASSERT_EQ(log_file->Type(), kAliveLogFile);
|
|
ASSERT_GT(log_file->LogNumber(), 0);
|
|
|
|
// force log files to cycle and add some more data, then check if
|
|
// log number moves forward
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
for (int i = 0; i < 10; i++) {
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
}
|
|
|
|
ASSERT_OK(Put(0, "foo4", "v4"));
|
|
ASSERT_OK(Put(0, "foo5", "v5"));
|
|
ASSERT_OK(Put(0, "foo6", "v6"));
|
|
|
|
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
|
|
|
|
ASSERT_EQ(log_file->StartSequence(), 0);
|
|
ASSERT_GT(log_file->SizeFileBytes(), 0);
|
|
ASSERT_EQ(log_file->Type(), kAliveLogFile);
|
|
ASSERT_GT(log_file->LogNumber(), 0);
|
|
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
|
|
// Test for regression of WAL cleanup missing files that don't contain data
|
|
// for every column family.
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Put(1, "foo", "v2"));
|
|
uint64_t earliest_log_nums[2];
|
|
for (int i = 0; i < 2; ++i) {
|
|
if (i > 0) {
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
}
|
|
VectorLogPtr log_files;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
|
if (log_files.size() > 0) {
|
|
earliest_log_nums[i] = log_files[0]->LogNumber();
|
|
} else {
|
|
earliest_log_nums[i] = std::numeric_limits<uint64_t>::max();
|
|
}
|
|
}
|
|
// Check at least the first WAL was cleaned up during the recovery.
|
|
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithLargeLog) {
|
|
do {
|
|
{
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
|
|
ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
|
|
ASSERT_OK(Put(1, "small3", std::string(10, '3')));
|
|
ASSERT_OK(Put(1, "small4", std::string(10, '4')));
|
|
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
|
|
}
|
|
|
|
// Make sure that if we re-open with a small write buffer size that
|
|
// we flush table files in the middle of a large log file.
|
|
Options options;
|
|
options.write_buffer_size = 100000;
|
|
options = CurrentOptions(options);
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
|
|
ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
|
|
ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
|
|
ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
|
|
ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
|
|
ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
|
|
} while (ChangeWalOptions());
|
|
}
|
|
|
|
// In https://reviews.facebook.net/D20661 we change
|
|
// recovery behavior: previously for each log file each column family
|
|
// memtable was flushed, even it was empty. Now it's changed:
|
|
// we try to create the smallest number of table files by merging
|
|
// updates from multiple logs
|
|
TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
|
|
Options options = CurrentOptions();
|
|
options.write_buffer_size = 5000000;
|
|
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
|
|
|
|
// Since we will reopen DB with smaller write_buffer_size,
|
|
// each key will go to new SST file
|
|
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
|
|
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
|
|
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
|
|
ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
|
|
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1)));
|
|
// Make 'dobrynia' to be flushed and new WAL file to be created
|
|
ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
|
|
{
|
|
auto tables = ListTableFiles(env_, dbname_);
|
|
ASSERT_EQ(tables.size(), static_cast<size_t>(1));
|
|
// Make sure 'dobrynia' was flushed: check sst files amount
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
static_cast<uint64_t>(1));
|
|
}
|
|
// New WAL file
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1)));
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1)));
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1)));
|
|
|
|
options.write_buffer_size = 4096;
|
|
options.arena_block_size = 4096;
|
|
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
|
|
options);
|
|
{
|
|
// No inserts => default is empty
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
static_cast<uint64_t>(0));
|
|
// First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
static_cast<uint64_t>(5));
|
|
// 1 SST for big key + 1 SST for small one
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
static_cast<uint64_t>(2));
|
|
// 1 SST for all keys
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
static_cast<uint64_t>(1));
|
|
}
|
|
}
|
|
|
|
// In https://reviews.facebook.net/D20661 we change
|
|
// recovery behavior: previously for each log file each column family
|
|
// memtable was flushed, even it wasn't empty. Now it's changed:
|
|
// we try to create the smallest number of table files by merging
|
|
// updates from multiple logs
|
|
TEST_F(DBWALTest, RecoverCheckFileAmount) {
|
|
Options options = CurrentOptions();
|
|
options.write_buffer_size = 100000;
|
|
options.arena_block_size = 4 * 1024;
|
|
options.avoid_flush_during_recovery = false;
|
|
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
|
|
// Make 'nikitich' memtable to be flushed
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1)));
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
|
|
// 4 memtable are not flushed, 1 sst file
|
|
{
|
|
auto tables = ListTableFiles(env_, dbname_);
|
|
ASSERT_EQ(tables.size(), static_cast<size_t>(1));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
static_cast<uint64_t>(1));
|
|
}
|
|
// Memtable for 'nikitich' has flushed, new WAL file has opened
|
|
// 4 memtable still not flushed
|
|
|
|
// Write to new WAL file
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
|
|
// Fill up 'nikitich' one more time
|
|
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
|
|
// make it flush
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1)));
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
|
|
// There are still 4 memtable not flushed, and 2 sst tables
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
|
|
{
|
|
auto tables = ListTableFiles(env_, dbname_);
|
|
ASSERT_EQ(tables.size(), static_cast<size_t>(2));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
static_cast<uint64_t>(2));
|
|
}
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
|
|
options);
|
|
{
|
|
std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
|
|
// Check, that records for 'default', 'dobrynia' and 'pikachu' from
|
|
// first, second and third WALs went to the same SST.
|
|
// So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
|
|
// 'dobrynia', one for 'pikachu'
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
static_cast<uint64_t>(1));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
static_cast<uint64_t>(3));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
static_cast<uint64_t>(1));
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
static_cast<uint64_t>(1));
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, SyncMultipleLogs) {
|
|
const uint64_t kNumBatches = 2;
|
|
const int kBatchSize = 1000;
|
|
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.write_buffer_size = 4096;
|
|
Reopen(options);
|
|
|
|
WriteBatch batch;
|
|
WriteOptions wo;
|
|
wo.sync = true;
|
|
|
|
for (uint64_t b = 0; b < kNumBatches; b++) {
|
|
batch.Clear();
|
|
for (int i = 0; i < kBatchSize; i++) {
|
|
ASSERT_OK(batch.Put(Key(i), DummyString(128)));
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->Write(wo, &batch));
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->SyncWAL());
|
|
}
|
|
|
|
// Github issue 1339. Prior the fix we read sequence id from the first log to
|
|
// a local variable, then keep increase the variable as we replay logs,
|
|
// ignoring actual sequence id of the records. This is incorrect if some writes
|
|
// come with WAL disabled.
|
|
TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
|
|
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
|
new FaultInjectionTestEnv(env_));
|
|
Options options = CurrentOptions();
|
|
options.env = fault_env.get();
|
|
options.disable_auto_compactions = true;
|
|
WriteOptions wal_on, wal_off;
|
|
wal_on.sync = true;
|
|
wal_on.disableWAL = false;
|
|
wal_off.disableWAL = true;
|
|
CreateAndReopenWithCF({"dummy"}, options);
|
|
ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
|
|
ASSERT_OK(Put(1, "dummy", "d2", wal_off));
|
|
ASSERT_OK(Put(1, "dummy", "d3", wal_off));
|
|
ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
|
|
ASSERT_OK(Flush(0));
|
|
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
|
|
ASSERT_EQ("v5", Get(0, "key"));
|
|
ASSERT_OK(dbfull()->FlushWAL(false));
|
|
// Simulate a crash.
|
|
fault_env->SetFilesystemActive(false);
|
|
Close();
|
|
fault_env->ResetState();
|
|
ReopenWithColumnFamilies({"default", "dummy"}, options);
|
|
// Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
|
|
ASSERT_EQ("v5", Get(0, "key"));
|
|
// Destroy DB before destruct fault_env.
|
|
Destroy(options);
|
|
}
|
|
|
|
//
|
|
// Test WAL recovery for the various modes available
|
|
//
|
|
class RecoveryTestHelper {
|
|
public:
|
|
// Number of WAL files to generate
|
|
static constexpr int kWALFilesCount = 10;
|
|
// Starting number for the WAL file name like 00010.log
|
|
static constexpr int kWALFileOffset = 10;
|
|
// Keys to be written per WAL file
|
|
static constexpr int kKeysPerWALFile = 133;
|
|
// Size of the value
|
|
static constexpr int kValueSize = 96;
|
|
|
|
// Create WAL files with values filled in
|
|
static void FillData(DBWALTestBase* test, const Options& options,
|
|
const size_t wal_count, size_t* count) {
|
|
// Calling internal functions requires sanitized options.
|
|
Options sanitized_options = SanitizeOptions(test->dbname_, options);
|
|
const ImmutableDBOptions db_options(sanitized_options);
|
|
|
|
*count = 0;
|
|
|
|
std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
|
|
FileOptions file_options;
|
|
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
|
|
|
|
std::unique_ptr<VersionSet> versions;
|
|
std::unique_ptr<WalManager> wal_manager;
|
|
WriteController write_controller;
|
|
|
|
versions.reset(new VersionSet(
|
|
test->dbname_, &db_options, file_options, table_cache.get(),
|
|
&write_buffer_manager, &write_controller,
|
|
/*block_cache_tracer=*/nullptr,
|
|
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ ""));
|
|
|
|
wal_manager.reset(
|
|
new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
|
|
|
|
std::unique_ptr<log::Writer> current_log_writer;
|
|
|
|
for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
|
|
uint64_t current_log_number = j;
|
|
std::string fname = LogFileName(test->dbname_, current_log_number);
|
|
std::unique_ptr<WritableFileWriter> file_writer;
|
|
ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
|
|
fname, file_options, &file_writer,
|
|
nullptr));
|
|
log::Writer* log_writer =
|
|
new log::Writer(std::move(file_writer), current_log_number,
|
|
db_options.recycle_log_file_num > 0, false,
|
|
db_options.wal_compression);
|
|
ASSERT_OK(log_writer->AddCompressionTypeRecord());
|
|
current_log_writer.reset(log_writer);
|
|
|
|
WriteBatch batch;
|
|
for (int i = 0; i < kKeysPerWALFile; i++) {
|
|
std::string key = "key" + std::to_string((*count)++);
|
|
std::string value = test->DummyString(kValueSize);
|
|
ASSERT_NE(current_log_writer.get(), nullptr);
|
|
uint64_t seq = versions->LastSequence() + 1;
|
|
batch.Clear();
|
|
ASSERT_OK(batch.Put(key, value));
|
|
WriteBatchInternal::SetSequence(&batch, seq);
|
|
ASSERT_OK(current_log_writer->AddRecord(
|
|
WriteBatchInternal::Contents(&batch)));
|
|
versions->SetLastAllocatedSequence(seq);
|
|
versions->SetLastPublishedSequence(seq);
|
|
versions->SetLastSequence(seq);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Recreate and fill the store with some data
|
|
static size_t FillData(DBWALTestBase* test, Options* options) {
|
|
options->create_if_missing = true;
|
|
test->DestroyAndReopen(*options);
|
|
test->Close();
|
|
|
|
size_t count = 0;
|
|
FillData(test, *options, kWALFilesCount, &count);
|
|
return count;
|
|
}
|
|
|
|
// Read back all the keys we wrote and return the number of keys found
|
|
static size_t GetData(DBWALTestBase* test) {
|
|
size_t count = 0;
|
|
for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
|
|
if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") {
|
|
++count;
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
// Manuall corrupt the specified WAL
|
|
static void CorruptWAL(DBWALTestBase* test, const Options& options,
|
|
const double off, const double len,
|
|
const int wal_file_id, const bool trunc = false) {
|
|
Env* env = options.env;
|
|
std::string fname = LogFileName(test->dbname_, wal_file_id);
|
|
uint64_t size;
|
|
ASSERT_OK(env->GetFileSize(fname, &size));
|
|
ASSERT_GT(size, 0);
|
|
#ifdef OS_WIN
|
|
// Windows disk cache behaves differently. When we truncate
|
|
// the original content is still in the cache due to the original
|
|
// handle is still open. Generally, in Windows, one prohibits
|
|
// shared access to files and it is not needed for WAL but we allow
|
|
// it to induce corruption at various tests.
|
|
test->Close();
|
|
#endif
|
|
if (trunc) {
|
|
ASSERT_OK(
|
|
test::TruncateFile(env, fname, static_cast<uint64_t>(size * off)));
|
|
} else {
|
|
ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8),
|
|
static_cast<int>(size * len), false));
|
|
}
|
|
}
|
|
};
|
|
|
|
class DBWALTestWithParams : public DBWALTestBase,
|
|
public ::testing::WithParamInterface<
|
|
std::tuple<bool, int, int, CompressionType>> {
|
|
public:
|
|
DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
Wal, DBWALTestWithParams,
|
|
::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1),
|
|
::testing::Range(RecoveryTestHelper::kWALFileOffset,
|
|
RecoveryTestHelper::kWALFileOffset +
|
|
RecoveryTestHelper::kWALFilesCount,
|
|
1),
|
|
::testing::Values(CompressionType::kNoCompression,
|
|
CompressionType::kZSTD)));
|
|
|
|
class DBWALTestWithParamsVaryingRecoveryMode
|
|
: public DBWALTestBase,
|
|
public ::testing::WithParamInterface<
|
|
std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
|
|
public:
|
|
DBWALTestWithParamsVaryingRecoveryMode()
|
|
: DBWALTestBase("/db_wal_test_with_params_mode") {}
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
Wal, DBWALTestWithParamsVaryingRecoveryMode,
|
|
::testing::Combine(
|
|
::testing::Bool(), ::testing::Range(0, 4, 1),
|
|
::testing::Range(RecoveryTestHelper::kWALFileOffset,
|
|
RecoveryTestHelper::kWALFileOffset +
|
|
RecoveryTestHelper::kWALFilesCount,
|
|
1),
|
|
::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
|
|
WALRecoveryMode::kAbsoluteConsistency,
|
|
WALRecoveryMode::kPointInTimeRecovery,
|
|
WALRecoveryMode::kSkipAnyCorruptedRecords),
|
|
::testing::Values(CompressionType::kNoCompression,
|
|
CompressionType::kZSTD)));
|
|
|
|
// Test scope:
|
|
// - We expect to open the data store when there is incomplete trailing writes
|
|
// at the end of any of the logs
|
|
// - We do not expect to open the data store for corruption
|
|
TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) {
|
|
bool trunc = std::get<0>(GetParam()); // Corruption style
|
|
// Corruption offset position
|
|
int corrupt_offset = std::get<1>(GetParam());
|
|
int wal_file_id = std::get<2>(GetParam()); // WAL file
|
|
|
|
// Fill data for testing
|
|
Options options = CurrentOptions();
|
|
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
|
|
// test checksum failure or parsing
|
|
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
|
|
/*len%=*/.1, wal_file_id, trunc);
|
|
|
|
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
|
|
if (trunc) {
|
|
options.create_if_missing = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
|
|
ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0);
|
|
ASSERT_LT(recovered_row_count, row_count);
|
|
} else {
|
|
ASSERT_NOK(TryReopen(options));
|
|
}
|
|
}
|
|
|
|
// Test scope:
|
|
// We don't expect the data store to be opened if there is any corruption
|
|
// (leading, middle or trailing -- incomplete writes or corruption)
|
|
TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
|
|
// Verify clean slate behavior
|
|
Options options = CurrentOptions();
|
|
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
|
|
options.create_if_missing = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
|
|
|
|
bool trunc = std::get<0>(GetParam()); // Corruption style
|
|
// Corruption offset position
|
|
int corrupt_offset = std::get<1>(GetParam());
|
|
int wal_file_id = std::get<2>(GetParam()); // WAL file
|
|
// WAL compression type
|
|
CompressionType compression_type = std::get<3>(GetParam());
|
|
options.wal_compression = compression_type;
|
|
|
|
if (trunc && corrupt_offset == 0) {
|
|
return;
|
|
}
|
|
|
|
// fill with new date
|
|
RecoveryTestHelper::FillData(this, &options);
|
|
// corrupt the wal
|
|
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
|
|
/*len%=*/.1, wal_file_id, trunc);
|
|
// verify
|
|
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
|
|
options.create_if_missing = false;
|
|
ASSERT_NOK(TryReopen(options));
|
|
}
|
|
|
|
// Test scope:
|
|
// We don't expect the data store to be opened if there is any inconsistency
|
|
// between WAL and SST files
|
|
TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
|
|
Options options = CurrentOptions();
|
|
options.avoid_flush_during_recovery = true;
|
|
|
|
// Create DB with multiple column families.
|
|
CreateAndReopenWithCF({"one", "two"}, options);
|
|
ASSERT_OK(Put(1, "key1", "val1"));
|
|
ASSERT_OK(Put(2, "key2", "val2"));
|
|
|
|
// Record the offset at this point
|
|
Env* env = options.env;
|
|
uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
|
|
std::string fname = LogFileName(dbname_, wal_file_id);
|
|
uint64_t offset_to_corrupt;
|
|
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
|
|
ASSERT_GT(offset_to_corrupt, 0);
|
|
|
|
ASSERT_OK(Put(1, "key3", "val3"));
|
|
// Corrupt WAL at location of key3
|
|
ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt),
|
|
4, false));
|
|
ASSERT_OK(Put(2, "key4", "val4"));
|
|
ASSERT_OK(Put(1, "key5", "val5"));
|
|
ASSERT_OK(Flush(2));
|
|
|
|
// PIT recovery & verify
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
|
|
}
|
|
|
|
TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.track_and_verify_wals_in_manifest = true;
|
|
// The following make sure there are two bg flush threads.
|
|
options.max_background_jobs = 8;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
const std::string cf1_name("cf1");
|
|
CreateAndReopenWithCF({cf1_name}, options);
|
|
assert(handles_.size() == 2);
|
|
|
|
{
|
|
dbfull()->TEST_LockMutex();
|
|
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
|
|
dbfull()->TEST_UnlockMutex();
|
|
}
|
|
|
|
ASSERT_OK(dbfull()->PauseBackgroundWork());
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
|
|
|
|
ASSERT_OK(dbfull()->TEST_FlushMemTable(
|
|
/*wait=*/false, /*allow_write_stall=*/true, handles_[1]));
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
|
|
|
|
ASSERT_OK(dbfull()->TEST_FlushMemTable(
|
|
/*wait=*/false, /*allow_write_stall=*/true, handles_[0]));
|
|
|
|
bool called = false;
|
|
std::atomic<int> bg_flush_threads{0};
|
|
std::atomic<bool> wal_synced{false};
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
|
|
int cur = bg_flush_threads.load();
|
|
int desired = cur + 1;
|
|
if (cur > 0 ||
|
|
!bg_flush_threads.compare_exchange_strong(cur, desired)) {
|
|
while (!wal_synced.load()) {
|
|
// Wait until the other bg flush thread finishes committing WAL sync
|
|
// operation to the MANIFEST.
|
|
}
|
|
}
|
|
});
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::FlushMemTableToOutputFile:CommitWal:1",
|
|
[&](void* /*arg*/) { wal_synced.store(true); });
|
|
// This callback will be called when the first bg flush thread reaches the
|
|
// point before entering the MANIFEST write queue after flushing the SST
|
|
// file.
|
|
// The purpose of the sync points here is to ensure both bg flush threads
|
|
// finish computing `min_wal_number_to_keep` before any of them updates the
|
|
// `log_number` for the column family that's being flushed.
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
|
|
[&](void* /*arg*/) {
|
|
dbfull()->mutex()->AssertHeld();
|
|
if (!called) {
|
|
// We are the first bg flush thread in the MANIFEST write queue.
|
|
// We set up the dependency between sync points for two threads that
|
|
// will be executing the same code.
|
|
// For the interleaving of events, see
|
|
// https://github.com/facebook/rocksdb/pull/9715.
|
|
// bg flush thread1 will release the db mutex while in the MANIFEST
|
|
// write queue. In the meantime, bg flush thread2 locks db mutex and
|
|
// computes the min_wal_number_to_keep (before thread1 writes to
|
|
// MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
|
|
// the MANIFEST write queue afterwards and bg flush thread1 proceeds
|
|
// with writing to MANIFEST.
|
|
called = true;
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"VersionSet::LogAndApply:WriteManifestStart",
|
|
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
|
|
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
|
|
"VersionSet::LogAndApply:WriteManifest"},
|
|
});
|
|
} else {
|
|
// The other bg flush thread has already been in the MANIFEST write
|
|
// queue, and we are after.
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
|
|
}
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ASSERT_OK(dbfull()->ContinueBackgroundWork());
|
|
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
|
|
|
|
ASSERT_TRUE(called);
|
|
|
|
Close();
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
DB* db1 = nullptr;
|
|
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
|
|
ASSERT_OK(s);
|
|
assert(db1);
|
|
delete db1;
|
|
}
|
|
|
|
TEST_F(DBWALTest, FixSyncWalOnObseletedWalWithNewManifestCausingMissingWAL) {
|
|
Options options = CurrentOptions();
|
|
// Small size to force manifest creation
|
|
options.max_manifest_file_size = 1;
|
|
options.track_and_verify_wals_in_manifest = true;
|
|
DestroyAndReopen(options);
|
|
|
|
// Accumulate memtable m1 and create the 1st wal (i.e, 4.log)
|
|
ASSERT_OK(Put(Key(1), ""));
|
|
ASSERT_OK(Put(Key(2), ""));
|
|
ASSERT_OK(Put(Key(3), ""));
|
|
|
|
const std::string wal_file_path = db_->GetName() + "/000004.log";
|
|
|
|
// Coerce the following sequence of events:
|
|
// (1) Flush() marks 4.log to be obsoleted, 8.log to be the latest (i.e,
|
|
// active) log and release the lock
|
|
// (2) SyncWAL() proceeds with the lock. It
|
|
// creates a new manifest and syncs all the inactive wals before the latest
|
|
// (i.e, active log), which is 4.log. Note that SyncWAL() is not aware of the
|
|
// fact that 4.log has marked as to be obseleted. Such wal
|
|
// sync will then add a WAL addition record of 4.log to the new manifest
|
|
// without any special treatment. Prior to the fix, there is no WAL deletion
|
|
// record to offset it. (3) BackgroundFlush() will eventually purge 4.log.
|
|
|
|
bool wal_synced = false;
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"FindObsoleteFiles::PostMutexUnlock", [&](void*) {
|
|
ASSERT_OK(env_->FileExists(wal_file_path));
|
|
uint64_t pre_sync_wal_manifest_no =
|
|
dbfull()->TEST_Current_Manifest_FileNo();
|
|
ASSERT_OK(db_->SyncWAL());
|
|
uint64_t post_sync_wal_manifest_no =
|
|
dbfull()->TEST_Current_Manifest_FileNo();
|
|
bool new_manifest_created =
|
|
post_sync_wal_manifest_no == pre_sync_wal_manifest_no + 1;
|
|
ASSERT_TRUE(new_manifest_created);
|
|
wal_synced = true;
|
|
});
|
|
|
|
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
|
|
|
|
ASSERT_TRUE(wal_synced);
|
|
// BackgroundFlush() purged 4.log
|
|
// because the memtable associated with the WAL was flushed and new WAL was
|
|
// created (i.e, 8.log)
|
|
ASSERT_TRUE(env_->FileExists(wal_file_path).IsNotFound());
|
|
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
// To verify the corruption of "Missing WAL with log number: 4" under
|
|
// `options.track_and_verify_wals_in_manifest = true` is fixed.
|
|
//
|
|
// Before the fix, `db_->SyncWAL()` will sync and record WAL addtion of the
|
|
// obseleted WAL 4.log in a new manifest without any special treament.
|
|
// This will result in missing-wal corruption in DB::Reopen().
|
|
Status s = TryReopen(options);
|
|
EXPECT_OK(s);
|
|
}
|
|
|
|
// Test scope:
|
|
// - We expect to open data store under all circumstances
|
|
// - We expect only data upto the point where the first error was encountered
|
|
TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
|
|
const int maxkeys =
|
|
RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
|
|
|
|
bool trunc = std::get<0>(GetParam()); // Corruption style
|
|
// Corruption offset position
|
|
int corrupt_offset = std::get<1>(GetParam());
|
|
int wal_file_id = std::get<2>(GetParam()); // WAL file
|
|
// WAL compression type
|
|
CompressionType compression_type = std::get<3>(GetParam());
|
|
|
|
// Fill data for testing
|
|
Options options = CurrentOptions();
|
|
options.wal_compression = compression_type;
|
|
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
|
|
|
|
// Corrupt the wal
|
|
// The offset here was 0.3 which cuts off right at the end of a
|
|
// valid fragment after wal zstd compression checksum is enabled,
|
|
// so changed the value to 0.33.
|
|
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33,
|
|
/*len%=*/.1, wal_file_id, trunc);
|
|
|
|
// Verify
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
options.create_if_missing = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
// Probe data for invariants
|
|
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
|
|
ASSERT_LT(recovered_row_count, row_count);
|
|
|
|
// Verify a prefix of keys were recovered. But not in the case of full WAL
|
|
// truncation, because we have no way to know there was a corruption when
|
|
// truncation happened on record boundaries (preventing recovery holes in
|
|
// that case requires using `track_and_verify_wals_in_manifest`).
|
|
if (!trunc || corrupt_offset != 0) {
|
|
bool expect_data = true;
|
|
for (size_t k = 0; k < maxkeys; ++k) {
|
|
bool found = Get("key" + std::to_string(k)) != "NOT_FOUND";
|
|
if (expect_data && !found) {
|
|
expect_data = false;
|
|
}
|
|
ASSERT_EQ(found, expect_data);
|
|
}
|
|
}
|
|
|
|
const size_t min = RecoveryTestHelper::kKeysPerWALFile *
|
|
(wal_file_id - RecoveryTestHelper::kWALFileOffset);
|
|
ASSERT_GE(recovered_row_count, min);
|
|
if (!trunc && corrupt_offset != 0) {
|
|
const size_t max = RecoveryTestHelper::kKeysPerWALFile *
|
|
(wal_file_id - RecoveryTestHelper::kWALFileOffset + 1);
|
|
ASSERT_LE(recovered_row_count, max);
|
|
}
|
|
}
|
|
|
|
// Test scope:
|
|
// - We expect to open the data store under all scenarios
|
|
// - We expect to have recovered records past the corruption zone
|
|
TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
|
|
bool trunc = std::get<0>(GetParam()); // Corruption style
|
|
// Corruption offset position
|
|
int corrupt_offset = std::get<1>(GetParam());
|
|
int wal_file_id = std::get<2>(GetParam()); // WAL file
|
|
// WAL compression type
|
|
CompressionType compression_type = std::get<3>(GetParam());
|
|
|
|
// Fill data for testing
|
|
Options options = CurrentOptions();
|
|
options.wal_compression = compression_type;
|
|
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
|
|
|
|
// Corrupt the WAL
|
|
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
|
|
/*len%=*/.1, wal_file_id, trunc);
|
|
|
|
// Verify behavior
|
|
options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
|
|
options.create_if_missing = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
// Probe data for invariants
|
|
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
|
|
ASSERT_LT(recovered_row_count, row_count);
|
|
|
|
if (!trunc) {
|
|
ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0);
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.avoid_flush_during_recovery = false;
|
|
|
|
// Test with flush after recovery.
|
|
Reopen(options);
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
ASSERT_OK(Put("bar", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("foo", "v3"));
|
|
ASSERT_OK(Put("bar", "v4"));
|
|
ASSERT_EQ(1, TotalTableFiles());
|
|
// Reopen DB. Check if WAL logs flushed.
|
|
Reopen(options);
|
|
ASSERT_EQ("v3", Get("foo"));
|
|
ASSERT_EQ("v4", Get("bar"));
|
|
ASSERT_EQ(2, TotalTableFiles());
|
|
|
|
// Test without flush after recovery.
|
|
options.avoid_flush_during_recovery = true;
|
|
DestroyAndReopen(options);
|
|
ASSERT_OK(Put("foo", "v5"));
|
|
ASSERT_OK(Put("bar", "v6"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("foo", "v7"));
|
|
ASSERT_OK(Put("bar", "v8"));
|
|
ASSERT_EQ(1, TotalTableFiles());
|
|
// Reopen DB. WAL logs should not be flushed this time.
|
|
Reopen(options);
|
|
ASSERT_EQ("v7", Get("foo"));
|
|
ASSERT_EQ("v8", Get("bar"));
|
|
ASSERT_EQ(1, TotalTableFiles());
|
|
|
|
// Force flush with allow_2pc.
|
|
options.avoid_flush_during_recovery = true;
|
|
options.allow_2pc = true;
|
|
ASSERT_OK(Put("foo", "v9"));
|
|
ASSERT_OK(Put("bar", "v10"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("foo", "v11"));
|
|
ASSERT_OK(Put("bar", "v12"));
|
|
Reopen(options);
|
|
ASSERT_EQ("v11", Get("foo"));
|
|
ASSERT_EQ("v12", Get("bar"));
|
|
ASSERT_EQ(3, TotalTableFiles());
|
|
}
|
|
|
|
TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
|
|
// Verifies WAL files that were present during recovery, but not flushed due
|
|
// to avoid_flush_during_recovery, will be considered for deletion at a later
|
|
// stage. We check at least one such file is deleted during Flush().
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.avoid_flush_during_recovery = true;
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
Reopen(options);
|
|
for (int i = 0; i < 2; ++i) {
|
|
if (i > 0) {
|
|
// Flush() triggers deletion of obsolete tracked files
|
|
ASSERT_OK(Flush());
|
|
}
|
|
VectorLogPtr log_files;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
|
|
if (i == 0) {
|
|
ASSERT_GT(log_files.size(), 0);
|
|
} else {
|
|
ASSERT_EQ(0, log_files.size());
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithoutFlush) {
|
|
Options options = CurrentOptions();
|
|
options.avoid_flush_during_recovery = true;
|
|
options.create_if_missing = false;
|
|
options.disable_auto_compactions = true;
|
|
options.write_buffer_size = 64 * 1024 * 1024;
|
|
|
|
size_t count = RecoveryTestHelper::FillData(this, &options);
|
|
auto validateData = [this, count]() {
|
|
for (size_t i = 0; i < count; i++) {
|
|
ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND");
|
|
}
|
|
};
|
|
Reopen(options);
|
|
validateData();
|
|
// Insert some data without flush
|
|
ASSERT_OK(Put("foo", "foo_v1"));
|
|
ASSERT_OK(Put("bar", "bar_v1"));
|
|
Reopen(options);
|
|
validateData();
|
|
ASSERT_EQ(Get("foo"), "foo_v1");
|
|
ASSERT_EQ(Get("bar"), "bar_v1");
|
|
// Insert again and reopen
|
|
ASSERT_OK(Put("foo", "foo_v2"));
|
|
ASSERT_OK(Put("bar", "bar_v2"));
|
|
Reopen(options);
|
|
validateData();
|
|
ASSERT_EQ(Get("foo"), "foo_v2");
|
|
ASSERT_EQ(Get("bar"), "bar_v2");
|
|
// manual flush and insert again
|
|
ASSERT_OK(Flush());
|
|
ASSERT_EQ(Get("foo"), "foo_v2");
|
|
ASSERT_EQ(Get("bar"), "bar_v2");
|
|
ASSERT_OK(Put("foo", "foo_v3"));
|
|
ASSERT_OK(Put("bar", "bar_v3"));
|
|
Reopen(options);
|
|
validateData();
|
|
ASSERT_EQ(Get("foo"), "foo_v3");
|
|
ASSERT_EQ(Get("bar"), "bar_v3");
|
|
}
|
|
|
|
TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
|
|
const std::string kSmallValue = "v";
|
|
const std::string kLargeValue = DummyString(1024);
|
|
Options options = CurrentOptions();
|
|
options.avoid_flush_during_recovery = true;
|
|
options.create_if_missing = false;
|
|
options.disable_auto_compactions = true;
|
|
|
|
auto countWalFiles = [this]() {
|
|
VectorLogPtr log_files;
|
|
if (!dbfull()->GetSortedWalFiles(log_files).ok()) {
|
|
return size_t{0};
|
|
}
|
|
return log_files.size();
|
|
};
|
|
|
|
// Create DB with multiple column families and multiple log files.
|
|
CreateAndReopenWithCF({"one", "two"}, options);
|
|
ASSERT_OK(Put(0, "key1", kSmallValue));
|
|
ASSERT_OK(Put(1, "key2", kLargeValue));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(1, countWalFiles());
|
|
ASSERT_OK(Put(0, "key3", kSmallValue));
|
|
ASSERT_OK(Put(2, "key4", kLargeValue));
|
|
ASSERT_OK(Flush(2));
|
|
ASSERT_EQ(2, countWalFiles());
|
|
|
|
// Reopen, insert and flush.
|
|
options.db_write_buffer_size = 64 * 1024 * 1024;
|
|
ReopenWithColumnFamilies({"default", "one", "two"}, options);
|
|
ASSERT_EQ(Get(0, "key1"), kSmallValue);
|
|
ASSERT_EQ(Get(1, "key2"), kLargeValue);
|
|
ASSERT_EQ(Get(0, "key3"), kSmallValue);
|
|
ASSERT_EQ(Get(2, "key4"), kLargeValue);
|
|
// Insert more data.
|
|
ASSERT_OK(Put(0, "key5", kLargeValue));
|
|
ASSERT_OK(Put(1, "key6", kLargeValue));
|
|
ASSERT_EQ(3, countWalFiles());
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_OK(Put(2, "key7", kLargeValue));
|
|
ASSERT_OK(dbfull()->FlushWAL(false));
|
|
ASSERT_EQ(4, countWalFiles());
|
|
|
|
// Reopen twice and validate.
|
|
for (int i = 0; i < 2; i++) {
|
|
ReopenWithColumnFamilies({"default", "one", "two"}, options);
|
|
ASSERT_EQ(Get(0, "key1"), kSmallValue);
|
|
ASSERT_EQ(Get(1, "key2"), kLargeValue);
|
|
ASSERT_EQ(Get(0, "key3"), kSmallValue);
|
|
ASSERT_EQ(Get(2, "key4"), kLargeValue);
|
|
ASSERT_EQ(Get(0, "key5"), kLargeValue);
|
|
ASSERT_EQ(Get(1, "key6"), kLargeValue);
|
|
ASSERT_EQ(Get(2, "key7"), kLargeValue);
|
|
ASSERT_EQ(4, countWalFiles());
|
|
}
|
|
}
|
|
|
|
// In this test we are trying to do the following:
|
|
// 1. Create a DB with corrupted WAL log;
|
|
// 2. Open with avoid_flush_during_recovery = true;
|
|
// 3. Append more data without flushing, which creates new WAL log.
|
|
// 4. Open again. See if it can correctly handle previous corruption.
|
|
TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
|
|
RecoverFromCorruptedWALWithoutFlush) {
|
|
const int kAppendKeys = 100;
|
|
Options options = CurrentOptions();
|
|
options.avoid_flush_during_recovery = true;
|
|
options.create_if_missing = false;
|
|
options.disable_auto_compactions = true;
|
|
options.write_buffer_size = 64 * 1024 * 1024;
|
|
|
|
auto getAll = [this]() {
|
|
std::vector<std::pair<std::string, std::string>> data;
|
|
ReadOptions ropt;
|
|
Iterator* iter = dbfull()->NewIterator(ropt);
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
data.push_back(
|
|
std::make_pair(iter->key().ToString(), iter->value().ToString()));
|
|
}
|
|
delete iter;
|
|
return data;
|
|
};
|
|
|
|
bool trunc = std::get<0>(GetParam()); // Corruption style
|
|
// Corruption offset position
|
|
int corrupt_offset = std::get<1>(GetParam());
|
|
int wal_file_id = std::get<2>(GetParam()); // WAL file
|
|
WALRecoveryMode recovery_mode = std::get<3>(GetParam());
|
|
// WAL compression type
|
|
CompressionType compression_type = std::get<4>(GetParam());
|
|
|
|
options.wal_recovery_mode = recovery_mode;
|
|
options.wal_compression = compression_type;
|
|
// Create corrupted WAL
|
|
RecoveryTestHelper::FillData(this, &options);
|
|
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
|
|
/*len%=*/.1, wal_file_id, trunc);
|
|
// Skip the test if DB won't open.
|
|
if (!TryReopen(options).ok()) {
|
|
ASSERT_TRUE(options.wal_recovery_mode ==
|
|
WALRecoveryMode::kAbsoluteConsistency ||
|
|
(!trunc && options.wal_recovery_mode ==
|
|
WALRecoveryMode::kTolerateCorruptedTailRecords));
|
|
return;
|
|
}
|
|
ASSERT_OK(TryReopen(options));
|
|
// Append some more data.
|
|
for (int k = 0; k < kAppendKeys; k++) {
|
|
std::string key = "extra_key" + std::to_string(k);
|
|
std::string value = DummyString(RecoveryTestHelper::kValueSize);
|
|
ASSERT_OK(Put(key, value));
|
|
}
|
|
// Save data for comparison.
|
|
auto data = getAll();
|
|
// Reopen. Verify data.
|
|
ASSERT_OK(TryReopen(options));
|
|
auto actual_data = getAll();
|
|
ASSERT_EQ(data, actual_data);
|
|
}
|
|
|
|
// Tests that total log size is recovered if we set
|
|
// avoid_flush_during_recovery=true.
|
|
// Flush should trigger if max_total_wal_size is reached.
|
|
TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
|
|
auto test_listener = std::make_shared<FlushCounterListener>();
|
|
test_listener->expected_flush_reason = FlushReason::kWalFull;
|
|
|
|
constexpr size_t kKB = 1024;
|
|
constexpr size_t kMB = 1024 * 1024;
|
|
Options options = CurrentOptions();
|
|
options.avoid_flush_during_recovery = true;
|
|
options.max_total_wal_size = 1 * kMB;
|
|
options.listeners.push_back(test_listener);
|
|
// Have to open DB in multi-CF mode to trigger flush when
|
|
// max_total_wal_size is reached.
|
|
CreateAndReopenWithCF({"one"}, options);
|
|
// Write some keys and we will end up with one log file which is slightly
|
|
// smaller than 1MB.
|
|
std::string value_100k(100 * kKB, 'v');
|
|
std::string value_300k(300 * kKB, 'v');
|
|
ASSERT_OK(Put(0, "foo", "v1"));
|
|
for (int i = 0; i < 9; i++) {
|
|
ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k));
|
|
}
|
|
// Get log files before reopen.
|
|
VectorLogPtr log_files_before;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
|
|
ASSERT_EQ(1, log_files_before.size());
|
|
uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
|
|
ASSERT_GT(log_size_before, 900 * kKB);
|
|
ASSERT_LT(log_size_before, 1 * kMB);
|
|
ReopenWithColumnFamilies({"default", "one"}, options);
|
|
// Write one more value to make log larger than 1MB.
|
|
ASSERT_OK(Put(1, "bar", value_300k));
|
|
// Get log files again. A new log file will be opened.
|
|
VectorLogPtr log_files_after_reopen;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
|
|
ASSERT_EQ(2, log_files_after_reopen.size());
|
|
ASSERT_EQ(log_files_before[0]->LogNumber(),
|
|
log_files_after_reopen[0]->LogNumber());
|
|
ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
|
|
log_files_after_reopen[1]->SizeFileBytes(),
|
|
1 * kMB);
|
|
// Write one more key to trigger flush.
|
|
ASSERT_OK(Put(0, "foo", "v2"));
|
|
for (auto* h : handles_) {
|
|
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h));
|
|
}
|
|
// Flushed two column families.
|
|
ASSERT_EQ(2, test_listener->count.load());
|
|
}
|
|
|
|
#if defined(ROCKSDB_PLATFORM_POSIX)
|
|
#if defined(ROCKSDB_FALLOCATE_PRESENT)
|
|
// Tests that we will truncate the preallocated space of the last log from
|
|
// previous.
|
|
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
|
|
constexpr size_t kKB = 1024;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.avoid_flush_during_recovery = true;
|
|
if (mem_env_) {
|
|
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
|
|
return;
|
|
}
|
|
if (!IsFallocateSupported()) {
|
|
return;
|
|
}
|
|
|
|
DestroyAndReopen(options);
|
|
size_t preallocated_size =
|
|
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
VectorLogPtr log_files_before;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
|
|
ASSERT_EQ(1, log_files_before.size());
|
|
auto& file_before = log_files_before[0];
|
|
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
|
|
// The log file has preallocated space.
|
|
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
|
|
preallocated_size);
|
|
Reopen(options);
|
|
VectorLogPtr log_files_after;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
|
|
ASSERT_EQ(1, log_files_after.size());
|
|
ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
|
|
// The preallocated space should be truncated.
|
|
ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
|
|
preallocated_size);
|
|
}
|
|
// Tests that we will truncate the preallocated space of the last log from
|
|
// previous.
|
|
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) {
|
|
constexpr size_t kKB = 1024;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.avoid_flush_during_shutdown = true;
|
|
if (mem_env_) {
|
|
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
|
|
return;
|
|
}
|
|
if (!IsFallocateSupported()) {
|
|
return;
|
|
}
|
|
|
|
DestroyAndReopen(options);
|
|
size_t preallocated_size =
|
|
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
VectorLogPtr log_files_before;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
|
|
ASSERT_EQ(1, log_files_before.size());
|
|
auto& file_before = log_files_before[0];
|
|
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
|
|
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
|
|
preallocated_size);
|
|
// The log file has preallocated space.
|
|
Close();
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBImpl::PurgeObsoleteFiles:Begin",
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
|
|
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
|
|
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
port::Thread reopen_thread([&]() { Reopen(options); });
|
|
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
|
|
// After the flush during Open, the log file should get deleted. However,
|
|
// if the process is in a crash loop, the log file may not get
|
|
// deleted and thte preallocated space will keep accumulating. So we need
|
|
// to ensure it gets trtuncated.
|
|
EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
|
|
preallocated_size);
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
|
|
reopen_thread.join();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.avoid_flush_during_recovery = false;
|
|
if (mem_env_ || encrypted_env_) {
|
|
ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment");
|
|
return;
|
|
}
|
|
if (!IsFallocateSupported()) {
|
|
return;
|
|
}
|
|
|
|
DestroyAndReopen(options);
|
|
size_t preallocated_size =
|
|
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
|
|
Close();
|
|
std::vector<std::string> filenames;
|
|
std::string last_log;
|
|
uint64_t last_log_num = 0;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
|
|
for (auto fname : filenames) {
|
|
uint64_t number;
|
|
FileType type;
|
|
if (ParseFileName(fname, &number, &type, nullptr)) {
|
|
if (type == kWalFile && number > last_log_num) {
|
|
last_log = fname;
|
|
}
|
|
}
|
|
}
|
|
ASSERT_NE(last_log, "");
|
|
last_log = dbname_ + '/' + last_log;
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBImpl::PurgeObsoleteFiles:Begin",
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
|
|
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
|
|
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"PosixWritableFile::Close",
|
|
[](void* arg) { *(reinterpret_cast<size_t*>(arg)) = 0; });
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
// Preallocate space for the empty log file. This could happen if WAL data
|
|
// was buffered in memory and the process crashed.
|
|
std::unique_ptr<WritableFile> log_file;
|
|
ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions()));
|
|
log_file->SetPreallocationBlockSize(preallocated_size);
|
|
log_file->PrepareWrite(0, 4096);
|
|
log_file.reset();
|
|
|
|
ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size);
|
|
|
|
port::Thread reopen_thread([&]() { Reopen(options); });
|
|
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
|
|
// The preallocated space should be truncated.
|
|
EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size);
|
|
TEST_SYNC_POINT(
|
|
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
|
|
reopen_thread.join();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) {
|
|
constexpr size_t kKB = 1024;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.avoid_flush_during_recovery = true;
|
|
if (mem_env_) {
|
|
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
|
|
return;
|
|
}
|
|
if (!IsFallocateSupported()) {
|
|
return;
|
|
}
|
|
|
|
// create DB and close with file truncate disabled
|
|
std::atomic_bool enable_truncate{false};
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"PosixWritableFile::Close", [&](void* arg) {
|
|
if (!enable_truncate) {
|
|
*(reinterpret_cast<size_t*>(arg)) = 0;
|
|
}
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
DestroyAndReopen(options);
|
|
size_t preallocated_size =
|
|
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
VectorLogPtr log_files_before;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
|
|
ASSERT_EQ(1, log_files_before.size());
|
|
auto& file_before = log_files_before[0];
|
|
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
|
|
// The log file has preallocated space.
|
|
auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName());
|
|
ASSERT_GE(db_size, preallocated_size);
|
|
Close();
|
|
|
|
// enable truncate and open DB as readonly, the file should not be truncated
|
|
// and DB size is not changed.
|
|
enable_truncate = true;
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
VectorLogPtr log_files_after;
|
|
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
|
|
ASSERT_EQ(1, log_files_after.size());
|
|
ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
|
|
ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName());
|
|
// The preallocated space should NOT be truncated.
|
|
// the DB size is almost the same.
|
|
ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size,
|
|
db_size / 100);
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
#endif // ROCKSDB_FALLOCATE_PRESENT
|
|
#endif // ROCKSDB_PLATFORM_POSIX
|
|
|
|
TEST_F(DBWALTest, WalInManifestButNotInSortedWals) {
|
|
Options options = CurrentOptions();
|
|
options.track_and_verify_wals_in_manifest = true;
|
|
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
|
|
|
|
// Build a way to make wal files selectively go missing
|
|
bool wals_go_missing = false;
|
|
struct MissingWalFs : public FileSystemWrapper {
|
|
MissingWalFs(const std::shared_ptr<FileSystem>& t,
|
|
bool* _wals_go_missing_flag)
|
|
: FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {}
|
|
bool* wals_go_missing_flag;
|
|
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
|
|
std::vector<std::string>* r,
|
|
IODebugContext* dbg) override {
|
|
IOStatus s = target_->GetChildren(dir, io_opts, r, dbg);
|
|
if (s.ok() && *wals_go_missing_flag) {
|
|
for (size_t i = 0; i < r->size();) {
|
|
if (EndsWith(r->at(i), ".log")) {
|
|
r->erase(r->begin() + i);
|
|
} else {
|
|
++i;
|
|
}
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
const char* Name() const override { return "MissingWalFs"; }
|
|
};
|
|
auto my_fs =
|
|
std::make_shared<MissingWalFs>(env_->GetFileSystem(), &wals_go_missing);
|
|
std::unique_ptr<Env> my_env(NewCompositeEnv(my_fs));
|
|
options.env = my_env.get();
|
|
|
|
CreateAndReopenWithCF({"blah"}, options);
|
|
|
|
// Currently necessary to get a WAL tracked in manifest; see
|
|
// https://github.com/facebook/rocksdb/issues/10080
|
|
ASSERT_OK(Put(0, "x", "y"));
|
|
ASSERT_OK(db_->SyncWAL());
|
|
ASSERT_OK(Put(1, "x", "y"));
|
|
ASSERT_OK(db_->SyncWAL());
|
|
ASSERT_OK(Flush(1));
|
|
|
|
ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
|
|
std::vector<std::unique_ptr<LogFile>> wals;
|
|
ASSERT_OK(db_->GetSortedWalFiles(wals));
|
|
wals_go_missing = true;
|
|
ASSERT_NOK(db_->GetSortedWalFiles(wals));
|
|
wals_go_missing = false;
|
|
Close();
|
|
}
|
|
|
|
|
|
TEST_F(DBWALTest, WalTermTest) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
ASSERT_OK(Put(1, "foo", "bar"));
|
|
|
|
WriteOptions wo;
|
|
wo.sync = true;
|
|
wo.disableWAL = false;
|
|
|
|
WriteBatch batch;
|
|
ASSERT_OK(batch.Put("foo", "bar"));
|
|
batch.MarkWalTerminationPoint();
|
|
ASSERT_OK(batch.Put("foo2", "bar2"));
|
|
|
|
ASSERT_OK(dbfull()->Write(wo, &batch));
|
|
|
|
// make sure we can re-open it.
|
|
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
|
|
ASSERT_EQ("bar", Get(1, "foo"));
|
|
ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
|
|
}
|
|
|
|
TEST_F(DBWALTest, GetCompressedWalsAfterSync) {
|
|
if (db_->GetOptions().wal_compression == kNoCompression) {
|
|
ROCKSDB_GTEST_BYPASS("stream compression not present");
|
|
return;
|
|
}
|
|
Options options = GetDefaultOptions();
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
options.create_if_missing = true;
|
|
options.env = env_;
|
|
options.avoid_flush_during_recovery = true;
|
|
options.track_and_verify_wals_in_manifest = true;
|
|
// Enable WAL compression so that the newly-created WAL will be non-empty
|
|
// after DB open, even if point-in-time WAL recovery encounters no
|
|
// corruption.
|
|
options.wal_compression = kZSTD;
|
|
DestroyAndReopen(options);
|
|
|
|
// Write something to memtable and WAL so that log_empty_ will be false after
|
|
// next DB::Open().
|
|
ASSERT_OK(Put("a", "v"));
|
|
|
|
Reopen(options);
|
|
|
|
// New WAL is created, thanks to !log_empty_.
|
|
ASSERT_OK(dbfull()->TEST_SwitchWAL());
|
|
|
|
ASSERT_OK(Put("b", "v"));
|
|
|
|
ASSERT_OK(db_->SyncWAL());
|
|
|
|
VectorLogPtr wals;
|
|
Status s = dbfull()->GetSortedWalFiles(wals);
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
TEST_F(DBWALTest, EmptyWalReopenTest) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
// make sure we can re-open it.
|
|
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
|
|
|
|
{
|
|
std::vector<std::string> files;
|
|
int num_wal_files = 0;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &files));
|
|
for (const auto& file : files) {
|
|
uint64_t number = 0;
|
|
FileType type = kWalFile;
|
|
if (ParseFileName(file, &number, &type) && type == kWalFile) {
|
|
num_wal_files++;
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(num_wal_files, 1);
|
|
}
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|