diff --git a/db/log_reader.cc b/db/log_reader.cc index d46149da2..f2fd97f62 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -22,9 +22,8 @@ Reader::Reporter::~Reporter() { } Reader::Reader(std::shared_ptr info_log, - unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t initial_offset, - uint64_t log_num) + unique_ptr&& _file, Reporter* reporter, + bool checksum, uint64_t initial_offset, uint64_t log_num) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -37,7 +36,8 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset), - log_number_(log_num) {} + log_number_(log_num), + recycled_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -192,6 +192,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, break; case kBadRecordLen: + if (recycled_ && + wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + scratch->clear(); + return false; + } ReportCorruption(drop_size, "bad record length"); if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); @@ -201,6 +207,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, break; case kBadRecordChecksum: + if (recycled_ && + wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + scratch->clear(); + return false; + } ReportCorruption(drop_size, "checksum mismatch"); if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); @@ -355,6 +367,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { const uint32_t length = a | (b << 8); int header_size = kHeaderSize; if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if (end_of_buffer_offset_ - buffer_.size() == 0) { + recycled_ = true; + } header_size = kRecyclableHeaderSize; // We need enough for the larger header if (buffer_.size() < (size_t)kRecyclableHeaderSize) { diff --git a/db/log_reader.h b/db/log_reader.h index 23bff39a7..445118546 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -113,6 +113,9 @@ class Reader { // which log number this is uint64_t const log_number_; + // Whether this is a recycled log file + bool recycled_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, diff --git a/db/log_test.cc b/db/log_test.cc index fe9b63084..c92f09137 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -447,9 +447,13 @@ TEST_P(LogTest, BadLength) { Write("foo"); // Least significant size byte is stored in header[4]. IncrementByte(4, 1); - ASSERT_EQ("foo", Read()); - ASSERT_EQ(kBlockSize, DroppedBytes()); - ASSERT_EQ("OK", MatchError("bad record length")); + if (!GetParam()) { + ASSERT_EQ("foo", Read()); + ASSERT_EQ(kBlockSize, DroppedBytes()); + ASSERT_EQ("OK", MatchError("bad record length")); + } else { + ASSERT_EQ("EOF", Read()); + } } TEST_P(LogTest, BadLengthAtEndIsIgnored) { @@ -472,8 +476,13 @@ TEST_P(LogTest, ChecksumMismatch) { Write("foooooo"); IncrementByte(0, 14); ASSERT_EQ("EOF", Read()); - ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes()); - ASSERT_EQ("OK", MatchError("checksum mismatch")); + if (!GetParam()) { + ASSERT_EQ(14U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("checksum mismatch")); + } else { + ASSERT_EQ(0U, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); + } } TEST_P(LogTest, UnexpectedMiddleType) { @@ -570,11 +579,15 @@ TEST_P(LogTest, ErrorJoinsRecords) { SetByte(offset, 'x'); } - ASSERT_EQ("correct", Read()); - ASSERT_EQ("EOF", Read()); - size_t dropped = DroppedBytes(); - ASSERT_LE(dropped, 2 * kBlockSize + 100); - ASSERT_GE(dropped, 2 * kBlockSize); + if (!GetParam()) { + ASSERT_EQ("correct", Read()); + ASSERT_EQ("EOF", Read()); + size_t dropped = DroppedBytes(); + ASSERT_LE(dropped, 2 * kBlockSize + 100); + ASSERT_GE(dropped, 2 * kBlockSize); + } else { + ASSERT_EQ("EOF", Read()); + } } TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }