@ -162,6 +162,14 @@ DEFINE_int32(max_background_compactions,
" The maximum number of concurrent background compactions "
" The maximum number of concurrent background compactions "
" that can occur in parallel. " ) ;
" that can occur in parallel. " ) ;
DEFINE_int32 ( compaction_thread_pool_adjust_interval , 0 ,
" The interval (in milliseconds) to adjust compaction thread pool "
" size. Don't change it periodically if the value is 0. " ) ;
DEFINE_int32 ( compaction_thread_pool_varations , 2 ,
" Range of bakground thread pool size variations when adjusted "
" periodically. " ) ;
DEFINE_int32 ( max_background_flushes , rocksdb : : Options ( ) . max_background_flushes ,
DEFINE_int32 ( max_background_flushes , rocksdb : : Options ( ) . max_background_flushes ,
" The maximum number of concurrent background flushes "
" The maximum number of concurrent background flushes "
" that can occur in parallel. " ) ;
" that can occur in parallel. " ) ;
@ -567,6 +575,8 @@ class SharedState {
num_done_ ( 0 ) ,
num_done_ ( 0 ) ,
start_ ( false ) ,
start_ ( false ) ,
start_verify_ ( false ) ,
start_verify_ ( false ) ,
should_stop_bg_thread_ ( false ) ,
bg_thread_finished_ ( false ) ,
stress_test_ ( stress_test ) ,
stress_test_ ( stress_test ) ,
verification_failure_ ( false ) {
verification_failure_ ( false ) {
if ( FLAGS_test_batches_snapshots ) {
if ( FLAGS_test_batches_snapshots ) {
@ -694,6 +704,14 @@ class SharedState {
uint32_t GetSeed ( ) const { return seed_ ; }
uint32_t GetSeed ( ) const { return seed_ ; }
void SetShouldStopBgThread ( ) { should_stop_bg_thread_ = true ; }
bool ShoudStopBgThread ( ) { return should_stop_bg_thread_ ; }
void SetBgThreadFinish ( ) { bg_thread_finished_ = true ; }
bool BgThreadFinished ( ) const { return bg_thread_finished_ ; }
private :
private :
port : : Mutex mu_ ;
port : : Mutex mu_ ;
port : : CondVar cv_ ;
port : : CondVar cv_ ;
@ -707,6 +725,8 @@ class SharedState {
long num_done_ ;
long num_done_ ;
bool start_ ;
bool start_ ;
bool start_verify_ ;
bool start_verify_ ;
bool should_stop_bg_thread_ ;
bool bg_thread_finished_ ;
StressTest * stress_test_ ;
StressTest * stress_test_ ;
std : : atomic < bool > verification_failure_ ;
std : : atomic < bool > verification_failure_ ;
@ -777,6 +797,11 @@ class StressTest {
threads [ i ] = new ThreadState ( i , & shared ) ;
threads [ i ] = new ThreadState ( i , & shared ) ;
FLAGS_env - > StartThread ( ThreadBody , threads [ i ] ) ;
FLAGS_env - > StartThread ( ThreadBody , threads [ i ] ) ;
}
}
ThreadState bg_thread ( 0 , & shared ) ;
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
FLAGS_env - > StartThread ( PoolSizeChangeThread , & bg_thread ) ;
}
// Each thread goes through the following states:
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
// wait for others to operate -> verify -> done
@ -829,6 +854,14 @@ class StressTest {
}
}
PrintStatistics ( ) ;
PrintStatistics ( ) ;
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
MutexLock l ( shared . GetMutex ( ) ) ;
shared . SetShouldStopBgThread ( ) ;
while ( ! shared . BgThreadFinished ( ) ) {
shared . GetCondVar ( ) - > Wait ( ) ;
}
}
if ( shared . HasVerificationFailedYet ( ) ) {
if ( shared . HasVerificationFailedYet ( ) ) {
printf ( " Verification failed :( \n " ) ;
printf ( " Verification failed :( \n " ) ;
return false ;
return false ;
@ -879,6 +912,38 @@ class StressTest {
}
}
static void PoolSizeChangeThread ( void * v ) {
assert ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) ;
ThreadState * thread = reinterpret_cast < ThreadState * > ( v ) ;
SharedState * shared = thread - > shared ;
while ( true ) {
{
MutexLock l ( shared - > GetMutex ( ) ) ;
if ( shared - > ShoudStopBgThread ( ) ) {
shared - > SetBgThreadFinish ( ) ;
shared - > GetCondVar ( ) - > SignalAll ( ) ;
return ;
}
}
auto thread_pool_size_base = FLAGS_max_background_compactions ;
auto thread_pool_size_var = FLAGS_compaction_thread_pool_varations ;
int new_thread_pool_size =
thread_pool_size_base - thread_pool_size_var +
thread - > rand . Next ( ) % ( thread_pool_size_var * 2 + 1 ) ;
if ( new_thread_pool_size < 1 ) {
new_thread_pool_size = 1 ;
}
FLAGS_env - > SetBackgroundThreads ( new_thread_pool_size ) ;
// Sleep up to 3 seconds
FLAGS_env - > SleepForMicroseconds (
thread - > rand . Next ( ) % FLAGS_compaction_thread_pool_adjust_interval *
1000 +
1 ) ;
}
}
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer MultiGet.
// Also refer MultiGet.