Making the transaction log iterator more robust

Summary:
strict essentially means that we MUST find the startsequence. Thus we should return if starteSequence is not found in the first file in case strict is set. This will take care of ending the iterator in case of permanent gaps due to corruptions in the log files
Also created NextImpl function that will have internal variable to distinguish whether Next is being called from StartSequence or by application.
Set NotFoudn::gaps status to give an indication of gaps happeneing.
Polished the inline documentation at various places

Test Plan:
* db_repl_stress test
* db_test relating to transaction log iterator
* fbcode/wormhole/rocksdb/rocks_log_iterator
* sigma production machine sigmafio032.prn1

Reviewers: dhruba

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13689
main
Mayank Agarwal 11 years ago
parent b4ad5e89ae
commit f837f5b1c9
  1. 11
      db/db_impl.cc
  2. 6
      db/db_impl.h
  3. 56
      db/db_test.cc
  4. 92
      db/transaction_log_impl.cc
  5. 16
      db/transaction_log_impl.h
  6. 22
      include/rocksdb/db.h
  7. 6
      include/rocksdb/transaction_log.h
  8. 2
      include/utilities/stackable_db.h
  9. 2
      utilities/ttl/db_ttl.cc
  10. 2
      utilities/ttl/db_ttl.h

@ -274,7 +274,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
flush_on_destroy_(false),
stats_(options.num_levels),
delayed_writes_(0),
last_flushed_sequence_(0),
storage_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false) {
@ -743,9 +742,6 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
last_flushed_sequence_ = max_sequence;
} else {
last_flushed_sequence_ = versions_->LastSequence();
}
SetTickerCount(options_.statistics, SEQUENCE_NUMBER,
versions_->LastSequence());
@ -1180,7 +1176,7 @@ Status DBImpl::Flush(const FlushOptions& options) {
return status;
}
SequenceNumber DBImpl::GetLatestSequenceNumber() {
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}
@ -1188,7 +1184,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
unique_ptr<TransactionLogIterator>* iter) {
RecordTick(options_.statistics, GET_UPDATES_SINCE_CALLS);
if (seq > last_flushed_sequence_) {
if (seq > versions_->LastSequence()) {
return Status::IOError("Requested sequence not yet written in the db");
}
// Get all sorted Wal Files.
@ -1210,7 +1206,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
storage_options_,
seq,
std::move(wal_files),
&last_flushed_sequence_));
this));
return (*iter)->status();
}
@ -2682,7 +2678,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
mutex_.Lock();
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
last_flushed_sequence_ = current_sequence;
}
}
if (updates == &tmp_batch_) tmp_batch_.Clear();

