@ -1763,6 +1763,7 @@ Status DBImpl::RunManualCompaction(
CompactionArg * ca = nullptr ;
bool scheduled = false ;
Env : : Priority thread_pool_priority = Env : : Priority : : TOTAL ;
bool manual_conflict = false ;
ManualCompactionState manual ;
manual . cfd = cfd ;
@ -1884,9 +1885,9 @@ Status DBImpl::RunManualCompaction(
manual . done = true ;
manual . status =
Status : : Incomplete ( Status : : SubCode : : kManualCompactionPaused ) ;
if ( ca & & ca - > prepicked_compaction ) {
ca - > prepicked_compaction - > is_canceled = true ;
}
assert ( thread_pool_priority ! = Env : : Priority : : TOTAL ) ;
env_ - > UnSchedule ( GetTaskTag ( TaskType : : kManualCompaction ) ,
thread_pool_priority ) ;
break ;
}
if ( scheduled & & manual . incomplete = = true ) {
@ -1916,13 +1917,17 @@ Status DBImpl::RunManualCompaction(
bg_bottom_compaction_scheduled_ + + ;
ca - > compaction_pri_ = Env : : Priority : : BOTTOM ;
env_ - > Schedule ( & DBImpl : : BGWorkBottomCompaction , ca ,
Env : : Priority : : BOTTOM , this ,
Env : : Priority : : BOTTOM ,
GetTaskTag ( TaskType : : kManualCompaction ) ,
& DBImpl : : UnscheduleCompactionCallback ) ;
thread_pool_priority = Env : : Priority : : BOTTOM ;
} else {
bg_compaction_scheduled_ + + ;
ca - > compaction_pri_ = Env : : Priority : : LOW ;
env_ - > Schedule ( & DBImpl : : BGWorkCompaction , ca , Env : : Priority : : LOW , this ,
env_ - > Schedule ( & DBImpl : : BGWorkCompaction , ca , Env : : Priority : : LOW ,
GetTaskTag ( TaskType : : kManualCompaction ) ,
& DBImpl : : UnscheduleCompactionCallback ) ;
thread_pool_priority = Env : : Priority : : LOW ;
}
scheduled = true ;
TEST_SYNC_POINT ( " DBImpl::RunManualCompaction:Scheduled " ) ;
@ -1933,6 +1938,13 @@ Status DBImpl::RunManualCompaction(
assert ( ! manual . in_progress ) ;
assert ( HasPendingManualCompaction ( ) ) ;
RemoveManualCompaction ( & manual ) ;
// if the manual job is unscheduled, try schedule other jobs in case there's
// any unscheduled compaction job which was blocked by exclusive manual
// compaction.
if ( manual . status . IsIncomplete ( ) & &
manual . status . subcode ( ) = = Status : : SubCode : : kManualCompactionPaused ) {
MaybeScheduleFlushOrCompaction ( ) ;
}
bg_cv_ . SignalAll ( ) ;
return manual . status ;
}
@ -2661,6 +2673,8 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) {
delete reinterpret_cast < CompactionArg * > ( arg ) ;
if ( ca . prepicked_compaction ! = nullptr ) {
if ( ca . prepicked_compaction - > compaction ! = nullptr ) {
ca . prepicked_compaction - > compaction - > ReleaseCompactionFiles (
Status : : Incomplete ( Status : : SubCode : : kManualCompactionPaused ) ) ;
delete ca . prepicked_compaction - > compaction ;
}
delete ca . prepicked_compaction ;
@ -2851,106 +2865,93 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
void DBImpl : : BackgroundCallCompaction ( PrepickedCompaction * prepicked_compaction ,
Env : : Priority bg_thread_pri ) {
bool made_progress = false ;
JobContext job_context ( next_job_id_ . fetch_add ( 1 ) , true ) ;
TEST_SYNC_POINT ( " BackgroundCallCompaction:0 " ) ;
LogBuffer log_buffer ( InfoLogLevel : : INFO_LEVEL ,
immutable_db_options_ . info_log . get ( ) ) ;
{
InstrumentedMutexLock l ( & mutex_ ) ;
if ( prepicked_compaction & & prepicked_compaction - > is_canceled ) {
assert ( prepicked_compaction - > compaction ) ;
ROCKS_LOG_BUFFER ( & log_buffer , " [%s] Skip canceled manual compaction job " ,
prepicked_compaction - > compaction - > column_family_data ( )
- > GetName ( )
. c_str ( ) ) ;
prepicked_compaction - > compaction - > ReleaseCompactionFiles (
Status : : Incomplete ( Status : : SubCode : : kManualCompactionPaused ) ) ;
delete prepicked_compaction - > compaction ;
} else {
JobContext job_context ( next_job_id_ . fetch_add ( 1 ) , true ) ;
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile ( ) ;
num_running_compactions_ + + ;
std : : unique_ptr < std : : list < uint64_t > : : iterator >
pending_outputs_inserted_elem ( new std : : list < uint64_t > : : iterator (
CaptureCurrentFileNumberInPendingOutputs ( ) ) ) ;
assert ( ( bg_thread_pri = = Env : : Priority : : BOTTOM & &
bg_bottom_compaction_scheduled_ ) | |
( bg_thread_pri = = Env : : Priority : : LOW & & bg_compaction_scheduled_ ) ) ;
Status s = BackgroundCompaction ( & made_progress , & job_context , & log_buffer ,
prepicked_compaction , bg_thread_pri ) ;
TEST_SYNC_POINT ( " BackgroundCallCompaction:1 " ) ;
if ( s . IsBusy ( ) ) {
bg_cv_ . SignalAll ( ) ; // In case a waiter can proceed despite the error
mutex_ . Unlock ( ) ;
immutable_db_options_ . clock - > SleepForMicroseconds (
10000 ) ; // prevent hot loop
mutex_ . Lock ( ) ;
} else if ( ! s . ok ( ) & & ! s . IsShutdownInProgress ( ) & &
! s . IsManualCompactionPaused ( ) & & ! s . IsColumnFamilyDropped ( ) ) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_ - > BumpAndGetBackgroundErrorCount ( ) ;
bg_cv_ . SignalAll ( ) ; // In case a waiter can proceed despite the error
mutex_ . Unlock ( ) ;
log_buffer . FlushBufferToLog ( ) ;
ROCKS_LOG_ERROR ( immutable_db_options_ . info_log ,
" Waiting after background compaction error: %s, "
" Accumulated background error counts: % " PRIu64 ,
s . ToString ( ) . c_str ( ) , error_cnt ) ;
LogFlush ( immutable_db_options_ . info_log ) ;
immutable_db_options_ . clock - > SleepForMicroseconds ( 1000000 ) ;
mutex_ . Lock ( ) ;
} else if ( s . IsManualCompactionPaused ( ) ) {
assert ( prepicked_compaction ) ;
ManualCompactionState * m =
prepicked_compaction - > manual_compaction_state ;
assert ( m ) ;
ROCKS_LOG_BUFFER ( & log_buffer , " [%s] [JOB %d] Manual compaction paused " ,
m - > cfd - > GetName ( ) . c_str ( ) , job_context . job_id ) ;
}
ReleaseFileNumberFromPendingOutputs ( pending_outputs_inserted_elem ) ;
// If compaction failed, we want to delete all temporary files that we
// might have created (they might not be all recorded in job_context in
// case of a failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles ( & job_context , ! s . ok ( ) & & ! s . IsShutdownInProgress ( ) & &
! s . IsManualCompactionPaused ( ) & &
! s . IsColumnFamilyDropped ( ) & &
! s . IsBusy ( ) ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundCallCompaction:FoundObsoleteFiles " ) ;
// delete unnecessary files if any, this is done outside the mutex
if ( job_context . HaveSomethingToClean ( ) | |
job_context . HaveSomethingToDelete ( ) | | ! log_buffer . IsEmpty ( ) ) {
mutex_ . Unlock ( ) ;
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer . FlushBufferToLog ( ) ;
if ( job_context . HaveSomethingToDelete ( ) ) {
PurgeObsoleteFiles ( job_context ) ;
TEST_SYNC_POINT (
" DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles " ) ;
}
job_context . Clean ( ) ;
mutex_ . Lock ( ) ;
}
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile ( ) ;
num_running_compactions_ + + ;
std : : unique_ptr < std : : list < uint64_t > : : iterator >
pending_outputs_inserted_elem ( new std : : list < uint64_t > : : iterator (
CaptureCurrentFileNumberInPendingOutputs ( ) ) ) ;
assert ( ( bg_thread_pri = = Env : : Priority : : BOTTOM & &
bg_bottom_compaction_scheduled_ ) | |
( bg_thread_pri = = Env : : Priority : : LOW & & bg_compaction_scheduled_ ) ) ;
Status s = BackgroundCompaction ( & made_progress , & job_context , & log_buffer ,
prepicked_compaction , bg_thread_pri ) ;
TEST_SYNC_POINT ( " BackgroundCallCompaction:1 " ) ;
if ( s . IsBusy ( ) ) {
bg_cv_ . SignalAll ( ) ; // In case a waiter can proceed despite the error
mutex_ . Unlock ( ) ;
immutable_db_options_ . clock - > SleepForMicroseconds (
10000 ) ; // prevent hot loop
mutex_ . Lock ( ) ;
} else if ( ! s . ok ( ) & & ! s . IsShutdownInProgress ( ) & &
! s . IsManualCompactionPaused ( ) & & ! s . IsColumnFamilyDropped ( ) ) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
uint64_t error_cnt =
default_cf_internal_stats_ - > BumpAndGetBackgroundErrorCount ( ) ;
bg_cv_ . SignalAll ( ) ; // In case a waiter can proceed despite the error
mutex_ . Unlock ( ) ;
log_buffer . FlushBufferToLog ( ) ;
ROCKS_LOG_ERROR ( immutable_db_options_ . info_log ,
" Waiting after background compaction error: %s, "
" Accumulated background error counts: % " PRIu64 ,
s . ToString ( ) . c_str ( ) , error_cnt ) ;
LogFlush ( immutable_db_options_ . info_log ) ;
immutable_db_options_ . clock - > SleepForMicroseconds ( 1000000 ) ;
mutex_ . Lock ( ) ;
} else if ( s . IsManualCompactionPaused ( ) ) {
assert ( prepicked_compaction ) ;
ManualCompactionState * m = prepicked_compaction - > manual_compaction_state ;
assert ( m ) ;
ROCKS_LOG_BUFFER ( & log_buffer , " [%s] [JOB %d] Manual compaction paused " ,
m - > cfd - > GetName ( ) . c_str ( ) , job_context . job_id ) ;
}
ReleaseFileNumberFromPendingOutputs ( pending_outputs_inserted_elem ) ;
assert ( num_running_compactions_ > 0 ) ;
num_running_compactions_ - - ;
// If compaction failed, we want to delete all temporary files that we
// might have created (they might not be all recorded in job_context in
// case of a failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles ( & job_context , ! s . ok ( ) & & ! s . IsShutdownInProgress ( ) & &
! s . IsManualCompactionPaused ( ) & &
! s . IsColumnFamilyDropped ( ) & &
! s . IsBusy ( ) ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundCallCompaction:FoundObsoleteFiles " ) ;
// delete unnecessary files if any, this is done outside the mutex
if ( job_context . HaveSomethingToClean ( ) | |
job_context . HaveSomethingToDelete ( ) | | ! log_buffer . IsEmpty ( ) ) {
mutex_ . Unlock ( ) ;
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer . FlushBufferToLog ( ) ;
if ( job_context . HaveSomethingToDelete ( ) ) {
PurgeObsoleteFiles ( job_context ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles " ) ;
}
job_context . Clean ( ) ;
mutex_ . Lock ( ) ;
}
assert ( num_running_compactions_ > 0 ) ;
num_running_compactions_ - - ;
if ( bg_thread_pri = = Env : : Priority : : LOW ) {
bg_compaction_scheduled_ - - ;
} else {