From 3d33da75efa22332dde32e5f2c583b1b8b899a6a Mon Sep 17 00:00:00 2001 From: Schalk-Willem Kruger Date: Mon, 27 Jan 2014 14:49:10 -0800 Subject: [PATCH] Fix UnmarkEOF for partial blocks Summary: Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch. This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called). Test Plan: - Added unit tests - Stress test (with replication). Check dbdir/LOG file for corruptions. - Test on test tier Reviewers: emayanke, haobo, dhruba Reviewed By: haobo CC: vamsi, sheki, dhruba, kailiu, igor Differential Revision: https://reviews.facebook.net/D15249 --- db/log_reader.cc | 70 +++++++++++++++++- db/log_reader.h | 12 +++- db/log_test.cc | 179 ++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 232 insertions(+), 29 deletions(-) diff --git a/db/log_reader.cc b/db/log_reader.cc index 6596cd84f..1dc567413 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr&& file, Reporter* reporter, backing_store_(new char[kBlockSize]), buffer_(), eof_(false), + read_error_(false), + eof_offset_(0), last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset) { @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() { return last_record_offset_; } +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + void Reader::ReportCorruption(size_t bytes, const char* reason) { ReportDrop(bytes, Status::Corruption(reason)); } @@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { - if (!eof_) { + if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); @@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); - eof_ = true; + read_error_ = true; return kEof; } else if (buffer_.size() < (size_t)kBlockSize) { eof_ = true; + eof_offset_ = buffer_.size(); } continue; } else if (buffer_.size() == 0) { diff --git a/db/log_reader.h b/db/log_reader.h index 8e277c821..81d334da2 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -69,9 +69,10 @@ class Reader { // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. - void UnmarkEOF() { - eof_ = false; - } + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); SequentialFile* file() { return file_.get(); } @@ -82,6 +83,11 @@ class Reader { char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; // Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; diff --git a/db/log_test.cc b/db/log_test.cc index dedbff0aa..636518835 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,36 +47,93 @@ class LogTest { public: std::string contents_; + explicit StringDest(Slice& reader_contents) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + reader_contents_ = Slice(contents_.data(), 0); + }; + virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } + virtual Status Flush() { + ASSERT_TRUE(reader_contents_.size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_.size(); + reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + + return Status::OK(); + } virtual Status Sync() { return Status::OK(); } virtual Status Append(const Slice& slice) { contents_.append(slice.data(), slice.size()); return Status::OK(); } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + reader_contents_ = Slice( + reader_contents_.data(), reader_contents_.size() - bytes); + last_flush_ = contents_.size(); + } + + private: + Slice& reader_contents_; + size_t last_flush_; }; class StringSource : public SequentialFile { public: - Slice contents_; + Slice& contents_; bool force_error_; + size_t force_error_position_; + bool force_eof_; + size_t force_eof_position_; bool returned_partial_; - StringSource() : force_error_(false), returned_partial_(false) { } + explicit StringSource(Slice& contents) : + contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false) { } virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; if (force_error_) { - force_error_ = false; - returned_partial_ = true; - return Status::Corruption("read error"); + if (force_error_position_ >= n) { + force_error_position_ -= n; + } else { + *result = Slice(contents_.data(), force_error_position_); + contents_.remove_prefix(force_error_position_); + force_error_ = false; + returned_partial_ = true; + return Status::Corruption("read error"); + } } if (contents_.size() < n) { n = contents_.size(); returned_partial_ = true; } - *result = Slice(contents_.data(), n); + + if (force_eof_) { + if (force_eof_position_ >= n) { + force_eof_position_ -= n; + } else { + force_eof_ = false; + n = force_eof_position_; + returned_partial_ = true; + } + } + + // By using scratch we ensure that caller has control over the + // lifetime of result.data() + memcpy(scratch, contents_.data(), n); + *result = Slice(scratch, n); + contents_.remove_prefix(n); return Status::OK(); } @@ -123,10 +180,10 @@ class LogTest { src->contents_ = dest_contents(); } + Slice reader_contents_; unique_ptr dest_holder_; unique_ptr source_holder_; ReportCollector report_; - bool reading_; Writer writer_; Reader reader_; @@ -135,16 +192,15 @@ class LogTest { static uint64_t initial_offset_last_record_offsets_[]; public: - LogTest() : dest_holder_(new StringDest), - source_holder_(new StringSource), - reading_(false), + LogTest() : reader_contents_(), + dest_holder_(new StringDest(reader_contents_)), + source_holder_(new StringSource(reader_contents_)), writer_(std::move(dest_holder_)), reader_(std::move(source_holder_), &report_, true/*checksum*/, 0/*initial_offset*/) { } void Write(const std::string& msg) { - ASSERT_TRUE(!reading_) << "Write() after starting to read"; writer_.AddRecord(Slice(msg)); } @@ -153,10 +209,6 @@ class LogTest { } std::string Read() { - if (!reading_) { - reading_ = true; - reset_source_contents(); - } std::string scratch; Slice record; if (reader_.ReadRecord(&record, &scratch)) { @@ -175,7 +227,9 @@ class LogTest { } void ShrinkSize(int bytes) { - dest_contents().resize(dest_contents().size() - bytes); + auto dest = dynamic_cast(writer_.file()); + assert(dest); + dest->Drop(bytes); } void FixChecksum(int header_offset, int len) { @@ -185,9 +239,10 @@ class LogTest { EncodeFixed32(&dest_contents()[header_offset], crc); } - void ForceError() { + void ForceError(size_t position = 0) { auto src = dynamic_cast(reader_.file()); src->force_error_ = true; + src->force_error_position_ = position; } size_t DroppedBytes() const { @@ -198,6 +253,22 @@ class LogTest { return report_.message_; } + void ForceEOF(size_t position = 0) { + auto src = dynamic_cast(reader_.file()); + src->force_eof_ = true; + src->force_eof_position_ = position; + } + + void UnmarkEOF() { + auto src = dynamic_cast(reader_.file()); + src->returned_partial_ = false; + reader_.UnmarkEOF(); + } + + bool IsEOF() { + return reader_.IsEOF(); + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -217,9 +288,7 @@ class LogTest { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, WrittenBytes() + offset_past_end)); @@ -231,9 +300,7 @@ class LogTest { void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, initial_offset)); @@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST(LogTest, ClearEofSingleBlock) { + Write("foo"); + Write("bar"); + ForceEOF(3 + kHeaderSize + 2); + ASSERT_EQ("foo", Read()); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_TRUE(IsEOF()); + ASSERT_EQ("EOF", Read()); + Write("xxx"); + UnmarkEOF(); + ASSERT_EQ("xxx", Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofMultiBlock) { + size_t num_full_blocks = 5; + size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; + Write(BigString("foo", n)); + Write(BigString("bar", n)); + ForceEOF(n + num_full_blocks * kHeaderSize + 10); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_TRUE(IsEOF()); + UnmarkEOF(); + ASSERT_EQ(BigString("bar", n), Read()); + ASSERT_TRUE(IsEOF()); + Write(BigString("xxx", n)); + UnmarkEOF(); + ASSERT_EQ(BigString("xxx", n), Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofError) { + // If an error occurs during Read() in UnmarkEOF(), the records contained + // in the buffer should be returned on subsequent calls of ReadRecord() + // until no more full records are left, whereafter ReadRecord() should return + // false to indicate that it cannot read any further. + + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + ASSERT_TRUE(IsEOF()); + Write("xxx"); + ForceError(0); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); +} + +TEST(LogTest, ClearEofError2) { + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + Write("xxx"); + ForceError(3); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(3U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("read error")); +} + } // namespace log } // namespace rocksdb