Introduce WAL recovery consistency levels

Summary:
The "one size fits all" approach with WAL recovery will only introduce inconvenience for our varied clients as we go forward. The current recovery is a bit heuristic. We introduce the following levels of consistency while replaying the WAL.

1. RecoverAfterRestart (kTolerateCorruptedTailRecords)

This mocks the current recovery mode.

2. RecoverAfterCleanShutdown (kAbsoluteConsistency)

This is ideal for unit test and cases where the store is shutdown cleanly. We tolerate no corruption or incomplete writes.

3. RecoverPointInTime (kPointInTimeRecovery)

This is ideal when using devices with controller cache or file systems which can loose data on restart. We recover upto the point were is no corruption or incomplete write.

4. RecoverAfterDisaster (kSkipAnyCorruptRecord)

This is ideal mode to recover data. We tolerate corruption and incomplete writes, and we hop over those sections that we cannot make sense of salvaging as many records as possible.

Test Plan:
(1) Run added unit test to cover all levels.
(2) Run make check.

Reviewers: leveldb, sdong, igor

Subscribers: yoshinorim, dhruba

Differential Revision: https://reviews.facebook.net/D38487
main
krad 10 years ago
parent 530534fceb
commit de85e4cadf
  1. 67
      db/db_impl.cc
  2. 183
      db/db_test.cc
  3. 24
      db/log_reader.cc
  4. 6
      db/log_reader.h
  5. 41
      db/log_test.cc
  6. 27
      include/rocksdb/options.h
  7. 6
      util/options.cc

@ -957,7 +957,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
info_log, "%s%s: dropping %d bytes; %s", info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""), (this->status == nullptr ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str()); fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s; if (this->status != nullptr && this->status->ok()) {
*this->status = s;
}
} }
}; };
@ -983,6 +985,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
stream.EndArray(); stream.EndArray();
} }
bool continue_replay_log = true;
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
@ -1008,21 +1011,56 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
reporter.env = env_; reporter.env = env_;
reporter.info_log = db_options_.info_log.get(); reporter.info_log = db_options_.info_log.get();
reporter.fname = fname.c_str(); reporter.fname = fname.c_str();
reporter.status = (db_options_.paranoid_checks) ? &status : nullptr; if (!db_options_.paranoid_checks ||
db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
reporter.status = nullptr;
} else {
reporter.status = &status;
}
// We intentially make log::Reader do checksumming even if // We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits // paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
// large sequence numbers). // large sequence numbers).
log::Reader reader(std::move(file), &reporter, true /*checksum*/, log::Reader reader(std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/); 0 /*initial_offset*/);
Log(InfoLogLevel::INFO_LEVEL, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
db_options_.info_log, "Recovering log #%" PRIu64 "", log_number); "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
db_options_.wal_recovery_mode, !continue_replay_log);
// Determine if we should tolerate incomplete records at the tail end of the
// log
bool report_eof_inconsistency;
if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
report_eof_inconsistency = true;
} else {
// for other modes ignore only incomplete records in the last log file
// which is presumably due to write in progress during restart
report_eof_inconsistency = false;
// TODO krad: Evaluate if we need to move to a more strict mode where we
// restrict the inconsistency to only the last log
}
// Read all the records and add to a memtable // Read all the records and add to a memtable
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (!continue_replay_log) {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
auto info_log = db_options_.info_log.get();
Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes",
fname.c_str(), static_cast<int>(bytes));
}
}
while (continue_replay_log &&
reader.ReadRecord(&record, &scratch, report_eof_inconsistency) &&
status.ok()) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
@ -1075,7 +1113,24 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
if (!status.ok()) { if (!status.ok()) {
return status; // The hook function is designed to ignore all IO errors from reader
// during recovery for kSkipAnyCorruptedRecords. Status variable is
// unmodified by the reader.
assert(db_options_.wal_recovery_mode !=
WALRecoveryMode::kSkipAnyCorruptedRecords);
if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
// We should ignore the error but not continue replaying
status = Status::OK();
continue_replay_log = false;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Point in time recovered to log #%" PRIu64 " seq #%" PRIu64,
log_number, *max_sequence);
} else if (db_options_.wal_recovery_mode !=
WALRecoveryMode::kSkipAnyCorruptedRecords) {
return status;
}
} }
flush_scheduler_.Clear(); flush_scheduler_.Clear();

