diff --git a/db/db_test.cc b/db/db_test.cc index 2a8cee4e7..c50822d73 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4543,16 +4543,43 @@ TEST(DBTest, TransactionLogIterator) { { auto iter = OpenTransactionLogIter(0); ExpectRecords(3, iter); - } - Reopen(&options); - env_->SleepForMicroseconds(2 * 1000 * 1000);{ + assert(!iter->IsObsolete()); + iter->Next(); + assert(!iter->Valid()); + assert(!iter->IsObsolete()); + assert(iter->status().ok()); + + Reopen(&options); + env_->SleepForMicroseconds(2 * 1000 * 1000); Put("key4", DummyString(1024)); Put("key5", DummyString(1024)); Put("key6", DummyString(1024)); + + iter->Next(); + assert(!iter->Valid()); + assert(iter->IsObsolete()); + assert(iter->status().ok()); } { auto iter = OpenTransactionLogIter(0); ExpectRecords(6, iter); + assert(!iter->IsObsolete()); + iter->Next(); + assert(!iter->Valid()); + assert(!iter->IsObsolete()); + assert(iter->status().ok()); + + Put("key7", DummyString(1024)); + iter->Next(); + assert(iter->Valid()); + assert(iter->status().ok()); + + dbfull()->Flush(FlushOptions()); + Put("key8", DummyString(1024)); + iter->Next(); + assert(!iter->Valid()); + assert(iter->IsObsolete()); + assert(iter->status().ok()); } } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 36b8932a5..f039c3426 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -21,6 +21,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( files_(std::move(files)), started_(false), isValid_(false), + is_obsolete_(false), currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), @@ -69,14 +70,15 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -bool TransactionLogIteratorImpl::RestrictedRead( - Slice* record, - std::string* scratch) { - // Don't read if no more complete entries to read from logs - if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { - return false; +bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, + std::string* scratch) { + bool ret = currentLogReader_->ReadRecord(record, scratch); + + if (!reporter_.last_status.ok()) { + currentStatus_ = reporter_.last_status; } - return currentLogReader_->ReadRecord(record, scratch); + + return ret; } void TransactionLogIteratorImpl::SeekToStartSequence( @@ -86,6 +88,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence( Slice record; started_ = false; isValid_ = false; + is_obsolete_ = false; if (files_->size() <= startFileIndex) { return; } @@ -94,6 +97,18 @@ void TransactionLogIteratorImpl::SeekToStartSequence( currentStatus_ = s; return; } + auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); + if (startingSequenceNumber_ > latest_seq_num) { + if (strict) { + currentStatus_ = Status::Corruption("Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(currentStatus_.ToString().c_str()); + } else { + // isValid_ is false; + return; + } + } + while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -123,11 +138,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence( // only file. Otherwise log the error and let the iterator return next entry // If strict is set, we want to seek exactly till the start sequence and it // should have been present in the file we scanned above - if (strict) { + if (strict || files_->size() == 1) { currentStatus_ = Status::Corruption("Gap in sequence number. Could not " "seek to required sequence number"); reporter_.Info(currentStatus_.ToString().c_str()); - } else if (files_->size() != 1) { + } else { currentStatus_ = Status::Corruption("Start sequence was not found, " "skipping to the next available"); reporter_.Info(currentStatus_.ToString().c_str()); @@ -149,11 +164,30 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } - while(true) { + + is_obsolete_ = false; + auto latest_seq_num = dbimpl_->GetLatestSequenceNumber(); + if (currentLastSeq_ >= latest_seq_num) { + isValid_ = false; + return; + } + + bool first = true; + while (currentFileIndex_ < files_->size()) { + if (!first) { + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { + isValid_ = false; + currentStatus_ = status; + return; + } + } + first = false; assert(currentLogReader_); if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } + while (RestrictedRead(&record, &scratch)) { if (record.size() < 12) { reporter_.Corruption( @@ -171,26 +205,14 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { return; } } - // Open the next file - if (currentFileIndex_ < files_->size() - 1) { - ++currentFileIndex_; - Status status =OpenLogReader(files_->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } else { - isValid_ = false; - if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { - currentStatus_ = Status::OK(); - } else { - currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); - } - return; - } + ++currentFileIndex_; } + + // Read all the files but cannot find next record expected. + // TODO(sdong): support to auto fetch new log files from DB and continue. + isValid_ = false; + is_obsolete_ = true; } bool TransactionLogIteratorImpl::IsBatchExpected( diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 6454d89e7..4c85379b6 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -19,7 +19,9 @@ namespace rocksdb { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; + Status last_status; virtual void Corruption(size_t bytes, const Status& s) { + last_status = s; Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); } virtual void Info(const char* s) { @@ -74,6 +76,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual bool Valid(); + virtual bool IsObsolete() override { return is_obsolete_; } + virtual void Next(); virtual Status status(); @@ -89,6 +93,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. + bool is_obsolete_; Status currentStatus_; size_t currentFileIndex_; std::unique_ptr currentBatch_; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 30443bba5..1bb259b1f 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -73,6 +73,12 @@ class TransactionLogIterator { // Can read data from a valid iterator. virtual bool Valid() = 0; + // IsObsolete() returns true if new log files were created. This usually + // means that the user needs to close the current iterator and create a new + // one to get the newest updates. It should happen only when mem tables are + // flushed. + virtual bool IsObsolete() = 0; + // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 27cb6d5ab..e336908ce 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -67,8 +67,21 @@ static void ReplicationThreadBody(void* arg) { } } fprintf(stderr, "Refreshing iterator\n"); - for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { + for (; !iter->IsObsolete(); iter->Next()) { + if (!iter->Valid()) { + if (t->stop.Acquire_Load() == nullptr) { + return; + } + // need to wait for new rows. + continue; + } + BatchResult res = iter->GetBatch(); + if (!iter->status().ok()) { + fprintf(stderr, "Corruption reported when reading seq no. b/w %ld", + static_cast(currentSeqNum)); + exit(1); + } if (res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", @@ -76,6 +89,8 @@ static void ReplicationThreadBody(void* arg) { (long)res.sequence); exit(1); } + t->no_read++; + currentSeqNum++; } } }