Delete code for WAL reader to start at nonzero offset (#4362)

Summary:
The code is dead in RocksDB as `log::Reader::initial_offset_` is always zero. We should delete it so we don't have to maintain it like in #4359.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4362

Differential Revision: D9817829

Pulled By: ajkr

fbshipit-source-id: 474a2c679e5bd273b40608f3a5332931d9eefe6d
main
Andrew Kryczka 6 years ago committed by Facebook Github Bot
parent 902261519e
commit c94523ee56
  1. 3
      db/db_impl_open.cc
  2. 42
      db/log_reader.cc
  3. 17
      db/log_reader.h
  4. 83
      db/log_test.cc
  5. 2
      db/repair.cc
  6. 6
      db/transaction_log_impl.cc
  7. 8
      db/version_set.cc
  8. 2
      db/wal_manager.cc
  9. 2
      tools/ldb_cmd.cc

@ -598,8 +598,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
// large sequence numbers). // large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, 0 /*initial_offset*/, &reporter, true /*checksum*/, log_number);
log_number);
// Determine if we should tolerate incomplete records at the tail end of the // Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable // Read all the records and add to a memtable

@ -24,7 +24,7 @@ Reader::Reporter::~Reporter() {
Reader::Reader(std::shared_ptr<Logger> info_log, Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file, Reporter* reporter, unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t initial_offset, uint64_t log_num) bool checksum, uint64_t log_num)
: info_log_(info_log), : info_log_(info_log),
file_(std::move(_file)), file_(std::move(_file)),
reporter_(reporter), reporter_(reporter),
@ -36,7 +36,6 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
eof_offset_(0), eof_offset_(0),
last_record_offset_(0), last_record_offset_(0),
end_of_buffer_offset_(0), end_of_buffer_offset_(0),
initial_offset_(initial_offset),
log_number_(log_num), log_number_(log_num),
recycled_(false) {} recycled_(false) {}
@ -44,29 +43,6 @@ Reader::~Reader() {
delete[] backing_store_; delete[] backing_store_;
} }
bool Reader::SkipToInitialBlock() {
size_t initial_offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - initial_offset_in_block;
// Don't search a block if we'd be in the trailer
if (initial_offset_in_block > kBlockSize - 6) {
block_start_location += kBlockSize;
}
end_of_buffer_offset_ = block_start_location;
// Skip to start of first block that can contain the initial record
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(static_cast<size_t>(block_start_location), skip_status);
return false;
}
}
return true;
}
// For kAbsoluteConsistency, on clean shutdown we don't expect any error // For kAbsoluteConsistency, on clean shutdown we don't expect any error
// in the log files. For other modes, we can ignore only incomplete records // in the log files. For other modes, we can ignore only incomplete records
// in the last log file, which are presumably due to a write in progress // in the last log file, which are presumably due to a write in progress
@ -76,12 +52,6 @@ bool Reader::SkipToInitialBlock() {
// restrict the inconsistency to only the last log // restrict the inconsistency to only the last log
bool Reader::ReadRecord(Slice* record, std::string* scratch, bool Reader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode) { WALRecoveryMode wal_recovery_mode) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear(); scratch->clear();
record->clear(); record->clear();
bool in_fragmented_record = false; bool in_fragmented_record = false;
@ -299,8 +269,7 @@ void Reader::ReportCorruption(size_t bytes, const char* reason) {
} }
void Reader::ReportDrop(size_t bytes, const Status& reason) { void Reader::ReportDrop(size_t bytes, const Status& reason) {
if (reporter_ != nullptr && if (reporter_ != nullptr) {
end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
reporter_->Corruption(bytes, reason); reporter_->Corruption(bytes, reason);
} }
} }
@ -417,13 +386,6 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
buffer_.remove_prefix(header_size + length); buffer_.remove_prefix(header_size + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - header_size - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + header_size, length); *result = Slice(header + header_size, length);
return type; return type;
} }

