@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
Status DBImpl : : Recover (
const std : : vector < ColumnFamilyDescriptor > & column_families , bool read_only ,
bool error_if_wal_file_exists , bool error_if_data_exists_in_wals ,
uint64_t * recovered_seq ) {
uint64_t * recovered_seq , RecoveryContext * recovery_ctx ) {
mutex_ . AssertHeld ( ) ;
bool is_new_db = false ;
@ -518,9 +518,10 @@ Status DBImpl::Recover(
if ( ! s . ok ( ) ) {
return s ;
}
s = SetDBId ( read_only ) ;
s = SetDBId ( read_only , recovery_ctx ) ;
if ( s . ok ( ) & & ! read_only ) {
s = DeleteUnreferencedSstFiles ( ) ;
s = DeleteUnreferencedSstFiles ( recovery_ctx ) ;
}
if ( immutable_db_options_ . paranoid_checks & & s . ok ( ) ) {
@ -535,10 +536,6 @@ Status DBImpl::Recover(
}
}
}
// DB mutex is already held
if ( s . ok ( ) & & immutable_db_options_ . persist_stats_to_disk ) {
s = InitPersistStatsColumnFamily ( ) ;
}
std : : vector < std : : string > files_in_wal_dir ;
if ( s . ok ( ) ) {
@ -608,7 +605,10 @@ Status DBImpl::Recover(
WalNumber max_wal_number =
versions_ - > GetWalSet ( ) . GetWals ( ) . rbegin ( ) - > first ;
edit . DeleteWalsBefore ( max_wal_number + 1 ) ;
s = versions_ - > LogAndApplyToDefaultColumnFamily ( & edit , & mutex_ ) ;
assert ( recovery_ctx ! = nullptr ) ;
assert ( versions_ - > GetColumnFamilySet ( ) ! = nullptr ) ;
recovery_ctx - > UpdateVersionEdits (
versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) , edit ) ;
}
if ( ! s . ok ( ) ) {
return s ;
@ -644,8 +644,8 @@ Status DBImpl::Recover(
std : : sort ( wals . begin ( ) , wals . end ( ) ) ;
bool corrupted_wal_found = false ;
s = RecoverLogFiles ( wals , & next_sequence , read_only ,
& corrupted_wal_found ) ;
s = RecoverLogFiles ( wals , & next_sequence , read_only , & corrupted_wal_found ,
recovery_ctx ) ;
if ( corrupted_wal_found & & recovered_seq ! = nullptr ) {
* recovered_seq = next_sequence ;
}
@ -805,10 +805,30 @@ Status DBImpl::InitPersistStatsColumnFamily() {
return s ;
}
Status DBImpl : : LogAndApplyForRecovery ( const RecoveryContext & recovery_ctx ) {
mutex_ . AssertHeld ( ) ;
assert ( versions_ - > descriptor_log_ = = nullptr ) ;
Status s = versions_ - > LogAndApply (
recovery_ctx . cfds_ , recovery_ctx . mutable_cf_opts_ ,
recovery_ctx . edit_lists_ , & mutex_ , directories_ . GetDbDir ( ) ) ;
if ( s . ok ( ) & & ! ( recovery_ctx . files_to_delete_ . empty ( ) ) ) {
mutex_ . Unlock ( ) ;
for ( const auto & fname : recovery_ctx . files_to_delete_ ) {
s = env_ - > DeleteFile ( fname ) ;
if ( ! s . ok ( ) ) {
break ;
}
}
mutex_ . Lock ( ) ;
}
return s ;
}
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl : : RecoverLogFiles ( const std : : vector < uint64_t > & wal_numbers ,
Status DBImpl : : RecoverLogFiles ( std : : vector < uint64_t > & wal_numbers ,
SequenceNumber * next_sequence , bool read_only ,
bool * corrupted_wal_found ) {
bool * corrupted_wal_found ,
RecoveryContext * recovery_ctx ) {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Logger * info_log ;
@ -833,6 +853,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
edit . SetColumnFamily ( cfd - > GetID ( ) ) ;
version_edits . insert ( { cfd - > GetID ( ) , edit } ) ;
}
int job_id = next_job_id_ . fetch_add ( 1 ) ;
{
auto stream = event_logger_ . Log ( ) ;
@ -1256,6 +1277,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
edit - > SetLogNumber ( max_wal_number + 1 ) ;
}
}
if ( status . ok ( ) ) {
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
@ -1263,42 +1285,40 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// log number
versions_ - > MarkFileNumberUsed ( max_wal_number + 1 ) ;
autovector < ColumnFamilyData * > cfds ;
autovector < const MutableCFOptions * > cf_opts ;
autovector < autovector < VersionEdit * > > edit_lists ;
if ( corrupted_wal_found ! = nullptr & & * corrupted_wal_found = = true & &
immutable_db_options_ . wal_recovery_mode = =
WALRecoveryMode : : kPointInTimeRecovery ) {
MoveCorruptedWalFiles ( wal_numbers , corrupted_wal_number ) ;
}
assert ( recovery_ctx ! = nullptr ) ;
for ( auto * cfd : * versions_ - > GetColumnFamilySet ( ) ) {
cfds . push_back ( cfd ) ;
cf_opts . push_back ( cfd - > GetLatestMutableCFOptions ( ) ) ;
auto iter = version_edits . find ( cfd - > GetID ( ) ) ;
assert ( iter ! = version_edits . end ( ) ) ;
edit_lists . push_back ( { & iter - > second } ) ;
recovery_ctx - > UpdateVersionEdits ( cfd , iter - > second ) ;
}
std : : unique_ptr < VersionEdit > wal_deletion ;
if ( flushed ) {
wal_deletion = std : : make_unique < VersionEdit > ( ) ;
VersionEdit wal_deletion ;
if ( immutable_db_options_ . track_and_verify_wals_in_manifest ) {
wal_deletion - > DeleteWalsBefore ( max_wal_number + 1 ) ;
wal_deletion . DeleteWalsBefore ( max_wal_number + 1 ) ;
}
if ( ! allow_2pc ( ) ) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion - > SetMinLogNumberToKeep ( max_wal_number + 1 ) ;
wal_deletion . SetMinLogNumberToKeep ( max_wal_number + 1 ) ;
}
edit_lists . back ( ) . push_back ( wal_deletion . get ( ) ) ;
assert ( versions_ - > GetColumnFamilySet ( ) ! = nullptr ) ;
recovery_ctx - > UpdateVersionEdits (
versions_ - > GetColumnFamilySet ( ) - > GetDefault ( ) , wal_deletion ) ;
}
// write MANIFEST with update
status = versions_ - > LogAndApply ( cfds , cf_opts , edit_lists , & mutex_ ,
directories_ . GetDbDir ( ) ,
/*new_descriptor_log=*/ true ) ;
}
}
if ( status . ok ( ) ) {
if ( data_seen & & ! flushed ) {
status = RestoreAliveLogFiles ( wal_numbers ) ;
} else {
} else if ( ! wal_numbers . empty ( ) ) {
// If there's no data in the WAL, or we flushed all the data, still
// truncate the log file. If the process goes into a crash loop before
// the file is deleted, the preallocated space will never get freed.
@ -1314,6 +1334,48 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
return status ;
}
void DBImpl : : MoveCorruptedWalFiles ( std : : vector < uint64_t > & wal_numbers ,
uint64_t corrupted_wal_number ) {
size_t num_wals = wal_numbers . size ( ) ;
// Find the first corrupted wal.
auto iter = std : : lower_bound ( wal_numbers . begin ( ) , wal_numbers . end ( ) ,
corrupted_wal_number ) ;
auto corrupt_start_iter = iter ;
// Increment iter to move WAL files from first corrupted_wal_number + 1.
iter + + ;
std : : string archival_path =
ArchivalDirectory ( immutable_db_options_ . GetWalDir ( ) ) ;
Status create_status = env_ - > CreateDirIfMissing ( archival_path ) ;
// create_status is only checked when it needs to move the corrupted WAL files
// to archive folder.
create_status . PermitUncheckedError ( ) ;
// Truncate the last WAL to reclaim the pre allocated space before
// moving it.
GetLogSizeAndMaybeTruncate ( wal_numbers . back ( ) , /*truncate=*/ true , nullptr )
. PermitUncheckedError ( ) ;
// Move all the WAL files from corrupted_wal_number + 1 to last WAL
// (max_wal_number) to avoid column family inconsistency error to archival
// directory. If its unable to create archive dir, it will delete the
// corrupted WAL files.
// We are moving all but first corrupted WAL file to a different folder.
while ( iter ! = wal_numbers . end ( ) ) {
LogFileNumberSize log ( * iter ) ;
std : : string fname = LogFileName ( immutable_db_options_ . GetWalDir ( ) , * iter ) ;
# ifndef ROCKSDB_LITE
if ( create_status . ok ( ) ) {
wal_manager_ . ArchiveWALFile ( fname , * iter ) ;
}
# endif
iter + + ;
}
wal_numbers . erase ( corrupt_start_iter + 1 , wal_numbers . begin ( ) + num_wals ) ;
}
Status DBImpl : : GetLogSizeAndMaybeTruncate ( uint64_t wal_number , bool truncate ,
LogFileNumberSize * log_ptr ) {
LogFileNumberSize log ( wal_number ) ;
@ -1376,7 +1438,8 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
// log has such preallocated space, so we only truncate for the last log.
LogFileNumberSize log ;
s = GetLogSizeAndMaybeTruncate (
wal_number , /*truncate=*/ ( wal_number = = wal_numbers . back ( ) ) , & log ) ;
wal_number ,
/*truncate=*/ ( wal_number = = wal_numbers . back ( ) ) , & log ) ;
if ( ! s . ok ( ) ) {
break ;
}
@ -1737,9 +1800,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl - > wal_in_db_path_ = impl - > immutable_db_options_ . IsWalDirSameAsDBPath ( ) ;
impl - > mutex_ . Lock ( ) ;
RecoveryContext recovery_ctx ;
// Handles create_if_missing, error_if_exists
uint64_t recovered_seq ( kMaxSequenceNumber ) ;
s = impl - > Recover ( column_families , false , false , false , & recovered_seq ) ;
s = impl - > Recover ( column_families , false , false , false , & recovered_seq ,
& recovery_ctx ) ;
if ( s . ok ( ) ) {
uint64_t new_log_number = impl - > versions_ - > NewFileNumber ( ) ;
log : : Writer * new_log = nullptr ;
@ -1756,40 +1823,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
if ( s . ok ( ) ) {
// set column family handles
for ( auto cf : column_families ) {
auto cfd =
impl - > versions_ - > GetColumnFamilySet ( ) - > GetColumnFamily ( cf . name ) ;
if ( cfd ! = nullptr ) {
handles - > push_back (
new ColumnFamilyHandleImpl ( cfd , impl , & impl - > mutex_ ) ) ;
impl - > NewThreadStatusCfInfo ( cfd ) ;
} else {
if ( db_options . create_missing_column_families ) {
// missing column family, create it
ColumnFamilyHandle * handle ;
impl - > mutex_ . Unlock ( ) ;
s = impl - > CreateColumnFamily ( cf . options , cf . name , & handle ) ;
impl - > mutex_ . Lock ( ) ;
if ( s . ok ( ) ) {
handles - > push_back ( handle ) ;
} else {
break ;
}
} else {
s = Status : : InvalidArgument ( " Column family not found " , cf . name ) ;
break ;
}
}
}
}
if ( s . ok ( ) ) {
SuperVersionContext sv_context ( /* create_superversion */ true ) ;
for ( auto cfd : * impl - > versions_ - > GetColumnFamilySet ( ) ) {
impl - > InstallSuperVersionAndScheduleWork (
cfd , & sv_context , * cfd - > GetLatestMutableCFOptions ( ) ) ;
}
sv_context . Clean ( ) ;
if ( impl - > two_write_queues_ ) {
impl - > log_write_mutex_ . Lock ( ) ;
}
@ -1802,14 +1835,15 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
if ( s . ok ( ) ) {
// In WritePrepared there could be gap in sequence numbers. This breaks
// the trick we use in kPointInTimeRecovery which assumes the first seq in
// the log right after the corrupted log is one larger than the last seq
// we read from the wals. To let this trick keep working, we add a dummy
// entry with the expected sequence to the first log right after recovery.
// In non-WritePrepared case also the new log after recovery could be
// empty, and thus missing the consecutive seq hint to distinguish
// middle-log corruption to corrupted-log-remained-after-recovery. This
// case also will be addressed by a dummy write.
// the trick we use in kPointInTimeRecovery which assumes the first seq
// in the log right after the corrupted log is one larger than the last
// seq we read from the wals. To let this trick keep working, we add a
// dummy entry with the expected sequence to the first log right after
// recovery. In non-WritePrepared case also the new log after recovery
// could be empty, and thus missing the consecutive seq hint to
// distinguish middle-log corruption to
// corrupted-log-remained-after-recovery. This case also will be
// addressed by a dummy write.
if ( recovered_seq ! = kMaxSequenceNumber ) {
WriteBatch empty_batch ;
WriteBatchInternal : : SetSequence ( & empty_batch , recovered_seq ) ;
@ -1828,6 +1862,52 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
}
if ( s . ok ( ) ) {
s = impl - > LogAndApplyForRecovery ( recovery_ctx ) ;
}
if ( s . ok ( ) & & impl - > immutable_db_options_ . persist_stats_to_disk ) {
impl - > mutex_ . AssertHeld ( ) ;
s = impl - > InitPersistStatsColumnFamily ( ) ;
}
if ( s . ok ( ) ) {
// set column family handles
for ( auto cf : column_families ) {
auto cfd =
impl - > versions_ - > GetColumnFamilySet ( ) - > GetColumnFamily ( cf . name ) ;
if ( cfd ! = nullptr ) {
handles - > push_back (
new ColumnFamilyHandleImpl ( cfd , impl , & impl - > mutex_ ) ) ;
impl - > NewThreadStatusCfInfo ( cfd ) ;
} else {
if ( db_options . create_missing_column_families ) {
// missing column family, create it
ColumnFamilyHandle * handle ;
impl - > mutex_ . Unlock ( ) ;
s = impl - > CreateColumnFamily ( cf . options , cf . name , & handle ) ;
impl - > mutex_ . Lock ( ) ;
if ( s . ok ( ) ) {
handles - > push_back ( handle ) ;
} else {
break ;
}
} else {
s = Status : : InvalidArgument ( " Column family not found " , cf . name ) ;
break ;
}
}
}
}
if ( s . ok ( ) ) {
SuperVersionContext sv_context ( /* create_superversion */ true ) ;
for ( auto cfd : * impl - > versions_ - > GetColumnFamilySet ( ) ) {
impl - > InstallSuperVersionAndScheduleWork (
cfd , & sv_context , * cfd - > GetLatestMutableCFOptions ( ) ) ;
}
sv_context . Clean ( ) ;
}
if ( s . ok ( ) & & impl - > immutable_db_options_ . persist_stats_to_disk ) {
// try to read format version
s = impl - > PersistentStatsProcessFormatVersion ( ) ;
@ -1853,7 +1933,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if ( cfd - > ioptions ( ) - > merge_operator ! = nullptr & &
! cfd - > mem ( ) - > IsMergeOperatorSupported ( ) ) {
s = Status : : InvalidArgument (
" The memtable of column family %s does not support merge operator "
" The memtable of column family %s does not support merge "
" operator "
" its options.merge_operator is non-null " ,
cfd - > GetName ( ) . c_str ( ) ) ;
}