Make Log::Reader more robust

Summary:
This diff does two things:
(1) Log::Reader does not report a corruption when the last record in a log or manifest file is truncated (meaning that log writer died in the middle of the write). Inherited the code from LevelDB: https://code.google.com/p/leveldb/source/detail?r=269fc6ca9416129248db5ca57050cd5d39d177c8#
(2) Turn off mmap writes for all writes to log and manifest files

(2) is necessary because if we use mmap writes, the last record is not truncated, but is actually filled with zeros, making checksum fail. It is hard to recover from checksum failing.

Test Plan:
Added unit tests from LevelDB
Actually recovered a "corrupted" MANIFEST file.

Reviewers: dhruba, haobo

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16119
main
Igor Canadi 11 years ago
parent a77527f2af
commit 58ca641d53
  1. 5
      HISTORY.md
  2. 16
      db/db_impl.cc
  3. 21
      db/log_reader.cc
  4. 40
      db/log_test.cc
  5. 4
      db/repair.cc
  6. 2
      db/version_set.cc
  7. 2
      include/rocksdb/env.h
  8. 6
      util/env.cc

@ -9,6 +9,11 @@
* Added is_manual_compaction to CompactionFilter::Context * Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin() = 0" in class Env * Added "virtual void WaitForJoin() = 0" in class Env
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it.
## 2.7.0 (01/28/2014) ## 2.7.0 (01/28/2014)
### Public API changes ### Public API changes

@ -441,7 +441,8 @@ Status DBImpl::NewDB() {
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(manifest, &file, storage_options_); Status s = env_->NewWritableFile(manifest, &file,
storage_options_.AdaptForLogWrite());
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -3524,11 +3525,9 @@ Status DBImpl::MakeRoomForWrite(bool force,
SuperVersion* new_superversion = nullptr; SuperVersion* new_superversion = nullptr;
mutex_.Unlock(); mutex_.Unlock();
{ {
EnvOptions soptions(storage_options_);
soptions.use_mmap_writes = false;
DelayLoggingAndReset(); DelayLoggingAndReset();
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
&lfile, soptions); &lfile, storage_options_.AdaptForLogWrite());
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
@ -3784,7 +3783,6 @@ DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr; *dbptr = nullptr;
EnvOptions soptions(options);
if (options.block_cache != nullptr && options.no_block_cache) { if (options.block_cache != nullptr && options.no_block_cache) {
return Status::InvalidArgument( return Status::InvalidArgument(
@ -3808,12 +3806,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
if (s.ok()) { if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
soptions.use_mmap_writes = false; EnvOptions soptions(options);
s = impl->options_.env->NewWritableFile( s = impl->options_.env->NewWritableFile(
LogFileName(impl->options_.wal_dir, new_log_number), LogFileName(impl->options_.wal_dir, new_log_number), &lfile,
&lfile, soptions.AdaptForLogWrite());
soptions
);
if (s.ok()) { if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
VersionEdit edit; VersionEdit edit;

@ -140,7 +140,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
case kEof: case kEof:
if (in_fragmented_record) { if (in_fragmented_record) {
ReportCorruption(scratch->size(), "partial record without end(3)"); // This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear(); scratch->clear();
} }
return false; return false;
@ -264,13 +266,12 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
eof_offset_ = buffer_.size(); eof_offset_ = buffer_.size();
} }
continue; continue;
} else if (buffer_.size() == 0) {
// End of file
return kEof;
} else { } else {
size_t drop_size = buffer_.size(); // Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error,
// just report EOF.
buffer_.clear(); buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof; return kEof;
} }
} }
@ -284,14 +285,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (kHeaderSize + length > buffer_.size()) { if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size(); size_t drop_size = buffer_.size();
buffer_.clear(); buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length"); ReportCorruption(drop_size, "bad record length");
return kBadRecord; return kBadRecord;
} }
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
return kEof;
}
if (type == kZeroType && length == 0) { if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since // Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in // such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions. // env_posix.cc that preallocates file regions.
// NOTE: this should never happen in DB written by new RocksDB versions,
// since we turn off mmap writes to manifest and log files
buffer_.clear(); buffer_.clear();
return kBadRecord; return kBadRecord;
} }

@ -446,20 +446,32 @@ TEST(LogTest, BadRecordType) {
ASSERT_EQ("OK", MatchError("unknown record type")); ASSERT_EQ("OK", MatchError("unknown record type"));
} }
TEST(LogTest, TruncatedTrailingRecord) { TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo"); Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ((unsigned int)(kHeaderSize - 1), DroppedBytes()); // Truncated last record is ignored, not treated as an error
ASSERT_EQ("OK", MatchError("truncated record at end of file")); ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("", ReportMessage());
} }
TEST(LogTest, BadLength) { TEST(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
}
TEST(LogTest, BadLengthAtEndIsIgnored) {
Write("foo"); Write("foo");
ShrinkSize(1); ShrinkSize(1);
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
ASSERT_EQ((unsigned int)(kHeaderSize + 2), DroppedBytes()); ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length")); ASSERT_EQ("", ReportMessage());
} }
TEST(LogTest, ChecksumMismatch) { TEST(LogTest, ChecksumMismatch) {
@ -510,6 +522,24 @@ TEST(LogTest, UnexpectedFirstType) {
ASSERT_EQ("OK", MatchError("partial record without end")); ASSERT_EQ("OK", MatchError("partial record without end"));
} }
TEST(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
}
TEST(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
}
TEST(LogTest, ErrorJoinsRecords) { TEST(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records: // Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2) // first(R1) last(R1) first(R2) last(R2)

@ -241,7 +241,6 @@ class Repairer {
} }
void ExtractMetaData() { void ExtractMetaData() {
std::vector<TableInfo> kept;
for (size_t i = 0; i < table_numbers_.size(); i++) { for (size_t i = 0; i < table_numbers_.size(); i++) {
TableInfo t; TableInfo t;
t.meta.number = table_numbers_[i]; t.meta.number = table_numbers_[i];
@ -307,7 +306,8 @@ class Repairer {
Status WriteDescriptor() { Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1); std::string tmp = TempFileName(dbname_, 1);
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
Status status = env_->NewWritableFile(tmp, &file, storage_options_); Status status =
env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite());
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }

@ -1555,7 +1555,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
unique_ptr<WritableFile> descriptor_file; unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(new_manifest_filename, s = env_->NewWritableFile(new_manifest_filename,
&descriptor_file, &descriptor_file,
storage_options_); storage_options_.AdaptForLogWrite());
if (s.ok()) { if (s.ok()) {
descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get()); s = WriteSnapshot(descriptor_log_.get());

@ -49,6 +49,8 @@ struct EnvOptions {
// construct from Options // construct from Options
explicit EnvOptions(const Options& options); explicit EnvOptions(const Options& options);
EnvOptions AdaptForLogWrite() const;
// If true, then allow caching of data in environment buffers // If true, then allow caching of data in environment buffers
bool use_os_buffer = true; bool use_os_buffer = true;

@ -237,6 +237,12 @@ void AssignEnvOptions(EnvOptions* env_options, const Options& options) {
} }
EnvOptions EnvOptions::AdaptForLogWrite() const {
EnvOptions adapted = *this;
adapted.use_mmap_writes = false;
return adapted;
}
EnvOptions::EnvOptions(const Options& options) { EnvOptions::EnvOptions(const Options& options) {
AssignEnvOptions(this, options); AssignEnvOptions(this, options);
} }

Loading…
Cancel
Save