diff --git a/HISTORY.md b/HISTORY.md index 933c43e4a..4133dd2ad 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,11 @@ * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env +### New Features +* If we find one truncated record at the end of the MANIFEST or WAL files, + we will ignore it. We assume that writers of these records were interrupted + and that we can safely ignore it. + ## 2.7.0 (01/28/2014) ### Public API changes diff --git a/db/db_impl.cc b/db/db_impl.cc index d13380e53..2fdd7bf87 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -441,7 +441,8 @@ Status DBImpl::NewDB() { const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; - Status s = env_->NewWritableFile(manifest, &file, storage_options_); + Status s = env_->NewWritableFile(manifest, &file, + storage_options_.AdaptForLogWrite()); if (!s.ok()) { return s; } @@ -3524,11 +3525,9 @@ Status DBImpl::MakeRoomForWrite(bool force, SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { - EnvOptions soptions(storage_options_); - soptions.use_mmap_writes = false; DelayLoggingAndReset(); s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), - &lfile, soptions); + &lfile, storage_options_.AdaptForLogWrite()); if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. @@ -3784,7 +3783,6 @@ DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; - EnvOptions soptions(options); if (options.block_cache != nullptr && options.no_block_cache) { return Status::InvalidArgument( @@ -3808,12 +3806,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; - soptions.use_mmap_writes = false; + EnvOptions soptions(options); s = impl->options_.env->NewWritableFile( - LogFileName(impl->options_.wal_dir, new_log_number), - &lfile, - soptions - ); + LogFileName(impl->options_.wal_dir, new_log_number), &lfile, + soptions.AdaptForLogWrite()); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); VersionEdit edit; diff --git a/db/log_reader.cc b/db/log_reader.cc index 1dc567413..be1fb8ceb 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -140,7 +140,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kEof: if (in_fragmented_record) { - ReportCorruption(scratch->size(), "partial record without end(3)"); + // This can be caused by the writer dying immediately after + // writing a physical record but before completing the next; don't + // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; @@ -264,13 +266,12 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { eof_offset_ = buffer_.size(); } continue; - } else if (buffer_.size() == 0) { - // End of file - return kEof; } else { - size_t drop_size = buffer_.size(); + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Instead of considering this an error, + // just report EOF. buffer_.clear(); - ReportCorruption(drop_size, "truncated record at end of file"); return kEof; } } @@ -284,14 +285,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); - ReportCorruption(drop_size, "bad record length"); - return kBadRecord; + if (!eof_) { + ReportCorruption(drop_size, "bad record length"); + return kBadRecord; + } + // If the end of the file has been reached without reading |length| bytes + // of payload, assume the writer died in the middle of writing the record. + // Don't report a corruption. + return kEof; } if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. + // NOTE: this should never happen in DB written by new RocksDB versions, + // since we turn off mmap writes to manifest and log files buffer_.clear(); return kBadRecord; } diff --git a/db/log_test.cc b/db/log_test.cc index 636518835..b28343e63 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -446,20 +446,32 @@ TEST(LogTest, BadRecordType) { ASSERT_EQ("OK", MatchError("unknown record type")); } -TEST(LogTest, TruncatedTrailingRecord) { +TEST(LogTest, TruncatedTrailingRecordIsIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize - 1), DroppedBytes()); - ASSERT_EQ("OK", MatchError("truncated record at end of file")); + // Truncated last record is ignored, not treated as an error + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, BadLength) { + const int kPayloadSize = kBlockSize - kHeaderSize; + Write(BigString("bar", kPayloadSize)); + Write("foo"); + // Least significant size byte is stored in header[4]. + IncrementByte(4, 1); + ASSERT_EQ("foo", Read()); + ASSERT_EQ(kBlockSize, DroppedBytes()); + ASSERT_EQ("OK", MatchError("bad record length")); +} + +TEST(LogTest, BadLengthAtEndIsIgnored) { Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); - ASSERT_EQ((unsigned int)(kHeaderSize + 2), DroppedBytes()); - ASSERT_EQ("OK", MatchError("bad record length")); + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); } TEST(LogTest, ChecksumMismatch) { @@ -510,6 +522,24 @@ TEST(LogTest, UnexpectedFirstType) { ASSERT_EQ("OK", MatchError("partial record without end")); } +TEST(LogTest, MissingLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Remove the LAST block, including header. + ShrinkSize(14); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + +TEST(LogTest, PartialLastIsIgnored) { + Write(BigString("bar", kBlockSize)); + // Cause a bad record length in the LAST block. + ShrinkSize(1); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); +} + TEST(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2) diff --git a/db/repair.cc b/db/repair.cc index 1d5468f25..235bb8967 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -241,7 +241,6 @@ class Repairer { } void ExtractMetaData() { - std::vector kept; for (size_t i = 0; i < table_numbers_.size(); i++) { TableInfo t; t.meta.number = table_numbers_[i]; @@ -307,7 +306,8 @@ class Repairer { Status WriteDescriptor() { std::string tmp = TempFileName(dbname_, 1); unique_ptr file; - Status status = env_->NewWritableFile(tmp, &file, storage_options_); + Status status = + env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite()); if (!status.ok()) { return status; } diff --git a/db/version_set.cc b/db/version_set.cc index b7e416a2a..b4c122970 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1555,7 +1555,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, unique_ptr descriptor_file; s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, - storage_options_); + storage_options_.AdaptForLogWrite()); if (s.ok()) { descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index a1f9349cb..90461b822 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -49,6 +49,8 @@ struct EnvOptions { // construct from Options explicit EnvOptions(const Options& options); + EnvOptions AdaptForLogWrite() const; + // If true, then allow caching of data in environment buffers bool use_os_buffer = true; diff --git a/util/env.cc b/util/env.cc index 0dd29ea56..f02dd4470 100644 --- a/util/env.cc +++ b/util/env.cc @@ -237,6 +237,12 @@ void AssignEnvOptions(EnvOptions* env_options, const Options& options) { } +EnvOptions EnvOptions::AdaptForLogWrite() const { + EnvOptions adapted = *this; + adapted.use_mmap_writes = false; + return adapted; +} + EnvOptions::EnvOptions(const Options& options) { AssignEnvOptions(this, options); }