Do not allow Transaction Log Iterator to fall ahead when writer is writing the same file

Summary:
Store the last flushed, seq no. in db_impl. Check against it in
transaction Log iterator. Do not attempt to read ahead if we do not know
if the data is flushed completely.
Does not work if flush is disabled. Any ideas on fixing that?
* Minor change, iter->Next is called the first time automatically for
* the first time.

Test Plan:
existing test pass.
More ideas on testing this?
Planning to run some stress test.

Reviewers: dhruba, heyongqiang

CC: leveldb

Differential Revision: https://reviews.facebook.net/D9087
main
Abhishek Kona 12 years ago
parent afed60938f
commit d68880a1b9
  1. 19
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 22
      db/db_test.cc
  4. 61
      db/transaction_log_iterator_impl.cc
  5. 13
      db/transaction_log_iterator_impl.h
  6. 13
      include/leveldb/transaction_log_iterator.h
  7. 17
      tools/db_repl_stress.cc

@ -161,7 +161,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
stall_level0_num_files_(0), stall_level0_num_files_(0),
started_at_(options.env->NowMicros()), started_at_(options.env->NowMicros()),
flush_on_destroy_(false), flush_on_destroy_(false),
delayed_writes_(0) { delayed_writes_(0),
last_flushed_sequence_(0) {
mem_->Ref(); mem_->Ref();
env_->GetAbsolutePath(dbname, &db_absolute_path_); env_->GetAbsolutePath(dbname, &db_absolute_path_);
@ -547,12 +549,14 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
std::sort(logs.begin(), logs.end()); std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) { for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence, external_table); s = RecoverLogFile(logs[i], edit, &max_sequence, external_table);
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet. // update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]); versions_->MarkFileNumberUsed(logs[i]);
} }
// This could be the last_flushed_sequence as the next sequences will be
// greater than this.
last_flushed_sequence_ = max_sequence;
if (s.ok()) { if (s.ok()) {
if (versions_->LastSequence() < max_sequence) { if (versions_->LastSequence() < max_sequence) {
@ -899,7 +903,12 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
return s; return s;
} }
iter->reset( iter->reset(
new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles)); new TransactionLogIteratorImpl(dbname_,
&options_,
seq,
probableWALFiles,
&last_flushed_sequence_));
iter->get()->Next();
return Status::OK(); return Status::OK();
} }
@ -1939,7 +1948,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1); const SequenceNumber current_sequence = last_sequence + 1;
WriteBatchInternal::SetSequence(updates, current_sequence);
int my_batch_count = WriteBatchInternal::Count(updates); int my_batch_count = WriteBatchInternal::Count(updates);
last_sequence += my_batch_count; last_sequence += my_batch_count;
// Record statistics // Record statistics
@ -1972,6 +1982,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
mutex_.Lock(); mutex_.Lock();
} }
last_flushed_sequence_ = current_sequence;
if (updates == tmp_batch_) tmp_batch_->Clear(); if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);

@ -323,6 +323,10 @@ class DBImpl : public DB {
// count of the number of contiguous delaying writes // count of the number of contiguous delaying writes
int delayed_writes_; int delayed_writes_;
// store the last flushed sequence.
// Used by transaction log iterator.
SequenceNumber last_flushed_sequence_;
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);

