From d68880a1b99b92823b80e9bf922242088b6453ba Mon Sep 17 00:00:00 2001 From: Abhishek Kona Date: Mon, 4 Mar 2013 10:44:04 -0800 Subject: [PATCH] Do not allow Transaction Log Iterator to fall ahead when writer is writing the same file Summary: Store the last flushed, seq no. in db_impl. Check against it in transaction Log iterator. Do not attempt to read ahead if we do not know if the data is flushed completely. Does not work if flush is disabled. Any ideas on fixing that? * Minor change, iter->Next is called the first time automatically for * the first time. Test Plan: existing test pass. More ideas on testing this? Planning to run some stress test. Reviewers: dhruba, heyongqiang CC: leveldb Differential Revision: https://reviews.facebook.net/D9087 --- db/db_impl.cc | 19 +++++-- db/db_impl.h | 4 ++ db/db_test.cc | 24 ++++----- db/transaction_log_iterator_impl.cc | 61 +++++++++++++++------- db/transaction_log_iterator_impl.h | 13 +++-- include/leveldb/transaction_log_iterator.h | 13 +++-- tools/db_repl_stress.cc | 17 +++--- 7 files changed, 97 insertions(+), 54 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index e9b890c4b..caa54f911 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -161,7 +161,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_level0_num_files_(0), started_at_(options.env->NowMicros()), flush_on_destroy_(false), - delayed_writes_(0) { + delayed_writes_(0), + last_flushed_sequence_(0) { + mem_->Ref(); env_->GetAbsolutePath(dbname, &db_absolute_path_); @@ -547,12 +549,14 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], edit, &max_sequence, external_table); - // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); } + // This could be the last_flushed_sequence as the next sequences will be + // greater than this. + last_flushed_sequence_ = max_sequence; if (s.ok()) { if (versions_->LastSequence() < max_sequence) { @@ -899,7 +903,12 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, return s; } iter->reset( - new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles)); + new TransactionLogIteratorImpl(dbname_, + &options_, + seq, + probableWALFiles, + &last_flushed_sequence_)); + iter->get()->Next(); return Status::OK(); } @@ -1939,7 +1948,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence + 1); + const SequenceNumber current_sequence = last_sequence + 1; + WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); last_sequence += my_batch_count; // Record statistics @@ -1972,6 +1982,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } mutex_.Lock(); } + last_flushed_sequence_ = current_sequence; if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); diff --git a/db/db_impl.h b/db/db_impl.h index 1e9c3ae99..e5a2936cb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -323,6 +323,10 @@ class DBImpl : public DB { // count of the number of contiguous delaying writes int delayed_writes_; + // store the last flushed sequence. + // Used by transaction log iterator. + SequenceNumber last_flushed_sequence_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_test.cc b/db/db_test.cc index 97243d42c..fcdb4917c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1471,7 +1471,7 @@ TEST(DBTest, CompactionFilter) { delete iter; // The sequence number of the remaining record - // is not zeroed out even though it is at the + // is not zeroed out even though it is at the // level Lmax because this record is at the tip count = 0; iter = dbfull()->TEST_NewInternalIterator(); @@ -2600,17 +2600,14 @@ TEST(DBTest, TransactionLogIterator) { unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(!iter->Valid()); - iter->Next(); + ASSERT_TRUE(iter->Valid()); int i = 0; SequenceNumber lastSequence = 0; while (iter->Valid()) { - WriteBatch batch; - SequenceNumber current; - iter->GetBatch(&batch, ¤t); - ASSERT_TRUE(current > lastSequence); + BatchResult res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); ++i; - lastSequence = current; + lastSequence = res.sequence; ASSERT_TRUE(iter->status().ok()); iter->Next(); } @@ -2626,16 +2623,13 @@ TEST(DBTest, TransactionLogIterator) { unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(!iter->Valid()); - iter->Next(); + ASSERT_TRUE(iter->Valid()); int i = 0; SequenceNumber lastSequence = 0; while (iter->Valid()) { - WriteBatch batch; - SequenceNumber current; - iter->GetBatch(&batch, ¤t); - ASSERT_TRUE(current > lastSequence); - lastSequence = current; + BatchResult res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); + lastSequence = res.sequence; ASSERT_TRUE(iter->status().ok()); iter->Next(); ++i; diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_iterator_impl.cc index 1077835bc..70ec8c9db 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_iterator_impl.cc @@ -4,18 +4,21 @@ namespace leveldb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( - const std::string& dbname, - const Options* options, - SequenceNumber& seq, - std::vector* files) : + const std::string& dbname, + const Options* options, + SequenceNumber& seq, + std::vector* files, + SequenceNumber const * const lastFlushedSequence) : dbname_(dbname), options_(options), sequenceNumber_(seq), files_(files), started_(false), isValid_(true), - currentFileIndex_(0) { + currentFileIndex_(0), + lastFlushedSequence_(lastFlushedSequence) { assert(files_ != nullptr); + assert(lastFlushedSequence_); } LogReporter @@ -29,8 +32,7 @@ TransactionLogIteratorImpl::NewLogReporter(const uint64_t logNumber) { Status TransactionLogIteratorImpl::OpenLogFile( const LogFile& logFile, - unique_ptr* file) -{ + unique_ptr* file) { Env* env = options_->env; if (logFile.type == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); @@ -44,7 +46,6 @@ Status TransactionLogIteratorImpl::OpenLogFile( fname = ArchivedLogFileName(dbname_, logFile.logNumber); status = env->NewSequentialFile(fname, file); if (!status.ok()) { - // TODO stringprintf return Status::IOError(" Requested file not present in the dir"); } } @@ -52,11 +53,12 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } -void TransactionLogIteratorImpl::GetBatch(WriteBatch* batch, - SequenceNumber* seq) { +BatchResult TransactionLogIteratorImpl::GetBatch() { assert(isValid_); // cannot call in a non valid state. - WriteBatchInternal::SetContents(batch, currentRecord_); - *seq = WriteBatchInternal::Sequence(batch); + BatchResult result; + result.sequence = currentSequence_; + result.writeBatchPtr = std::move(currentBatch_); + return result; } Status TransactionLogIteratorImpl::status() { @@ -73,16 +75,21 @@ void TransactionLogIteratorImpl::Next() { LogReporter reporter = NewLogReporter(currentLogFile.logNumber); std::string scratch; Slice record; + if (!started_) { + isValid_ = false; + if (sequenceNumber_ > *lastFlushedSequence_) { + currentStatus_ = Status::IOError("Looking for a sequence, " + "which is not flushed yet."); + return; + } unique_ptr file; Status status = OpenLogFile(currentLogFile, &file); if (!status.ok()) { - isValid_ = false; currentStatus_ = status; return; } assert(file); - WriteBatch batch; unique_ptr reader( new log::Reader(std::move(file), &reporter, true, 0)); assert(reader); @@ -92,11 +99,10 @@ void TransactionLogIteratorImpl::Next() { record.size(), Status::Corruption("log record too small")); continue; } - WriteBatchInternal::SetContents(&batch, record); - SequenceNumber currentNum = WriteBatchInternal::Sequence(&batch); - if (currentNum >= sequenceNumber_) { + UpdateCurrentWriteBatch(record); + if (currentSequence_ >= sequenceNumber_) { + assert(currentSequence_ < *lastFlushedSequence_); isValid_ = true; - currentRecord_ = record; currentLogReader_ = std::move(reader); break; } @@ -112,13 +118,21 @@ void TransactionLogIteratorImpl::Next() { LOOK_NEXT_FILE: assert(currentLogReader_); bool openNextFile = true; - while (currentLogReader_->ReadRecord(&record, &scratch)) { + + if (currentSequence_ == *lastFlushedSequence_) { + // The last update has been read. and next is being called. + isValid_ = false; + currentStatus_ = Status::OK(); + } + + while (currentLogReader_->ReadRecord(&record, &scratch) && + currentSequence_ < *lastFlushedSequence_) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); continue; } else { - currentRecord_ = record; + UpdateCurrentWriteBatch(record); openNextFile = false; break; } @@ -148,4 +162,11 @@ LOOK_NEXT_FILE: } } +void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { + WriteBatch* batch = new WriteBatch(); + WriteBatchInternal::SetContents(batch, record); + currentSequence_ = WriteBatchInternal::Sequence(batch); + currentBatch_.reset(batch); +} + } // namespace leveldb diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_iterator_impl.h index 7b0b7723f..c1e541e5d 100644 --- a/db/transaction_log_iterator_impl.h +++ b/db/transaction_log_iterator_impl.h @@ -28,7 +28,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { TransactionLogIteratorImpl(const std::string& dbname, const Options* options, SequenceNumber& seqNum, - std::vector* files); + std::vector* files, + SequenceNumber const * const lastFlushedSequence); + virtual ~TransactionLogIteratorImpl() { // TODO move to cc file. delete files_; @@ -40,7 +42,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual Status status(); - virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq); + virtual BatchResult GetBatch(); private: const std::string& dbname_; @@ -51,10 +53,15 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { bool isValid_; // not valid when it starts of. Status currentStatus_; size_t currentFileIndex_; - Slice currentRecord_; + std::unique_ptr currentBatch_; unique_ptr currentLogReader_; Status OpenLogFile(const LogFile& logFile, unique_ptr* file); LogReporter NewLogReporter(uint64_t logNumber); + SequenceNumber const * const lastFlushedSequence_; + // represents the sequence number being read currently. + SequenceNumber currentSequence_; + + void UpdateCurrentWriteBatch(const Slice& record); }; diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log_iterator.h index a4bb99aa7..d755d829c 100644 --- a/include/leveldb/transaction_log_iterator.h +++ b/include/leveldb/transaction_log_iterator.h @@ -8,6 +8,11 @@ namespace leveldb { +struct BatchResult { + SequenceNumber sequence; + std::unique_ptr writeBatchPtr; +}; + // A TransactionLogIterator is used to iterate over the Transaction's in a db. class TransactionLogIterator { public: @@ -16,19 +21,21 @@ class TransactionLogIterator { // An iterator is either positioned at a WriteBatch or not valid. // This method returns true if the iterator is valid. + // Can read data from a valid iterator. virtual bool Valid() = 0; // Moves the iterator to the next WriteBatch. // REQUIRES: Valid() to be true. virtual void Next() = 0; - // Return's ok if the iterator is in a valid stated. - // Return the Error Status when the iterator is not Valid. + // Return's ok if the iterator is valid. + // Return the Error when something has gone wrong. virtual Status status() = 0; // If valid return's the current write_batch and the sequence number of the // latest transaction contained in the batch. - virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq) = 0; + // ONLY use if Valid() is true and status() is OK. + virtual BatchResult GetBatch() = 0; }; } // namespace leveldb diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 582e567ee..012a3df01 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -56,24 +56,23 @@ static void ReplicationThreadBody(void* arg) { DB* db = t->db; unique_ptr iter; SequenceNumber currentSeqNum = 0; - while (t->stop.Acquire_Load() != NULL) { + while (t->stop.Acquire_Load() != nullptr) { if (!iter) { db->GetUpdatesSince(currentSeqNum, &iter); fprintf(stdout, "Refreshing iterator\n"); iter->Next(); while(iter->Valid()) { - WriteBatch batch; - SequenceNumber seq; - iter->GetBatch(&batch, &seq); - if (seq != currentSeqNum +1 && seq != currentSeqNum) { + BatchResult res = iter->GetBatch(); + if (res.sequence != currentSeqNum +1 + && res.sequence != currentSeqNum) { fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum, - seq); + res.sequence); exit(1); } - currentSeqNum = seq; - t->latest = seq; + currentSeqNum = res.sequence; + t->latest = res.sequence; iter->Next(); t->no_read++; } @@ -132,7 +131,7 @@ int main(int argc, const char** argv) { while(dataPump.is_running) { continue; } - replThread.stop.Release_Store(NULL); + replThread.stop.Release_Store(nullptr); if ( replThread.no_read < dataPump.no_records ) { // no. read should be => than inserted. fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"