@ -56,12 +56,11 @@ void ThreadBody(void* v) {
}
}
}
}
bool RunStressTest ( StressTest * stress ) {
bool RunStressTest ( SharedState * shared ) {
SystemClock * clock = db_stress_env - > GetSystemClock ( ) . get ( ) ;
SystemClock * clock = db_stress_env - > GetSystemClock ( ) . get ( ) ;
StressTest * stress = shared - > GetStressTest ( ) ;
SharedState shared ( db_stress_env , stress ) ;
if ( shared - > ShouldVerifyAtBeginning ( ) & & FLAGS_preserve_unverified_changes ) {
if ( shared . ShouldVerifyAtBeginning ( ) & & FLAGS_preserve_unverified_changes ) {
Status s = InitUnverifiedSubdir ( FLAGS_db ) ;
Status s = InitUnverifiedSubdir ( FLAGS_db ) ;
if ( s . ok ( ) & & ! FLAGS_expected_values_dir . empty ( ) ) {
if ( s . ok ( ) & & ! FLAGS_expected_values_dir . empty ( ) ) {
s = InitUnverifiedSubdir ( FLAGS_expected_values_dir ) ;
s = InitUnverifiedSubdir ( FLAGS_expected_values_dir ) ;
@ -73,8 +72,8 @@ bool RunStressTest(StressTest* stress) {
}
}
}
}
stress - > InitDb ( & shared ) ;
stress - > InitDb ( shared ) ;
stress - > FinishInitDb ( & shared ) ;
stress - > FinishInitDb ( shared ) ;
if ( FLAGS_sync_fault_injection ) {
if ( FLAGS_sync_fault_injection ) {
fault_fs_guard - > SetFilesystemDirectWritable ( false ) ;
fault_fs_guard - > SetFilesystemDirectWritable ( false ) ;
@ -88,28 +87,28 @@ bool RunStressTest(StressTest* stress) {
fprintf ( stdout , " %s Initializing worker threads \n " ,
fprintf ( stdout , " %s Initializing worker threads \n " ,
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
shared . SetThreads ( n ) ;
shared - > SetThreads ( n ) ;
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
shared . IncBgThreads ( ) ;
shared - > IncBgThreads ( ) ;
}
}
if ( FLAGS_continuous_verification_interval > 0 ) {
if ( FLAGS_continuous_verification_interval > 0 ) {
shared . IncBgThreads ( ) ;
shared - > IncBgThreads ( ) ;
}
}
std : : vector < ThreadState * > threads ( n ) ;
std : : vector < ThreadState * > threads ( n ) ;
for ( uint32_t i = 0 ; i < n ; i + + ) {
for ( uint32_t i = 0 ; i < n ; i + + ) {
threads [ i ] = new ThreadState ( i , & shared ) ;
threads [ i ] = new ThreadState ( i , shared ) ;
db_stress_env - > StartThread ( ThreadBody , threads [ i ] ) ;
db_stress_env - > StartThread ( ThreadBody , threads [ i ] ) ;
}
}
ThreadState bg_thread ( 0 , & shared ) ;
ThreadState bg_thread ( 0 , shared ) ;
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 ) {
db_stress_env - > StartThread ( PoolSizeChangeThread , & bg_thread ) ;
db_stress_env - > StartThread ( PoolSizeChangeThread , & bg_thread ) ;
}
}
ThreadState continuous_verification_thread ( 0 , & shared ) ;
ThreadState continuous_verification_thread ( 0 , shared ) ;
if ( FLAGS_continuous_verification_interval > 0 ) {
if ( FLAGS_continuous_verification_interval > 0 ) {
db_stress_env - > StartThread ( DbVerificationThread ,
db_stress_env - > StartThread ( DbVerificationThread ,
& continuous_verification_thread ) ;
& continuous_verification_thread ) ;
@ -120,12 +119,12 @@ bool RunStressTest(StressTest* stress) {
// wait for others to operate -> verify -> done
// wait for others to operate -> verify -> done
{
{
MutexLock l ( shared . GetMutex ( ) ) ;
MutexLock l ( shared - > GetMutex ( ) ) ;
while ( ! shared . AllInitialized ( ) ) {
while ( ! shared - > AllInitialized ( ) ) {
shared . GetCondVar ( ) - > Wait ( ) ;
shared - > GetCondVar ( ) - > Wait ( ) ;
}
}
if ( shared . ShouldVerifyAtBeginning ( ) ) {
if ( shared - > ShouldVerifyAtBeginning ( ) ) {
if ( shared . HasVerificationFailedYet ( ) ) {
if ( shared - > HasVerificationFailedYet ( ) ) {
fprintf ( stderr , " Crash-recovery verification failed :( \n " ) ;
fprintf ( stderr , " Crash-recovery verification failed :( \n " ) ;
} else {
} else {
fprintf ( stdout , " Crash-recovery verification passed :) \n " ) ;
fprintf ( stdout , " Crash-recovery verification passed :) \n " ) ;
@ -144,17 +143,17 @@ bool RunStressTest(StressTest* stress) {
// This is after the verification step to avoid making all those `Get()`s
// This is after the verification step to avoid making all those `Get()`s
// and `MultiGet()`s contend on the DB-wide trace mutex.
// and `MultiGet()`s contend on the DB-wide trace mutex.
if ( ! FLAGS_expected_values_dir . empty ( ) ) {
if ( ! FLAGS_expected_values_dir . empty ( ) ) {
stress - > TrackExpectedState ( & shared ) ;
stress - > TrackExpectedState ( shared ) ;
}
}
now = clock - > NowMicros ( ) ;
now = clock - > NowMicros ( ) ;
fprintf ( stdout , " %s Starting database operations \n " ,
fprintf ( stdout , " %s Starting database operations \n " ,
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
shared . SetStart ( ) ;
shared - > SetStart ( ) ;
shared . GetCondVar ( ) - > SignalAll ( ) ;
shared - > GetCondVar ( ) - > SignalAll ( ) ;
while ( ! shared . AllOperated ( ) ) {
while ( ! shared - > AllOperated ( ) ) {
shared . GetCondVar ( ) - > Wait ( ) ;
shared - > GetCondVar ( ) - > Wait ( ) ;
}
}
now = clock - > NowMicros ( ) ;
now = clock - > NowMicros ( ) ;
@ -169,10 +168,10 @@ bool RunStressTest(StressTest* stress) {
clock - > TimeToString ( ( uint64_t ) now / 1000000 ) . c_str ( ) ) ;
clock - > TimeToString ( ( uint64_t ) now / 1000000 ) . c_str ( ) ) ;
}
}
shared . SetStartVerify ( ) ;
shared - > SetStartVerify ( ) ;
shared . GetCondVar ( ) - > SignalAll ( ) ;
shared - > GetCondVar ( ) - > SignalAll ( ) ;
while ( ! shared . AllDone ( ) ) {
while ( ! shared - > AllDone ( ) ) {
shared . GetCondVar ( ) - > Wait ( ) ;
shared - > GetCondVar ( ) - > Wait ( ) ;
}
}
}
}
@ -187,7 +186,7 @@ bool RunStressTest(StressTest* stress) {
}
}
now = clock - > NowMicros ( ) ;
now = clock - > NowMicros ( ) ;
if ( ! FLAGS_skip_verifydb & & ! FLAGS_test_batches_snapshots & &
if ( ! FLAGS_skip_verifydb & & ! FLAGS_test_batches_snapshots & &
! shared . HasVerificationFailedYet ( ) ) {
! shared - > HasVerificationFailedYet ( ) ) {
fprintf ( stdout , " %s Verification successful \n " ,
fprintf ( stdout , " %s Verification successful \n " ,
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
clock - > TimeToString ( now / 1000000 ) . c_str ( ) ) ;
}
}
@ -195,14 +194,14 @@ bool RunStressTest(StressTest* stress) {
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 | |
if ( FLAGS_compaction_thread_pool_adjust_interval > 0 | |
FLAGS_continuous_verification_interval > 0 ) {
FLAGS_continuous_verification_interval > 0 ) {
MutexLock l ( shared . GetMutex ( ) ) ;
MutexLock l ( shared - > GetMutex ( ) ) ;
shared . SetShouldStopBgThread ( ) ;
shared - > SetShouldStopBgThread ( ) ;
while ( ! shared . BgThreadsFinished ( ) ) {
while ( ! shared - > BgThreadsFinished ( ) ) {
shared . GetCondVar ( ) - > Wait ( ) ;
shared - > GetCondVar ( ) - > Wait ( ) ;
}
}
}
}
if ( shared . HasVerificationFailedYet ( ) ) {
if ( shared - > HasVerificationFailedYet ( ) ) {
fprintf ( stderr , " Verification failed :( \n " ) ;
fprintf ( stderr , " Verification failed :( \n " ) ;
return false ;
return false ;
}
}