@ -113,159 +113,162 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
for ( auto & allow_parallel : { true , false } ) {
for ( auto & allow_batching : { true , false } ) {
for ( auto & write_group : write_scenarios ) {
Options options ;
options . create_if_missing = true ;
options . allow_concurrent_memtable_write = allow_parallel ;
WriteOptions write_options ;
ReadOptions read_options ;
DB * db ;
DBImpl * db_impl ;
ASSERT_OK ( DB : : Open ( options , dbname , & db ) ) ;
db_impl = dynamic_cast < DBImpl * > ( db ) ;
ASSERT_TRUE ( db_impl ) ;
for ( auto & enable_WAL : { true , false } ) {
for ( auto & write_group : write_scenarios ) {
Options options ;
options . create_if_missing = true ;
options . allow_concurrent_memtable_write = allow_parallel ;
ReadOptions read_options ;
DB * db ;
DBImpl * db_impl ;
ASSERT_OK ( DB : : Open ( options , dbname , & db ) ) ;
db_impl = dynamic_cast < DBImpl * > ( db ) ;
ASSERT_TRUE ( db_impl ) ;
std : : atomic < uint64_t > threads_waiting ( 0 ) ;
std : : atomic < uint64_t > seq ( db_impl - > GetLatestSequenceNumber ( ) ) ;
ASSERT_EQ ( db_impl - > GetLatestSequenceNumber ( ) , 0 ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Wait " , [ & ] ( void * arg ) {
uint64_t cur_threads_waiting = 0 ;
bool is_leader = false ;
bool is_last = false ;
// who am i
do {
cur_threads_waiting = threads_waiting . load ( ) ;
is_leader = ( cur_threads_waiting = = 0 ) ;
is_last = ( cur_threads_waiting = = write_group . size ( ) - 1 ) ;
} while ( ! threads_waiting . compare_exchange_strong (
cur_threads_waiting , cur_threads_waiting + 1 ) ) ;
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( is_leader ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else {
ASSERT_TRUE ( writer - > state = = WriteThread : : State : : STATE_INIT ) ;
}
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if ( is_leader ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . front ( ) . callback_ . should_fail_ ) ;
} else if ( is_last ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . back ( ) . callback_ . should_fail_ ) ;
}
// wait for friends
while ( threads_waiting . load ( ) < write_group . size ( ) ) {
}
} ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:DoneWaiting " , [ & ] ( void * arg ) {
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( ! allow_batching ) {
// no batching so everyone should be a leader
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else if ( ! allow_parallel ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_COMPLETED ) ;
}
} ) ;
std : : atomic < uint32_t > thread_num ( 0 ) ;
std : : atomic < char > dummy_key ( 0 ) ;
std : : function < void ( ) > write_with_callback_func = [ & ] ( ) {
uint32_t i = thread_num . fetch_add ( 1 ) ;
Random rnd ( i ) ;
// leaders gotta lead
while ( i > 0 & & threads_waiting . load ( ) < 1 ) {
}
std : : atomic < uint64_t > threads_waiting ( 0 ) ;
std : : atomic < uint64_t > seq ( db_impl - > GetLatestSequenceNumber ( ) ) ;
ASSERT_EQ ( db_impl - > GetLatestSequenceNumber ( ) , 0 ) ;
// loser has to lose
while ( i = = write_group . size ( ) - 1 & &
threads_waiting . load ( ) < write_group . size ( ) - 1 ) {
}
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Wait " , [ & ] ( void * arg ) {
uint64_t cur_threads_waiting = 0 ;
bool is_leader = false ;
bool is_last = false ;
auto & write_op = write_group . at ( i ) ;
write_op . Clear ( ) ;
write_op . callback_ . allow_batching_ = allow_batching ;
// who am i
// insert some keys
for ( uint32_t j = 0 ; j < rnd . Next ( ) % 50 ; j + + ) {
// grab unique key
char my_key = 0 ;
do {
cur_threads_waiting = threads_waiting . load ( ) ;
is_leader = ( cur_threads_waiting = = 0 ) ;
is_last = ( cur_threads_waiting = = write_group . size ( ) - 1 ) ;
} while ( ! threads_waiting . compare_exchange_strong (
cur_threads_waiting , cur_threads_waiting + 1 ) ) ;
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( is_leader ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else {
ASSERT_TRUE ( writer - > state = = WriteThread : : State : : STATE_INIT ) ;
}
my_key = dummy_key . load ( ) ;
} while ( ! dummy_key . compare_exchange_strong ( my_key , my_key + 1 ) ) ;
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if ( is_leader ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . front ( ) . callback_ . should_fail_ ) ;
} else if ( is_last ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . back ( ) . callback_ . should_fail_ ) ;
}
string skey ( 5 , my_key ) ;
string sval ( 10 , my_key ) ;
write_op . Put ( skey , sval ) ;
// wait for friends
while ( threads_waiting . load ( ) < write_group . size ( ) ) {
if ( ! write_op . callback_ . should_fail_ ) {
seq . fetch_add ( 1 ) ;
}
} ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:DoneWaiting " , [ & ] ( void * arg ) {
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( ! allow_batching ) {
// no batching so everyone should be a leader
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else if ( ! allow_parallel ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_COMPLETED ) ;
}
} ) ;
std : : atomic < uint32_t > thread_num ( 0 ) ;
std : : atomic < char > dummy_key ( 0 ) ;
std : : function < void ( ) > write_with_callback_func = [ & ] ( ) {
uint32_t i = thread_num . fetch_add ( 1 ) ;
Random rnd ( i ) ;
// leaders gotta lead
while ( i > 0 & & threads_waiting . load ( ) < 1 ) {
}
// loser has to lose
while ( i = = write_group . size ( ) - 1 & &
threads_waiting . load ( ) < write_group . size ( ) - 1 ) {
}
auto & write_op = write_group . at ( i ) ;
write_op . Clear ( ) ;
write_op . callback_ . allow_batching_ = allow_batching ;
// insert some keys
for ( uint32_t j = 0 ; j < rnd . Next ( ) % 50 ; j + + ) {
// grab unique key
char my_key = 0 ;
do {
my_key = dummy_key . load ( ) ;
} while ( ! dummy_key . compare_exchange_strong ( my_key , my_key + 1 ) ) ;
}
string skey ( 5 , my_key ) ;
string sval ( 10 , my_key ) ;
write_op . Put ( skey , sval ) ;
WriteOptions woptions ;
woptions . disableWAL = ! enable_WAL ;
woptions . sync = enable_WAL ;
Status s = db_impl - > WriteWithCallback (
woptions , & write_op . write_batch_ , & write_op . callback_ ) ;
if ( ! write_op . callback_ . should_fail_ ) {
seq . fetch_add ( 1 ) ;
if ( write_op . callback_ . should_fail_ ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
} else {
ASSERT_OK ( s ) ;
}
}
} ;
WriteOptions woptions ;
Status s = db_impl - > WriteWithCallback (
woptions , & write_op . write_batch_ , & write_op . callback_ ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
if ( write_op . callback_ . should_fail_ ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
} else {
ASSERT_OK ( s ) ;
// do all the writes
std : : vector < std : : thread > threads ;
for ( uint32_t i = 0 ; i < write_group . size ( ) ; i + + ) {
threads . emplace_back ( write_with_callback_func ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
} ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
// do all the writes
std : : vector < std : : thread > threads ;
for ( uint32_t i = 0 ; i < write_group . size ( ) ; i + + ) {
threads . emplace_back ( write_with_callback_func ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
rocksdb : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
// check for keys
string value ;
for ( auto & w : write_group ) {
ASSERT_TRUE ( w . callback_ . was_called_ ) ;
for ( auto & kvp : w . kvs_ ) {
if ( w . callback_ . should_fail_ ) {
ASSERT_TRUE (
db - > Get ( read_options , kvp . first , & value ) . IsNotFound ( ) ) ;
} else {
ASSERT_OK ( db - > Get ( read_options , kvp . first , & value ) ) ;
ASSERT_EQ ( value , kvp . second ) ;
// check for keys
string value ;
for ( auto & w : write_group ) {
ASSERT_TRUE ( w . callback_ . was_called_ ) ;
for ( auto & kvp : w . kvs_ ) {
if ( w . callback_ . should_fail_ ) {
ASSERT_TRUE (
db - > Get ( read_options , kvp . first , & value ) . IsNotFound ( ) ) ;
} else {
ASSERT_OK ( db - > Get ( read_options , kvp . first , & value ) ) ;
ASSERT_EQ ( value , kvp . second ) ;
}
}
}
}
ASSERT_EQ ( seq . load ( ) , db_impl - > GetLatestSequenceNumber ( ) ) ;
ASSERT_EQ ( seq . load ( ) , db_impl - > GetLatestSequenceNumber ( ) ) ;
delete db ;
DestroyDB ( dbname , options ) ;
delete db ;
DestroyDB ( dbname , options ) ;
}
}
}
}