@ -8023,46 +8023,64 @@ static void MTThreadBody(void* arg) {
} // namespace
} // namespace
TEST_F ( DBTest , MultiThreaded ) {
class MultiThreadedDBTest : public DBTest ,
anon : : OptionsOverride options_override ;
public : : testing : : WithParamInterface < int > {
options_override . skip_policy = kSkipNoSnapshot ;
public :
do {
virtual void SetUp ( ) override { option_config_ = GetParam ( ) ; }
std : : vector < std : : string > cfs ;
for ( int i = 1 ; i < kColumnFamilies ; + + i ) {
static std : : vector < int > GenerateOptionConfigs ( ) {
cfs . push_back ( ToString ( i ) ) ;
std : : vector < int > optionConfigs ;
}
for ( int optionConfig = kDefault ; optionConfig < kEnd ; + + optionConfig ) {
CreateAndReopenWithCF ( cfs , CurrentOptions ( options_override ) ) ;
// skip as HashCuckooRep does not support snapshot
// Initialize state
if ( optionConfig ! = kHashCuckoo ) {
MTState mt ;
optionConfigs . push_back ( optionConfig ) ;
mt . test = this ;
}
mt . stop . store ( false , std : : memory_order_release ) ;
for ( int id = 0 ; id < kNumThreads ; id + + ) {
mt . counter [ id ] . store ( 0 , std : : memory_order_release ) ;
mt . thread_done [ id ] . store ( false , std : : memory_order_release ) ;
}
// Start threads
MTThread thread [ kNumThreads ] ;
for ( int id = 0 ; id < kNumThreads ; id + + ) {
thread [ id ] . state = & mt ;
thread [ id ] . id = id ;
env_ - > StartThread ( MTThreadBody , & thread [ id ] ) ;
}
}
return optionConfigs ;
}
} ;
// Let them run for a while
TEST_P ( MultiThreadedDBTest , MultiThreaded ) {
env_ - > SleepForMicroseconds ( kTestSeconds * 1000000 ) ;
anon : : OptionsOverride options_override ;
options_override . skip_policy = kSkipNoSnapshot ;
// Stop the threads and wait for them to finish
std : : vector < std : : string > cfs ;
mt . stop . store ( true , std : : memory_order_release ) ;
for ( int i = 1 ; i < kColumnFamilies ; + + i ) {
for ( int id = 0 ; id < kNumThreads ; id + + ) {
cfs . push_back ( ToString ( i ) ) ;
while ( mt . thread_done [ id ] . load ( std : : memory_order_acquire ) = = false ) {
}
env_ - > SleepForMicroseconds ( 100000 ) ;
CreateAndReopenWithCF ( cfs , CurrentOptions ( options_override ) ) ;
}
// Initialize state
MTState mt ;
mt . test = this ;
mt . stop . store ( false , std : : memory_order_release ) ;
for ( int id = 0 ; id < kNumThreads ; id + + ) {
mt . counter [ id ] . store ( 0 , std : : memory_order_release ) ;
mt . thread_done [ id ] . store ( false , std : : memory_order_release ) ;
}
// Start threads
MTThread thread [ kNumThreads ] ;
for ( int id = 0 ; id < kNumThreads ; id + + ) {
thread [ id ] . state = & mt ;
thread [ id ] . id = id ;
env_ - > StartThread ( MTThreadBody , & thread [ id ] ) ;
}
// Let them run for a while
env_ - > SleepForMicroseconds ( kTestSeconds * 1000000 ) ;
// Stop the threads and wait for them to finish
mt . stop . store ( true , std : : memory_order_release ) ;
for ( int id = 0 ; id < kNumThreads ; id + + ) {
while ( mt . thread_done [ id ] . load ( std : : memory_order_acquire ) = = false ) {
env_ - > SleepForMicroseconds ( 100000 ) ;
}
}
// skip as HashCuckooRep does not support snapshot
}
} while ( ChangeOptions ( kSkipHashCuckoo ) ) ;
}
}
INSTANTIATE_TEST_CASE_P (
MultiThreaded , MultiThreadedDBTest ,
: : testing : : ValuesIn ( MultiThreadedDBTest : : GenerateOptionConfigs ( ) ) ) ;
// Group commit test:
// Group commit test:
namespace {
namespace {