[RocksDB] Introduce an option to skip log error on recovery

Summary:
Currently, with paranoid_check on, DB::Open will fail on any log read error on recovery.
If client is ok with losing most recent updates, we could simply skip those errors.
However, it's important to introduce an additional flag, so that paranoid_check can
still guard against more serious problems.

Test Plan: make check; db_stress

Reviewers: dhruba, emayanke

Reviewed By: emayanke

CC: leveldb, emayanke

Differential Revision: https://reviews.facebook.net/D10869
main
Haobo Xu 12 years ago
parent d1aaaf718c
commit 87d0af15d8
  1. 9
      db/db_impl.cc
  2. 6
      include/leveldb/options.h
  3. 5
      util/options.cc

@ -590,7 +590,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
Env* env; Env* env;
Logger* info_log; Logger* info_log;
const char* fname; const char* fname;
Status* status; // nullptr if options_.paranoid_checks==false Status* status; // nullptr if options_.paranoid_checks==false or
// options_.skip_log_error_on_recovery==true
virtual void Corruption(size_t bytes, const Status& s) { virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s", Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""), (this->status == nullptr ? "(ignoring error) " : ""),
@ -615,7 +616,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
reporter.env = env_; reporter.env = env_;
reporter.info_log = options_.info_log.get(); reporter.info_log = options_.info_log.get();
reporter.fname = fname.c_str(); reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr); reporter.status = (options_.paranoid_checks &&
!options_.skip_log_error_on_recovery ? &status : nullptr);
// We intentially make log::Reader do checksumming even if // We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits // paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
@ -633,8 +635,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
if (external_table) { if (external_table) {
mem = external_table; mem = external_table;
} }
while (reader.ReadRecord(&record, &scratch) && while (reader.ReadRecord(&record, &scratch) && status.ok()) {
status.ok()) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption( reporter.Corruption(
record.size(), Status::Corruption("log record too small")); record.size(), Status::Corruption("log record too small"));

@ -424,6 +424,12 @@ struct Options {
// Disable child process inherit open files. Default: true // Disable child process inherit open files. Default: true
bool is_fd_close_on_exec; bool is_fd_close_on_exec;
// Skip log corruption error on recovery (If client is ok with
// losing most recent changes)
// Default: false
bool skip_log_error_on_recovery;
}; };
// Options that control read operations // Options that control read operations

@ -68,7 +68,8 @@ Options::Options()
allow_readahead_compactions(true), allow_readahead_compactions(true),
allow_mmap_reads(false), allow_mmap_reads(false),
allow_mmap_writes(true), allow_mmap_writes(true),
is_fd_close_on_exec(true) { is_fd_close_on_exec(true),
skip_log_error_on_recovery(false) {
} }
void void
@ -189,6 +190,8 @@ Options::Dump(Logger* log) const
allow_mmap_writes); allow_mmap_writes);
Log(log," Options.is_fd_close_on_exec: %d", Log(log," Options.is_fd_close_on_exec: %d",
is_fd_close_on_exec); is_fd_close_on_exec);
Log(log," Options.skip_log_error_on_recovery: %d",
skip_log_error_on_recovery);
} // Options::Dump } // Options::Dump
// //

Loading…
Cancel
Save