log_reader: pass log_number and optional info_log to ctor

We will need the log number to validate the recycle-style CRCs.  The log
is helpful for debugging, but optional, as not all callers have it.

Signed-off-by: Sage Weil <sage@redhat.com>
main
Sage Weil 9 years ago
parent 5830c699f2
commit 3ac13c99d1
  1. 4
      db/db_impl.cc
  2. 12
      db/log_reader.cc
  3. 11
      db/log_reader.h
  4. 13
      db/log_test.cc
  5. 4
      db/repair.cc
  6. 6
      db/transaction_log_impl.cc
  7. 15
      db/version_set.cc
  8. 4
      db/wal_manager.cc
  9. 16
      tools/ldb_cmd.cc

@ -1119,8 +1119,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// paranoid_checks==false so that corruptions cause entire commits // paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
// large sequence numbers). // large sequence numbers).
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, log_number);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
db_options_.wal_recovery_mode, !continue_replay_log); db_options_.wal_recovery_mode, !continue_replay_log);

@ -21,9 +21,12 @@ namespace log {
Reader::Reporter::~Reporter() { Reader::Reporter::~Reporter() {
} }
Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter, Reader::Reader(std::shared_ptr<Logger> info_log,
bool checksum, uint64_t initial_offset) unique_ptr<SequentialFileReader>&& _file,
: file_(std::move(_file)), Reporter* reporter, bool checksum, uint64_t initial_offset,
uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter), reporter_(reporter),
checksum_(checksum), checksum_(checksum),
backing_store_(new char[kBlockSize]), backing_store_(new char[kBlockSize]),
@ -33,7 +36,8 @@ Reader::Reader(unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
eof_offset_(0), eof_offset_(0),
last_record_offset_(0), last_record_offset_(0),
end_of_buffer_offset_(0), end_of_buffer_offset_(0),
initial_offset_(initial_offset) {} initial_offset_(initial_offset),
log_number_(log_num) {}
Reader::~Reader() { Reader::~Reader() {
delete[] backing_store_; delete[] backing_store_;

@ -18,6 +18,7 @@
namespace rocksdb { namespace rocksdb {
class SequentialFileReader; class SequentialFileReader;
class Logger;
using std::unique_ptr; using std::unique_ptr;
namespace log { namespace log {
@ -51,8 +52,10 @@ class Reader {
// //
// The Reader will start reading at the first record located at physical // The Reader will start reading at the first record located at physical
// position >= initial_offset within the file. // position >= initial_offset within the file.
Reader(unique_ptr<SequentialFileReader>&& file, Reporter* reporter, Reader(std::shared_ptr<Logger> info_log,
bool checksum, uint64_t initial_offset); unique_ptr<SequentialFileReader>&& file,
Reporter* reporter, bool checksum, uint64_t initial_offset,
uint64_t log_num);
~Reader(); ~Reader();
@ -84,6 +87,7 @@ class Reader {
SequentialFileReader* file() { return file_.get(); } SequentialFileReader* file() { return file_.get(); }
private: private:
std::shared_ptr<Logger> info_log_;
const unique_ptr<SequentialFileReader> file_; const unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_; Reporter* const reporter_;
bool const checksum_; bool const checksum_;
@ -104,6 +108,9 @@ class Reader {
// Offset at which to start looking for the first record to return // Offset at which to start looking for the first record to return
uint64_t const initial_offset_; uint64_t const initial_offset_;
// which log number this is
uint64_t const log_number_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
kEof = kMaxRecordType + 1, kEof = kMaxRecordType + 1,

@ -163,8 +163,8 @@ class LogTest : public ::testing::TestWithParam<int> {
source_holder_( source_holder_(
test::GetSequentialFileReader(new StringSource(reader_contents_))), test::GetSequentialFileReader(new StringSource(reader_contents_))),
writer_(std::move(dest_holder_), 123, GetParam()), writer_(std::move(dest_holder_), 123, GetParam()),
reader_(std::move(source_holder_), &report_, true /*checksum*/, reader_(NULL, std::move(source_holder_), &report_,
0 /*initial_offset*/) {} true /*checksum*/, 0 /*initial_offset*/, 123) {}
void Write(const std::string& msg) { void Write(const std::string& msg) {
writer_.AddRecord(Slice(msg)); writer_.AddRecord(Slice(msg));
@ -258,8 +258,8 @@ class LogTest : public ::testing::TestWithParam<int> {
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); test::GetSequentialFileReader(new StringSource(reader_contents_)));
unique_ptr<Reader> offset_reader( unique_ptr<Reader> offset_reader(
new Reader(std::move(file_reader), &report_, true /*checksum*/, new Reader(NULL, std::move(file_reader), &report_,
WrittenBytes() + offset_past_end)); true /*checksum*/, WrittenBytes() + offset_past_end, 123));
Slice record; Slice record;
std::string scratch; std::string scratch;
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
@ -270,8 +270,9 @@ class LogTest : public ::testing::TestWithParam<int> {
WriteInitialOffsetLog(); WriteInitialOffsetLog();
unique_ptr<SequentialFileReader> file_reader( unique_ptr<SequentialFileReader> file_reader(
test::GetSequentialFileReader(new StringSource(reader_contents_))); test::GetSequentialFileReader(new StringSource(reader_contents_)));
unique_ptr<Reader> offset_reader(new Reader( unique_ptr<Reader> offset_reader(
std::move(file_reader), &report_, true /*checksum*/, initial_offset)); new Reader(NULL, std::move(file_reader), &report_,
true /*checksum*/, initial_offset, 123));
Slice record; Slice record;
std::string scratch; std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));

@ -249,8 +249,8 @@ class Repairer {
// corruptions cause entire commits to be skipped instead of // corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence // propagating bad information (like overly large sequence
// numbers). // numbers).
log::Reader reader(std::move(lfile_reader), &reporter, log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, 0 /*initial_offset*/); true /*enable checksum*/, 0 /*initial_offset*/, log);
// Read all the records and add to a memtable // Read all the records and add to a memtable
std::string scratch; std::string scratch;

@ -262,8 +262,10 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
return s; return s;
} }
assert(file); assert(file);
currentLogReader_.reset(new log::Reader(std::move(file), &reporter_, currentLogReader_.reset(new log::Reader(options_->info_log,
read_options_.verify_checksums_, 0)); std::move(file), &reporter_,
read_options_.verify_checksums_, 0,
logFile->LogNumber()));
return Status::OK(); return Status::OK();
} }
} // namespace rocksdb } // namespace rocksdb

@ -2376,8 +2376,8 @@ Status VersionSet::Recover(
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(manifest_file_reader), &reporter, log::Reader reader(NULL, std::move(manifest_file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -2629,8 +2629,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
column_family_names.insert({0, kDefaultColumnFamilyName}); column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
0 /*initial_offset*/); 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -2787,8 +2787,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(NULL, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3040,7 +3040,8 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_num,
} }
file_reader.reset(new SequentialFileReader(std::move(file))); file_reader.reset(new SequentialFileReader(std::move(file)));
} }
log::Reader reader(std::move(file_reader), nullptr, true /*checksum*/, 0); log::Reader reader(NULL, std::move(file_reader), nullptr,
true /*checksum*/, 0, 0);
Slice r; Slice r;
std::string scratch; std::string scratch;
bool result = false; bool result = false;

@ -448,8 +448,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.fname = fname.c_str(); reporter.fname = fname.c_str();
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
0 /*initial_offset*/); true /*checksum*/, 0 /*initial_offset*/, *sequence);
std::string scratch; std::string scratch;
Slice record; Slice record;

@ -1438,7 +1438,21 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
} }
} else { } else {
StdErrReporter reporter; StdErrReporter reporter;
log::Reader reader(move(wal_file_reader), &reporter, true, 0); uint64_t log_number;
FileType type;
// we need the log number, but ParseFilename expects dbname/NNN.log.
string sanitized = wal_file;
size_t lastslash = sanitized.rfind('/');
if (lastslash != std::string::npos)
sanitized = sanitized.substr(lastslash + 1);
if (!ParseFileName(sanitized, &log_number, &type)) {
// bogus input, carry on as best we can
log_number = 0;
}
DBOptions db_options;
log::Reader reader(db_options.info_log, move(wal_file_reader), &reporter,
true, 0, log_number);
string scratch; string scratch;
WriteBatch batch; WriteBatch batch;
Slice record; Slice record;

Loading…
Cancel
Save