@ -30,80 +30,6 @@ static void SetBool(void* ptr) {
reinterpret_cast < port : : AtomicPointer * > ( ptr ) - > NoBarrier_Store ( ptr ) ;
reinterpret_cast < port : : AtomicPointer * > ( ptr ) - > NoBarrier_Store ( ptr ) ;
}
}
TEST ( EnvPosixTest , TwoPools ) {
class CB {
public :
CB ( const std : : string & pool_name , int pool_size )
: mu_ ( ) ,
num_running_ ( 0 ) ,
num_finished_ ( 0 ) ,
pool_size_ ( pool_size ) ,
pool_name_ ( pool_name ) { }
static void Run ( void * v ) {
CB * cb = reinterpret_cast < CB * > ( v ) ;
cb - > Run ( ) ;
}
void Run ( ) {
{
MutexLock l ( & mu_ ) ;
num_running_ + + ;
std : : cout < < " Pool " < < pool_name_ < < " : "
< < num_running_ < < " running threads. \n " ;
// make sure we don't have more than pool_size_ jobs running.
ASSERT_LE ( num_running_ , pool_size_ ) ;
}
// sleep for 1 sec
Env : : Default ( ) - > SleepForMicroseconds ( 1000000 ) ;
{
MutexLock l ( & mu_ ) ;
num_running_ - - ;
num_finished_ + + ;
}
}
int NumFinished ( ) {
MutexLock l ( & mu_ ) ;
return num_finished_ ;
}
private :
port : : Mutex mu_ ;
int num_running_ ;
int num_finished_ ;
int pool_size_ ;
std : : string pool_name_ ;
} ;
const int kLowPoolSize = 2 ;
const int kHighPoolSize = 4 ;
const int kJobs = 8 ;
CB low_pool_job ( " low " , kLowPoolSize ) ;
CB high_pool_job ( " high " , kHighPoolSize ) ;
env_ - > SetBackgroundThreads ( kLowPoolSize ) ;
env_ - > SetBackgroundThreads ( kHighPoolSize , Env : : Priority : : HIGH ) ;
// schedule same number of jobs in each pool
for ( int i = 0 ; i < kJobs ; i + + ) {
env_ - > Schedule ( & CB : : Run , & low_pool_job ) ;
env_ - > Schedule ( & CB : : Run , & high_pool_job , Env : : Priority : : HIGH ) ;
}
// wait for all jobs to finish
while ( low_pool_job . NumFinished ( ) < kJobs | |
high_pool_job . NumFinished ( ) < kJobs ) {
env_ - > SleepForMicroseconds ( kDelayMicros ) ;
}
}
TEST ( EnvPosixTest , RunImmediately ) {
TEST ( EnvPosixTest , RunImmediately ) {
port : : AtomicPointer called ( nullptr ) ;
port : : AtomicPointer called ( nullptr ) ;
env_ - > Schedule ( & SetBool , & called ) ;
env_ - > Schedule ( & SetBool , & called ) ;
@ -176,6 +102,78 @@ TEST(EnvPosixTest, StartThread) {
ASSERT_EQ ( state . val , 3 ) ;
ASSERT_EQ ( state . val , 3 ) ;
}
}
TEST ( EnvPosixTest , TwoPools ) {
class CB {
public :
CB ( const std : : string & pool_name , int pool_size )
: mu_ ( ) ,
num_running_ ( 0 ) ,
num_finished_ ( 0 ) ,
pool_size_ ( pool_size ) ,
pool_name_ ( pool_name ) { }
static void Run ( void * v ) {
CB * cb = reinterpret_cast < CB * > ( v ) ;
cb - > Run ( ) ;
}
void Run ( ) {
{
MutexLock l ( & mu_ ) ;
num_running_ + + ;
std : : cout < < " Pool " < < pool_name_ < < " : "
< < num_running_ < < " running threads. \n " ;
// make sure we don't have more than pool_size_ jobs running.
ASSERT_LE ( num_running_ , pool_size_ ) ;
}
// sleep for 1 sec
Env : : Default ( ) - > SleepForMicroseconds ( 1000000 ) ;
{
MutexLock l ( & mu_ ) ;
num_running_ - - ;
num_finished_ + + ;
}
}
int NumFinished ( ) {
MutexLock l ( & mu_ ) ;
return num_finished_ ;
}
private :
port : : Mutex mu_ ;
int num_running_ ;
int num_finished_ ;
int pool_size_ ;
std : : string pool_name_ ;
} ;
const int kLowPoolSize = 2 ;
const int kHighPoolSize = 4 ;
const int kJobs = 8 ;
CB low_pool_job ( " low " , kLowPoolSize ) ;
CB high_pool_job ( " high " , kHighPoolSize ) ;
env_ - > SetBackgroundThreads ( kLowPoolSize ) ;
env_ - > SetBackgroundThreads ( kHighPoolSize , Env : : Priority : : HIGH ) ;
// schedule same number of jobs in each pool
for ( int i = 0 ; i < kJobs ; i + + ) {
env_ - > Schedule ( & CB : : Run , & low_pool_job ) ;
env_ - > Schedule ( & CB : : Run , & high_pool_job , Env : : Priority : : HIGH ) ;
}
// wait for all jobs to finish
while ( low_pool_job . NumFinished ( ) < kJobs | |
high_pool_job . NumFinished ( ) < kJobs ) {
env_ - > SleepForMicroseconds ( kDelayMicros ) ;
}
}
bool IsSingleVarint ( const std : : string & s ) {
bool IsSingleVarint ( const std : : string & s ) {
Slice slice ( s ) ;
Slice slice ( s ) ;