@ -880,10 +880,12 @@ class PosixEnv : public Env {
// Allow increasing the number of worker threads.
// Allow increasing the number of worker threads.
virtual void SetBackgroundThreads ( int num ) {
virtual void SetBackgroundThreads ( int num ) {
PthreadCall ( " lock " , pthread_mutex_lock ( & mu_ ) ) ;
if ( num > num_threads_ ) {
if ( num > num_threads_ ) {
num_threads_ = num ;
num_threads_ = num ;
bgthread_ . resize ( num_threads_ ) ;
bgthread_ . resize ( num_threads_ ) ;
}
}
PthreadCall ( " unlock " , pthread_mutex_unlock ( & mu_ ) ) ;
}
}
virtual std : : string TimeToString ( uint64_t secondsSince1970 ) {
virtual std : : string TimeToString ( uint64_t secondsSince1970 ) {
@ -961,7 +963,6 @@ class PosixEnv : public Env {
// Entry per Schedule() call
// Entry per Schedule() call
struct BGItem { void * arg ; void ( * function ) ( void * ) ; } ;
struct BGItem { void * arg ; void ( * function ) ( void * ) ; } ;
typedef std : : deque < BGItem > BGQueue ;
typedef std : : deque < BGItem > BGQueue ;
int queue_size_ ; // number of items in BGQueue
BGQueue queue_ ;
BGQueue queue_ ;
} ;
} ;
@ -969,8 +970,7 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
forceMmapOff ( false ) ,
forceMmapOff ( false ) ,
page_size_ ( getpagesize ( ) ) ,
page_size_ ( getpagesize ( ) ) ,
started_bgthread_ ( 0 ) ,
started_bgthread_ ( 0 ) ,
num_threads_ ( 1 ) ,
num_threads_ ( 1 ) {
queue_size_ ( 0 ) {
PthreadCall ( " mutex_init " , pthread_mutex_init ( & mu_ , nullptr ) ) ;
PthreadCall ( " mutex_init " , pthread_mutex_init ( & mu_ , nullptr ) ) ;
PthreadCall ( " cvar_init " , pthread_cond_init ( & bgsignal_ , nullptr ) ) ;
PthreadCall ( " cvar_init " , pthread_cond_init ( & bgsignal_ , nullptr ) ) ;
bgthread_ . resize ( num_threads_ ) ;
bgthread_ . resize ( num_threads_ ) ;
@ -990,14 +990,13 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
fprintf ( stdout , " Created bg thread 0x%lx \n " , bgthread_ [ started_bgthread_ ] ) ;
fprintf ( stdout , " Created bg thread 0x%lx \n " , bgthread_ [ started_bgthread_ ] ) ;
}
}
// always wake up at least one waiting thread.
PthreadCall ( " signal " , pthread_cond_signal ( & bgsignal_ ) ) ;
// Add to priority queue
// Add to priority queue
queue_ . push_back ( BGItem ( ) ) ;
queue_ . push_back ( BGItem ( ) ) ;
queue_ . back ( ) . function = function ;
queue_ . back ( ) . function = function ;
queue_ . back ( ) . arg = arg ;
queue_ . back ( ) . arg = arg ;
queue_size_ + + ;
// always wake up at least one waiting thread.
PthreadCall ( " signal " , pthread_cond_signal ( & bgsignal_ ) ) ;
PthreadCall ( " unlock " , pthread_mutex_unlock ( & mu_ ) ) ;
PthreadCall ( " unlock " , pthread_mutex_unlock ( & mu_ ) ) ;
}
}
@ -1013,7 +1012,6 @@ void PosixEnv::BGThread() {
void ( * function ) ( void * ) = queue_ . front ( ) . function ;
void ( * function ) ( void * ) = queue_ . front ( ) . function ;
void * arg = queue_ . front ( ) . arg ;
void * arg = queue_ . front ( ) . arg ;
queue_ . pop_front ( ) ;
queue_ . pop_front ( ) ;
queue_size_ - - ;
PthreadCall ( " unlock " , pthread_mutex_unlock ( & mu_ ) ) ;
PthreadCall ( " unlock " , pthread_mutex_unlock ( & mu_ ) ) ;
( * function ) ( arg ) ;
( * function ) ( arg ) ;