From 58ca641d537e371aa24c1af8c9fd767b2bf8553d Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 28 Feb 2014 13:19:47 -0800 Subject: [PATCH] Make Log::Reader more robust Summary: This diff does two things: (1) Log::Reader does not report a corruption when the last record in a log or manifest file is truncated (meaning that log writer died in the middle of the write). Inherited the code from LevelDB: https://code.google.com/p/leveldb/source/detail?r=269fc6ca9416129248db5ca57050cd5d39d177c8# (2) Turn off mmap writes for all writes to log and manifest files (2) is necessary because if we use mmap writes, the last record is not truncated, but is actually filled with zeros, making checksum fail. It is hard to recover from checksum failing. Test Plan: Added unit tests from LevelDB Actually recovered a "corrupted" MANIFEST file. Reviewers: dhruba, haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16119 --- HISTORY.md | 5 +++++ db/db_impl.cc | 16 ++++++---------- db/log_reader.cc | 25 +++++++++++++++++-------- db/log_test.cc | 40 +++++++++++++++++++++++++++++++++++----- db/repair.cc | 4 ++-- db/version_set.cc | 2 +- include/rocksdb/env.h | 2 ++ util/env.cc | 6 ++++++ 8 files changed, 74 insertions(+), 26 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 933c43e4a..4133dd2ad 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,11 @@ * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env +### New Features +* If we find one truncated record at the end of the MANIFEST or WAL files, + we will ignore it. We assume that writers of these records were interrupted + and that we can safely ignore it. + ## 2.7.0 (01/28/2014) ### Public API changes diff --git a/db/db_impl.cc b/db/db_impl.cc index d13380e53..2fdd7bf87 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -441,7 +441,8 @@ Status DBImpl::NewDB() { const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; - Status s = env_->NewWritableFile(manifest, &file, storage_options_); + Status s = env_->NewWritableFile(manifest, &file, + storage_options_.AdaptForLogWrite()); if (!s.ok()) { return s; } @@ -3524,11 +3525,9 @@ Status DBImpl::MakeRoomForWrite(bool force, SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { - EnvOptions soptions(storage_options_); - soptions.use_mmap_writes = false; DelayLoggingAndReset(); s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), - &lfile, soptions); + &lfile, storage_options_.AdaptForLogWrite()); if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. @@ -3784,7 +3783,6 @@ DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; - EnvOptions soptions(options); if (options.block_cache != nullptr && options.no_block_cache) { return Status::InvalidArgument( @@ -3808,12 +3806,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; - soptions.use_mmap_writes = false; + EnvOptions soptions(options); s = impl->options_.env->NewWritableFile( - LogFileName(impl->options_.wal_dir, new_log_number), - &lfile, - soptions - ); + LogFileName(impl->options_.wal_dir, new_log_number), &lfile, + soptions.AdaptForLogWrite()); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); VersionEdit edit; diff --git a/db/log_reader.cc b/db/log_reader.cc index 1dc567413..be1fb8ceb 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -140,7 +140,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kEof: if (in_fragmented_record) { - ReportCorruption(scratch->size(), "partial record without end(3)"); + // This can be caused by the writer dying immediately after + // writing a physical record but before completing the next; don't + // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; @@ -264,13 +266,12 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { eof_offset_ = buffer_.size(); } continue; - } else if (buffer_.size() == 0) { - // End of file - return kEof; } else { - size_t drop_size = buffer_.size(); + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Instead of considering this an error, + // just report EOF. buffer_.clear(); - ReportCorruption(drop_size, "truncated record at end of file"); return kEof; } } @@ -284,14 +285,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); - ReportCorruption(drop_size, "bad record length"); - return kBadRecord; + if (!eof_) { + ReportCorruption(drop_size, "bad record length"); + return kBadRecord; + } + // If the end of the file has been reached without reading |length| bytes + // of payload, assume the writer died in the middle of writing the record. + // Don't report a corruption. + return kEof; } if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. + // NOTE: this should never happen in DB written by new RocksDB versions, + // since we turn off mmap writes to manifest and log files buffer_.clear(); return kBadRecord; } diff --git a/db/log_test.cc b/db/log_test.cc index 636518835..b28343e63 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -446,20 +446,32 @@ TEST(LogTest, BadRecordType) { ASSERT_EQ("OK", MatchError("unknown record type")); } -TEST(LogTest, TruncatedTrailingRecord) { +TEST(LogTest, TruncatedTrailingRecordIsIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize - 1), DroppedBytes()); - ASSERT_EQ("OK", MatchError("truncated record at end of file")); + // Truncated last record is ignored, not treated as an error + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, BadLength) { + const int kPayloadSize = kBlockSize - kHeaderSize; + Write(BigString("bar", kPayloadSize)); + Write("foo"); + // Least significant size byte is stored in header[4]. + IncrementByte(4, 1); + ASSERT_EQ("foo", Read()); + ASSERT_EQ(kBlockSize, DroppedBytes()); + ASSERT_EQ("OK", MatchError("bad record length")); +} + +TEST(LogTest, BadLengthAtEndIsIgnored) { Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize + 2), DroppedBytes()); - ASSERT_EQ("OK", MatchError("bad record length")); + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, ChecksumMismatch) { @@ -510,6 +522,24 @@ TEST(LogTest, UnexpectedFirstType) { ASSERT_EQ("OK", MatchError("partial record without end")); } +TEST(LogTest, MissingLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Remove the LAST block, including header. + ShrinkSize(14); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + +TEST(LogTest, PartialLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Cause a bad record length in the LAST block. + ShrinkSize(1); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + TEST(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2) diff --git a/db/repair.cc b/db/repair.cc index 1d5468f25..235bb8967 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -241,7 +241,6 @@ class Repairer { } void ExtractMetaData() { - std::vector kept; for (size_t i = 0; i < table_numbers_.size(); i++) { TableInfo t; t.meta.number = table_numbers_[i]; @@ -307,7 +306,8 @@ class Repairer { Status WriteDescriptor() { std::string tmp = TempFileName(dbname_, 1); unique_ptr file; - Status status = env_->NewWritableFile(tmp, &file, storage_options_); + Status status = + env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite()); if (!status.ok()) { return status; } diff --git a/db/version_set.cc b/db/version_set.cc index b7e416a2a..b4c122970 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1555,7 +1555,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, unique_ptr descriptor_file; s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, - storage_options_); + storage_options_.AdaptForLogWrite()); if (s.ok()) { descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index a1f9349cb..90461b822 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -49,6 +49,8 @@ struct EnvOptions { // construct from Options explicit EnvOptions(const Options& options); + EnvOptions AdaptForLogWrite() const; + // If true, then allow caching of data in environment buffers bool use_os_buffer = true; diff --git a/util/env.cc b/util/env.cc index 0dd29ea56..f02dd4470 100644 --- a/util/env.cc +++ b/util/env.cc @@ -237,6 +237,12 @@ void AssignEnvOptions(EnvOptions* env_options, const Options& options) { } +EnvOptions EnvOptions::AdaptForLogWrite() const { + EnvOptions adapted = *this; + adapted.use_mmap_writes = false; + return adapted; +} + EnvOptions::EnvOptions(const Options& options) { AssignEnvOptions(this, options); }