Add read retry support to log reader (#4394)

Summary:
Current `log::Reader` does not perform retry after encountering `EOF`. In the future, we need the log reader to be able to retry tailing the log even after `EOF`.

Current implementation is simple. It does not provide more advanced retry policies. Will address this in the future.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4394

Differential Revision: D9926508

Pulled By: riversand963

fbshipit-source-id: d86d145792a41bd64a72f642a2a08c7b7b5201e1
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 35cd754a6d
commit da4aa59b4c
  1. 3
      db/db_impl_open.cc
  2. 26
      db/log_reader.cc
  3. 7
      db/log_reader.h
  4. 174
      db/log_test.cc
  5. 3
      db/repair.cc
  6. 3
      db/transaction_log_impl.cc
  7. 9
      db/version_set.cc
  8. 2
      db/wal_manager.cc
  9. 3
      tools/ldb_cmd.cc
  10. 4
      util/sync_point.cc

@ -608,7 +608,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly // to be skipped instead of propagating bad information (like overly
// large sequence numbers). // large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, log_number); &reporter, true /*checksum*/, log_number,
false /* retry_after_eof */);
// Determine if we should tolerate incomplete records at the tail end of the // Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable // Read all the records and add to a memtable

@ -24,7 +24,7 @@ Reader::Reporter::~Reporter() {
Reader::Reader(std::shared_ptr<Logger> info_log, Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file, Reporter* reporter, unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t log_num) bool checksum, uint64_t log_num, bool retry_after_eof)
: info_log_(info_log), : info_log_(info_log),
file_(std::move(_file)), file_(std::move(_file)),
reporter_(reporter), reporter_(reporter),
@ -37,7 +37,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
last_record_offset_(0), last_record_offset_(0),
end_of_buffer_offset_(0), end_of_buffer_offset_(0),
log_number_(log_num), log_number_(log_num),
recycled_(false) {} recycled_(false),
retry_after_eof_(retry_after_eof) {}
Reader::~Reader() { Reader::~Reader() {
delete[] backing_store_; delete[] backing_store_;
@ -208,7 +209,8 @@ void Reader::UnmarkEOF() {
eof_ = false; eof_ = false;
if (eof_offset_ == 0) { // If retry_after_eof_ is true, we have to proceed to read anyway.
if (!retry_after_eof_ && eof_offset_ == 0) {
return; return;
} }
@ -289,8 +291,12 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true; eof_ = true;
eof_offset_ = buffer_.size(); eof_offset_ = buffer_.size();
TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF");
} }
return true; return true;
} else if (retry_after_eof_ && !read_error_) {
UnmarkEOF();
return !read_error_;
} else { } else {
// Note that if buffer_ is non-empty, we have a truncated header at the // Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the // end of the file, which can be caused by the writer crashing in the
@ -345,17 +351,25 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
} }
} }
if (header_size + length > buffer_.size()) { if (header_size + length > buffer_.size()) {
if (!retry_after_eof_) {
*drop_size = buffer_.size(); *drop_size = buffer_.size();
buffer_.clear(); buffer_.clear();
if (!eof_) { if (!eof_) {
return kBadRecordLen; return kBadRecordLen;
} }
// If the end of the file has been reached without reading |length| bytes // If the end of the file has been reached without reading |length|
// of payload, assume the writer died in the middle of writing the record. // bytes of payload, assume the writer died in the middle of writing the
// Don't report a corruption unless requested. // record. Don't report a corruption unless requested.
if (*drop_size) { if (*drop_size) {
return kBadHeader; return kBadHeader;
} }
} else {
int r;
if (!ReadMore(drop_size, &r)) {
return r;
}
continue;
}
return kEof; return kEof;
} }

