@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl(
Status DBImpl : : PauseBackgroundWork ( ) {
InstrumentedMutexLock guard_lock ( & mutex_ ) ;
bg_compaction_paused_ + + ;
while ( bg_compaction_scheduled_ > 0 | | bg_flush_scheduled_ > 0 ) {
while ( bg_bottom_compaction_scheduled_ > 0 | | bg_compaction_scheduled_ > 0 | |
bg_flush_scheduled_ > 0 ) {
bg_cv_ . Wait ( ) ;
}
bg_work_paused_ + + ;
@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
bool scheduled = false ;
bool manual_conflict = false ;
ManualCompaction manual ;
ManualCompactionState manual ;
manual . cfd = cfd ;
manual . input_level = input_level ;
manual . output_level = output_level ;
@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
AddManualCompaction ( & manual ) ;
TEST_SYNC_POINT_CALLBACK ( " DBImpl::RunManualCompaction:NotScheduled " , & mutex_ ) ;
if ( exclusive ) {
while ( bg_compaction_scheduled_ > 0 ) {
while ( bg_bottom_compaction_scheduled_ > 0 | |
bg_compaction_scheduled_ > 0 ) {
TEST_SYNC_POINT ( " DBImpl::RunManualCompaction:WaitScheduled " ) ;
ROCKS_LOG_INFO (
immutable_db_options_ . info_log ,
@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
while ( ! manual . done ) {
assert ( HasPendingManualCompaction ( ) ) ;
manual_conflict = false ;
Compaction * compaction ;
if ( ShouldntRunManualCompaction ( & manual ) | | ( manual . in_progress = = true ) | |
scheduled | |
( ( manual . manual_end = & manual . tmp_storage1 ) & & (
( manual . compaction = manual . cfd - > CompactRange (
* manual . cfd - > GetLatestMutableCFOptions ( ) , manual . input_level ,
manual . output_level , manual . output_path_id , manual . begin ,
manual . end , & manual . manual_end , & manual_conflict ) ) = =
nullptr ) & &
( ( manual . manual_end = & manual . tmp_storage1 ) & &
( ( compaction = manual . cfd - > CompactRange (
* manual . cfd - > GetLatestMutableCFOptions ( ) , manual . input_level ,
manual . output_level , manual . output_path_id , manual . begin ,
manual . end , & manual . manual_end , & manual_conflict ) ) = = nullptr ) & &
manual_conflict ) ) {
// exclusive manual compactions should not see a conflict during
// CompactRange
@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual . incomplete = false ;
}
} else if ( ! scheduled ) {
if ( manual . compaction = = nullptr ) {
if ( compaction = = nullptr ) {
manual . done = true ;
bg_cv_ . SignalAll ( ) ;
continue ;
}
ca = new CompactionArg ;
ca - > db = this ;
ca - > m = & manual ;
ca - > prepicked_compaction = new PrepickedCompaction ;
ca - > prepicked_compaction - > manual_compaction_state = & manual ;
ca - > prepicked_compaction - > compaction = compaction ;
manual . incomplete = false ;
bg_compaction_scheduled_ + + ;
env_ - > Schedule ( & DBImpl : : BGWorkCompaction , ca , Env : : Priority : : LOW , this ,
@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
unscheduled_compactions_ > 0 ) {
CompactionArg * ca = new CompactionArg ;
ca - > db = this ;
ca - > m = nullptr ;
ca - > prepicked_co mpaction = nullptr ;
bg_compaction_scheduled_ + + ;
unscheduled_compactions_ - - ;
env_ - > Schedule ( & DBImpl : : BGWorkCompaction , ca , Env : : Priority : : LOW , this ,
@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
delete reinterpret_cast < CompactionArg * > ( arg ) ;
IOSTATS_SET_THREAD_POOL_ID ( Env : : Priority : : LOW ) ;
TEST_SYNC_POINT ( " DBImpl::BGWorkCompaction " ) ;
reinterpret_cast < DBImpl * > ( ca . db ) - > BackgroundCallCompaction ( ca . m ) ;
auto prepicked_compaction =
static_cast < PrepickedCompaction * > ( ca . prepicked_compaction ) ;
reinterpret_cast < DBImpl * > ( ca . db ) - > BackgroundCallCompaction (
prepicked_compaction , Env : : Priority : : LOW ) ;
delete prepicked_compaction ;
}
void DBImpl : : BGWorkBottomCompaction ( void * arg ) {
CompactionArg ca = * ( static_cast < CompactionArg * > ( arg ) ) ;
delete static_cast < CompactionArg * > ( arg ) ;
IOSTATS_SET_THREAD_POOL_ID ( Env : : Priority : : BOTTOM ) ;
TEST_SYNC_POINT ( " DBImpl::BGWorkBottomCompaction " ) ;
auto * prepicked_compaction = ca . prepicked_compaction ;
assert ( prepicked_compaction & & prepicked_compaction - > compaction & &
! prepicked_compaction - > manual_compaction_state ) ;
ca . db - > BackgroundCallCompaction ( prepicked_compaction , Env : : Priority : : BOTTOM ) ;
delete prepicked_compaction ;
}
void DBImpl : : BGWorkPurge ( void * db ) {
@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) {
void DBImpl : : UnscheduleCallback ( void * arg ) {
CompactionArg ca = * ( reinterpret_cast < CompactionArg * > ( arg ) ) ;
delete reinterpret_cast < CompactionArg * > ( arg ) ;
if ( ( ca . m ! = nullptr ) & & ( ca . m - > compaction ! = nullptr ) ) {
delete ca . m - > compaction ;
if ( ca . prepicked_compaction ! = nullptr ) {
if ( ca . prepicked_compaction - > compaction ! = nullptr ) {
delete ca . prepicked_compaction - > compaction ;
}
delete ca . prepicked_compaction ;
}
TEST_SYNC_POINT ( " DBImpl::UnscheduleCallback " ) ;
}
@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() {
}
}
void DBImpl : : BackgroundCallCompaction ( void * arg ) {
void DBImpl : : BackgroundCallCompaction ( PrepickedCompaction * prepicked_compaction ,
Env : : Priority bg_thread_pri ) {
bool made_progress = false ;
ManualCompaction * m = reinterpret_cast < ManualCompaction * > ( arg ) ;
JobContext job_context ( next_job_id_ . fetch_add ( 1 ) , true ) ;
TEST_SYNC_POINT ( " BackgroundCallCompaction:0 " ) ;
MaybeDumpStats ( ) ;
@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs ( ) ;
assert ( bg_compaction_scheduled_ ) ;
Status s =
BackgroundCompaction ( & made_progress , & job_context , & log_buffer , m ) ;
assert ( ( bg_thread_pri = = Env : : Priority : : BOTTOM & &
bg_bottom_compaction_scheduled_ ) | |
( bg_thread_pri = = Env : : Priority : : LOW & & bg_compaction_scheduled_ ) ) ;
Status s = BackgroundCompaction ( & made_progress , & job_context , & log_buffer ,
prepicked_compaction ) ;
TEST_SYNC_POINT ( " BackgroundCallCompaction:1 " ) ;
if ( ! s . ok ( ) & & ! s . IsShutdownInProgress ( ) ) {
// Wait a little bit before retrying background compaction in
@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
assert ( num_running_compactions_ > 0 ) ;
num_running_compactions_ - - ;
bg_compaction_scheduled_ - - ;
if ( bg_thread_pri = = Env : : Priority : : LOW ) {
bg_compaction_scheduled_ - - ;
} else {
assert ( bg_thread_pri = = Env : : Priority : : BOTTOM ) ;
bg_bottom_compaction_scheduled_ - - ;
}
versions_ - > GetColumnFamilySet ( ) - > FreeDeadColumnFamilies ( ) ;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction ( ) ;
if ( made_progress | | bg_compaction_scheduled_ = = 0 | |
if ( made_progress | |
( bg_compaction_scheduled_ = = 0 & &
bg_bottom_compaction_scheduled_ = = 0 ) | |
HasPendingManualCompaction ( ) ) {
// signal if
// * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_{bottom,}_ compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
Status DBImpl : : BackgroundCompaction ( bool * made_progress ,
JobContext * job_context ,
LogBuffer * log_buffer , void * arg ) {
ManualCompaction * manual_compaction =
reinterpret_cast < ManualCompaction * > ( arg ) ;
LogBuffer * log_buffer ,
PrepickedCompaction * prepicked_compaction ) {
ManualCompactionState * manual_compaction =
prepicked_compaction = = nullptr
? nullptr
: prepicked_compaction - > manual_compaction_state ;
* made_progress = false ;
mutex_ . AssertHeld ( ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundCompaction:Start " ) ;
bool is_manual = ( manual_compaction ! = nullptr ) ;
unique_ptr < Compaction > c ;
if ( prepicked_compaction ! = nullptr & &
prepicked_compaction - > compaction ! = nullptr ) {
c . reset ( prepicked_compaction - > compaction ) ;
}
bool is_prepicked = is_manual | | c ;
// (manual_compaction->in_progress == false);
bool trivial_move_disallowed =
@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction - > status = status ;
manual_compaction - > done = true ;
manual_compaction - > in_progress = false ;
delete manual_compaction - > compaction ;
manual_compaction = nullptr ;
}
return status ;
@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction - > in_progress = true ;
}
unique_ptr < Compaction > c ;
// InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage;
if ( is_manual ) {
ManualCompaction * m = manual_compaction ;
ManualCompactionState * m = manual_compaction ;
assert ( m - > in_progress ) ;
c . reset ( std : : move ( m - > compaction ) ) ;
if ( ! c ) {
m - > done = true ;
m - > manual_end = nullptr ;
@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
? " (end) "
: m - > manual_end - > DebugString ( ) . c_str ( ) ) ) ;
}
} else if ( ! compaction_queue_ . empty ( ) ) {
} else if ( ! is_prepicked & & ! compaction_queue_ . empty ( ) ) {
if ( HaveManualCompaction ( compaction_queue_ . front ( ) ) ) {
// Can't compact right now, but try again later
TEST_SYNC_POINT ( " DBImpl::BackgroundCompaction()::Conflict " ) ;
@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Clear Instrument
ThreadStatusUtil : : ResetThreadStatus ( ) ;
} else if ( c - > column_family_data ( ) - > ioptions ( ) - > compaction_style = =
kCompactionStyleUniversal & &
! is_prepicked & & c - > output_level ( ) > 0 & &
c - > output_level ( ) = =
c - > column_family_data ( )
- > current ( )
- > storage_info ( )
- > MaxOutputLevel (
immutable_db_options_ . allow_ingest_behind ) & &
env_ - > GetBackgroundThreads ( Env : : Priority : : BOTTOM ) > 0 ) {
// Forward universal compactions involving last level to the bottom pool
// if it exists, such that long-running compactions can't block short-
// lived ones, like L0->L0s.
TEST_SYNC_POINT ( " DBImpl::BackgroundCompaction:ForwardToBottomPriPool " ) ;
CompactionArg * ca = new CompactionArg ;
ca - > db = this ;
ca - > prepicked_compaction = new PrepickedCompaction ;
ca - > prepicked_compaction - > compaction = c . release ( ) ;
ca - > prepicked_compaction - > manual_compaction_state = nullptr ;
+ + bg_bottom_compaction_scheduled_ ;
env_ - > Schedule ( & DBImpl : : BGWorkBottomCompaction , ca , Env : : Priority : : BOTTOM ,
this , & DBImpl : : UnscheduleCallback ) ;
} else {
int output_level __attribute__ ( ( unused ) ) = c - > output_level ( ) ;
TEST_SYNC_POINT_CALLBACK ( " DBImpl::BackgroundCompaction:NonTrivial " ,
@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
if ( is_manual ) {
ManualCompaction * m = manual_compaction ;
ManualCompactionState * m = manual_compaction ;
if ( ! status . ok ( ) ) {
m - > status = status ;
m - > done = true ;
@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() {
return ( ! manual_compaction_dequeue_ . empty ( ) ) ;
}
void DBImpl : : AddManualCompaction ( DBImpl : : ManualCompaction * m ) {
void DBImpl : : AddManualCompaction ( DBImpl : : ManualCompactionState * m ) {
manual_compaction_dequeue_ . push_back ( m ) ;
}
void DBImpl : : RemoveManualCompaction ( DBImpl : : ManualCompaction * m ) {
void DBImpl : : RemoveManualCompaction ( DBImpl : : ManualCompactionState * m ) {
// Remove from queue
std : : deque < ManualCompaction * > : : iterator it =
std : : deque < ManualCompactionState * > : : iterator it =
manual_compaction_dequeue_ . begin ( ) ;
while ( it ! = manual_compaction_dequeue_ . end ( ) ) {
if ( m = = ( * it ) ) {
@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
return ;
}
bool DBImpl : : ShouldntRunManualCompaction ( ManualCompaction * m ) {
bool DBImpl : : ShouldntRunManualCompaction ( ManualCompactionState * m ) {
if ( num_running_ingest_file_ > 0 ) {
// We need to wait for other IngestExternalFile() calls to finish
// before running a manual compaction.
return true ;
}
if ( m - > exclusive ) {
return ( bg_compaction_scheduled_ > 0 ) ;
return ( bg_bottom_compaction_scheduled_ > 0 | |
bg_compaction_scheduled_ > 0 ) ;
}
std : : deque < ManualCompaction * > : : iterator it =
std : : deque < ManualCompactionState * > : : iterator it =
manual_compaction_dequeue_ . begin ( ) ;
bool seen = false ;
while ( it ! = manual_compaction_dequeue_ . end ( ) ) {
@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
bool DBImpl : : HaveManualCompaction ( ColumnFamilyData * cfd ) {
// Remove from priority queue
std : : deque < ManualCompaction * > : : iterator it =
std : : deque < ManualCompactionState * > : : iterator it =
manual_compaction_dequeue_ . begin ( ) ;
while ( it ! = manual_compaction_dequeue_ . end ( ) ) {
if ( ( * it ) - > exclusive ) {
@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
bool DBImpl : : HasExclusiveManualCompaction ( ) {
// Remove from priority queue
std : : deque < ManualCompaction * > : : iterator it =
std : : deque < ManualCompactionState * > : : iterator it =
manual_compaction_dequeue_ . begin ( ) ;
while ( it ! = manual_compaction_dequeue_ . end ( ) ) {
if ( ( * it ) - > exclusive ) {
@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
return false ;
}
bool DBImpl : : MCOverlap ( ManualCompaction * m , ManualCompaction * m1 ) {
bool DBImpl : : MCOverlap ( ManualCompactionState * m , ManualCompactionState * m1 ) {
if ( ( m - > exclusive ) | | ( m1 - > exclusive ) ) {
return true ;
}