@ -2600,17 +2600,14 @@ TEST(DBTest, TransactionLogIterator) {
unique_ptr<TransactionLogIterator> iter; unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter); Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(iter->Valid());
iter->Next();
int i = 0; int i = 0;
SequenceNumber lastSequence = 0; SequenceNumber lastSequence = 0;
while (iter->Valid()) { while (iter->Valid()) {
WriteBatch batch; BatchResult res = iter->GetBatch();
SequenceNumber current; ASSERT_TRUE(res.sequence > lastSequence);
iter->GetBatch(&batch, &current);
ASSERT_TRUE(current > lastSequence);
++i; ++i;
lastSequence = current; lastSequence = res.sequence;
ASSERT_TRUE(iter->status().ok()); ASSERT_TRUE(iter->status().ok());
iter->Next(); iter->Next();
} }
@ -2626,16 +2623,13 @@ TEST(DBTest, TransactionLogIterator) {
unique_ptr<TransactionLogIterator> iter; unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter); Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(iter->Valid());
iter->Next();
int i = 0; int i = 0;
SequenceNumber lastSequence = 0; SequenceNumber lastSequence = 0;
while (iter->Valid()) { while (iter->Valid()) {
WriteBatch batch; BatchResult res = iter->GetBatch();
SequenceNumber current; ASSERT_TRUE(res.sequence > lastSequence);
iter->GetBatch(&batch, &current); lastSequence = res.sequence;
ASSERT_TRUE(current > lastSequence);
lastSequence = current;
ASSERT_TRUE(iter->status().ok()); ASSERT_TRUE(iter->status().ok());
iter->Next(); iter->Next();
++i; ++i;

@ -4,18 +4,21 @@
namespace leveldb { namespace leveldb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl( TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dbname, const std::string& dbname,
const Options* options, const Options* options,
SequenceNumber& seq, SequenceNumber& seq,
std::vector<LogFile>* files) : std::vector<LogFile>* files,
SequenceNumber const * const lastFlushedSequence) :
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),
sequenceNumber_(seq), sequenceNumber_(seq),
files_(files), files_(files),
started_(false), started_(false),
isValid_(true), isValid_(true),
currentFileIndex_(0) { currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) {
assert(files_ != nullptr); assert(files_ != nullptr);
assert(lastFlushedSequence_);
} }
LogReporter LogReporter
@ -29,8 +32,7 @@ TransactionLogIteratorImpl::NewLogReporter(const uint64_t logNumber) {
Status TransactionLogIteratorImpl::OpenLogFile( Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile& logFile, const LogFile& logFile,
unique_ptr<SequentialFile>* file) unique_ptr<SequentialFile>* file) {
{
Env* env = options_->env; Env* env = options_->env;
if (logFile.type == kArchivedLogFile) { if (logFile.type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber);
@ -44,7 +46,6 @@ Status TransactionLogIteratorImpl::OpenLogFile(
fname = ArchivedLogFileName(dbname_, logFile.logNumber); fname = ArchivedLogFileName(dbname_, logFile.logNumber);
status = env->NewSequentialFile(fname, file); status = env->NewSequentialFile(fname, file);
if (!status.ok()) { if (!status.ok()) {
// TODO stringprintf
return Status::IOError(" Requested file not present in the dir"); return Status::IOError(" Requested file not present in the dir");
} }
} }
@ -52,11 +53,12 @@ Status TransactionLogIteratorImpl::OpenLogFile(
} }
} }
void TransactionLogIteratorImpl::GetBatch(WriteBatch* batch, BatchResult TransactionLogIteratorImpl::GetBatch() {
SequenceNumber* seq) {
assert(isValid_); // cannot call in a non valid state. assert(isValid_); // cannot call in a non valid state.
WriteBatchInternal::SetContents(batch, currentRecord_); BatchResult result;
*seq = WriteBatchInternal::Sequence(batch); result.sequence = currentSequence_;
result.writeBatchPtr = std::move(currentBatch_);
return result;
} }
Status TransactionLogIteratorImpl::status() { Status TransactionLogIteratorImpl::status() {
@ -73,16 +75,21 @@ void TransactionLogIteratorImpl::Next() {
LogReporter reporter = NewLogReporter(currentLogFile.logNumber); LogReporter reporter = NewLogReporter(currentLogFile.logNumber);
std::string scratch; std::string scratch;
Slice record; Slice record;
if (!started_) { if (!started_) {
isValid_ = false;
if (sequenceNumber_ > *lastFlushedSequence_) {
currentStatus_ = Status::IOError("Looking for a sequence, "
"which is not flushed yet.");
return;
}
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
Status status = OpenLogFile(currentLogFile, &file); Status status = OpenLogFile(currentLogFile, &file);
if (!status.ok()) { if (!status.ok()) {
isValid_ = false;
currentStatus_ = status; currentStatus_ = status;
return; return;
} }
assert(file); assert(file);
WriteBatch batch;
unique_ptr<log::Reader> reader( unique_ptr<log::Reader> reader(
new log::Reader(std::move(file), &reporter, true, 0)); new log::Reader(std::move(file), &reporter, true, 0));
assert(reader); assert(reader);
@ -92,11 +99,10 @@ void TransactionLogIteratorImpl::Next() {
record.size(), Status::Corruption("log record too small")); record.size(), Status::Corruption("log record too small"));
continue; continue;
} }
WriteBatchInternal::SetContents(&batch, record); UpdateCurrentWriteBatch(record);
SequenceNumber currentNum = WriteBatchInternal::Sequence(&batch); if (currentSequence_ >= sequenceNumber_) {
if (currentNum >= sequenceNumber_) { assert(currentSequence_ < *lastFlushedSequence_);
isValid_ = true; isValid_ = true;
currentRecord_ = record;
currentLogReader_ = std::move(reader); currentLogReader_ = std::move(reader);
break; break;
} }
@ -112,13 +118,21 @@ void TransactionLogIteratorImpl::Next() {
LOOK_NEXT_FILE: LOOK_NEXT_FILE:
assert(currentLogReader_); assert(currentLogReader_);
bool openNextFile = true; bool openNextFile = true;
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (currentSequence_ == *lastFlushedSequence_) {
// The last update has been read. and next is being called.
isValid_ = false;
currentStatus_ = Status::OK();
}
while (currentLogReader_->ReadRecord(&record, &scratch) &&
currentSequence_ < *lastFlushedSequence_) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption( reporter.Corruption(
record.size(), Status::Corruption("log record too small")); record.size(), Status::Corruption("log record too small"));
continue; continue;
} else { } else {
currentRecord_ = record; UpdateCurrentWriteBatch(record);
openNextFile = false; openNextFile = false;
break; break;
} }
@ -148,4 +162,11 @@ LOOK_NEXT_FILE:
} }
} }
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
WriteBatch* batch = new WriteBatch();
WriteBatchInternal::SetContents(batch, record);
currentSequence_ = WriteBatchInternal::Sequence(batch);
currentBatch_.reset(batch);
}
} // namespace leveldb } // namespace leveldb

