diff --git a/db/db_impl.cc b/db/db_impl.cc index fc39e15c6..cbed7482d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1126,21 +1126,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, db_options_.wal_recovery_mode, !continue_replay_log); // Determine if we should tolerate incomplete records at the tail end of the - // log - bool report_eof_inconsistency; - if (db_options_.wal_recovery_mode == - WALRecoveryMode::kAbsoluteConsistency) { - // in clean shutdown we don't expect any error in the log files - report_eof_inconsistency = true; - } else { - // for other modes ignore only incomplete records in the last log file - // which is presumably due to write in progress during restart - report_eof_inconsistency = false; - - // TODO krad: Evaluate if we need to move to a more strict mode where we - // restrict the inconsistency to only the last log - } - // Read all the records and add to a memtable std::string scratch; Slice record; @@ -1155,9 +1140,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } - while (continue_replay_log && - reader.ReadRecord(&record, &scratch, report_eof_inconsistency) && - status.ok()) { + while ( + continue_replay_log && + reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) && + status.ok()) { if (record.size() < 12) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); diff --git a/db/log_reader.cc b/db/log_reader.cc index 44396eb7a..cf5c26152 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -66,8 +66,15 @@ bool Reader::SkipToInitialBlock() { return true; } +// For kAbsoluteConsistency, on clean shutdown we don't expect any error +// in the log files. For other modes, we can ignore only incomplete records +// in the last log file, which are presumably due to a write in progress +// during restart (or from log recycling). +// +// TODO krad: Evaluate if we need to move to a more strict mode where we +// restrict the inconsistency to only the last log bool Reader::ReadRecord(Slice* record, std::string* scratch, - const bool report_eof_inconsistency) { + WALRecoveryMode wal_recovery_mode) { if (last_record_offset_ < initial_offset_) { if (!SkipToInitialBlock()) { return false; @@ -85,7 +92,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); const unsigned int record_type = - ReadPhysicalRecord(&fragment, report_eof_inconsistency); + ReadPhysicalRecord(&fragment, wal_recovery_mode); switch (record_type) { case kFullType: if (in_fragmented_record && !scratch->empty()) { @@ -137,7 +144,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, case kEof: if (in_fragmented_record) { - if (report_eof_inconsistency) { + if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files ReportCorruption(scratch->size(), "error reading trailing data"); } // This can be caused by the writer dying immediately after @@ -249,7 +257,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { } unsigned int Reader::ReadPhysicalRecord(Slice* result, - const bool report_eof_inconsistency) { + WALRecoveryMode wal_recovery_mode) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { if (!eof_ && !read_error_) { @@ -272,7 +280,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, // end of the file, which can be caused by the writer crashing in the // middle of writing the header. Unless explicitly requested we don't // considering this an error, just report EOF. - if (buffer_.size() && report_eof_inconsistency) { + if (buffer_.size() && + wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files ReportCorruption(buffer_.size(), "truncated header"); } buffer_.clear(); @@ -296,7 +306,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. // Don't report a corruption unless requested. - if (drop_size && report_eof_inconsistency) { + if (drop_size && + wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files ReportCorruption(drop_size, "truncated header"); } return kEof; diff --git a/db/log_reader.h b/db/log_reader.h index 1b0a592c9..bb86dfda7 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -14,6 +14,7 @@ #include "db/log_format.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/options.h" namespace rocksdb { @@ -65,7 +66,8 @@ class Reader { // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. bool ReadRecord(Slice* record, std::string* scratch, - bool report_eof_inconsistency = false); + WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords); // Returns the physical offset of the last record returned by ReadRecord. // @@ -128,8 +130,9 @@ class Reader { bool SkipToInitialBlock(); // Return type, or one of the preceding special values - unsigned int ReadPhysicalRecord(Slice* result, - bool report_eof_inconsistency = false); + unsigned int ReadPhysicalRecord( + Slice* result, WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords); // Reports dropped bytes to the reporter. // buffer_ must be updated to remove the dropped bytes prior to invocation. diff --git a/db/log_test.cc b/db/log_test.cc index 55f8a9682..22921090a 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -174,10 +174,11 @@ class LogTest : public ::testing::TestWithParam { return dest_contents().size(); } - std::string Read(const bool report_eof_inconsistency = false) { + std::string Read(const WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords) { std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch, report_eof_inconsistency)) { + if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) { return record.ToString(); } else { return "EOF"; @@ -424,7 +425,7 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte - ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true)); + ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); // Truncated last record is ignored, not treated as an error ASSERT_GT(DroppedBytes(), 0U); ASSERT_EQ("OK", MatchError("Corruption: truncated header")); @@ -452,7 +453,7 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) { TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { Write("foo"); ShrinkSize(1); - ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); ASSERT_GT(DroppedBytes(), 0U); ASSERT_EQ("OK", MatchError("Corruption: truncated header")); } @@ -518,7 +519,7 @@ TEST_P(LogTest, MissingLastIsNotIgnored) { Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); - ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); ASSERT_GT(DroppedBytes(), 0U); ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); } @@ -536,7 +537,7 @@ TEST_P(LogTest, PartialLastIsNotIgnored) { Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); - ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); ASSERT_GT(DroppedBytes(), 0U); ASSERT_EQ("OK", MatchError( "Corruption: truncated headerCorruption: "