|
|
|
@ -25,13 +25,13 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( |
|
|
|
|
options_(options), |
|
|
|
|
read_options_(read_options), |
|
|
|
|
soptions_(soptions), |
|
|
|
|
startingSequenceNumber_(seq), |
|
|
|
|
starting_sequence_number_(seq), |
|
|
|
|
files_(std::move(files)), |
|
|
|
|
started_(false), |
|
|
|
|
isValid_(false), |
|
|
|
|
currentFileIndex_(0), |
|
|
|
|
currentBatchSeq_(0), |
|
|
|
|
currentLastSeq_(0), |
|
|
|
|
is_valid_(false), |
|
|
|
|
current_file_index_(0), |
|
|
|
|
current_batch_seq_(0), |
|
|
|
|
current_last_seq_(0), |
|
|
|
|
versions_(versions), |
|
|
|
|
seq_per_batch_(seq_per_batch) { |
|
|
|
|
assert(files_ != nullptr); |
|
|
|
@ -43,23 +43,23 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status TransactionLogIteratorImpl::OpenLogFile( |
|
|
|
|
const LogFile* logFile, |
|
|
|
|
const LogFile* log_file, |
|
|
|
|
std::unique_ptr<SequentialFileReader>* file_reader) { |
|
|
|
|
Env* env = options_->env; |
|
|
|
|
std::unique_ptr<SequentialFile> file; |
|
|
|
|
std::string fname; |
|
|
|
|
Status s; |
|
|
|
|
EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_); |
|
|
|
|
if (logFile->Type() == kArchivedLogFile) { |
|
|
|
|
fname = ArchivedLogFileName(dir_, logFile->LogNumber()); |
|
|
|
|
if (log_file->Type() == kArchivedLogFile) { |
|
|
|
|
fname = ArchivedLogFileName(dir_, log_file->LogNumber()); |
|
|
|
|
s = env->NewSequentialFile(fname, &file, optimized_env_options); |
|
|
|
|
} else { |
|
|
|
|
fname = LogFileName(dir_, logFile->LogNumber()); |
|
|
|
|
fname = LogFileName(dir_, log_file->LogNumber()); |
|
|
|
|
s = env->NewSequentialFile(fname, &file, optimized_env_options); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
// If cannot open file in DB directory.
|
|
|
|
|
// Try the archive dir, as it could have moved in the meanwhile.
|
|
|
|
|
fname = ArchivedLogFileName(dir_, logFile->LogNumber()); |
|
|
|
|
fname = ArchivedLogFileName(dir_, log_file->LogNumber()); |
|
|
|
|
s = env->NewSequentialFile(fname, &file, optimized_env_options); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -70,45 +70,41 @@ Status TransactionLogIteratorImpl::OpenLogFile( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
BatchResult TransactionLogIteratorImpl::GetBatch() { |
|
|
|
|
assert(isValid_); // cannot call in a non valid state.
|
|
|
|
|
assert(is_valid_); // cannot call in a non valid state.
|
|
|
|
|
BatchResult result; |
|
|
|
|
result.sequence = currentBatchSeq_; |
|
|
|
|
result.writeBatchPtr = std::move(currentBatch_); |
|
|
|
|
result.sequence = current_batch_seq_; |
|
|
|
|
result.writeBatchPtr = std::move(current_batch_); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status TransactionLogIteratorImpl::status() { |
|
|
|
|
return currentStatus_; |
|
|
|
|
} |
|
|
|
|
Status TransactionLogIteratorImpl::status() { return current_status_; } |
|
|
|
|
|
|
|
|
|
bool TransactionLogIteratorImpl::Valid() { |
|
|
|
|
return started_ && isValid_; |
|
|
|
|
} |
|
|
|
|
bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; } |
|
|
|
|
|
|
|
|
|
bool TransactionLogIteratorImpl::RestrictedRead( |
|
|
|
|
Slice* record, |
|
|
|
|
std::string* scratch) { |
|
|
|
|
// Don't read if no more complete entries to read from logs
|
|
|
|
|
if (currentLastSeq_ >= versions_->LastSequence()) { |
|
|
|
|
if (current_last_seq_ >= versions_->LastSequence()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return currentLogReader_->ReadRecord(record, scratch); |
|
|
|
|
return current_log_reader_->ReadRecord(record, scratch); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void TransactionLogIteratorImpl::SeekToStartSequence( |
|
|
|
|
uint64_t startFileIndex, |
|
|
|
|
bool strict) { |
|
|
|
|
void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, |
|
|
|
|
bool strict) { |
|
|
|
|
std::string scratch; |
|
|
|
|
Slice record; |
|
|
|
|
started_ = false; |
|
|
|
|
isValid_ = false; |
|
|
|
|
if (files_->size() <= startFileIndex) { |
|
|
|
|
is_valid_ = false; |
|
|
|
|
if (files_->size() <= start_file_index) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
Status s = OpenLogReader(files_->at(static_cast<size_t>(startFileIndex)).get()); |
|
|
|
|
Status s = |
|
|
|
|
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get()); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
currentStatus_ = s; |
|
|
|
|
reporter_.Info(currentStatus_.ToString().c_str()); |
|
|
|
|
current_status_ = s; |
|
|
|
|
reporter_.Info(current_status_.ToString().c_str()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
while (RestrictedRead(&record, &scratch)) { |
|
|
|
@ -118,21 +114,22 @@ void TransactionLogIteratorImpl::SeekToStartSequence( |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
UpdateCurrentWriteBatch(record); |
|
|
|
|
if (currentLastSeq_ >= startingSequenceNumber_) { |
|
|
|
|
if (strict && currentBatchSeq_ != startingSequenceNumber_) { |
|
|
|
|
currentStatus_ = Status::Corruption("Gap in sequence number. Could not " |
|
|
|
|
"seek to required sequence number"); |
|
|
|
|
reporter_.Info(currentStatus_.ToString().c_str()); |
|
|
|
|
if (current_last_seq_ >= starting_sequence_number_) { |
|
|
|
|
if (strict && current_batch_seq_ != starting_sequence_number_) { |
|
|
|
|
current_status_ = Status::Corruption( |
|
|
|
|
"Gap in sequence number. Could not " |
|
|
|
|
"seek to required sequence number"); |
|
|
|
|
reporter_.Info(current_status_.ToString().c_str()); |
|
|
|
|
return; |
|
|
|
|
} else if (strict) { |
|
|
|
|
reporter_.Info("Could seek required sequence number. Iterator will " |
|
|
|
|
"continue."); |
|
|
|
|
} |
|
|
|
|
isValid_ = true; |
|
|
|
|
is_valid_ = true; |
|
|
|
|
started_ = true; // set started_ as we could seek till starting sequence
|
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
isValid_ = false; |
|
|
|
|
is_valid_ = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -141,13 +138,15 @@ void TransactionLogIteratorImpl::SeekToStartSequence( |
|
|
|
|
// If strict is set, we want to seek exactly till the start sequence and it
|
|
|
|
|
// should have been present in the file we scanned above
|
|
|
|
|
if (strict) { |
|
|
|
|
currentStatus_ = Status::Corruption("Gap in sequence number. Could not " |
|
|
|
|
"seek to required sequence number"); |
|
|
|
|
reporter_.Info(currentStatus_.ToString().c_str()); |
|
|
|
|
current_status_ = Status::Corruption( |
|
|
|
|
"Gap in sequence number. Could not " |
|
|
|
|
"seek to required sequence number"); |
|
|
|
|
reporter_.Info(current_status_.ToString().c_str()); |
|
|
|
|
} else if (files_->size() != 1) { |
|
|
|
|
currentStatus_ = Status::Corruption("Start sequence was not found, " |
|
|
|
|
"skipping to the next available"); |
|
|
|
|
reporter_.Info(currentStatus_.ToString().c_str()); |
|
|
|
|
current_status_ = Status::Corruption( |
|
|
|
|
"Start sequence was not found, " |
|
|
|
|
"skipping to the next available"); |
|
|
|
|
reporter_.Info(current_status_.ToString().c_str()); |
|
|
|
|
// Let NextImpl find the next available entry. started_ remains false
|
|
|
|
|
// because we don't want to check for gaps while moving to start sequence
|
|
|
|
|
NextImpl(true); |
|
|
|
@ -161,15 +160,15 @@ void TransactionLogIteratorImpl::Next() { |
|
|
|
|
void TransactionLogIteratorImpl::NextImpl(bool internal) { |
|
|
|
|
std::string scratch; |
|
|
|
|
Slice record; |
|
|
|
|
isValid_ = false; |
|
|
|
|
is_valid_ = false; |
|
|
|
|
if (!internal && !started_) { |
|
|
|
|
// Runs every time until we can seek to the start sequence
|
|
|
|
|
return SeekToStartSequence(); |
|
|
|
|
} |
|
|
|
|
while(true) { |
|
|
|
|
assert(currentLogReader_); |
|
|
|
|
if (currentLogReader_->IsEOF()) { |
|
|
|
|
currentLogReader_->UnmarkEOF(); |
|
|
|
|
assert(current_log_reader_); |
|
|
|
|
if (current_log_reader_->IsEOF()) { |
|
|
|
|
current_log_reader_->UnmarkEOF(); |
|
|
|
|
} |
|
|
|
|
while (RestrictedRead(&record, &scratch)) { |
|
|
|
|
if (record.size() < WriteBatchInternal::kHeader) { |
|
|
|
@ -190,20 +189,20 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Open the next file
|
|
|
|
|
if (currentFileIndex_ < files_->size() - 1) { |
|
|
|
|
++currentFileIndex_; |
|
|
|
|
Status s = OpenLogReader(files_->at(currentFileIndex_).get()); |
|
|
|
|
if (current_file_index_ < files_->size() - 1) { |
|
|
|
|
++current_file_index_; |
|
|
|
|
Status s = OpenLogReader(files_->at(current_file_index_).get()); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
isValid_ = false; |
|
|
|
|
currentStatus_ = s; |
|
|
|
|
is_valid_ = false; |
|
|
|
|
current_status_ = s; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
isValid_ = false; |
|
|
|
|
if (currentLastSeq_ == versions_->LastSequence()) { |
|
|
|
|
currentStatus_ = Status::OK(); |
|
|
|
|
is_valid_ = false; |
|
|
|
|
if (current_last_seq_ == versions_->LastSequence()) { |
|
|
|
|
current_status_ = Status::OK(); |
|
|
|
|
} else { |
|
|
|
|
currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); |
|
|
|
|
current_status_ = Status::Corruption("NO MORE DATA LEFT"); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -211,17 +210,16 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool TransactionLogIteratorImpl::IsBatchExpected( |
|
|
|
|
const WriteBatch* batch, |
|
|
|
|
const SequenceNumber expectedSeq) { |
|
|
|
|
const WriteBatch* batch, const SequenceNumber expected_seq) { |
|
|
|
|
assert(batch); |
|
|
|
|
SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); |
|
|
|
|
if (batchSeq != expectedSeq) { |
|
|
|
|
if (batchSeq != expected_seq) { |
|
|
|
|
char buf[200]; |
|
|
|
|
snprintf(buf, sizeof(buf), |
|
|
|
|
"Discontinuity in log records. Got seq=%" PRIu64 |
|
|
|
|
", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64 |
|
|
|
|
".Log iterator will reseek the correct batch.", |
|
|
|
|
batchSeq, expectedSeq, versions_->LastSequence()); |
|
|
|
|
batchSeq, expected_seq, versions_->LastSequence()); |
|
|
|
|
reporter_.Info(buf); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -232,25 +230,25 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { |
|
|
|
|
std::unique_ptr<WriteBatch> batch(new WriteBatch()); |
|
|
|
|
WriteBatchInternal::SetContents(batch.get(), record); |
|
|
|
|
|
|
|
|
|
SequenceNumber expectedSeq = currentLastSeq_ + 1; |
|
|
|
|
SequenceNumber expected_seq = current_last_seq_ + 1; |
|
|
|
|
// If the iterator has started, then confirm that we get continuous batches
|
|
|
|
|
if (started_ && !IsBatchExpected(batch.get(), expectedSeq)) { |
|
|
|
|
if (started_ && !IsBatchExpected(batch.get(), expected_seq)) { |
|
|
|
|
// Seek to the batch having expected sequence number
|
|
|
|
|
if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { |
|
|
|
|
if (expected_seq < files_->at(current_file_index_)->StartSequence()) { |
|
|
|
|
// Expected batch must lie in the previous log file
|
|
|
|
|
// Avoid underflow.
|
|
|
|
|
if (currentFileIndex_ != 0) { |
|
|
|
|
currentFileIndex_--; |
|
|
|
|
if (current_file_index_ != 0) { |
|
|
|
|
current_file_index_--; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
startingSequenceNumber_ = expectedSeq; |
|
|
|
|
starting_sequence_number_ = expected_seq; |
|
|
|
|
// currentStatus_ will be set to Ok if reseek succeeds
|
|
|
|
|
// Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
|
|
|
|
|
// that allows gaps in the WAL since it will still skip over the gap.
|
|
|
|
|
currentStatus_ = Status::NotFound("Gap in sequence numbers"); |
|
|
|
|
current_status_ = Status::NotFound("Gap in sequence numbers"); |
|
|
|
|
// In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
|
|
|
|
|
// should be disabled
|
|
|
|
|
return SeekToStartSequence(currentFileIndex_, !seq_per_batch_); |
|
|
|
|
return SeekToStartSequence(current_file_index_, !seq_per_batch_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct BatchCounter : public WriteBatch::Handler { |
|
|
|
@ -289,33 +287,33 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { |
|
|
|
|
Status MarkRollback(const Slice&) override { return Status::OK(); } |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get()); |
|
|
|
|
current_batch_seq_ = WriteBatchInternal::Sequence(batch.get()); |
|
|
|
|
if (seq_per_batch_) { |
|
|
|
|
BatchCounter counter(currentBatchSeq_); |
|
|
|
|
BatchCounter counter(current_batch_seq_); |
|
|
|
|
batch->Iterate(&counter); |
|
|
|
|
currentLastSeq_ = counter.sequence_; |
|
|
|
|
current_last_seq_ = counter.sequence_; |
|
|
|
|
} else { |
|
|
|
|
currentLastSeq_ = |
|
|
|
|
currentBatchSeq_ + WriteBatchInternal::Count(batch.get()) - 1; |
|
|
|
|
current_last_seq_ = |
|
|
|
|
current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1; |
|
|
|
|
} |
|
|
|
|
// currentBatchSeq_ can only change here
|
|
|
|
|
assert(currentLastSeq_ <= versions_->LastSequence()); |
|
|
|
|
assert(current_last_seq_ <= versions_->LastSequence()); |
|
|
|
|
|
|
|
|
|
currentBatch_ = std::move(batch); |
|
|
|
|
isValid_ = true; |
|
|
|
|
currentStatus_ = Status::OK(); |
|
|
|
|
current_batch_ = std::move(batch); |
|
|
|
|
is_valid_ = true; |
|
|
|
|
current_status_ = Status::OK(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { |
|
|
|
|
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) { |
|
|
|
|
std::unique_ptr<SequentialFileReader> file; |
|
|
|
|
Status s = OpenLogFile(logFile, &file); |
|
|
|
|
Status s = OpenLogFile(log_file, &file); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
assert(file); |
|
|
|
|
currentLogReader_.reset( |
|
|
|
|
current_log_reader_.reset( |
|
|
|
|
new log::Reader(options_->info_log, std::move(file), &reporter_, |
|
|
|
|
read_options_.verify_checksums_, logFile->LogNumber())); |
|
|
|
|
read_options_.verify_checksums_, log_file->LogNumber())); |
|
|
|
|
return Status::OK(); |
|
|
|
|
} |
|
|
|
|
} // namespace rocksdb
|
|
|
|
|