@ -28,7 +28,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
TransactionLogIteratorImpl(const std::string& dbname, TransactionLogIteratorImpl(const std::string& dbname,
const Options* options, const Options* options,
SequenceNumber& seqNum, SequenceNumber& seqNum,
std::vector<LogFile>* files); std::vector<LogFile>* files,
SequenceNumber const * const lastFlushedSequence);
virtual ~TransactionLogIteratorImpl() { virtual ~TransactionLogIteratorImpl() {
// TODO move to cc file. // TODO move to cc file.
delete files_; delete files_;
@ -40,7 +42,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
virtual Status status(); virtual Status status();
virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq); virtual BatchResult GetBatch();
private: private:
const std::string& dbname_; const std::string& dbname_;
@ -51,10 +53,15 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
bool isValid_; // not valid when it starts of. bool isValid_; // not valid when it starts of.
Status currentStatus_; Status currentStatus_;
size_t currentFileIndex_; size_t currentFileIndex_;
Slice currentRecord_; std::unique_ptr<WriteBatch> currentBatch_;
unique_ptr<log::Reader> currentLogReader_; unique_ptr<log::Reader> currentLogReader_;
Status OpenLogFile(const LogFile& logFile, unique_ptr<SequentialFile>* file); Status OpenLogFile(const LogFile& logFile, unique_ptr<SequentialFile>* file);
LogReporter NewLogReporter(uint64_t logNumber); LogReporter NewLogReporter(uint64_t logNumber);
SequenceNumber const * const lastFlushedSequence_;
// represents the sequence number being read currently.
SequenceNumber currentSequence_;
void UpdateCurrentWriteBatch(const Slice& record);
}; };

