@ -1235,11 +1235,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
DeletionState & deletion_state ) {
DeletionState & deletion_state ) {
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
assert ( imm_ . size ( ) ! = 0 ) ;
assert ( imm_ . size ( ) ! = 0 ) ;
assert ( imm_ . IsFlushPending ( ) ) ;
if ( ! imm_ . IsFlushPending ( ) ) {
Log ( options_ . info_log , " FlushMemTableToOutputFile already in progress " ) ;
return Status : : IOError ( " FlushMemTableToOutputFile already in progress " ) ;
}
// Save the contents of the earliest memtable as a new Table
// Save the contents of the earliest memtable as a new Table
uint64_t file_number ;
uint64_t file_number ;
@ -1247,7 +1243,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
imm_ . PickMemtablesToFlush ( & mems ) ;
imm_ . PickMemtablesToFlush ( & mems ) ;
if ( mems . empty ( ) ) {
if ( mems . empty ( ) ) {
Log ( options_ . info_log , " Nothing in memstore to flush " ) ;
Log ( options_ . info_log , " Nothing in memstore to flush " ) ;
return Status : : IOError ( " Nothing in memstore to flush " ) ;
return Status : : OK ( ) ;
}
}
// record the logfile_number_ before we release the mutex
// record the logfile_number_ before we release the mutex
@ -1272,14 +1268,19 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
Status s = WriteLevel0Table ( mems , edit , & file_number ) ;
Status s = WriteLevel0Table ( mems , edit , & file_number ) ;
if ( s . ok ( ) & & shutting_down_ . Acquire_Load ( ) ) {
if ( s . ok ( ) & & shutting_down_ . Acquire_Load ( ) ) {
s = Status : : IOError (
s = Status : : ShutdownInProgress (
" Database shutdown started during memtable compaction "
" Database shutdown started during memtable compaction "
) ;
) ;
}
}
if ( ! s . ok ( ) ) {
imm_ . RollbackMemtableFlush ( mems , file_number , & pending_outputs_ ) ;
return s ;
}
// Replace immutable memtable with the generated Table
// Replace immutable memtable with the generated Table
s = imm_ . InstallMemtableFlushResults (
s = imm_ . InstallMemtableFlushResults (
mems , versions_ . get ( ) , s , & mutex_ , options_ . info_log . get ( ) , file_number ,
mems , versions_ . get ( ) , & mutex_ , options_ . info_log . get ( ) , file_number ,
pending_outputs_ , & deletion_state . memtables_to_free , db_directory_ . get ( ) ) ;
pending_outputs_ , & deletion_state . memtables_to_free , db_directory_ . get ( ) ) ;
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
@ -1458,7 +1459,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
if ( seq > versions_ - > LastSequence ( ) ) {
if ( seq > versions_ - > LastSequence ( ) ) {
return Status : : IOError ( " Requested sequence not yet written in the db " ) ;
return Status : : NotFound (
" Requested sequence not yet written in the db " ) ;
}
}
// Get all sorted Wal Files.
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
// Do binary search and open files and find the seq number.
@ -1522,16 +1524,19 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
if ( type = = kAliveLogFile ) {
if ( type = = kAliveLogFile ) {
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
Status status = ReadFirstLine ( fname , result ) ;
if ( ! status . ok ( ) ) {
if ( status . ok ( ) | | env_ - > FileExists ( fname ) ) {
// return OK or any error that is not caused non-existing file
return status ;
}
// check if the file got moved to archive.
// check if the file got moved to archive.
std : : string archived_file =
std : : string archived_file =
ArchivedLogFileName ( options_ . wal_dir , number ) ;
ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
if ( ! s . ok ( ) ) {
if ( s . ok ( ) | | env_ - > FileExists ( archived_file ) ) {
return Status : : IOError ( " Log File has been deleted: " + archived_file ) ;
return s ;
}
}
}
return Status : : OK ( ) ;
return Status : : NotFound ( " Log File has been deleted: " + archived_file ) ;
} else if ( type = = kArchivedLogFile ) {
} else if ( type = = kArchivedLogFile ) {
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
Status status = ReadFirstLine ( fname , result ) ;
@ -1546,12 +1551,17 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
Env * env ;
Env * env ;
Logger * info_log ;
Logger * info_log ;
const char * fname ;
const char * fname ;
Status * status ; // nullptr if options_.paranoid_checks==false
Status * status ;
bool ignore_error ; // true if options_.paranoid_checks==false
virtual void Corruption ( size_t bytes , const Status & s ) {
virtual void Corruption ( size_t bytes , const Status & s ) {
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
( this - > status = = nullpt r ? " (ignoring error) " : " " ) ,
( this - > ignore_erro r ? " (ignoring error) " : " " ) ,
fname , static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
fname , static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
if ( this - > status ! = nullptr & & this - > status - > ok ( ) ) * this - > status = s ;
if ( this - > status - > ok ( ) ) {
// only keep the first error
* this - > status = s ;
}
}
}
} ;
} ;
@ -1567,23 +1577,30 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
reporter . env = env_ ;
reporter . env = env_ ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . status = ( options_ . paranoid_checks ? & status : nullptr ) ;
reporter . status = & status ;
reporter . ignore_error = ! options_ . paranoid_checks ;
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
0 /*initial_offset*/ ) ;
0 /*initial_offset*/ ) ;
std : : string scratch ;
std : : string scratch ;
Slice record ;
Slice record ;
if ( reader . ReadRecord ( & record , & scratch ) & & status . ok ( ) ) {
if ( reader . ReadRecord ( & record , & scratch ) & &
( status . ok ( ) | | ! options_ . paranoid_checks ) ) {
if ( record . size ( ) < 12 ) {
if ( record . size ( ) < 12 ) {
reporter . Corruption (
reporter . Corruption (
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
return Status : : IOError ( " Corruption noted " ) ;
// TODO read record's till the first no corrupt entry?
// TODO read record's till the first no corrupt entry?
}
} else {
WriteBatchInternal : : SetContents ( batch , record ) ;
WriteBatchInternal : : SetContents ( batch , record ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
return Status : : IOError ( " Error reading from file " + fname ) ;
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if ( status . ok ( ) ) {
status = Status : : Corruption ( " eof reached " ) ;
}
return status ;
}
}
struct CompareLogByPointer {
struct CompareLogByPointer {
@ -2219,7 +2236,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact - > compaction - > level ( ) ,
compact - > compaction - > level ( ) ,
compact - > compaction - > num_input_files ( 1 ) ,
compact - > compaction - > num_input_files ( 1 ) ,
compact - > compaction - > level ( ) + 1 ) ;
compact - > compaction - > level ( ) + 1 ) ;
return Status : : IOError ( " Compaction input files inconsistent " ) ;
return Status : : Corruption ( " Compaction input files inconsistent " ) ;
}
}
Log ( options_ . info_log , " Compacted %d@%d + %d@%d files => %lld bytes " ,
Log ( options_ . info_log , " Compacted %d@%d + %d@%d files => %lld bytes " ,
@ -2600,7 +2617,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
}
if ( status . ok ( ) & & shutting_down_ . Acquire_Load ( ) ) {
if ( status . ok ( ) & & shutting_down_ . Acquire_Load ( ) ) {
status = Status : : IOError ( " Database shutdown started during compaction " ) ;
status = Status : : ShutdownInProgress (
" Database shutdown started during compaction " ) ;
}
}
if ( status . ok ( ) & & compact - > builder ! = nullptr ) {
if ( status . ok ( ) & & compact - > builder ! = nullptr ) {
status = FinishCompactionOutputFile ( compact , input . get ( ) ) ;
status = FinishCompactionOutputFile ( compact , input . get ( ) ) ;