Features in Transaction log iterator

Summary:
* Logstore requests a valid change of reutrning an empty iterator and not an error in case of no log files.
* Changed the code to return the writebatch containing the sequence number requested from GetupdatesSince even if it lies in the middle. Earlier we used to return the next writebatch,. This also allows me oto guarantee that no files played upon by the iterator are redundant. I mean the starting log file has at least a sequence number >= the sequence number requested form GetupdatesSince.
* Cleaned up redundant logic in Iterator::Next and made a new function SeekToStartSequence for greater readability and maintainibilty.
* Modified a test in db_test accordingly
Please check the logic carefully and suggest improvements. I have a separate patch out for more improvements like restricting reader to read till written sequences.

Test Plan:
* transaction log iterator tests in db_test,
* db_repl_stress.
* rocks_log_iterator_test in fbcode/wormhole/rocksdb/test - 2 tests thriving on hacks till now can get simplified
* testing on the shadow setup for sigma with replication

Reviewers: dhruba, haobo, kailiu, sdong

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13437
main
Mayank Agarwal 12 years ago
parent 86ef6c3f74
commit fe3713961e
  1. 8
      db/db_impl.cc
  2. 5
      db/db_test.cc
  3. 95
      db/transaction_log_impl.cc
  4. 6
      db/transaction_log_impl.h
  5. 4
      include/rocksdb/transaction_log.h

@ -1117,11 +1117,6 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
return s; return s;
} }
if (wal_files->empty()) {
return Status::IOError(" NO WAL Files present in the db");
}
// std::shared_ptr would have been useful here.
s = RetainProbableWalFiles(*wal_files, seq); s = RetainProbableWalFiles(*wal_files, seq);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -1133,8 +1128,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
seq, seq,
std::move(wal_files), std::move(wal_files),
&last_flushed_sequence_)); &last_flushed_sequence_));
iter->get()->Next(); return (*iter)->status();
return iter->get()->status();
} }
Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,

@ -3563,7 +3563,8 @@ TEST(DBTest, TransactionLogIteratorJustEmptyFile) {
DestroyAndReopen(&options); DestroyAndReopen(&options);
unique_ptr<TransactionLogIterator> iter; unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter); Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(!status.ok()); // Check that an empty iterator is returned
ASSERT_TRUE(!iter->Valid());
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
@ -3594,7 +3595,7 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) {
Reopen(&options); Reopen(&options);
Put("key4", DummyString(1024)); Put("key4", DummyString(1024));
auto iter = OpenTransactionLogIter(3); auto iter = OpenTransactionLogIter(3);
ExpectRecords(1, iter); ExpectRecords(2, iter);
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }

@ -20,10 +20,11 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
currentFileIndex_(0), currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) { lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_); assert(startingSequenceNumber_ <= *lastFlushedSequence_);
assert(files_.get() != nullptr); assert(files_ != nullptr);
reporter_.env = options_->env; reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get(); reporter_.info_log = options_->info_log.get();
SeekToStartSequence(); // Seek till starting sequence
} }
Status TransactionLogIteratorImpl::OpenLogFile( Status TransactionLogIteratorImpl::OpenLogFile(
@ -52,7 +53,7 @@ Status TransactionLogIteratorImpl::OpenLogFile(
BatchResult TransactionLogIteratorImpl::GetBatch() { BatchResult TransactionLogIteratorImpl::GetBatch() {
assert(isValid_); // cannot call in a non valid state. assert(isValid_); // cannot call in a non valid state.
BatchResult result; BatchResult result;
result.sequence = currentSequence_; result.sequence = currentBatchSeq_;
result.writeBatchPtr = std::move(currentBatch_); result.writeBatchPtr = std::move(currentBatch_);
return result; return result;
} }
@ -65,24 +66,21 @@ bool TransactionLogIteratorImpl::Valid() {
return started_ && isValid_; return started_ && isValid_;
} }
void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::SeekToStartSequence() {
LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get(); std::string scratch;
Slice record;
// First seek to the given seqNo. in the current file.
std::string scratch;
Slice record;
if (!started_) {
started_ = true; // this piece only runs onced.
isValid_ = false; isValid_ = false;
if (startingSequenceNumber_ > *lastFlushedSequence_) { if (startingSequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, " currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet."); "which is not flushed yet.");
return;
}
if (files_->size() == 0) {
return; return;
} }
Status s = OpenLogReader(currentLogFile); Status s = OpenLogReader(files_->at(0).get());
if (!s.ok()) { if (!s.ok()) {
currentStatus_ = s; currentStatus_ = s;
isValid_ = false;
return; return;
} }
while (currentLogReader_->ReadRecord(&record, &scratch)) { while (currentLogReader_->ReadRecord(&record, &scratch)) {
@ -92,23 +90,39 @@ void TransactionLogIteratorImpl::Next() {
continue; continue;
} }
UpdateCurrentWriteBatch(record); UpdateCurrentWriteBatch(record);
if (currentSequence_ >= startingSequenceNumber_) { if (currentBatchSeq_ + currentBatchCount_ - 1 >=
assert(currentSequence_ <= *lastFlushedSequence_); startingSequenceNumber_) {
assert(currentBatchSeq_ <= *lastFlushedSequence_);
isValid_ = true; isValid_ = true;
break; started_ = true; // set started_ as we could seek till starting sequence
return;
} else { } else {
isValid_ = false; isValid_ = false;
} }
} }
if (isValid_) { // Could not find start sequence in first file. Normally this must be the
// Done for this iteration // only file. Otherwise log the error and let the iterator return next entry
return; if (files_->size() != 1) {
currentStatus_ = Status::Corruption("Start sequence was not found, "
"skipping to the next available");
reporter_.Corruption(0, currentStatus_);
started_ = true; // Let Next find next available entry
Next();
} }
}
void TransactionLogIteratorImpl::Next() {
// TODO:Next() says that it requires Valid to be true but this is not true
// assert(Valid());
std::string scratch;
Slice record;
isValid_ = false;
if (!started_) { // Runs every time until we can seek to the start sequence
return SeekToStartSequence();
} }
bool openNextFile = true; while(true) {
while(openNextFile) {
assert(currentLogReader_); assert(currentLogReader_);
if (currentSequence_ < *lastFlushedSequence_) { if (currentBatchSeq_ < *lastFlushedSequence_) {
if (currentLogReader_->IsEOF()) { if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF(); currentLogReader_->UnmarkEOF();
} }
@ -119,30 +133,28 @@ void TransactionLogIteratorImpl::Next() {
continue; continue;
} else { } else {
UpdateCurrentWriteBatch(record); UpdateCurrentWriteBatch(record);
openNextFile = false; return;
break;
} }
} }
} }
if (openNextFile) { // Open the next file
if (currentFileIndex_ < files_.get()->size() - 1) { if (currentFileIndex_ < files_->size() - 1) {
++currentFileIndex_; ++currentFileIndex_;
Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get()); Status status =OpenLogReader(files_->at(currentFileIndex_).get());
if (!status.ok()) { if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
} else {
isValid_ = false; isValid_ = false;
openNextFile = false; currentStatus_ = status;
if (currentSequence_ == *lastFlushedSequence_) { return;
currentStatus_ = Status::OK(); }
} else { } else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); isValid_ = false;
} if (currentBatchSeq_ == *lastFlushedSequence_) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
} }
return;
} }
} }
} }
@ -150,7 +162,8 @@ void TransactionLogIteratorImpl::Next() {
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch(); WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record); WriteBatchInternal::SetContents(batch, record);
currentSequence_ = WriteBatchInternal::Sequence(batch); currentBatchSeq_ = WriteBatchInternal::Sequence(batch);
currentBatchCount_ = WriteBatchInternal::Count(batch);
currentBatch_.reset(batch); currentBatch_.reset(batch);
isValid_ = true; isValid_ = true;
currentStatus_ = Status::OK(); currentStatus_ = Status::OK();

@ -88,9 +88,11 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file); Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
LogReporter reporter_; LogReporter reporter_;
SequenceNumber const * const lastFlushedSequence_; SequenceNumber const * const lastFlushedSequence_;
// represents the sequence number being read currently.
SequenceNumber currentSequence_;
SequenceNumber currentBatchSeq_; // sequence number at start of current batch
uint64_t currentBatchCount_; // count in current batch
void SeekToStartSequence();
void UpdateCurrentWriteBatch(const Slice& record); void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file); Status OpenLogReader(const LogFile* file);
}; };

@ -67,8 +67,8 @@ class TransactionLogIterator {
// REQUIRES: Valid() to be true. // REQUIRES: Valid() to be true.
virtual void Next() = 0; virtual void Next() = 0;
// Return's ok if the iterator is valid. // Returns ok if the iterator is valid.
// Return the Error when something has gone wrong. // Returns the Error when something has gone wrong.
virtual Status status() = 0; virtual Status status() = 0;
// If valid return's the current write_batch and the sequence number of the // If valid return's the current write_batch and the sequence number of the

Loading…
Cancel
Save