@ -73,7 +73,7 @@ class DBImpl : public DB {
uint64_t* manifest_file_size,
bool flush_memtable = true);
virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber();
virtual SequenceNumber GetLatestSequenceNumber() const;
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter);
virtual Status DeleteFile(std::string name);
@ -402,10 +402,6 @@ class DBImpl : public DB {
// count of the number of contiguous delaying writes
int delayed_writes_;
// store the last flushed sequence.
// Used by transaction log iterator.
SequenceNumber last_flushed_sequence_;
// The options to access storage files
const EnvOptions storage_options_;

@ -660,7 +660,7 @@ class DBTest {
}
std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
const SequenceNumber seq) {
const SequenceNumber seq) {
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(seq, &iter);
ASSERT_OK(status);
@ -3876,20 +3876,29 @@ TEST(DBTest, WALClear) {
} while (ChangeCompactOptions());
}
void ExpectRecords(
const int expected_no_records,
std::unique_ptr<TransactionLogIterator>& iter) {
int i = 0;
SequenceNumber ReadRecords(
std::unique_ptr<TransactionLogIterator>& iter,
int& count) {
count = 0;
SequenceNumber lastSequence = 0;
BatchResult res;
while (iter->Valid()) {
BatchResult res = iter->GetBatch();
res = iter->GetBatch();
ASSERT_TRUE(res.sequence > lastSequence);
++i;
++count;
lastSequence = res.sequence;
ASSERT_OK(iter->status());
iter->Next();
}
ASSERT_EQ(i, expected_no_records);
return res.sequence;
}
void ExpectRecords(
const int expected_no_records,
std::unique_ptr<TransactionLogIterator>& iter) {
int num_records;
ReadRecords(iter, num_records);
ASSERT_EQ(num_records, expected_no_records);
}
TEST(DBTest, TransactionLogIterator) {
@ -3976,6 +3985,35 @@ TEST(DBTest, TransactionLogIteratorCheckAfterRestart) {
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorCorruptedLog) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
for (int i = 0; i < 1024; i++) {
Put("key"+std::to_string(i), DummyString(10));
}
dbfull()->Flush(FlushOptions());
// Corrupt this log to create a gap
rocksdb::VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
const auto logfilePath = dbname_ + "/" + wal_files.front()->PathName();
ASSERT_EQ(
0,
truncate(logfilePath.c_str(), wal_files.front()->SizeFileBytes() / 2));
// Insert a new entry to a new log file
Put("key1025", DummyString(10));
// Try to read from the beginning. Should stop before the gap and read less
// than 1025 entries
auto iter = OpenTransactionLogIter(0);
int count;
int last_sequence_read = ReadRecords(iter, count);
ASSERT_LT(last_sequence_read, 1025);
// Try to read past the gap, should be able to seek to key1025
auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
ExpectRecords(1, iter2);
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorBatchOperations) {
do {
Options options = OptionsForLogIterTest();
@ -4329,7 +4367,7 @@ class ModelDB: public DB {
return Status::OK();
}
virtual SequenceNumber GetLatestSequenceNumber() {
virtual SequenceNumber GetLatestSequenceNumber() const {
return 0;
}
virtual Status GetUpdatesSince(rocksdb::SequenceNumber,

@ -14,7 +14,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const EnvOptions& soptions,
const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence) :
DBImpl const * const dbimpl) :
dir_(dir),
options_(options),
soptions_(soptions),
@ -24,10 +24,10 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
isValid_(false),
currentFileIndex_(0),
currentBatchSeq_(0),
currentBatchCount_(0),
lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_);
currentLastSeq_(0),
dbimpl_(dbimpl) {
assert(files_ != nullptr);
assert(dbimpl_ != nullptr);
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
@ -77,7 +77,7 @@ bool TransactionLogIteratorImpl::RestrictedRead(
Slice* record,
std::string* scratch) {
// Don't read if no more complete entries to read from logs
if (currentBatchSeq_ >= *lastFlushedSequence_) {
if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) {
return false;
}
return currentLogReader_->ReadRecord(record, scratch);
@ -90,11 +90,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
Slice record;
started_ = false;
isValid_ = false;
if (startingSequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet.");
return;
}
if (files_->size() <= startFileIndex) {
return;
}
@ -110,8 +105,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
continue;
}
UpdateCurrentWriteBatch(record);
if (currentBatchSeq_ + currentBatchCount_ - 1 >=
startingSequenceNumber_) {
if (currentLastSeq_ >= startingSequenceNumber_) {
if (strict && currentBatchSeq_ != startingSequenceNumber_) {
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
"seek to required sequence number");
@ -128,38 +122,57 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
isValid_ = false;
}
}
// Could not find start sequence in first file. Normally this must be the
// only file. Otherwise log the error and let the iterator return next entry
if (files_->size() != 1) {
// If strict is set, we want to seek exactly till the start sequence and it
// should have been present in the file we scanned above
if (strict) {
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
"seek to required sequence number");
reporter_.Info(currentStatus_.ToString().c_str());
} else if (files_->size() != 1) {
currentStatus_ = Status::Corruption("Start sequence was not found, "
"skipping to the next available");
reporter_.Corruption(0, currentStatus_);
started_ = true; // Let Next find next available entry
Next();
reporter_.Info(currentStatus_.ToString().c_str());
// Let NextImpl find the next available entry. started_ remains false
// because we don't want to check for gaps while moving to start sequence
NextImpl(true);
}
}
void TransactionLogIteratorImpl::Next() {
return NextImpl(false);
}
void TransactionLogIteratorImpl::NextImpl(bool internal) {
std::string scratch;
Slice record;
isValid_ = false;
if (!started_) { // Runs every time until we can seek to the start sequence
if (!internal && !started_) {
// Runs every time until we can seek to the start sequence
return SeekToStartSequence();
}
while(true) {
assert(currentLogReader_);
if (currentBatchSeq_ < *lastFlushedSequence_) {
if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF();
}
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("very small log record"));
continue;
} else {
return UpdateCurrentWriteBatch(record);
if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF();
}
while (RestrictedRead(&record, &scratch)) {
if (record.size() < 12) {
reporter_.Corruption(
record.size(), Status::Corruption("very small log record"));
continue;
} else {
// started_ should be true if called by application
assert(internal || started_);
// started_ should be false if called internally
assert(!internal || !started_);
UpdateCurrentWriteBatch(record);
if (internal && !started_) {
started_ = true;
}
return;
}
}
@ -174,27 +187,27 @@ void TransactionLogIteratorImpl::Next() {
}
} else {
isValid_ = false;
if (currentBatchSeq_ == *lastFlushedSequence_) {
if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
currentStatus_ = Status::IOError("NO MORE DATA LEFT");
}
return;
}
}
}
bool TransactionLogIteratorImpl::IsBatchContinuous(
bool TransactionLogIteratorImpl::IsBatchExpected(
const WriteBatch* batch,
const SequenceNumber expectedSeq) {
assert(batch);
SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
if (started_ && batchSeq != expectedSeq) {
if (batchSeq != expectedSeq) {
char buf[200];
snprintf(buf, sizeof(buf),
"Discontinuity in log records. Got seq=%lu, Expected seq=%lu, "
"Last flushed seq=%lu. Log iterator will seek the correct batch.",
batchSeq, expectedSeq, *lastFlushedSequence_);
"Last flushed seq=%lu.Log iterator will reseek the correct batch.",
batchSeq, expectedSeq, dbimpl_->GetLatestSequenceNumber());
reporter_.Info(buf);
return false;
}
@ -205,8 +218,9 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record);
SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_;
if (!IsBatchContinuous(batch, expectedSeq)) {
SequenceNumber expectedSeq = currentLastSeq_ + 1;
// If the iterator has started, then confirm that we get continuous batches
if (started_ && !IsBatchExpected(batch, expectedSeq)) {
// Seek to the batch having expected sequence number
if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) {
// Expected batch must lie in the previous log file
@ -214,13 +228,15 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
currentFileIndex_ = (currentFileIndex_ >= 0) ? currentFileIndex_ : 0;
}
startingSequenceNumber_ = expectedSeq;
// currentStatus_ will be set to Ok if reseek succeeds
currentStatus_ = Status::NotFound("Gap in sequence numbers");
return SeekToStartSequence(currentFileIndex_, true);
}
currentBatchSeq_ = WriteBatchInternal::Sequence(batch);
currentBatchCount_ = WriteBatchInternal::Count(batch);
currentLastSeq_ = currentBatchSeq_ + WriteBatchInternal::Count(batch) - 1;
// currentBatchSeq_ can only change here
assert(currentBatchSeq_ <= *lastFlushedSequence_);
assert(currentLastSeq_ <= dbimpl_->GetLatestSequenceNumber());
currentBatch_.reset(batch);
isValid_ = true;

@ -10,6 +10,7 @@
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "db/db_impl.h"
#include "db/log_reader.h"
#include "db/filename.h"
@ -70,7 +71,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const EnvOptions& soptions,
const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence);
DBImpl const * const dbimpl);
virtual bool Valid();
@ -95,16 +96,21 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
LogReporter reporter_;
SequenceNumber currentBatchSeq_; // sequence number at start of current batch
uint64_t currentBatchCount_; // count in current batch
SequenceNumber const * const lastFlushedSequence_;
SequenceNumber currentLastSeq_; // last sequence in the current batch
DBImpl const * const dbimpl_; // The db on whose log files this iterates
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record, std::string* scratch);
// Seeks to startingSequenceNumber reading from startFileIndex in files_.
// If strict is set,then must get a batch starting with startingSequenceNumber
void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false);
// Check if batch is continuous starting from expectedSeq, else return false
bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq);
// Implementation of Next. SeekToStartSequence calls it internally with
// internal=true to let it find next entry even if it has to jump gaps because
// the iterator may start off from the first available entry but promises to
// be continuous after that
void NextImpl(bool internal = false);
// Check if batch is expected, else return false
bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expectedSeq);
// Update current batch if a continuous batch is found, else return false
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);

