@ -538,6 +538,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// don't delete files that might be currently written to from compaction
// don't delete files that might be currently written to from compaction
// threads
// threads
// Since job_context->min_pending_output is set, until file scan finishes,
// mutex_ cannot be released. Otherwise, we might see no min_pending_output
// here but later find newer generated unfinalized files while scannint.
if ( ! pending_outputs_ . empty ( ) ) {
if ( ! pending_outputs_ . empty ( ) ) {
job_context - > min_pending_output = * pending_outputs_ . begin ( ) ;
job_context - > min_pending_output = * pending_outputs_ . begin ( ) ;
} else {
} else {
@ -550,37 +553,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_ - > GetObsoleteFiles ( & job_context - > sst_delete_files ,
versions_ - > GetObsoleteFiles ( & job_context - > sst_delete_files ,
job_context - > min_pending_output ) ;
job_context - > min_pending_output ) ;
if ( ! alive_log_files_ . empty ( ) ) {
uint64_t min_log_number = versions_ - > MinLogNumber ( ) ;
// find newly obsoleted log files
while ( alive_log_files_ . begin ( ) - > number < min_log_number ) {
auto & earliest = * alive_log_files_ . begin ( ) ;
job_context - > log_delete_files . push_back ( earliest . number ) ;
total_log_size_ - = earliest . size ;
alive_log_files_ . pop_front ( ) ;
// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert ( alive_log_files_ . size ( ) ) ;
}
while ( ! logs_ . empty ( ) & & logs_ . front ( ) . number < min_log_number ) {
auto & log = logs_ . front ( ) ;
if ( log . getting_synced ) {
log_sync_cv_ . Wait ( ) ;
// logs_ could have changed while we were waiting.
continue ;
}
logs_to_free_ . push_back ( log . ReleaseWriter ( ) ) ;
logs_ . pop_front ( ) ;
}
// Current log cannot be obsolete.
assert ( ! logs_ . empty ( ) ) ;
}
// We're just cleaning up for DB::Write().
assert ( job_context - > logs_to_free . empty ( ) ) ;
job_context - > logs_to_free = logs_to_free_ ;
logs_to_free_ . clear ( ) ;
// store the current filenum, lognum, etc
// store the current filenum, lognum, etc
job_context - > manifest_file_number = versions_ - > manifest_file_number ( ) ;
job_context - > manifest_file_number = versions_ - > manifest_file_number ( ) ;
job_context - > pending_manifest_file_number =
job_context - > pending_manifest_file_number =
@ -622,6 +594,37 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
}
}
}
}
}
if ( ! alive_log_files_ . empty ( ) ) {
uint64_t min_log_number = versions_ - > MinLogNumber ( ) ;
// find newly obsoleted log files
while ( alive_log_files_ . begin ( ) - > number < min_log_number ) {
auto & earliest = * alive_log_files_ . begin ( ) ;
job_context - > log_delete_files . push_back ( earliest . number ) ;
total_log_size_ - = earliest . size ;
alive_log_files_ . pop_front ( ) ;
// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert ( alive_log_files_ . size ( ) ) ;
}
while ( ! logs_ . empty ( ) & & logs_ . front ( ) . number < min_log_number ) {
auto & log = logs_ . front ( ) ;
if ( log . getting_synced ) {
log_sync_cv_ . Wait ( ) ;
// logs_ could have changed while we were waiting.
continue ;
}
logs_to_free_ . push_back ( log . ReleaseWriter ( ) ) ;
logs_ . pop_front ( ) ;
}
// Current log cannot be obsolete.
assert ( ! logs_ . empty ( ) ) ;
}
// We're just cleaning up for DB::Write().
assert ( job_context - > logs_to_free . empty ( ) ) ;
job_context - > logs_to_free = logs_to_free_ ;
logs_to_free_ . clear ( ) ;
}
}
namespace {
namespace {