diff --git a/db/db_impl.cc b/db/db_impl.cc index e9b890c4b..caa54f911 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -161,7 +161,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_level0_num_files_(0), started_at_(options.env->NowMicros()), flush_on_destroy_(false), - delayed_writes_(0) { + delayed_writes_(0), + last_flushed_sequence_(0) { + mem_->Ref(); env_->GetAbsolutePath(dbname, &db_absolute_path_); @@ -547,12 +549,14 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], edit, &max_sequence, external_table); - // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); } + // This could be the last_flushed_sequence as the next sequences will be + // greater than this. + last_flushed_sequence_ = max_sequence; if (s.ok()) { if (versions_->LastSequence() < max_sequence) { @@ -899,7 +903,12 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, return s; } iter->reset( - new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles)); + new TransactionLogIteratorImpl(dbname_, + &options_, + seq, + probableWALFiles, + &last_flushed_sequence_)); + iter->get()->Next(); return Status::OK(); } @@ -1939,7 +1948,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence + 1); + const SequenceNumber current_sequence = last_sequence + 1; + WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); last_sequence += my_batch_count; // Record statistics @@ -1972,6 +1982,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } mutex_.Lock(); } + last_flushed_sequence_ = current_sequence; if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); diff --git a/db/db_impl.h b/db/db_impl.h index 1e9c3ae99..e5a2936cb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -323,6 +323,10 @@ class DBImpl : public DB { // count of the number of contiguous delaying writes int delayed_writes_; + // store the last flushed sequence. + // Used by transaction log iterator. + SequenceNumber last_flushed_sequence_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_test.cc b/db/db_test.cc index 97243d42c..fcdb4917c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1471,7 +1471,7 @@ TEST(DBTest, CompactionFilter) { delete iter; // The sequence number of the remaining record - // is not zeroed out even though it is at the + // is not zeroed out even though it is at the // level Lmax because this record is at the tip count = 0; iter = dbfull()->TEST_NewInternalIterator(); @@ -2600,17 +2600,14 @@ TEST(DBTest, TransactionLogIterator) { unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(!iter->Valid()); - iter->Next(); + ASSERT_TRUE(iter->Valid()); int i = 0; SequenceNumber lastSequence = 0; while (iter->Valid()) { - WriteBatch batch; - SequenceNumber current; - iter->GetBatch(&batch, ¤t); - ASSERT_TRUE(current > lastSequence); + BatchResult res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); ++i; - lastSequence = current; + lastSequence = res.sequence; ASSERT_TRUE(iter->status().ok()); iter->Next(); } @@ -2626,16 +2623,13 @@ TEST(DBTest, TransactionLogIterator) { unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(!iter->Valid()); - iter->Next(); + ASSERT_TRUE(iter->Valid()); int i = 0; SequenceNumber lastSequence = 0; while (iter->Valid()) { - WriteBatch batch; - SequenceNumber current; - iter->GetBatch(&batch, ¤t); - ASSERT_TRUE(current > lastSequence); - lastSequence = current; + BatchResult res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); + lastSequence = res.sequence; ASSERT_TRUE(iter->status().ok()); iter->Next(); ++i; diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index 1077835bc..70ec8c9db 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -4,18 +4,21 @@ namespace leveldb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( - const std::string& dbname, - const Options* options, - SequenceNumber& seq, - std::vector* files) : + const std::string& dbname, + const Options* options, + SequenceNumber& seq, + std::vector* files, + SequenceNumber const * const lastFlushedSequence) : dbname_(dbname), options_(options), sequenceNumber_(seq), files_(files), started_(false), isValid_(true), - currentFileIndex_(0) { + currentFileIndex_(0), + lastFlushedSequence_(lastFlushedSequence) { assert(files_ != nullptr); + assert(lastFlushedSequence_); } LogReporter @@ -29,8 +32,7 @@ TransactionLogIteratorImpl::NewLogReporter(const uint64_t logNumber) { Status TransactionLogIteratorImpl::OpenLogFile( const LogFile& logFile, - unique_ptr* file) -{ + unique_ptr* file) { Env* env = options_->env; if (logFile.type == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); @@ -44,7 +46,6 @@ Status TransactionLogIteratorImpl::OpenLogFile( fname = ArchivedLogFileName(dbname_, logFile.logNumber); status = env->NewSequentialFile(fname, file); if (!status.ok()) { - // TODO stringprintf return Status::IOError(" Requested file not present in the dir"); } } @@ -52,11 +53,12 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } -void TransactionLogIteratorImpl::GetBatch(WriteBatch* batch, - SequenceNumber* seq) { +BatchResult TransactionLogIteratorImpl::GetBatch() { assert(isValid_); // cannot call in a non valid state. - WriteBatchInternal::SetContents(batch, currentRecord_); - *seq = WriteBatchInternal::Sequence(batch); + BatchResult result; + result.sequence = currentSequence_; + result.writeBatchPtr = std::move(currentBatch_); + return result; } Status TransactionLogIteratorImpl::status() { @@ -73,16 +75,21 @@ void TransactionLogIteratorImpl::Next() { LogReporter reporter = NewLogReporter(currentLogFile.logNumber); std::string scratch; Slice record; + if (!started_) { + isValid_ = false; + if (sequenceNumber_ > *lastFlushedSequence_) { + currentStatus_ = Status::IOError("Looking for a sequence, " + "which is not flushed yet."); + return; + } unique_ptr file; Status status = OpenLogFile(currentLogFile, &file); if (!status.ok()) { - isValid_ = false; currentStatus_ = status; return; } assert(file); - WriteBatch batch; unique_ptr reader( new log::Reader(std::move(file), &reporter, true, 0)); assert(reader); @@ -92,11 +99,10 @@ void TransactionLogIteratorImpl::Next() { record.size(), Status::Corruption("log record too small")); continue; } - WriteBatchInternal::SetContents(&batch, record); - SequenceNumber currentNum = WriteBatchInternal::Sequence(&batch); - if (currentNum >= sequenceNumber_) { + UpdateCurrentWriteBatch(record); + if (currentSequence_ >= sequenceNumber_) { + assert(currentSequence_ < *lastFlushedSequence_); isValid_ = true; - currentRecord_ = record; currentLogReader_ = std::move(reader); break; } @@ -112,13 +118,21 @@ void TransactionLogIteratorImpl::Next() { LOOK_NEXT_FILE: assert(currentLogReader_); bool openNextFile = true; - while (currentLogReader_->ReadRecord(&record, &scratch)) { + + if (currentSequence_ == *lastFlushedSequence_) { + // The last update has been read. and next is being called. + isValid_ = false; + currentStatus_ = Status::OK(); + } + + while (currentLogReader_->ReadRecord(&record, &scratch) && + currentSequence_ < *lastFlushedSequence_) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); continue; } else { - currentRecord_ = record; + UpdateCurrentWriteBatch(record); openNextFile = false; break; } @@ -148,4 +162,11 @@ LOOK_NEXT_FILE: } } +void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { + WriteBatch* batch = new WriteBatch(); + WriteBatchInternal::SetContents(batch, record); + currentSequence_ = WriteBatchInternal::Sequence(batch); + currentBatch_.reset(batch); +} + } // namespace leveldb diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h index 7b0b7723f..c1e541e5d 100644 --- a/db/transaction_log_iterator_impl.h +++ b/db/transaction_log_iterator_impl.h @@ -28,7 +28,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { TransactionLogIteratorImpl(const std::string& dbname, const Options* options, SequenceNumber& seqNum, - std::vector* files); + std::vector* files, + SequenceNumber const * const lastFlushedSequence); + virtual ~TransactionLogIteratorImpl() { // TODO move to cc file. delete files_; @@ -40,7 +42,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual Status status(); - virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq); + virtual BatchResult GetBatch(); private: const std::string& dbname_; @@ -51,10 +53,15 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { bool isValid_; // not valid when it starts of. Status currentStatus_; size_t currentFileIndex_; - Slice currentRecord_; + std::unique_ptr currentBatch_; unique_ptr currentLogReader_; Status OpenLogFile(const LogFile& logFile, unique_ptr* file); LogReporter NewLogReporter(uint64_t logNumber); + SequenceNumber const * const lastFlushedSequence_; + // represents the sequence number being read currently. + SequenceNumber currentSequence_; + + void UpdateCurrentWriteBatch(const Slice& record); }; diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log_iterator.h index a4bb99aa7..d755d829c 100644 --- a/include/leveldb/transaction_log_iterator.h +++ b/include/leveldb/transaction_log_iterator.h @@ -8,6 +8,11 @@ namespace leveldb { +struct BatchResult { + SequenceNumber sequence; + std::unique_ptr writeBatchPtr; +}; + // A TransactionLogIterator is used to iterate over the Transaction's in a db. class TransactionLogIterator { public: @@ -16,19 +21,21 @@ class TransactionLogIterator { // An iterator is either positioned at a WriteBatch or not valid. // This method returns true if the iterator is valid. + // Can read data from a valid iterator. virtual bool Valid() = 0; // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; - // Return's ok if the iterator is in a valid stated. - // Return the Error Status when the iterator is not Valid. + // Return's ok if the iterator is valid. + // Return the Error when something has gone wrong. virtual Status status() = 0; // If valid return's the current write_batch and the sequence number of the // latest transaction contained in the batch. - virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq) = 0; + // ONLY use if Valid() is true and status() is OK. + virtual BatchResult GetBatch() = 0; }; } // namespace leveldb diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 582e567ee..012a3df01 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -56,24 +56,23 @@ static void ReplicationThreadBody(void* arg) { DB* db = t->db; unique_ptr iter; SequenceNumber currentSeqNum = 0; - while (t->stop.Acquire_Load() != NULL) { + while (t->stop.Acquire_Load() != nullptr) { if (!iter) { db->GetUpdatesSince(currentSeqNum, &iter); fprintf(stdout, "Refreshing iterator\n"); iter->Next(); while(iter->Valid()) { - WriteBatch batch; - SequenceNumber seq; - iter->GetBatch(&batch, &seq); - if (seq != currentSeqNum +1 && seq != currentSeqNum) { + BatchResult res = iter->GetBatch(); + if (res.sequence != currentSeqNum +1 + && res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum, - seq); + res.sequence); exit(1); } - currentSeqNum = seq; - t->latest = seq; + currentSeqNum = res.sequence; + t->latest = res.sequence; iter->Next(); t->no_read++; } @@ -132,7 +131,7 @@ int main(int argc, const char** argv) { while(dataPump.is_running) { continue; } - replThread.stop.Release_Store(NULL); + replThread.stop.Release_Store(nullptr); if ( replThread.no_read < dataPump.no_records ) { // no. read should be => than inserted. fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"