|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#include "db/log_reader.h"
|
|
|
|
|
|
|
|
#include <stdio.h>
|
|
|
|
#include "rocksdb/env.h"
|
|
|
|
#include "util/coding.h"
|
|
|
|
#include "util/crc32c.h"
|
Move rate_limiter, write buffering, most perf context instrumentation and most random kill out of Env
Summary: We want to keep Env a think layer for better portability. Less platform dependent codes should be moved out of Env. In this patch, I create a wrapper of file readers and writers, and put rate limiting, write buffering, as well as most perf context instrumentation and random kill out of Env. It will make it easier to maintain multiple Env in the future.
Test Plan: Run all existing unit tests.
Reviewers: anthony, kradhakrishnan, IslamAbdelRahman, yhchiang, igor
Reviewed By: igor
Subscribers: leveldb, dhruba
Differential Revision: https://reviews.facebook.net/D42321
9 years ago
|
|
|
#include "util/file_reader_writer.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
namespace log {
|
|
|
|
|
|
|
|
Reader::Reporter::~Reporter() {
|
|
|
|
}
|
|
|
|
|
|
|
|
Reader::Reader(std::shared_ptr<Logger> info_log,
|
|
|
|
unique_ptr<SequentialFileReader>&& _file,
|
|
|
|
Reporter* reporter, bool checksum, uint64_t initial_offset,
|
|
|
|
uint64_t log_num)
|
|
|
|
: info_log_(info_log),
|
|
|
|
file_(std::move(_file)),
|
|
|
|
reporter_(reporter),
|
|
|
|
checksum_(checksum),
|
|
|
|
backing_store_(new char[kBlockSize]),
|
|
|
|
buffer_(),
|
|
|
|
eof_(false),
|
Fix UnmarkEOF for partial blocks
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.
This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).
Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier
Reviewers: emayanke, haobo, dhruba
Reviewed By: haobo
CC: vamsi, sheki, dhruba, kailiu, igor
Differential Revision: https://reviews.facebook.net/D15249
11 years ago
|
|
|
read_error_(false),
|
|
|
|
eof_offset_(0),
|
|
|
|
last_record_offset_(0),
|
|
|
|
end_of_buffer_offset_(0),
|
|
|
|
initial_offset_(initial_offset),
|
|
|
|
log_number_(log_num) {}
|
|
|
|
|
|
|
|
Reader::~Reader() {
|
|
|
|
delete[] backing_store_;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool Reader::SkipToInitialBlock() {
|
|
|
|
size_t initial_offset_in_block = initial_offset_ % kBlockSize;
|
|
|
|
uint64_t block_start_location = initial_offset_ - initial_offset_in_block;
|
|
|
|
|
|
|
|
// Don't search a block if we'd be in the trailer
|
|
|
|
if (initial_offset_in_block > kBlockSize - 6) {
|
|
|
|
block_start_location += kBlockSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
end_of_buffer_offset_ = block_start_location;
|
|
|
|
|
|
|
|
// Skip to start of first block that can contain the initial record
|
|
|
|
if (block_start_location > 0) {
|
|
|
|
Status skip_status = file_->Skip(block_start_location);
|
|
|
|
if (!skip_status.ok()) {
|
|
|
|
ReportDrop(static_cast<size_t>(block_start_location), skip_status);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
|
|
|
const bool report_eof_inconsistency) {
|
|
|
|
if (last_record_offset_ < initial_offset_) {
|
|
|
|
if (!SkipToInitialBlock()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
scratch->clear();
|
|
|
|
record->clear();
|
|
|
|
bool in_fragmented_record = false;
|
|
|
|
// Record offset of the logical record that we're reading
|
|
|
|
// 0 is a dummy value to make compilers happy
|
|
|
|
uint64_t prospective_record_offset = 0;
|
|
|
|
|
|
|
|
Slice fragment;
|
|
|
|
while (true) {
|
|
|
|
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
|
|
|
|
const unsigned int record_type =
|
|
|
|
ReadPhysicalRecord(&fragment, report_eof_inconsistency);
|
|
|
|
switch (record_type) {
|
|
|
|
case kFullType:
|
|
|
|
if (in_fragmented_record && !scratch->empty()) {
|
|
|
|
// Handle bug in earlier versions of log::Writer where
|
|
|
|
// it could emit an empty kFirstType record at the tail end
|
|
|
|
// of a block followed by a kFullType or kFirstType record
|
|
|
|
// at the beginning of the next block.
|
|
|
|
ReportCorruption(scratch->size(), "partial record without end(1)");
|
|
|
|
}
|
|
|
|
prospective_record_offset = physical_record_offset;
|
|
|
|
scratch->clear();
|
|
|
|
*record = fragment;
|
|
|
|
last_record_offset_ = prospective_record_offset;
|
|
|
|
return true;
|
|
|
|
|
|
|
|
case kFirstType:
|
|
|
|
if (in_fragmented_record && !scratch->empty()) {
|
|
|
|
// Handle bug in earlier versions of log::Writer where
|
|
|
|
// it could emit an empty kFirstType record at the tail end
|
|
|
|
// of a block followed by a kFullType or kFirstType record
|
|
|
|
// at the beginning of the next block.
|
|
|
|
ReportCorruption(scratch->size(), "partial record without end(2)");
|
|
|
|
}
|
|
|
|
prospective_record_offset = physical_record_offset;
|
|
|
|
scratch->assign(fragment.data(), fragment.size());
|
|
|
|
in_fragmented_record = true;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case kMiddleType:
|
|
|
|
if (!in_fragmented_record) {
|
|
|
|
ReportCorruption(fragment.size(),
|
|
|
|
"missing start of fragmented record(1)");
|
|
|
|
} else {
|
|
|
|
scratch->append(fragment.data(), fragment.size());
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
|
|
case kLastType:
|
|
|
|
if (!in_fragmented_record) {
|
|
|
|
ReportCorruption(fragment.size(),
|
|
|
|
"missing start of fragmented record(2)");
|
|
|
|
} else {
|
|
|
|
scratch->append(fragment.data(), fragment.size());
|
|
|
|
*record = Slice(*scratch);
|
|
|
|
last_record_offset_ = prospective_record_offset;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
|
|
case kEof:
|
|
|
|
if (in_fragmented_record) {
|
|
|
|
if (report_eof_inconsistency) {
|
|
|
|
ReportCorruption(scratch->size(), "error reading trailing data");
|
|
|
|
}
|
|
|
|
// This can be caused by the writer dying immediately after
|
|
|
|
// writing a physical record but before completing the next; don't
|
|
|
|
// treat it as a corruption, just ignore the entire logical record.
|
|
|
|
scratch->clear();
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
|
|
|
|
case kBadRecord:
|
|
|
|
if (in_fragmented_record) {
|
|
|
|
ReportCorruption(scratch->size(), "error in middle of record");
|
|
|
|
in_fragmented_record = false;
|
|
|
|
scratch->clear();
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
|
|
default: {
|
|
|
|
char buf[40];
|
|
|
|
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
|
|
|
|
ReportCorruption(
|
|
|
|
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
|
|
|
|
buf);
|
|
|
|
in_fragmented_record = false;
|
|
|
|
scratch->clear();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t Reader::LastRecordOffset() {
|
|
|
|
return last_record_offset_;
|
|
|
|
}
|
|
|
|
|
Fix UnmarkEOF for partial blocks
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.
This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).
Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier
Reviewers: emayanke, haobo, dhruba
Reviewed By: haobo
CC: vamsi, sheki, dhruba, kailiu, igor
Differential Revision: https://reviews.facebook.net/D15249
11 years ago
|
|
|
void Reader::UnmarkEOF() {
|
|
|
|
if (read_error_) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
eof_ = false;
|
|
|
|
|
|
|
|
if (eof_offset_ == 0) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the EOF was in the middle of a block (a partial block was read) we have
|
|
|
|
// to read the rest of the block as ReadPhysicalRecord can only read full
|
|
|
|
// blocks and expects the file position indicator to be aligned to the start
|
|
|
|
// of a block.
|
|
|
|
//
|
|
|
|
// consumed_bytes + buffer_size() + remaining == kBlockSize
|
|
|
|
|
|
|
|
size_t consumed_bytes = eof_offset_ - buffer_.size();
|
|
|
|
size_t remaining = kBlockSize - eof_offset_;
|
|
|
|
|
|
|
|
// backing_store_ is used to concatenate what is left in buffer_ and
|
|
|
|
// the remainder of the block. If buffer_ already uses backing_store_,
|
|
|
|
// we just append the new data.
|
|
|
|
if (buffer_.data() != backing_store_ + consumed_bytes) {
|
|
|
|
// Buffer_ does not use backing_store_ for storage.
|
|
|
|
// Copy what is left in buffer_ to backing_store.
|
|
|
|
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice read_buffer;
|
|
|
|
Status status = file_->Read(remaining, &read_buffer,
|
|
|
|
backing_store_ + eof_offset_);
|
|
|
|
|
|
|
|
size_t added = read_buffer.size();
|
|
|
|
end_of_buffer_offset_ += added;
|
|
|
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
if (added > 0) {
|
|
|
|
ReportDrop(added, status);
|
|
|
|
}
|
|
|
|
|
|
|
|
read_error_ = true;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (read_buffer.data() != backing_store_ + eof_offset_) {
|
|
|
|
// Read did not write to backing_store_
|
|
|
|
memmove(backing_store_ + eof_offset_, read_buffer.data(),
|
|
|
|
read_buffer.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
buffer_ = Slice(backing_store_ + consumed_bytes,
|
|
|
|
eof_offset_ + added - consumed_bytes);
|
|
|
|
|
|
|
|
if (added < remaining) {
|
|
|
|
eof_ = true;
|
|
|
|
eof_offset_ += added;
|
|
|
|
} else {
|
|
|
|
eof_offset_ = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reader::ReportCorruption(size_t bytes, const char* reason) {
|
|
|
|
ReportDrop(bytes, Status::Corruption(reason));
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reader::ReportDrop(size_t bytes, const Status& reason) {
|
|
|
|
if (reporter_ != nullptr &&
|
|
|
|
end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
|
|
|
|
reporter_->Corruption(bytes, reason);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
unsigned int Reader::ReadPhysicalRecord(Slice* result,
|
|
|
|
const bool report_eof_inconsistency) {
|
|
|
|
while (true) {
|
|
|
|
if (buffer_.size() < (size_t)kHeaderSize) {
|
Fix UnmarkEOF for partial blocks
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.
This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).
Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier
Reviewers: emayanke, haobo, dhruba
Reviewed By: haobo
CC: vamsi, sheki, dhruba, kailiu, igor
Differential Revision: https://reviews.facebook.net/D15249
11 years ago
|
|
|
if (!eof_ && !read_error_) {
|
|
|
|
// Last read was a full read, so this is a trailer to skip
|
|
|
|
buffer_.clear();
|
|
|
|
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
|
|
|
|
end_of_buffer_offset_ += buffer_.size();
|
|
|
|
if (!status.ok()) {
|
|
|
|
buffer_.clear();
|
|
|
|
ReportDrop(kBlockSize, status);
|
Fix UnmarkEOF for partial blocks
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.
This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).
Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier
Reviewers: emayanke, haobo, dhruba
Reviewed By: haobo
CC: vamsi, sheki, dhruba, kailiu, igor
Differential Revision: https://reviews.facebook.net/D15249
11 years ago
|
|
|
read_error_ = true;
|
|
|
|
return kEof;
|
|
|
|
} else if (buffer_.size() < (size_t)kBlockSize) {
|
|
|
|
eof_ = true;
|
Fix UnmarkEOF for partial blocks
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.
This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).
Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier
Reviewers: emayanke, haobo, dhruba
Reviewed By: haobo
CC: vamsi, sheki, dhruba, kailiu, igor
Differential Revision: https://reviews.facebook.net/D15249
11 years ago
|
|
|
eof_offset_ = buffer_.size();
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
// Note that if buffer_ is non-empty, we have a truncated header at the
|
|
|
|
// end of the file, which can be caused by the writer crashing in the
|
|
|
|
// middle of writing the header. Unless explicitly requested we don't
|
|
|
|
// considering this an error, just report EOF.
|
|
|
|
if (buffer_.size() && report_eof_inconsistency) {
|
|
|
|
ReportCorruption(buffer_.size(), "truncated header");
|
|
|
|
}
|
|
|
|
buffer_.clear();
|
|
|
|
return kEof;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse the header
|
|
|
|
const char* header = buffer_.data();
|
|
|
|
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
|
|
|
|
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
|
|
|
|
const unsigned int type = header[6];
|
|
|
|
const uint32_t length = a | (b << 8);
|
|
|
|
if (kHeaderSize + length > buffer_.size()) {
|
|
|
|
size_t drop_size = buffer_.size();
|
|
|
|
buffer_.clear();
|
|
|
|
if (!eof_) {
|
|
|
|
ReportCorruption(drop_size, "bad record length");
|
|
|
|
return kBadRecord;
|
|
|
|
}
|
|
|
|
// If the end of the file has been reached without reading |length| bytes
|
|
|
|
// of payload, assume the writer died in the middle of writing the record.
|
|
|
|
// Don't report a corruption unless requested.
|
|
|
|
if (drop_size && report_eof_inconsistency) {
|
|
|
|
ReportCorruption(drop_size, "truncated header");
|
|
|
|
}
|
|
|
|
return kEof;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (type == kZeroType && length == 0) {
|
|
|
|
// Skip zero length record without reporting any drops since
|
|
|
|
// such records are produced by the mmap based writing code in
|
|
|
|
// env_posix.cc that preallocates file regions.
|
|
|
|
// NOTE: this should never happen in DB written by new RocksDB versions,
|
|
|
|
// since we turn off mmap writes to manifest and log files
|
|
|
|
buffer_.clear();
|
|
|
|
return kBadRecord;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check crc
|
|
|
|
if (checksum_) {
|
|
|
|
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
|
|
|
|
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
|
|
|
|
if (actual_crc != expected_crc) {
|
|
|
|
// Drop the rest of the buffer since "length" itself may have
|
|
|
|
// been corrupted and if we trust it, we could find some
|
|
|
|
// fragment of a real log record that just happens to look
|
|
|
|
// like a valid log record.
|
|
|
|
size_t drop_size = buffer_.size();
|
|
|
|
buffer_.clear();
|
|
|
|
ReportCorruption(drop_size, "checksum mismatch");
|
|
|
|
return kBadRecord;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
buffer_.remove_prefix(kHeaderSize + length);
|
|
|
|
|
|
|
|
// Skip physical record that started before initial_offset_
|
|
|
|
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
|
|
|
|
initial_offset_) {
|
|
|
|
result->clear();
|
|
|
|
return kBadRecord;
|
|
|
|
}
|
|
|
|
|
|
|
|
*result = Slice(header + kHeaderSize, length);
|
|
|
|
return type;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace log
|
|
|
|
} // namespace rocksdb
|