@ -8,6 +8,11 @@
namespace leveldb { namespace leveldb {
struct BatchResult {
SequenceNumber sequence;
std::unique_ptr<WriteBatch> writeBatchPtr;
};
// A TransactionLogIterator is used to iterate over the Transaction's in a db. // A TransactionLogIterator is used to iterate over the Transaction's in a db.
class TransactionLogIterator { class TransactionLogIterator {
public: public:
@ -16,19 +21,21 @@ class TransactionLogIterator {
// An iterator is either positioned at a WriteBatch or not valid. // An iterator is either positioned at a WriteBatch or not valid.
// This method returns true if the iterator is valid. // This method returns true if the iterator is valid.
// Can read data from a valid iterator.
virtual bool Valid() = 0; virtual bool Valid() = 0;
// Moves the iterator to the next WriteBatch. // Moves the iterator to the next WriteBatch.
// REQUIRES: Valid() to be true. // REQUIRES: Valid() to be true.
virtual void Next() = 0; virtual void Next() = 0;
// Return's ok if the iterator is in a valid stated. // Return's ok if the iterator is valid.
// Return the Error Status when the iterator is not Valid. // Return the Error when something has gone wrong.
virtual Status status() = 0; virtual Status status() = 0;
// If valid return's the current write_batch and the sequence number of the // If valid return's the current write_batch and the sequence number of the
// latest transaction contained in the batch. // latest transaction contained in the batch.
virtual void GetBatch(WriteBatch* batch, SequenceNumber* seq) = 0; // ONLY use if Valid() is true and status() is OK.
virtual BatchResult GetBatch() = 0;
}; };
} // namespace leveldb } // namespace leveldb

@ -56,24 +56,23 @@ static void ReplicationThreadBody(void* arg) {
DB* db = t->db; DB* db = t->db;
unique_ptr<TransactionLogIterator> iter; unique_ptr<TransactionLogIterator> iter;
SequenceNumber currentSeqNum = 0; SequenceNumber currentSeqNum = 0;
while (t->stop.Acquire_Load() != NULL) { while (t->stop.Acquire_Load() != nullptr) {
if (!iter) { if (!iter) {
db->GetUpdatesSince(currentSeqNum, &iter); db->GetUpdatesSince(currentSeqNum, &iter);
fprintf(stdout, "Refreshing iterator\n"); fprintf(stdout, "Refreshing iterator\n");
iter->Next(); iter->Next();
while(iter->Valid()) { while(iter->Valid()) {
WriteBatch batch; BatchResult res = iter->GetBatch();
SequenceNumber seq; if (res.sequence != currentSeqNum +1
iter->GetBatch(&batch, &seq); && res.sequence != currentSeqNum) {
if (seq != currentSeqNum +1 && seq != currentSeqNum) {
fprintf(stderr, fprintf(stderr,
"Missed a seq no. b/w %ld and %ld\n", "Missed a seq no. b/w %ld and %ld\n",
currentSeqNum, currentSeqNum,
seq); res.sequence);
exit(1); exit(1);
} }
currentSeqNum = seq; currentSeqNum = res.sequence;
t->latest = seq; t->latest = res.sequence;
iter->Next(); iter->Next();
t->no_read++; t->no_read++;
} }
@ -132,7 +131,7 @@ int main(int argc, const char** argv) {
while(dataPump.is_running) { while(dataPump.is_running) {
continue; continue;
} }
replThread.stop.Release_Store(NULL); replThread.stop.Release_Store(nullptr);
if ( replThread.no_read < dataPump.no_records ) { if ( replThread.no_read < dataPump.no_records ) {
// no. read should be => than inserted. // no. read should be => than inserted.
fprintf(stderr, "No. of Record's written and read not same\nRead : %ld" fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"

Loading…
Cancel
Save