@ -50,14 +50,10 @@ class Reader {
// live while this Reader is in use. // live while this Reader is in use.
// //
// If "checksum" is true, verify checksums if available. // If "checksum" is true, verify checksums if available.
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
Reader(std::shared_ptr<Logger> info_log, Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in // @lint-ignore TXT2 T25377293 Grandfathered in
unique_ptr<SequentialFileReader>&& file, unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
Reporter* reporter, bool checksum, uint64_t initial_offset, bool checksum, uint64_t log_num);
uint64_t log_num);
~Reader(); ~Reader();
@ -108,9 +104,6 @@ class Reader {
// Offset of the first location past the end of buffer_. // Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_; uint64_t end_of_buffer_offset_;
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;
// which log number this is // which log number this is
uint64_t const log_number_; uint64_t const log_number_;
@ -124,7 +117,6 @@ class Reader {
// Currently there are three situations in which this happens: // Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop) // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported) // * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2, kBadRecord = kMaxRecordType + 2,
// Returned when we fail to read a valid header. // Returned when we fail to read a valid header.
kBadHeader = kMaxRecordType + 3, kBadHeader = kMaxRecordType + 3,
@ -136,11 +128,6 @@ class Reader {
kBadRecordChecksum = kMaxRecordType + 6, kBadRecordChecksum = kMaxRecordType + 6,
}; };
// Skips all blocks that are completely before "initial_offset_".
//
// Returns true on success. Handles reporting.
bool SkipToInitialBlock();
// Return type, or one of the preceding special values // Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);

@ -163,8 +163,8 @@ class LogTest : public ::testing::TestWithParam<int> {
source_holder_(test::GetSequentialFileReader( source_holder_(test::GetSequentialFileReader(
new StringSource(reader_contents_), "" /* file name */)), new StringSource(reader_contents_), "" /* file name */)),
writer_(std::move(dest_holder_), 123, GetParam()), writer_(std::move(dest_holder_), 123, GetParam()),
reader_(nullptr, std::move(source_holder_), &report_, true /*checksum*/, reader_(nullptr, std::move(source_holder_), &report_,
0 /*initial_offset*/, 123) { true /* checksum */, 123 /* log_number */) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
initial_offset_last_record_offsets_[0] = 0; initial_offset_last_record_offsets_[0] = 0;
initial_offset_last_record_offsets_[1] = header_size + 10000; initial_offset_last_record_offsets_[1] = header_size + 10000;
@ -266,36 +266,6 @@ class LogTest : public ::testing::TestWithParam<int> {
} }
} }
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader(test::GetSequentialFileReader(
new StringSource(reader_contents_), "" /* fname */));
unique_ptr<Reader> offset_reader(
new Reader(nullptr, std::move(file_reader), &report_,
true /*checksum*/, WrittenBytes() + offset_past_end, 123));
Slice record;
std::string scratch;
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
}
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader(test::GetSequentialFileReader(
new StringSource(reader_contents_), "" /* fname */));
unique_ptr<Reader> offset_reader(
new Reader(nullptr, std::move(file_reader), &report_,
true /*checksum*/, initial_offset, 123));
Slice record;
std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
record.size());
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
offset_reader->LastRecordOffset());
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
}
}; };
size_t LogTest::initial_offset_record_sizes_[] = size_t LogTest::initial_offset_record_sizes_[] =
@ -590,55 +560,6 @@ TEST_P(LogTest, ErrorJoinsRecords) {
} }
} }
TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
TEST_P(LogTest, ReadSecondStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(10000 + header_size, 1);
}
TEST_P(LogTest, ReadThirdOneOff) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(10000 + header_size + 1, 2);
}
TEST_P(LogTest, ReadThirdStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(20000 + 2 * header_size, 2);
}
TEST_P(LogTest, ReadFourthOneOff) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(20000 + 2 * header_size + 1, 3);
}
TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
}
TEST_P(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
}
TEST_P(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
}
TEST_P(LogTest, ReadFourthStart) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
CheckInitialOffsetRecord(
2 * (header_size + 1000) + (2 * log::kBlockSize - 1000) + 3 * header_size,
3);
}
TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
TEST_P(LogTest, ClearEofSingleBlock) { TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo"); Write("foo");
Write("bar"); Write("bar");

@ -353,7 +353,7 @@ class Repairer {
// propagating bad information (like overly large sequence // propagating bad information (like overly large sequence
// numbers). // numbers).
log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, 0 /*initial_offset*/, log); true /*enable checksum*/, log);
// Initialize per-column family memtables // Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) { for (auto* cfd : *vset_.GetColumnFamilySet()) {

@ -312,9 +312,9 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
return s; return s;
} }
assert(file); assert(file);
currentLogReader_.reset(new log::Reader( currentLogReader_.reset(
options_->info_log, std::move(file), &reporter_, new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, 0, logFile->LogNumber())); read_options_.verify_checksums_, logFile->LogNumber()));
return Status::OK(); return Status::OK();
} }
} // namespace rocksdb } // namespace rocksdb

@ -3424,7 +3424,7 @@ Status VersionSet::Recover(
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, 0); true /* checksum */, 0 /* log_number */);
Slice record; Slice record;
std::string scratch; std::string scratch;
std::vector<VersionEdit> replay_buffer; std::vector<VersionEdit> replay_buffer;
@ -3629,8 +3629,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
column_family_names.insert({0, kDefaultColumnFamilyName}); column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(nullptr, std::move(file_reader), &reporter,
0 /*initial_offset*/, 0); true /* checksum */, 0 /* log_number */);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3790,7 +3790,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter, log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, 0); true /* checksum */, 0 /* log_number */);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {

@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, number); true /*checksum*/, number);
std::string scratch; std::string scratch;
Slice record; Slice record;

@ -1999,7 +1999,7 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
} }
DBOptions db_options; DBOptions db_options;
log::Reader reader(db_options.info_log, std::move(wal_file_reader), log::Reader reader(db_options.info_log, std::move(wal_file_reader),
&reporter, true, 0, log_number); &reporter, true /* checksum */, log_number);
std::string scratch; std::string scratch;
WriteBatch batch; WriteBatch batch;
Slice record; Slice record;

Loading…
Cancel
Save