@ -14,6 +14,7 @@
#include <thread> #include <thread>
#include <unordered_set> #include <unordered_set>
#include <utility> #include <utility>
#include <fcntl.h>
#include "db/filename.h" #include "db/filename.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -8628,6 +8629,188 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
//
// Test WAL recovery for the various modes available
// TODO krad:
// 1. Add tests when there are more than one log file
//
class RecoveryTestHelper {
public:
// Recreate and fill the store with some data
static size_t FillData(DBTest* test, const Options& options) {
size_t count = 0;
test->DestroyAndReopen(options);
for (int i = 0; i < 1024; i++) {
test->Put("key" + ToString(i), test->DummyString(10));
++count;
}
return count;
}
// Read back all the keys we wrote and return the number of keys found
static size_t GetData(DBTest* test) {
size_t count = 0;
for (size_t i = 0; i < 1024; i++) {
if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
++count;
}
}
return count;
}
// Overwrite data with 'a' from offset for length len
static void InduceCorruption(const std::string& filename, uint32_t offset,
uint32_t len) {
ASSERT_GT(len, 0);
int fd = open(filename.c_str(), O_RDWR);
ASSERT_GT(fd, 0);
ASSERT_EQ(offset, lseek(fd, offset, SEEK_SET));
char buf[len];
memset(buf, 'a', len);
ASSERT_EQ(len, write(fd, buf, len));
close(fd);
}
// Corrupt the last WAL file from (filesize * off) for length (filesize * len)
static void CorruptWAL(DBTest* test, const double off, const double len,
const bool trunc = false) {
rocksdb::VectorLogPtr wal_files;
ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1);
const auto logfile_path =
test->dbname_ + "/" + wal_files.front()->PathName();
auto size = wal_files.front()->SizeFileBytes();
if (trunc) {
ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off));
} else {
InduceCorruption(logfile_path, size * off, size * len);
}
}
};
// Test scope:
// - We expect to open the data store when there is incomplete trailing writes
// at the end of any of the logs
// - We do not expect to open the data store for corruption
TEST_F(DBTest, kTolerateCorruptedTailRecords) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
if (trunc) {
options.wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords;
ASSERT_OK(TryReopen(options));
const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_TRUE(i == 0 || recovered_row_count > 0);
ASSERT_LT(recovered_row_count, row_count);
} else {
options.wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords;
ASSERT_NOK(TryReopen(options));
}
}
}
}
// Test scope:
// We don't expect the data store to be opened if there is any corruption
// (leading, middle or trailing -- incomplete writes or corruption)
TEST_F(DBTest, kAbsoluteConsistency) {
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
ASSERT_OK(TryReopen(options));
ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
if (trunc && i == 0) {
continue;
}
options = CurrentOptions();
RecoveryTestHelper::FillData(this, options);
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
ASSERT_NOK(TryReopen(options));
}
}
}
// Test scope:
// - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered
TEST_F(DBTest, kPointInTimeRecovery) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// test checksum failure or parsing
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ASSERT_OK(TryReopen(options));
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
// verify that the keys are sequential and there is no break
bool expect_data = true;
for (size_t j = 0; j < 1024; ++j) {
bool found = Get("key" + ToString(i)) != "NOT_FOUND";
if (expect_data && !found) {
expect_data = false;
}
ASSERT_EQ(found, expect_data);
}
ASSERT_TRUE(i != 0 || recovered_row_count == 0);
ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2);
}
}
}
// Test scope:
// - We expect to open the data store under all scenarios
// - We expect to have recovered records past the corruption zone
TEST_F(DBTest, kSkipAnyCorruptedRecords) {
for (auto trunc : {true, false}) {
for (int i = 0; i < 4; i++) {
// Fill data for testing
Options options = CurrentOptions();
const size_t row_count = RecoveryTestHelper::FillData(this, options);
// induce leading corruption
RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc);
options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
ASSERT_OK(TryReopen(options));
size_t recovered_row_count = RecoveryTestHelper::GetData(this);
ASSERT_LT(recovered_row_count, row_count);
if (!trunc) {
ASSERT_TRUE(i != 0 || recovered_row_count > 0);
}
}
}
}
TEST_F(DBTest, TransactionLogIteratorBatchOperations) { TEST_F(DBTest, TransactionLogIteratorBatchOperations) {
do { do {
Options options = OptionsForLogIterTest(); Options options = OptionsForLogIterTest();

@ -61,7 +61,8 @@ bool Reader::SkipToInitialBlock() {
return true; return true;
} }
bool Reader::ReadRecord(Slice* record, std::string* scratch) { bool Reader::ReadRecord(Slice* record, std::string* scratch,
const bool report_eof_inconsistency) {
if (last_record_offset_ < initial_offset_) { if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) { if (!SkipToInitialBlock()) {
return false; return false;
@ -78,7 +79,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
Slice fragment; Slice fragment;
while (true) { while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment); const unsigned int record_type =
ReadPhysicalRecord(&fragment, report_eof_inconsistency);
switch (record_type) { switch (record_type) {
case kFullType: case kFullType:
if (in_fragmented_record && !scratch->empty()) { if (in_fragmented_record && !scratch->empty()) {
@ -130,6 +132,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
case kEof: case kEof:
if (in_fragmented_record) { if (in_fragmented_record) {
if (report_eof_inconsistency) {
ReportCorruption(scratch->size(), "error reading trailing data");
}
// This can be caused by the writer dying immediately after // This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't // writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record. // treat it as a corruption, just ignore the entire logical record.
@ -238,7 +243,8 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
} }
} }
unsigned int Reader::ReadPhysicalRecord(Slice* result) { unsigned int Reader::ReadPhysicalRecord(Slice* result,
const bool report_eof_inconsistency) {
while (true) { while (true) {
if (buffer_.size() < (size_t)kHeaderSize) { if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_ && !read_error_) { if (!eof_ && !read_error_) {
@ -259,8 +265,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
} else { } else {
// Note that if buffer_ is non-empty, we have a truncated header at the // Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the // end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error, // middle of writing the header. Unless explicitly requested we don't
// just report EOF. // considering this an error, just report EOF.
if (buffer_.size() && report_eof_inconsistency) {
ReportCorruption(buffer_.size(), "truncated header");
}
buffer_.clear(); buffer_.clear();
return kEof; return kEof;
} }
@ -281,7 +290,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
} }
// If the end of the file has been reached without reading |length| bytes // If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record. // of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption. // Don't report a corruption unless requested.
if (drop_size && report_eof_inconsistency) {
ReportCorruption(drop_size, "truncated header");
}
return kEof; return kEof;
} }

@ -61,7 +61,8 @@ class Reader {
// "*scratch" as temporary storage. The contents filled in *record // "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this // will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch. // reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch); bool ReadRecord(Slice* record, std::string* scratch,
bool report_eof_inconsistency = false);
// Returns the physical offset of the last record returned by ReadRecord. // Returns the physical offset of the last record returned by ReadRecord.
// //
@ -120,7 +121,8 @@ class Reader {
bool SkipToInitialBlock(); bool SkipToInitialBlock();
// Return type, or one of the preceding special values // Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result); unsigned int ReadPhysicalRecord(Slice* result,
bool report_eof_inconsistency = false);
// Reports dropped bytes to the reporter. // Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation. // buffer_ must be updated to remove the dropped bytes prior to invocation.

@ -208,10 +208,10 @@ class LogTest : public testing::Test {
return dest_contents().size(); return dest_contents().size();
} }
std::string Read() { std::string Read(const bool report_eof_inconsistency = false) {
std::string scratch; std::string scratch;
Slice record; Slice record;
if (reader_.ReadRecord(&record, &scratch)) { if (reader_.ReadRecord(&record, &scratch, report_eof_inconsistency)) {
return record.ToString(); return record.ToString();
} else { } else {
return "EOF"; return "EOF";
@ -452,6 +452,15 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
ASSERT_EQ("", ReportMessage()); ASSERT_EQ("", ReportMessage());
} }
TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true));
// Truncated last record is ignored, not treated as an error
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
}
TEST_F(LogTest, BadLength) { TEST_F(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize; const int kPayloadSize = kBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize)); Write(BigString("bar", kPayloadSize));
@ -471,6 +480,14 @@ TEST_F(LogTest, BadLengthAtEndIsIgnored) {
ASSERT_EQ("", ReportMessage()); ASSERT_EQ("", ReportMessage());
} }
TEST_F(LogTest, BadLengthAtEndIsNotIgnored) {
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
}
TEST_F(LogTest, ChecksumMismatch) { TEST_F(LogTest, ChecksumMismatch) {
Write("foo"); Write("foo");
IncrementByte(0, 10); IncrementByte(0, 10);
@ -528,6 +545,15 @@ TEST_F(LogTest, MissingLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes()); ASSERT_EQ(0U, DroppedBytes());
} }
TEST_F(LogTest, MissingLastIsNotIgnored) {
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
}
TEST_F(LogTest, PartialLastIsIgnored) { TEST_F(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize)); Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block. // Cause a bad record length in the LAST block.
@ -537,6 +563,17 @@ TEST_F(LogTest, PartialLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes()); ASSERT_EQ(0U, DroppedBytes());
} }
TEST_F(LogTest, PartialLastIsNotIgnored) {
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError(
"Corruption: truncated headerCorruption: "
"error reading trailing data"));
}
TEST_F(LogTest, ErrorJoinsRecords) { TEST_F(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records: // Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2) // first(R1) last(R1) first(R2) last(R2)

@ -70,6 +70,29 @@ enum CompactionStyle : char {
kCompactionStyleNone = 0x3, kCompactionStyleNone = 0x3,
}; };
enum class WALRecoveryMode : char {
// Original levelDB recovery
// We tolerate incomplete record in trailing data on all logs
// Use case : This is legacy behavior (default)
kTolerateCorruptedTailRecords = 0x00,
// Recover from clean shutdown
// We don't expect to find any corruption in the WAL
// Use case : This is ideal for unit tests and rare applications that
// can require high consistency gaurantee
kAbsoluteConsistency = 0x01,
// Recover to point-in-time consistency
// We stop the WAL playback on discovering WAL inconsistency
// Use case : Ideal for systems that have disk controller cache like
// hard disk, SSD without super capacitor that store related data
kPointInTimeRecovery = 0x02,
// Recovery after a disaster
// We ignore any corruption in the WAL and try to salvage as much data as
// possible
// Use case : Ideal for last ditch effort to recover data or systems that
// operate with low grade unrelated data
kSkipAnyCorruptedRecords = 0x03,
};
struct CompactionOptionsFIFO { struct CompactionOptionsFIFO {
// once the total sum of table files reaches this, we will delete the oldest // once the total sum of table files reaches this, we will delete the oldest
// table file // table file
@ -1028,6 +1051,10 @@ struct DBOptions {
// //
// Default: 1MB/s // Default: 1MB/s
uint64_t delayed_write_rate; uint64_t delayed_write_rate;
// Recovery mode to control the consistency while replaying WAL
// Default: kTolerateCorruptedTailRecords
WALRecoveryMode wal_recovery_mode;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

@ -242,7 +242,8 @@ DBOptions::DBOptions()
wal_bytes_per_sync(0), wal_bytes_per_sync(0),
listeners(), listeners(),
enable_thread_tracking(false), enable_thread_tracking(false),
delayed_write_rate(1024U * 1024U) { delayed_write_rate(1024U * 1024U),
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) {
} }
DBOptions::DBOptions(const Options& options) DBOptions::DBOptions(const Options& options)
@ -288,7 +289,8 @@ DBOptions::DBOptions(const Options& options)
wal_bytes_per_sync(options.wal_bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners), listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking), enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate) {} delayed_write_rate(options.delayed_write_rate),
wal_recovery_mode(options.wal_recovery_mode) {}
static const char* const access_hints[] = { static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"

Loading…
Cancel
Save