@ -53,7 +53,7 @@ class Reader {
Reader(std::shared_ptr<Logger> info_log, Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in // @lint-ignore TXT2 T25377293 Grandfathered in
unique_ptr<SequentialFileReader>&& file, Reporter* reporter, unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
bool checksum, uint64_t log_num); bool checksum, uint64_t log_num, bool retry_after_eof);
~Reader(); ~Reader();
@ -110,6 +110,11 @@ class Reader {
// Whether this is a recycled log file // Whether this is a recycled log file
bool recycled_; bool recycled_;
// Whether retry after encountering EOF
// TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit,
// etc.
const bool retry_after_eof_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
kEof = kMaxRecordType + 1, kEof = kMaxRecordType + 1,

@ -164,7 +164,8 @@ class LogTest : public ::testing::TestWithParam<int> {
new StringSource(reader_contents_), "" /* file name */)), new StringSource(reader_contents_), "" /* file name */)),
writer_(std::move(dest_holder_), 123, GetParam()), writer_(std::move(dest_holder_), 123, GetParam()),
reader_(nullptr, std::move(source_holder_), &report_, reader_(nullptr, std::move(source_holder_), &report_,
true /* checksum */, 123 /* log_number */) { true /* checksum */, 123 /* log_number */,
false /* retry_after_eof */) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
initial_offset_last_record_offsets_[0] = 0; initial_offset_last_record_offsets_[0] = 0;
initial_offset_last_record_offsets_[1] = header_size + 10000; initial_offset_last_record_offsets_[1] = header_size + 10000;
@ -652,6 +653,177 @@ TEST_P(LogTest, Recycle) {
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
class RetriableLogTest : public ::testing::TestWithParam<int> {
private:
class ReportCollector : public Reader::Reporter {
public:
size_t dropped_bytes_;
std::string message_;
ReportCollector() : dropped_bytes_(0) {}
virtual void Corruption(size_t bytes, const Status& status) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
};
Slice contents_;
unique_ptr<WritableFileWriter> dest_holder_;
unique_ptr<Writer> log_writer_;
Env* env_;
EnvOptions env_options_;
const std::string test_dir_;
const std::string log_file_;
unique_ptr<WritableFileWriter> writer_;
unique_ptr<SequentialFileReader> reader_;
ReportCollector report_;
unique_ptr<Reader> log_reader_;
public:
RetriableLogTest()
: contents_(),
dest_holder_(nullptr),
log_writer_(nullptr),
env_(Env::Default()),
test_dir_(test::PerThreadDBPath("retriable_log_test")),
log_file_(test_dir_ + "/log"),
writer_(nullptr),
reader_(nullptr),
log_reader_(nullptr) {}
Status SetupTestEnv() {
dest_holder_.reset(test::GetWritableFileWriter(
new test::StringSink(&contents_), "" /* file name */));
assert(dest_holder_ != nullptr);
log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
assert(log_writer_ != nullptr);
Status s;
s = env_->CreateDirIfMissing(test_dir_);
unique_ptr<WritableFile> writable_file;
if (s.ok()) {
s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
}
if (s.ok()) {
writer_.reset(new WritableFileWriter(std::move(writable_file), log_file_,
env_options_));
assert(writer_ != nullptr);
}
unique_ptr<SequentialFile> seq_file;
if (s.ok()) {
s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
}
if (s.ok()) {
reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
assert(reader_ != nullptr);
log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_,
true /* checksum */, 123 /* log_number */,
true /* retry_after_eof */));
assert(log_reader_ != nullptr);
}
return s;
}
std::string contents() {
auto file =
dynamic_cast<test::StringSink*>(log_writer_->file()->writable_file());
assert(file != nullptr);
return file->contents_;
}
void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
void Write(const Slice& data) {
writer_->Append(data);
writer_->Sync(true);
}
std::string Read() {
auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
std::string scratch;
Slice record;
if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) {
return record.ToString();
} else {
return "Read error";
}
}
};
TEST_P(RetriableLogTest, TailLog_PartialHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
{"LogReader::ReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size - 1;
port::Thread log_writer_thread([&]() {
size_t old_sz = contents().size();
Encode("foo");
size_t new_sz = contents().size();
std::string part1 = contents().substr(old_sz, delta);
std::string part2 =
contents().substr(old_sz + delta, new_sz - old_sz - delta);
Write(Slice(part1));
TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
Write(Slice(part2));
});
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
record = Read();
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
}
TEST_P(RetriableLogTest, TailLog_FullHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
{"LogReader::ReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size + 1;
port::Thread log_writer_thread([&]() {
size_t old_sz = contents().size();
Encode("foo");
size_t new_sz = contents().size();
std::string part1 = contents().substr(old_sz, delta);
std::string part2 =
contents().substr(old_sz + delta, new_sz - old_sz - delta);
Write(Slice(part1));
TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
Write(Slice(part2));
});
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
record = Read();
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
}
INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
} // namespace log } // namespace log
} // namespace rocksdb } // namespace rocksdb

@ -363,7 +363,8 @@ class Repairer {
// propagating bad information (like overly large sequence // propagating bad information (like overly large sequence
// numbers). // numbers).
log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, log); true /*enable checksum*/, log,
false /* retry_after_eof */);
// Initialize per-column family memtables // Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) { for (auto* cfd : *vset_.GetColumnFamilySet()) {

@ -314,7 +314,8 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
assert(file); assert(file);
currentLogReader_.reset( currentLogReader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_, new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, logFile->LogNumber())); read_options_.verify_checksums_, logFile->LogNumber(),
false /* retry_after_eof */));
return Status::OK(); return Status::OK();
} }
} // namespace rocksdb } // namespace rocksdb

@ -3419,7 +3419,8 @@ Status VersionSet::Recover(
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
Slice record; Slice record;
std::string scratch; std::string scratch;
std::vector<VersionEdit> replay_buffer; std::vector<VersionEdit> replay_buffer;
@ -3625,7 +3626,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter, log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3785,7 +3787,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter, log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {

@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, number); true /*checksum*/, number, false /* retry_after_eof */);
std::string scratch; std::string scratch;
Slice record; Slice record;

@ -1999,7 +1999,8 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
} }
DBOptions db_options; DBOptions db_options;
log::Reader reader(db_options.info_log, std::move(wal_file_reader), log::Reader reader(db_options.info_log, std::move(wal_file_reader),
&reporter, true /* checksum */, log_number); &reporter, true /* checksum */, log_number,
false /* retry_after_eof */);
std::string scratch; std::string scratch;
WriteBatch batch; WriteBatch batch;
Slice record; Slice record;

@ -17,9 +17,7 @@ SyncPoint* SyncPoint::GetInstance() {
return &sync_point; return &sync_point;
} }
SyncPoint::SyncPoint() : SyncPoint::SyncPoint() : impl_(new Data) {}
impl_(new Data) {
}
SyncPoint:: ~SyncPoint() { SyncPoint:: ~SyncPoint() {
delete impl_; delete impl_;

Loading…
Cancel
Save