@ -258,19 +258,15 @@ class DB {
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
// The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() = 0;
// Return's an iterator for all writes since the sequence number
// Status::ok if iterator is valid.
// The iterator internally holds references to the available log files.
// It automatically takes care of closing a file with no-updates left, and
// opening the next one.
// If the sequence number is non existent. it returns an iterator at a seq_no
// just greater than the requested seq_no.
// Must set WAL_ttl_seconds to a large value to use this api.
// else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before
// an update is read.
virtual SequenceNumber GetLatestSequenceNumber() const = 0;
// Sets iter to an iterator that is positioned at a write-batch containing
// seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no
// Returns Status::Ok if iterator is valid
// Must set WAL_ttl_seconds to a large value to use this api, else the WAL
// files will get cleared aggressively and the iterator might keep getting
// invalid before an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) = 0;

@ -54,7 +54,9 @@ struct BatchResult {
std::unique_ptr<WriteBatch> writeBatchPtr;
};
// A TransactionLogIterator is used to iterate over the Transaction's in a db.
// A TransactionLogIterator is used to iterate over the transactions in a db.
// One run of the iterator is continuous, i.e. the iterator will stop at the
// beginning of any gap in sequences
class TransactionLogIterator {
public:
TransactionLogIterator() {}
@ -74,7 +76,7 @@ class TransactionLogIterator {
virtual Status status() = 0;
// If valid return's the current write_batch and the sequence number of the
// latest transaction contained in the batch.
// earliest transaction contained in the batch.
// ONLY use if Valid() is true and status() is OK.
virtual BatchResult GetBatch() = 0;
};

@ -136,7 +136,7 @@ class StackableDB : public DB {
return sdb_->GetLiveFiles(vec, mfs, flush_memtable);
}
virtual SequenceNumber GetLatestSequenceNumber() override {
virtual SequenceNumber GetLatestSequenceNumber() const override {
return sdb_->GetLatestSequenceNumber();
}

@ -272,7 +272,7 @@ Status DBWithTTL::GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
return db_->GetLiveFiles(vec, mfs, flush_memtable);
}
SequenceNumber DBWithTTL::GetLatestSequenceNumber() {
SequenceNumber DBWithTTL::GetLatestSequenceNumber() const {
return db_->GetLatestSequenceNumber();
}

@ -80,7 +80,7 @@ class DBWithTTL : public StackableDB {
virtual Status DeleteFile(std::string name);
virtual SequenceNumber GetLatestSequenceNumber();
virtual SequenceNumber GetLatestSequenceNumber() const;
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter);

Loading…
Cancel
Save