From fe3713961ec573a80e742a334747571549ad4369 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Sun, 13 Oct 2013 15:28:24 -0700 Subject: [PATCH] Features in Transaction log iterator Summary: * Logstore requests a valid change of reutrning an empty iterator and not an error in case of no log files. * Changed the code to return the writebatch containing the sequence number requested from GetupdatesSince even if it lies in the middle. Earlier we used to return the next writebatch,. This also allows me oto guarantee that no files played upon by the iterator are redundant. I mean the starting log file has at least a sequence number >= the sequence number requested form GetupdatesSince. * Cleaned up redundant logic in Iterator::Next and made a new function SeekToStartSequence for greater readability and maintainibilty. * Modified a test in db_test accordingly Please check the logic carefully and suggest improvements. I have a separate patch out for more improvements like restricting reader to read till written sequences. Test Plan: * transaction log iterator tests in db_test, * db_repl_stress. * rocks_log_iterator_test in fbcode/wormhole/rocksdb/test - 2 tests thriving on hacks till now can get simplified * testing on the shadow setup for sigma with replication Reviewers: dhruba, haobo, kailiu, sdong Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13437 --- db/db_impl.cc | 8 +-- db/db_test.cc | 5 +- db/transaction_log_impl.cc | 95 ++++++++++++++++++------------- db/transaction_log_impl.h | 6 +- include/rocksdb/transaction_log.h | 4 +- 5 files changed, 64 insertions(+), 54 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 23c243471..f3e3b4d32 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1117,11 +1117,6 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, return s; } - if (wal_files->empty()) { - return Status::IOError(" NO WAL Files present in the db"); - } - // std::shared_ptr would have been useful here. - s = RetainProbableWalFiles(*wal_files, seq); if (!s.ok()) { return s; @@ -1133,8 +1128,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, seq, std::move(wal_files), &last_flushed_sequence_)); - iter->get()->Next(); - return iter->get()->status(); + return (*iter)->status(); } Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, diff --git a/db/db_test.cc b/db/db_test.cc index b59de240b..340590cdd 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3563,7 +3563,8 @@ TEST(DBTest, TransactionLogIteratorJustEmptyFile) { DestroyAndReopen(&options); unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(!status.ok()); + // Check that an empty iterator is returned + ASSERT_TRUE(!iter->Valid()); } while (ChangeCompactOptions()); } @@ -3594,7 +3595,7 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) { Reopen(&options); Put("key4", DummyString(1024)); auto iter = OpenTransactionLogIter(3); - ExpectRecords(1, iter); + ExpectRecords(2, iter); } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 5ce89b3d4..39ca38507 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -20,10 +20,11 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( currentFileIndex_(0), lastFlushedSequence_(lastFlushedSequence) { assert(startingSequenceNumber_ <= *lastFlushedSequence_); - assert(files_.get() != nullptr); + assert(files_ != nullptr); reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); + SeekToStartSequence(); // Seek till starting sequence } Status TransactionLogIteratorImpl::OpenLogFile( @@ -52,7 +53,7 @@ Status TransactionLogIteratorImpl::OpenLogFile( BatchResult TransactionLogIteratorImpl::GetBatch() { assert(isValid_); // cannot call in a non valid state. BatchResult result; - result.sequence = currentSequence_; + result.sequence = currentBatchSeq_; result.writeBatchPtr = std::move(currentBatch_); return result; } @@ -65,24 +66,21 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -void TransactionLogIteratorImpl::Next() { - LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get(); - -// First seek to the given seqNo. in the current file. - std::string scratch; - Slice record; - if (!started_) { - started_ = true; // this piece only runs onced. +void TransactionLogIteratorImpl::SeekToStartSequence() { + std::string scratch; + Slice record; isValid_ = false; if (startingSequenceNumber_ > *lastFlushedSequence_) { currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); + "which is not flushed yet."); + return; + } + if (files_->size() == 0) { return; } - Status s = OpenLogReader(currentLogFile); + Status s = OpenLogReader(files_->at(0).get()); if (!s.ok()) { currentStatus_ = s; - isValid_ = false; return; } while (currentLogReader_->ReadRecord(&record, &scratch)) { @@ -92,23 +90,39 @@ void TransactionLogIteratorImpl::Next() { continue; } UpdateCurrentWriteBatch(record); - if (currentSequence_ >= startingSequenceNumber_) { - assert(currentSequence_ <= *lastFlushedSequence_); + if (currentBatchSeq_ + currentBatchCount_ - 1 >= + startingSequenceNumber_) { + assert(currentBatchSeq_ <= *lastFlushedSequence_); isValid_ = true; - break; + started_ = true; // set started_ as we could seek till starting sequence + return; } else { isValid_ = false; } } - if (isValid_) { - // Done for this iteration - return; + // Could not find start sequence in first file. Normally this must be the + // only file. Otherwise log the error and let the iterator return next entry + if (files_->size() != 1) { + currentStatus_ = Status::Corruption("Start sequence was not found, " + "skipping to the next available"); + reporter_.Corruption(0, currentStatus_); + started_ = true; // Let Next find next available entry + Next(); } +} + +void TransactionLogIteratorImpl::Next() { + // TODO:Next() says that it requires Valid to be true but this is not true + // assert(Valid()); + std::string scratch; + Slice record; + isValid_ = false; + if (!started_) { // Runs every time until we can seek to the start sequence + return SeekToStartSequence(); } - bool openNextFile = true; - while(openNextFile) { + while(true) { assert(currentLogReader_); - if (currentSequence_ < *lastFlushedSequence_) { + if (currentBatchSeq_ < *lastFlushedSequence_) { if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } @@ -119,30 +133,28 @@ void TransactionLogIteratorImpl::Next() { continue; } else { UpdateCurrentWriteBatch(record); - openNextFile = false; - break; + return; } } } - if (openNextFile) { - if (currentFileIndex_ < files_.get()->size() - 1) { - ++currentFileIndex_; - Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } else { + // Open the next file + if (currentFileIndex_ < files_->size() - 1) { + ++currentFileIndex_; + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { isValid_ = false; - openNextFile = false; - if (currentSequence_ == *lastFlushedSequence_) { - currentStatus_ = Status::OK(); - } else { - currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); - } + currentStatus_ = status; + return; + } + } else { + isValid_ = false; + if (currentBatchSeq_ == *lastFlushedSequence_) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); } + return; } } } @@ -150,7 +162,8 @@ void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatch* batch = new WriteBatch(); WriteBatchInternal::SetContents(batch, record); - currentSequence_ = WriteBatchInternal::Sequence(batch); + currentBatchSeq_ = WriteBatchInternal::Sequence(batch); + currentBatchCount_ = WriteBatchInternal::Count(batch); currentBatch_.reset(batch); isValid_ = true; currentStatus_ = Status::OK(); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 9b4d7c9b5..6269dc30d 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -88,9 +88,11 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; SequenceNumber const * const lastFlushedSequence_; - // represents the sequence number being read currently. - SequenceNumber currentSequence_; + SequenceNumber currentBatchSeq_; // sequence number at start of current batch + uint64_t currentBatchCount_; // count in current batch + + void SeekToStartSequence(); void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); }; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 0a2be79c3..a7553cea8 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -67,8 +67,8 @@ class TransactionLogIterator { // REQUIRES: Valid() to be true. virtual void Next() = 0; - // Return's ok if the iterator is valid. - // Return the Error when something has gone wrong. + // Returns ok if the iterator is valid. + // Returns the Error when something has gone wrong. virtual Status status() = 0; // If valid return's the current write_batch and the sequence number of the