@ -23,6 +23,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
started_ ( false ) ,
isValid_ ( false ) ,
currentFileIndex_ ( 0 ) ,
currentBatchSeq_ ( 0 ) ,
currentBatchCount_ ( 0 ) ,
lastFlushedSequence_ ( lastFlushedSequence ) {
assert ( startingSequenceNumber_ < = * lastFlushedSequence_ ) ;
assert ( files_ ! = nullptr ) ;
@ -48,7 +50,7 @@ Status TransactionLogIteratorImpl::OpenLogFile(
fname = ArchivedLogFileName ( dir_ , logFile - > LogNumber ( ) ) ;
status = env - > NewSequentialFile ( fname , file , soptions_ ) ;
if ( ! status . ok ( ) ) {
return Status : : IOError ( " Requested file not present in the dir" ) ;
return Status : : IOError ( " Requested file not present in the dir " ) ;
}
}
return status ;
@ -71,33 +73,54 @@ bool TransactionLogIteratorImpl::Valid() {
return started_ & & isValid_ ;
}
void TransactionLogIteratorImpl : : SeekToStartSequence ( ) {
bool TransactionLogIteratorImpl : : RestrictedRead (
Slice * record ,
std : : string * scratch ) {
// Don't read if no more complete entries to read from logs
if ( currentBatchSeq_ > = * lastFlushedSequence_ ) {
return false ;
}
return currentLogReader_ - > ReadRecord ( record , scratch ) ;
}
void TransactionLogIteratorImpl : : SeekToStartSequence (
uint64_t startFileIndex ,
bool strict ) {
std : : string scratch ;
Slice record ;
started_ = false ;
isValid_ = false ;
if ( startingSequenceNumber_ > * lastFlushedSequence_ ) {
currentStatus_ = Status : : IOError ( " Looking for a sequence, "
" which is not flushed yet. " ) ;
return ;
}
if ( files_ - > size ( ) = = 0 ) {
if ( files_ - > size ( ) < = startFileIndex ) {
return ;
}
Status s = OpenLogReader ( files_ - > at ( 0 ) . get ( ) ) ;
Status s = OpenLogReader ( files_ - > at ( startFileIndex ) . get ( ) ) ;
if ( ! s . ok ( ) ) {
currentStatus_ = s ;
return ;
}
while ( currentLogReader_ - > ReadRecor d( & record , & scratch ) ) {
while ( RestrictedRea d( & record , & scratch ) ) {
if ( record . size ( ) < 12 ) {
reporter_ . Corruption (
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
record . size ( ) , Status : : Corruption ( " very small log record" ) ) ;
continue ;
}
UpdateCurrentWriteBatch ( record ) ;
if ( currentBatchSeq_ + currentBatchCount_ - 1 > =
startingSequenceNumber_ ) {
assert ( currentBatchSeq_ < = * lastFlushedSequence_ ) ;
if ( strict & & currentBatchSeq_ ! = startingSequenceNumber_ ) {
currentStatus_ = Status : : Corruption ( " Gap in sequence number. Could not "
" seek to required sequence number " ) ;
reporter_ . Info ( currentStatus_ . ToString ( ) . c_str ( ) ) ;
return ;
} else if ( strict ) {
reporter_ . Info ( " Could seek required sequence number. Iterator will "
" continue. " ) ;
}
isValid_ = true ;
started_ = true ; // set started_ as we could seek till starting sequence
return ;
@ -117,8 +140,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence() {
}
void TransactionLogIteratorImpl : : Next ( ) {
// TODO:Next() says that it requires Valid to be true but this is not true
// assert(Valid());
std : : string scratch ;
Slice record ;
isValid_ = false ;
@ -134,11 +155,10 @@ void TransactionLogIteratorImpl::Next() {
while ( currentLogReader_ - > ReadRecord ( & record , & scratch ) ) {
if ( record . size ( ) < 12 ) {
reporter_ . Corruption (
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
record . size ( ) , Status : : Corruption ( " very small log record" ) ) ;
continue ;
} else {
UpdateCurrentWriteBatch ( record ) ;
return ;
return UpdateCurrentWriteBatch ( record ) ;
}
}
}
@ -164,11 +184,44 @@ void TransactionLogIteratorImpl::Next() {
}
}
bool TransactionLogIteratorImpl : : IsBatchContinuous (
const WriteBatch * batch ,
const SequenceNumber expectedSeq ) {
assert ( batch ) ;
SequenceNumber batchSeq = WriteBatchInternal : : Sequence ( batch ) ;
if ( started_ & & batchSeq ! = expectedSeq ) {
char buf [ 200 ] ;
snprintf ( buf , sizeof ( buf ) ,
" Discontinuity in log records. Got seq=%lu, Expected seq=%lu, "
" Last flushed seq=%lu. Log iterator will seek the correct batch. " ,
batchSeq , expectedSeq , * lastFlushedSequence_ ) ;
reporter_ . Info ( buf ) ;
return false ;
}
return true ;
}
void TransactionLogIteratorImpl : : UpdateCurrentWriteBatch ( const Slice & record ) {
WriteBatch * batch = new WriteBatch ( ) ;
WriteBatchInternal : : SetContents ( batch , record ) ;
SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_ ;
if ( ! IsBatchContinuous ( batch , expectedSeq ) ) {
// Seek to the batch having expected sequence number
if ( expectedSeq < files_ - > at ( currentFileIndex_ ) - > StartSequence ( ) ) {
// Expected batch must lie in the previous log file
currentFileIndex_ - - ;
currentFileIndex_ = ( currentFileIndex_ > = 0 ) ? currentFileIndex_ : 0 ;
}
startingSequenceNumber_ = expectedSeq ;
return SeekToStartSequence ( currentFileIndex_ , true ) ;
}
currentBatchSeq_ = WriteBatchInternal : : Sequence ( batch ) ;
currentBatchCount_ = WriteBatchInternal : : Count ( batch ) ;
// currentBatchSeq_ can only change here
assert ( currentBatchSeq_ < = * lastFlushedSequence_ ) ;
currentBatch_ . reset ( batch ) ;
isValid_ = true ;
currentStatus_ = Status : : OK ( ) ;