diff --git a/db/db_test.cc b/db/db_test.cc index e092384af..143a1fc5c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -517,6 +517,26 @@ class DBTest { } return result; } + + Options OptionsForLogIterTest() { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.WAL_ttl_seconds = 1000; + return options; + } + + std::unique_ptr OpenTransactionLogIter( + const SequenceNumber seq) { + unique_ptr iter; + Status status = dbfull()->GetUpdatesSince(seq, &iter); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(iter->Valid()); + return std::move(iter); + } + + std::string DummyString(size_t len, char c = 'a') { + return std::string(len, c); + } }; TEST(DBTest, Empty) { @@ -2591,88 +2611,74 @@ TEST(DBTest, WALArchival) { } +void ExpectRecords( + const int expected_no_records, + std::unique_ptr& iter) { + int i = 0; + SequenceNumber lastSequence = 0; + while (iter->Valid()) { + BatchResult res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); + ++i; + lastSequence = res.sequence; + ASSERT_TRUE(iter->status().ok()); + iter->Next(); + } + ASSERT_EQ(i, expected_no_records); +} + TEST(DBTest, TransactionLogIterator) { - std::string value(1024, '1'); - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 1000; + Options options = OptionsForLogIterTest(); DestroyAndReopen(&options); - Put("key1", value); - Put("key2", value); - Put("key2", value); + Put("key1", DummyString(1024)); + Put("key2", DummyString(1024)); + Put("key2", DummyString(1024)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U); { - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); - int i = 0; - SequenceNumber lastSequence = 0; - while (iter->Valid()) { - BatchResult res = iter->GetBatch(); - ASSERT_TRUE(res.sequence > lastSequence); - ++i; - lastSequence = res.sequence; - ASSERT_TRUE(iter->status().ok()); - iter->Next(); - } - ASSERT_EQ(i, 3); + auto iter = OpenTransactionLogIter(0); + ExpectRecords(3, iter); } Reopen(&options); { - Put("key4", value); - Put("key5", value); - Put("key6", value); + Put("key4", DummyString(1024)); + Put("key5", DummyString(1024)); + Put("key6", DummyString(1024)); } { - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(iter->Valid()); - int i = 0; - SequenceNumber lastSequence = 0; - while (iter->Valid()) { - BatchResult res = iter->GetBatch(); - ASSERT_TRUE(res.sequence > lastSequence); - lastSequence = res.sequence; - ASSERT_TRUE(iter->status().ok()); - iter->Next(); - ++i; - } - ASSERT_EQ(i, 6); + auto iter = OpenTransactionLogIter(0); + ExpectRecords(6, iter); } } TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { - std::string value(1024, '1'); - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 1000; + Options options = OptionsForLogIterTest(); DestroyAndReopen(&options); // Do a plain Reopen. - Put("key1", value); + Put("key1", DummyString(1024)); // Two reopens should create a zero record WAL file. Reopen(&options); Reopen(&options); - Put("key2", value); - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(status.ok()); + Put("key2", DummyString(1024)); + + auto iter = OpenTransactionLogIter(0); + ExpectRecords(2, iter); +} + +TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + auto iter = OpenTransactionLogIter(0); + ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - ASSERT_TRUE(status.ok()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + ASSERT_OK(iter->status()); + Put("key2", DummyString(1024)); + iter->Next(); + ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - int i = 0; - SequenceNumber lastSequence = 0; - while (iter->Valid()) { - BatchResult res = iter->GetBatch(); - ASSERT_TRUE(res.sequence > lastSequence); - lastSequence = res.sequence; - ASSERT_TRUE(iter->status().ok()); - iter->Next(); - ++i; - } - ASSERT_EQ(i, 2); } TEST(DBTest, ReadCompaction) { diff --git a/db/log_reader.h b/db/log_reader.h index 77ed5796d..271286877 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -59,6 +59,17 @@ class Reader { // Undefined before the first call to ReadRecord. uint64_t LastRecordOffset(); + // returns true if the reader has encountered an eof condition. + bool IsEOF() { + return eof_; + } + + // when we know more data has been written to the file. we can use this + // function to force the reader to look again in the file. + void UnmarkEOF() { + eof_ = false; + } + SequentialFile* file() { return file_.get(); } private: diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index 59f3c2461..676e0c907 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -1,6 +1,7 @@ #include "db/transaction_log_iterator_impl.h" #include "db/write_batch_internal.h" #include "db/filename.h" + namespace leveldb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( @@ -103,7 +104,7 @@ void TransactionLogIteratorImpl::Next() { } UpdateCurrentWriteBatch(record); if (currentSequence_ >= sequenceNumber_) { - assert(currentSequence_ < *lastFlushedSequence_); + assert(currentSequence_ <= *lastFlushedSequence_); isValid_ = true; currentLogReader_ = std::move(reader); break; @@ -120,23 +121,20 @@ void TransactionLogIteratorImpl::Next() { LOOK_NEXT_FILE: assert(currentLogReader_); bool openNextFile = true; - - if (currentSequence_ == *lastFlushedSequence_) { - // The last update has been read. and next is being called. - isValid_ = false; - currentStatus_ = Status::OK(); - } - - while (currentLogReader_->ReadRecord(&record, &scratch) && - currentSequence_ < *lastFlushedSequence_) { - if (record.size() < 12) { - reporter.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } else { - UpdateCurrentWriteBatch(record); - openNextFile = false; - break; + if (currentSequence_ < *lastFlushedSequence_) { + if (currentLogReader_->IsEOF()) { + currentLogReader_->UnmarkEOF(); + } + while (currentLogReader_->ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } else { + UpdateCurrentWriteBatch(record); + openNextFile = false; + break; + } } } @@ -154,6 +152,10 @@ LOOK_NEXT_FILE: currentLogReader_.reset( new log::Reader(std::move(file), &reporter, true, 0)); goto LOOK_NEXT_FILE; + } else if (currentSequence_ == *lastFlushedSequence_) { + // The last update has been read. and next is being called. + isValid_ = false; + currentStatus_ = Status::OK(); } else { // LOOKED AT FILES. WE ARE DONE HERE. isValid_ = false; @@ -169,6 +171,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatchInternal::SetContents(batch, record); currentSequence_ = WriteBatchInternal::Sequence(batch); currentBatch_.reset(batch); + isValid_ = true; } } // namespace leveldb