From ae8e0770b47cfc11b6d8ea901b2d5aa8347249cb Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Sun, 20 Oct 2013 19:06:19 -0700 Subject: [PATCH] Disallow transaction log iterator to skip sequences Summary: This is expected to solve the "gaps in transaction log iterator" problem. * After a lot of observations on the gaps on the sigmafio machines I found that it is due to a race between log reader and writer always. * So when we drop the wormhole subscription and refresh the iterator, the gaps are not there. * It is NOT due to some boundary or corner case left unattended in the iterator logic because I checked many instances of the gaps against their log files with ldb. The log files are NOT corrupted also. * The solution is to not allow the iterator to read incompletely written sequences and detect gaps inside itself and invalidate it which will cause the application to refresh the iterator normally and seek to the required sequence properly. * Thus, the iterator can at least guarantee that it will not give any gaps. Test Plan: * db_test based log iterator tests * db_repl_stress * testing on sigmafio setup to see gaps go away Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13593 --- db/transaction_log_impl.cc | 147 +++++++++++++++++++++++++------------ db/transaction_log_impl.h | 17 ++++- 2 files changed, 113 insertions(+), 51 deletions(-) diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index c6dfbd304..29e309d5f 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -23,6 +23,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( started_(false), isValid_(false), currentFileIndex_(0), + currentBatchSeq_(0), + currentBatchCount_(0), lastFlushedSequence_(lastFlushedSequence) { assert(startingSequenceNumber_ <= *lastFlushedSequence_); assert(files_ != nullptr); @@ -33,8 +35,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( } Status TransactionLogIteratorImpl::OpenLogFile( - const LogFile* logFile, - unique_ptr* file) { + const LogFile* logFile, + unique_ptr* file) { Env* env = options_->env; if (logFile->Type() == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); @@ -48,7 +50,7 @@ Status TransactionLogIteratorImpl::OpenLogFile( fname = ArchivedLogFileName(dir_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { - return Status::IOError(" Requested file not present in the dir"); + return Status::IOError("Requested file not present in the dir"); } } return status; @@ -71,54 +73,73 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -void TransactionLogIteratorImpl::SeekToStartSequence() { - std::string scratch; - Slice record; - isValid_ = false; - if (startingSequenceNumber_ > *lastFlushedSequence_) { - currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); - return; - } - if (files_->size() == 0) { - return; - } - Status s = OpenLogReader(files_->at(0).get()); - if (!s.ok()) { - currentStatus_ = s; - return; +bool TransactionLogIteratorImpl::RestrictedRead( + Slice* record, + std::string* scratch) { + // Don't read if no more complete entries to read from logs + if (currentBatchSeq_ >= *lastFlushedSequence_) { + return false; + } + return currentLogReader_->ReadRecord(record, scratch); +} + +void TransactionLogIteratorImpl::SeekToStartSequence( + uint64_t startFileIndex, + bool strict) { + std::string scratch; + Slice record; + started_ = false; + isValid_ = false; + if (startingSequenceNumber_ > *lastFlushedSequence_) { + currentStatus_ = Status::IOError("Looking for a sequence, " + "which is not flushed yet."); + return; + } + if (files_->size() <= startFileIndex) { + return; + } + Status s = OpenLogReader(files_->at(startFileIndex).get()); + if (!s.ok()) { + currentStatus_ = s; + return; + } + while (RestrictedRead(&record, &scratch)) { + if (record.size() < 12) { + reporter_.Corruption( + record.size(), Status::Corruption("very small log record")); + continue; } - while (currentLogReader_->ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } - UpdateCurrentWriteBatch(record); - if (currentBatchSeq_ + currentBatchCount_ - 1 >= - startingSequenceNumber_) { - assert(currentBatchSeq_ <= *lastFlushedSequence_); - isValid_ = true; - started_ = true; // set started_ as we could seek till starting sequence + UpdateCurrentWriteBatch(record); + if (currentBatchSeq_ + currentBatchCount_ - 1 >= + startingSequenceNumber_) { + if (strict && currentBatchSeq_ != startingSequenceNumber_) { + currentStatus_ = Status::Corruption("Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(currentStatus_.ToString().c_str()); return; - } else { - isValid_ = false; + } else if (strict) { + reporter_.Info("Could seek required sequence number. Iterator will " + "continue."); } + isValid_ = true; + started_ = true; // set started_ as we could seek till starting sequence + return; + } else { + isValid_ = false; } - // Could not find start sequence in first file. Normally this must be the - // only file. Otherwise log the error and let the iterator return next entry - if (files_->size() != 1) { - currentStatus_ = Status::Corruption("Start sequence was not found, " - "skipping to the next available"); - reporter_.Corruption(0, currentStatus_); - started_ = true; // Let Next find next available entry - Next(); - } + } + // Could not find start sequence in first file. Normally this must be the + // only file. Otherwise log the error and let the iterator return next entry + if (files_->size() != 1) { + currentStatus_ = Status::Corruption("Start sequence was not found, " + "skipping to the next available"); + reporter_.Corruption(0, currentStatus_); + started_ = true; // Let Next find next available entry + Next(); + } } void TransactionLogIteratorImpl::Next() { - // TODO:Next() says that it requires Valid to be true but this is not true - // assert(Valid()); std::string scratch; Slice record; isValid_ = false; @@ -134,11 +155,10 @@ void TransactionLogIteratorImpl::Next() { while (currentLogReader_->ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( - record.size(), Status::Corruption("log record too small")); + record.size(), Status::Corruption("very small log record")); continue; } else { - UpdateCurrentWriteBatch(record); - return; + return UpdateCurrentWriteBatch(record); } } } @@ -164,11 +184,44 @@ void TransactionLogIteratorImpl::Next() { } } +bool TransactionLogIteratorImpl::IsBatchContinuous( + const WriteBatch* batch, + const SequenceNumber expectedSeq) { + assert(batch); + SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); + if (started_ && batchSeq != expectedSeq) { + char buf[200]; + snprintf(buf, sizeof(buf), + "Discontinuity in log records. Got seq=%lu, Expected seq=%lu, " + "Last flushed seq=%lu. Log iterator will seek the correct batch.", + batchSeq, expectedSeq, *lastFlushedSequence_); + reporter_.Info(buf); + return false; + } + return true; +} + void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatch* batch = new WriteBatch(); WriteBatchInternal::SetContents(batch, record); + + SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_; + if (!IsBatchContinuous(batch, expectedSeq)) { + // Seek to the batch having expected sequence number + if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { + // Expected batch must lie in the previous log file + currentFileIndex_--; + currentFileIndex_ = (currentFileIndex_ >= 0) ? currentFileIndex_ : 0; + } + startingSequenceNumber_ = expectedSeq; + return SeekToStartSequence(currentFileIndex_, true); + } + currentBatchSeq_ = WriteBatchInternal::Sequence(batch); currentBatchCount_ = WriteBatchInternal::Count(batch); + // currentBatchSeq_ can only change here + assert(currentBatchSeq_ <= *lastFlushedSequence_); + currentBatch_.reset(batch); isValid_ = true; currentStatus_ = Status::OK(); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index b17a80a2e..d08d192ac 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -21,6 +21,9 @@ struct LogReporter : public log::Reader::Reporter { virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } + virtual void Info(const char* s) { + Log(info_log, "%s", s); + } }; class LogFileImpl : public LogFile { @@ -81,7 +84,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dir_; const Options* options_; const EnvOptions& soptions_; - const SequenceNumber startingSequenceNumber_; + SequenceNumber startingSequenceNumber_; std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. @@ -91,12 +94,18 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { unique_ptr currentLogReader_; Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; - SequenceNumber const * const lastFlushedSequence_; - SequenceNumber currentBatchSeq_; // sequence number at start of current batch uint64_t currentBatchCount_; // count in current batch + SequenceNumber const * const lastFlushedSequence_; - void SeekToStartSequence(); + // Reads from transaction log only if the writebatch record has been written + bool RestrictedRead(Slice* record, std::string* scratch); + // Seeks to startingSequenceNumber reading from startFileIndex in files_. + // If strict is set,then must get a batch starting with startingSequenceNumber + void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false); + // Check if batch is continuous starting from expectedSeq, else return false + bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq); + // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); };