diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index b2cdb2c2c..c595d395c 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -608,7 +608,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), - &reporter, true /*checksum*/, log_number); + &reporter, true /*checksum*/, log_number, + false /* retry_after_eof */); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable diff --git a/db/log_reader.cc b/db/log_reader.cc index bd9d300bf..d58540be3 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,7 +24,7 @@ Reader::Reporter::~Reporter() { Reader::Reader(std::shared_ptr info_log, unique_ptr&& _file, Reporter* reporter, - bool checksum, uint64_t log_num) + bool checksum, uint64_t log_num, bool retry_after_eof) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -37,7 +37,8 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), - recycled_(false) {} + recycled_(false), + retry_after_eof_(retry_after_eof) {} Reader::~Reader() { delete[] backing_store_; @@ -122,7 +123,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, // in clean shutdown we don't expect any error in the log files ReportCorruption(drop_size, "truncated header"); } - FALLTHROUGH_INTENDED; + FALLTHROUGH_INTENDED; case kEof: if (in_fragmented_record) { @@ -152,7 +153,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } return false; } - FALLTHROUGH_INTENDED; + FALLTHROUGH_INTENDED; case kBadRecord: if (in_fragmented_record) { @@ -208,7 +209,8 @@ void Reader::UnmarkEOF() { eof_ = false; - if (eof_offset_ == 0) { + // If retry_after_eof_ is true, we have to proceed to read anyway. + if (!retry_after_eof_ && eof_offset_ == 0) { return; } @@ -289,8 +291,12 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { } else if (buffer_.size() < static_cast(kBlockSize)) { eof_ = true; eof_offset_ = buffer_.size(); + TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF"); } return true; + } else if (retry_after_eof_ && !read_error_) { + UnmarkEOF(); + return !read_error_; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the @@ -345,16 +351,24 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } if (header_size + length > buffer_.size()) { - *drop_size = buffer_.size(); - buffer_.clear(); - if (!eof_) { - return kBadRecordLen; - } - // If the end of the file has been reached without reading |length| bytes - // of payload, assume the writer died in the middle of writing the record. - // Don't report a corruption unless requested. - if (*drop_size) { - return kBadHeader; + if (!retry_after_eof_) { + *drop_size = buffer_.size(); + buffer_.clear(); + if (!eof_) { + return kBadRecordLen; + } + // If the end of the file has been reached without reading |length| + // bytes of payload, assume the writer died in the middle of writing the + // record. Don't report a corruption unless requested. + if (*drop_size) { + return kBadHeader; + } + } else { + int r; + if (!ReadMore(drop_size, &r)) { + return r; + } + continue; } return kEof; } diff --git a/db/log_reader.h b/db/log_reader.h index 4727be24c..9a08c62f2 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -53,7 +53,7 @@ class Reader { Reader(std::shared_ptr info_log, // @lint-ignore TXT2 T25377293 Grandfathered in unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num); + bool checksum, uint64_t log_num, bool retry_after_eof); ~Reader(); @@ -110,6 +110,11 @@ class Reader { // Whether this is a recycled log file bool recycled_; + // Whether retry after encountering EOF + // TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit, + // etc. + const bool retry_after_eof_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, diff --git a/db/log_test.cc b/db/log_test.cc index e14aa202b..a6f2f4f4a 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -164,7 +164,8 @@ class LogTest : public ::testing::TestWithParam { new StringSource(reader_contents_), "" /* file name */)), writer_(std::move(dest_holder_), 123, GetParam()), reader_(nullptr, std::move(source_holder_), &report_, - true /* checksum */, 123 /* log_number */) { + true /* checksum */, 123 /* log_number */, + false /* retry_after_eof */) { int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; initial_offset_last_record_offsets_[0] = 0; initial_offset_last_record_offsets_[1] = header_size + 10000; @@ -652,6 +653,177 @@ TEST_P(LogTest, Recycle) { INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); +class RetriableLogTest : public ::testing::TestWithParam { + private: + class ReportCollector : public Reader::Reporter { + public: + size_t dropped_bytes_; + std::string message_; + + ReportCollector() : dropped_bytes_(0) {} + virtual void Corruption(size_t bytes, const Status& status) override { + dropped_bytes_ += bytes; + message_.append(status.ToString()); + } + }; + + Slice contents_; + unique_ptr dest_holder_; + unique_ptr log_writer_; + Env* env_; + EnvOptions env_options_; + const std::string test_dir_; + const std::string log_file_; + unique_ptr writer_; + unique_ptr reader_; + ReportCollector report_; + unique_ptr log_reader_; + + public: + RetriableLogTest() + : contents_(), + dest_holder_(nullptr), + log_writer_(nullptr), + env_(Env::Default()), + test_dir_(test::PerThreadDBPath("retriable_log_test")), + log_file_(test_dir_ + "/log"), + writer_(nullptr), + reader_(nullptr), + log_reader_(nullptr) {} + + Status SetupTestEnv() { + dest_holder_.reset(test::GetWritableFileWriter( + new test::StringSink(&contents_), "" /* file name */)); + assert(dest_holder_ != nullptr); + log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam())); + assert(log_writer_ != nullptr); + + Status s; + s = env_->CreateDirIfMissing(test_dir_); + unique_ptr writable_file; + if (s.ok()) { + s = env_->NewWritableFile(log_file_, &writable_file, env_options_); + } + if (s.ok()) { + writer_.reset(new WritableFileWriter(std::move(writable_file), log_file_, + env_options_)); + assert(writer_ != nullptr); + } + unique_ptr seq_file; + if (s.ok()) { + s = env_->NewSequentialFile(log_file_, &seq_file, env_options_); + } + if (s.ok()) { + reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); + assert(reader_ != nullptr); + log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_, + true /* checksum */, 123 /* log_number */, + true /* retry_after_eof */)); + assert(log_reader_ != nullptr); + } + return s; + } + + std::string contents() { + auto file = + dynamic_cast(log_writer_->file()->writable_file()); + assert(file != nullptr); + return file->contents_; + } + + void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); } + + void Write(const Slice& data) { + writer_->Append(data); + writer_->Sync(true); + } + + std::string Read() { + auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; + std::string scratch; + Slice record; + if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) { + return record.ToString(); + } else { + return "Read error"; + } + } +}; + +TEST_P(RetriableLogTest, TailLog_PartialHeader) { + ASSERT_OK(SetupTestEnv()); + std::vector remaining_bytes_in_last_record; + size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"RetriableLogTest::TailLog:AfterPart1", + "RetriableLogTest::TailLog:BeforeReadRecord"}, + {"LogReader::ReadMore:FirstEOF", + "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + size_t delta = header_size - 1; + port::Thread log_writer_thread([&]() { + size_t old_sz = contents().size(); + Encode("foo"); + size_t new_sz = contents().size(); + std::string part1 = contents().substr(old_sz, delta); + std::string part2 = + contents().substr(old_sz + delta, new_sz - old_sz - delta); + Write(Slice(part1)); + TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); + TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); + Write(Slice(part2)); + }); + + std::string record; + port::Thread log_reader_thread([&]() { + TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); + record = Read(); + }); + log_reader_thread.join(); + log_writer_thread.join(); + ASSERT_EQ("foo", record); +} + +TEST_P(RetriableLogTest, TailLog_FullHeader) { + ASSERT_OK(SetupTestEnv()); + std::vector remaining_bytes_in_last_record; + size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"RetriableLogTest::TailLog:AfterPart1", + "RetriableLogTest::TailLog:BeforeReadRecord"}, + {"LogReader::ReadMore:FirstEOF", + "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + size_t delta = header_size + 1; + port::Thread log_writer_thread([&]() { + size_t old_sz = contents().size(); + Encode("foo"); + size_t new_sz = contents().size(); + std::string part1 = contents().substr(old_sz, delta); + std::string part2 = + contents().substr(old_sz + delta, new_sz - old_sz - delta); + Write(Slice(part1)); + TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); + TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); + Write(Slice(part2)); + }); + + std::string record; + port::Thread log_reader_thread([&]() { + TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); + record = Read(); + }); + log_reader_thread.join(); + log_writer_thread.join(); + ASSERT_EQ("foo", record); +} + +INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); + } // namespace log } // namespace rocksdb diff --git a/db/repair.cc b/db/repair.cc index a856de0eb..a4029bd51 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -363,7 +363,8 @@ class Repairer { // propagating bad information (like overly large sequence // numbers). log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, - true /*enable checksum*/, log); + true /*enable checksum*/, log, + false /* retry_after_eof */); // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 9bdae05f8..fba91a979 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -314,7 +314,8 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { assert(file); currentLogReader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, - read_options_.verify_checksums_, logFile->LogNumber())); + read_options_.verify_checksums_, logFile->LogNumber(), + false /* retry_after_eof */)); return Status::OK(); } } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 4c36a075b..25e551f48 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3419,7 +3419,8 @@ Status VersionSet::Recover( VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, - true /* checksum */, 0 /* log_number */); + true /* checksum */, 0 /* log_number */, + false /* retry_after_eof */); Slice record; std::string scratch; std::vector replay_buffer; @@ -3625,7 +3626,8 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */); + true /* checksum */, 0 /* log_number */, + false /* retry_after_eof */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -3785,7 +3787,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */); + true /* checksum */, 0 /* log_number */, + false /* retry_after_eof */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 4612f7f31..cc5ed3105 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, reporter.status = &status; reporter.ignore_error = !db_options_.paranoid_checks; log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, - true /*checksum*/, number); + true /*checksum*/, number, false /* retry_after_eof */); std::string scratch; Slice record; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 4b6f6f4d8..d0840feb2 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1999,7 +1999,8 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values, } DBOptions db_options; log::Reader reader(db_options.info_log, std::move(wal_file_reader), - &reporter, true /* checksum */, log_number); + &reporter, true /* checksum */, log_number, + false /* retry_after_eof */); std::string scratch; WriteBatch batch; Slice record; diff --git a/util/sync_point.cc b/util/sync_point.cc index ce0fa0a97..4599c256d 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -17,9 +17,7 @@ SyncPoint* SyncPoint::GetInstance() { return &sync_point; } -SyncPoint::SyncPoint() : - impl_(new Data) { -} +SyncPoint::SyncPoint() : impl_(new Data) {} SyncPoint:: ~SyncPoint() { delete impl_;