[RocksDB][Bug] Look at all the files, not just the first file in TransactionLogIter as BatchWrites can leave it in Limbo

Summary:
Transaction Log Iterator did not move to the next file in the series if there was a write batch at the end of the currentFile.
The solution is if the last seq no. of the current file is < RequestedSeqNo. Assume the first seqNo. of the next file has to satisfy the request.

Also major refactoring around the code. Moved opening the logreader to a seperate function, got rid of goto.

Test Plan: added a unit test for it.

Reviewers: dhruba, heyongqiang

Reviewed By: heyongqiang

CC: leveldb, emayanke

Differential Revision: https://reviews.facebook.net/D10029
main
Abhishek Kona 12 years ago
parent 9b3134f5ca
commit 574b76f710
  1. 2
      db/db_impl.cc
  2. 16
      db/db_test.cc
  3. 78
      db/transaction_log_iterator_impl.cc
  4. 3
      db/transaction_log_iterator_impl.h

@ -907,7 +907,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
probableWALFiles, probableWALFiles,
&last_flushed_sequence_)); &last_flushed_sequence_));
iter->get()->Next(); iter->get()->Next();
return Status::OK(); return iter->get()->status();
} }
Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs, Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs,

@ -2700,6 +2700,22 @@ TEST(DBTest, TransactionLogIteratorCheckAfterRestart) {
ExpectRecords(2, iter); ExpectRecords(2, iter);
} }
TEST(DBTest, TransactionLogIteratorBatchOperations) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
WriteBatch batch;
batch.Put("key1", DummyString(1024));
batch.Put("key2", DummyString(1024));
batch.Put("key3", DummyString(1024));
batch.Delete("key2");
dbfull()->Write(WriteOptions(), &batch);
dbfull()->Flush(FlushOptions());
Reopen(&options);
Put("key4", DummyString(1024));
auto iter = OpenTransactionLogIter(3);
ExpectRecords(1, iter);
}
TEST(DBTest, ReadCompaction) { TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K std::string value(4096, '4'); // a string of size 4K
{ {

@ -14,10 +14,10 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),
soptions_(soptions), soptions_(soptions),
sequenceNumber_(seq), startingSequenceNumber_(seq),
files_(files), files_(files),
started_(false), started_(false),
isValid_(true), isValid_(false),
currentFileIndex_(0), currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) { lastFlushedSequence_(lastFlushedSequence) {
assert(files_ != nullptr); assert(files_ != nullptr);
@ -73,54 +73,49 @@ bool TransactionLogIteratorImpl::Valid() {
} }
void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::Next() {
// First seek to the given seqNo. in the current file.
LogFile currentLogFile = files_->at(currentFileIndex_); LogFile currentLogFile = files_->at(currentFileIndex_);
LogReporter reporter = NewLogReporter(currentLogFile.logNumber); LogReporter reporter = NewLogReporter(currentLogFile.logNumber);
// First seek to the given seqNo. in the current file.
std::string scratch; std::string scratch;
Slice record; Slice record;
if (!started_) { if (!started_) {
started_ = true; // this piece only runs onced.
isValid_ = false; isValid_ = false;
if (sequenceNumber_ > *lastFlushedSequence_) { if (startingSequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, " currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet."); "which is not flushed yet.");
return; return;
} }
unique_ptr<SequentialFile> file; Status s = OpenLogReader(currentLogFile);
Status status = OpenLogFile(currentLogFile, &file); if (!s.ok()) {
if (!status.ok()) { currentStatus_ = s;
currentStatus_ = status; isValid_ = false;
return; return;
} }
assert(file); while (currentLogReader_->ReadRecord(&record, &scratch)) {
unique_ptr<log::Reader> reader(
new log::Reader(std::move(file), &reporter, true, 0));
assert(reader);
while (reader->ReadRecord(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption( reporter.Corruption(
record.size(), Status::Corruption("log record too small")); record.size(), Status::Corruption("log record too small"));
continue; continue;
} }
UpdateCurrentWriteBatch(record); UpdateCurrentWriteBatch(record);
if (currentSequence_ >= sequenceNumber_) { if (currentSequence_ >= startingSequenceNumber_) {
assert(currentSequence_ <= *lastFlushedSequence_); assert(currentSequence_ <= *lastFlushedSequence_);
isValid_ = true; isValid_ = true;
currentLogReader_ = std::move(reader);
break; break;
} else {
isValid_ = false;
} }
} }
if (!isValid_) { if (isValid_) {
// TODO read the entire first file. and did not find the seq number. // Done for this iteration
// Error out. return;
currentStatus_ =
Status::NotFound("Did not find the Seq no. in first file");
} }
started_ = true; }
} else { bool openNextFile = true;
LOOK_NEXT_FILE: while(openNextFile) {
assert(currentLogReader_); assert(currentLogReader_);
bool openNextFile = true;
if (currentSequence_ < *lastFlushedSequence_) { if (currentSequence_ < *lastFlushedSequence_) {
if (currentLogReader_->IsEOF()) { if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF(); currentLogReader_->UnmarkEOF();
@ -141,28 +136,22 @@ LOOK_NEXT_FILE:
if (openNextFile) { if (openNextFile) {
if (currentFileIndex_ < files_->size() - 1) { if (currentFileIndex_ < files_->size() - 1) {
++currentFileIndex_; ++currentFileIndex_;
currentLogReader_.reset(); Status status = OpenLogReader(files_->at(currentFileIndex_));
unique_ptr<SequentialFile> file;
Status status = OpenLogFile(files_->at(currentFileIndex_), &file);
if (!status.ok()) { if (!status.ok()) {
isValid_ = false; isValid_ = false;
currentStatus_ = status; currentStatus_ = status;
return; return;
} }
currentLogReader_.reset(
new log::Reader(std::move(file), &reporter, true, 0));
goto LOOK_NEXT_FILE;
} else if (currentSequence_ == *lastFlushedSequence_) {
// The last update has been read. and next is being called.
isValid_ = false;
currentStatus_ = Status::OK();
} else { } else {
// LOOKED AT FILES. WE ARE DONE HERE.
isValid_ = false; isValid_ = false;
currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); openNextFile = false;
if (currentSequence_ == *lastFlushedSequence_) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
}
} }
} }
} }
} }
@ -175,4 +164,17 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
currentStatus_ = Status::OK(); currentStatus_ = Status::OK();
} }
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) {
LogReporter reporter = NewLogReporter(logFile.logNumber);
unique_ptr<SequentialFile> file;
Status status = OpenLogFile(logFile, &file);
if (!status.ok()) {
return status;
}
assert(file);
currentLogReader_.reset(
new log::Reader(std::move(file), &reporter, true, 0)
);
return Status::OK();
}
} // namespace leveldb } // namespace leveldb

@ -50,7 +50,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dbname_; const std::string& dbname_;
const Options* options_; const Options* options_;
const StorageOptions& soptions_; const StorageOptions& soptions_;
const uint64_t sequenceNumber_; const uint64_t startingSequenceNumber_;
const std::vector<LogFile>* files_; const std::vector<LogFile>* files_;
bool started_; bool started_;
bool isValid_; // not valid when it starts of. bool isValid_; // not valid when it starts of.
@ -65,6 +65,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
SequenceNumber currentSequence_; SequenceNumber currentSequence_;
void UpdateCurrentWriteBatch(const Slice& record); void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile& file);
}; };

Loading…
Cancel
Save