TransactionLogIterator sequence gap fix

Summary: DBTestXactLogIterator.TransactionLogIterator was failing due the sequence gaps. This was caused by an off-by-one error when calculating the new sequence number after recovering from logs.

Test Plan: db_log_iter_test

Reviewers: andrewkr

Subscribers: andrewkr, hermanlee4, dhruba, IslamAbdelRahman

Differential Revision: https://reviews.facebook.net/D58053
main
Reid Horuff 9 years ago
parent fa3536d202
commit a400336398
  1. 27
      db/db_impl.cc

@ -1181,7 +1181,7 @@ Status DBImpl::Recover(
s = CheckConsistency(); s = CheckConsistency();
} }
if (s.ok()) { if (s.ok()) {
SequenceNumber max_sequence(kMaxSequenceNumber); SequenceNumber next_sequence(kMaxSequenceNumber);
default_cf_handle_ = new ColumnFamilyHandleImpl( default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
@ -1241,7 +1241,7 @@ Status DBImpl::Recover(
if (!logs.empty()) { if (!logs.empty()) {
// Recover in the order in which the logs were generated // Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end()); std::sort(logs.begin(), logs.end());
s = RecoverLogFiles(logs, &max_sequence, read_only); s = RecoverLogFiles(logs, &next_sequence, read_only);
if (!s.ok()) { if (!s.ok()) {
// Clear memtables if recovery failed // Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -1266,7 +1266,7 @@ Status DBImpl::Recover(
// REQUIRES: log_numbers are sorted in ascending order // REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers, Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only) { SequenceNumber* next_sequence, bool read_only) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
Logger* info_log; Logger* info_log;
@ -1469,10 +1469,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
if (*max_sequence == kMaxSequenceNumber) { if (*next_sequence == kMaxSequenceNumber) {
*max_sequence = WriteBatchInternal::Sequence(&batch); *next_sequence = WriteBatchInternal::Sequence(&batch);
} }
WriteBatchInternal::SetSequence(&batch, *max_sequence); WriteBatchInternal::SetSequence(&batch, *next_sequence);
// If column family was not found, it might mean that the WAL write // If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the // batch references to the column family that was dropped after the
@ -1481,7 +1481,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// That's why we set ignore missing column families to true // That's why we set ignore missing column families to true
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true, &batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, true, false, max_sequence); log_number, this, true, false, next_sequence);
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
// We are treating this as a failure while reading since we read valid // We are treating this as a failure while reading since we read valid
@ -1511,7 +1511,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*max_sequence); *next_sequence);
} }
} }
} }
@ -1529,7 +1529,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64,
log_number, *max_sequence); log_number, *next_sequence);
} else { } else {
assert(db_options_.wal_recovery_mode == assert(db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords WALRecoveryMode::kTolerateCorruptedTailRecords
@ -1540,9 +1540,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
flush_scheduler_.Clear(); flush_scheduler_.Clear();
if ((*max_sequence != kMaxSequenceNumber) && auto last_sequence = *next_sequence - 1;
(versions_->LastSequence() < *max_sequence)) { if ((*next_sequence != kMaxSequenceNumber) &&
versions_->SetLastSequence(*max_sequence); (versions_->LastSequence() <= last_sequence)) {
versions_->SetLastSequence(last_sequence);
} }
} }
@ -1574,7 +1575,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*max_sequence); *next_sequence);
} }
// write MANIFEST with update // write MANIFEST with update

Loading…
Cancel
Save