@ -84,7 +84,28 @@ class MockWriteCallback : public WriteCallback {
bool AllowWriteBatching ( ) override { return allow_batching_ ; }
} ;
TEST_F ( WriteCallbackTest , WriteWithCallbackTest ) {
class WriteCallbackPTest
: public WriteCallbackTest ,
public : : testing : : WithParamInterface <
std : : tuple < bool , bool , bool , bool , bool , bool , bool > > {
public :
WriteCallbackPTest ( ) {
std : : tie ( unordered_write_ , seq_per_batch_ , two_queues_ , allow_parallel_ ,
allow_batching_ , enable_WAL_ , enable_pipelined_write_ ) =
GetParam ( ) ;
}
protected :
bool unordered_write_ ;
bool seq_per_batch_ ;
bool two_queues_ ;
bool allow_parallel_ ;
bool allow_batching_ ;
bool enable_WAL_ ;
bool enable_pipelined_write_ ;
} ;
TEST_P ( WriteCallbackPTest , WriteWithCallbackTest ) {
struct WriteOP {
WriteOP ( bool should_fail = false ) { callback_ . should_fail_ = should_fail ; }
@ -124,254 +145,238 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
{ false , false , true , false , true } ,
} ;
for ( auto & unordered_write : { true , false } ) {
for ( auto & seq_per_batch : { true , false } ) {
for ( auto & two_queues : { true , false } ) {
for ( auto & allow_parallel : { true , false } ) {
for ( auto & allow_batching : { true , false } ) {
for ( auto & enable_WAL : { true , false } ) {
for ( auto & enable_pipelined_write : { true , false } ) {
for ( auto & write_group : write_scenarios ) {
Options options ;
options . create_if_missing = true ;
options . unordered_write = unordered_write ;
options . allow_concurrent_memtable_write = allow_parallel ;
options . enable_pipelined_write = enable_pipelined_write ;
options . two_write_queues = two_queues ;
// Skip unsupported combinations
if ( options . enable_pipelined_write & & seq_per_batch ) {
continue ;
}
if ( options . enable_pipelined_write & & options . two_write_queues ) {
continue ;
}
if ( options . unordered_write & &
! options . allow_concurrent_memtable_write ) {
continue ;
}
if ( options . unordered_write & & options . enable_pipelined_write ) {
continue ;
}
ReadOptions read_options ;
DB * db ;
DBImpl * db_impl ;
DestroyDB ( dbname , options ) ;
DBOptions db_options ( options ) ;
ColumnFamilyOptions cf_options ( options ) ;
std : : vector < ColumnFamilyDescriptor > column_families ;
column_families . push_back (
ColumnFamilyDescriptor ( kDefaultColumnFamilyName , cf_options ) ) ;
std : : vector < ColumnFamilyHandle * > handles ;
auto open_s =
DBImpl : : Open ( db_options , dbname , column_families , & handles ,
& db , seq_per_batch , true /* batch_per_txn */ ) ;
ASSERT_OK ( open_s ) ;
assert ( handles . size ( ) = = 1 ) ;
delete handles [ 0 ] ;
db_impl = dynamic_cast < DBImpl * > ( db ) ;
ASSERT_TRUE ( db_impl ) ;
// Writers that have called JoinBatchGroup.
std : : atomic < uint64_t > threads_joining ( 0 ) ;
// Writers that have linked to the queue
std : : atomic < uint64_t > threads_linked ( 0 ) ;
// Writers that pass WriteThread::JoinBatchGroup:Wait sync-point.
std : : atomic < uint64_t > threads_verified ( 0 ) ;
std : : atomic < uint64_t > seq ( db_impl - > GetLatestSequenceNumber ( ) ) ;
ASSERT_EQ ( db_impl - > GetLatestSequenceNumber ( ) , 0 ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Start " , [ & ] ( void * ) {
uint64_t cur_threads_joining = threads_joining . fetch_add ( 1 ) ;
// Wait for the last joined writer to link to the queue.
// In this way the writers link to the queue one by one.
// This allows us to confidently detect the first writer
// who increases threads_linked as the leader.
while ( threads_linked . load ( ) < cur_threads_joining ) {
}
} ) ;
// Verification once writers call JoinBatchGroup.
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Wait " , [ & ] ( void * arg ) {
uint64_t cur_threads_linked = threads_linked . fetch_add ( 1 ) ;
bool is_leader = false ;
bool is_last = false ;
// who am i
is_leader = ( cur_threads_linked = = 0 ) ;
is_last = ( cur_threads_linked = = write_group . size ( ) - 1 ) ;
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( is_leader ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_INIT ) ;
}
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if ( is_leader ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . front ( ) . callback_ . should_fail_ ) ;
} else if ( is_last ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . back ( ) . callback_ . should_fail_ ) ;
}
threads_verified . fetch_add ( 1 ) ;
// Wait here until all verification in this sync-point
// callback finish for all writers.
while ( threads_verified . load ( ) < write_group . size ( ) ) {
}
} ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:DoneWaiting " , [ & ] ( void * arg ) {
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( ! allow_batching ) {
// no batching so everyone should be a leader
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else if ( ! allow_parallel ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_COMPLETED | |
( enable_pipelined_write & &
writer - > state = =
WriteThread : : State : :
STATE_MEMTABLE_WRITER_LEADER ) ) ;
}
} ) ;
std : : atomic < uint32_t > thread_num ( 0 ) ;
std : : atomic < char > dummy_key ( 0 ) ;
// Each write thread create a random write batch and write to DB
// with a write callback.
std : : function < void ( ) > write_with_callback_func = [ & ] ( ) {
uint32_t i = thread_num . fetch_add ( 1 ) ;
Random rnd ( i ) ;
// leaders gotta lead
while ( i > 0 & & threads_verified . load ( ) < 1 ) {
}
// loser has to lose
while ( i = = write_group . size ( ) - 1 & &
threads_verified . load ( ) < write_group . size ( ) - 1 ) {
}
auto & write_op = write_group . at ( i ) ;
write_op . Clear ( ) ;
write_op . callback_ . allow_batching_ = allow_batching ;
// insert some keys
for ( uint32_t j = 0 ; j < rnd . Next ( ) % 50 ; j + + ) {
// grab unique key
char my_key = dummy_key . fetch_add ( 1 ) ;
string skey ( 5 , my_key ) ;
string sval ( 10 , my_key ) ;
write_op . Put ( skey , sval ) ;
if ( ! write_op . callback_ . should_fail_ & & ! seq_per_batch ) {
seq . fetch_add ( 1 ) ;
}
}
if ( ! write_op . callback_ . should_fail_ & & seq_per_batch ) {
seq . fetch_add ( 1 ) ;
}
WriteOptions woptions ;
woptions . disableWAL = ! enable_WAL ;
woptions . sync = enable_WAL ;
Status s ;
if ( seq_per_batch ) {
class PublishSeqCallback : public PreReleaseCallback {
public :
PublishSeqCallback ( DBImpl * db_impl_in )
: db_impl_ ( db_impl_in ) { }
Status Callback ( SequenceNumber last_seq , bool /*not used*/ ,
uint64_t , size_t /*index*/ ,
size_t /*total*/ ) override {
db_impl_ - > SetLastPublishedSequence ( last_seq ) ;
return Status : : OK ( ) ;
}
DBImpl * db_impl_ ;
} publish_seq_callback ( db_impl ) ;
// seq_per_batch requires a natural batch separator or Noop
WriteBatchInternal : : InsertNoop ( & write_op . write_batch_ ) ;
const size_t ONE_BATCH = 1 ;
s = db_impl - > WriteImpl (
woptions , & write_op . write_batch_ , & write_op . callback_ ,
nullptr , 0 , false , nullptr , ONE_BATCH ,
two_queues ? & publish_seq_callback : nullptr ) ;
} else {
s = db_impl - > WriteWithCallback (
woptions , & write_op . write_batch_ , & write_op . callback_ ) ;
}
if ( write_op . callback_ . should_fail_ ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
} else {
ASSERT_OK ( s ) ;
}
} ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
// do all the writes
std : : vector < port : : Thread > threads ;
for ( uint32_t i = 0 ; i < write_group . size ( ) ; i + + ) {
threads . emplace_back ( write_with_callback_func ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
// check for keys
string value ;
for ( auto & w : write_group ) {
ASSERT_TRUE ( w . callback_ . was_called_ . load ( ) ) ;
for ( auto & kvp : w . kvs_ ) {
if ( w . callback_ . should_fail_ ) {
ASSERT_TRUE (
db - > Get ( read_options , kvp . first , & value ) . IsNotFound ( ) ) ;
} else {
ASSERT_OK ( db - > Get ( read_options , kvp . first , & value ) ) ;
ASSERT_EQ ( value , kvp . second ) ;
}
}
}
ASSERT_EQ ( seq . load ( ) , db_impl - > TEST_GetLastVisibleSequence ( ) ) ;
delete db ;
DestroyDB ( dbname , options ) ;
}
for ( auto & write_group : write_scenarios ) {
Options options ;
options . create_if_missing = true ;
options . unordered_write = unordered_write_ ;
options . allow_concurrent_memtable_write = allow_parallel_ ;
options . enable_pipelined_write = enable_pipelined_write_ ;
options . two_write_queues = two_queues_ ;
// Skip unsupported combinations
if ( options . enable_pipelined_write & & seq_per_batch_ ) {
continue ;
}
if ( options . enable_pipelined_write & & options . two_write_queues ) {
continue ;
}
if ( options . unordered_write & & ! options . allow_concurrent_memtable_write ) {
continue ;
}
if ( options . unordered_write & & options . enable_pipelined_write ) {
continue ;
}
ReadOptions read_options ;
DB * db ;
DBImpl * db_impl ;
DestroyDB ( dbname , options ) ;
DBOptions db_options ( options ) ;
ColumnFamilyOptions cf_options ( options ) ;
std : : vector < ColumnFamilyDescriptor > column_families ;
column_families . push_back (
ColumnFamilyDescriptor ( kDefaultColumnFamilyName , cf_options ) ) ;
std : : vector < ColumnFamilyHandle * > handles ;
auto open_s = DBImpl : : Open ( db_options , dbname , column_families , & handles ,
& db , seq_per_batch_ , true /* batch_per_txn */ ) ;
ASSERT_OK ( open_s ) ;
assert ( handles . size ( ) = = 1 ) ;
delete handles [ 0 ] ;
db_impl = dynamic_cast < DBImpl * > ( db ) ;
ASSERT_TRUE ( db_impl ) ;
// Writers that have called JoinBatchGroup.
std : : atomic < uint64_t > threads_joining ( 0 ) ;
// Writers that have linked to the queue
std : : atomic < uint64_t > threads_linked ( 0 ) ;
// Writers that pass WriteThread::JoinBatchGroup:Wait sync-point.
std : : atomic < uint64_t > threads_verified ( 0 ) ;
std : : atomic < uint64_t > seq ( db_impl - > GetLatestSequenceNumber ( ) ) ;
ASSERT_EQ ( db_impl - > GetLatestSequenceNumber ( ) , 0 ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Start " , [ & ] ( void * ) {
uint64_t cur_threads_joining = threads_joining . fetch_add ( 1 ) ;
// Wait for the last joined writer to link to the queue.
// In this way the writers link to the queue one by one.
// This allows us to confidently detect the first writer
// who increases threads_linked as the leader.
while ( threads_linked . load ( ) < cur_threads_joining ) {
}
} ) ;
// Verification once writers call JoinBatchGroup.
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Wait " , [ & ] ( void * arg ) {
uint64_t cur_threads_linked = threads_linked . fetch_add ( 1 ) ;
bool is_leader = false ;
bool is_last = false ;
// who am i
is_leader = ( cur_threads_linked = = 0 ) ;
is_last = ( cur_threads_linked = = write_group . size ( ) - 1 ) ;
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( is_leader ) {
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else {
ASSERT_TRUE ( writer - > state = = WriteThread : : State : : STATE_INIT ) ;
}
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if ( is_leader ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . front ( ) . callback_ . should_fail_ ) ;
} else if ( is_last ) {
ASSERT_TRUE ( writer - > callback - > Callback ( nullptr ) . ok ( ) = =
! write_group . back ( ) . callback_ . should_fail_ ) ;
}
threads_verified . fetch_add ( 1 ) ;
// Wait here until all verification in this sync-point
// callback finish for all writers.
while ( threads_verified . load ( ) < write_group . size ( ) ) {
}
} ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:DoneWaiting " , [ & ] ( void * arg ) {
// check my state
auto * writer = reinterpret_cast < WriteThread : : Writer * > ( arg ) ;
if ( ! allow_batching_ ) {
// no batching so everyone should be a leader
ASSERT_TRUE ( writer - > state = =
WriteThread : : State : : STATE_GROUP_LEADER ) ;
} else if ( ! allow_parallel_ ) {
ASSERT_TRUE ( writer - > state = = WriteThread : : State : : STATE_COMPLETED | |
( enable_pipelined_write_ & &
writer - > state = =
WriteThread : : State : : STATE_MEMTABLE_WRITER_LEADER ) ) ;
}
} ) ;
std : : atomic < uint32_t > thread_num ( 0 ) ;
std : : atomic < char > dummy_key ( 0 ) ;
// Each write thread create a random write batch and write to DB
// with a write callback.
std : : function < void ( ) > write_with_callback_func = [ & ] ( ) {
uint32_t i = thread_num . fetch_add ( 1 ) ;
Random rnd ( i ) ;
// leaders gotta lead
while ( i > 0 & & threads_verified . load ( ) < 1 ) {
}
// loser has to lose
while ( i = = write_group . size ( ) - 1 & &
threads_verified . load ( ) < write_group . size ( ) - 1 ) {
}
auto & write_op = write_group . at ( i ) ;
write_op . Clear ( ) ;
write_op . callback_ . allow_batching_ = allow_batching_ ;
// insert some keys
for ( uint32_t j = 0 ; j < rnd . Next ( ) % 50 ; j + + ) {
// grab unique key
char my_key = dummy_key . fetch_add ( 1 ) ;
string skey ( 5 , my_key ) ;
string sval ( 10 , my_key ) ;
write_op . Put ( skey , sval ) ;
if ( ! write_op . callback_ . should_fail_ & & ! seq_per_batch_ ) {
seq . fetch_add ( 1 ) ;
}
}
if ( ! write_op . callback_ . should_fail_ & & seq_per_batch_ ) {
seq . fetch_add ( 1 ) ;
}
WriteOptions woptions ;
woptions . disableWAL = ! enable_WAL_ ;
woptions . sync = enable_WAL_ ;
Status s ;
if ( seq_per_batch_ ) {
class PublishSeqCallback : public PreReleaseCallback {
public :
PublishSeqCallback ( DBImpl * db_impl_in ) : db_impl_ ( db_impl_in ) { }
Status Callback ( SequenceNumber last_seq , bool /*not used*/ , uint64_t ,
size_t /*index*/ , size_t /*total*/ ) override {
db_impl_ - > SetLastPublishedSequence ( last_seq ) ;
return Status : : OK ( ) ;
}
DBImpl * db_impl_ ;
} publish_seq_callback ( db_impl ) ;
// seq_per_batch_ requires a natural batch separator or Noop
WriteBatchInternal : : InsertNoop ( & write_op . write_batch_ ) ;
const size_t ONE_BATCH = 1 ;
s = db_impl - > WriteImpl ( woptions , & write_op . write_batch_ ,
& write_op . callback_ , nullptr , 0 , false , nullptr ,
ONE_BATCH ,
two_queues_ ? & publish_seq_callback : nullptr ) ;
} else {
s = db_impl - > WriteWithCallback ( woptions , & write_op . write_batch_ ,
& write_op . callback_ ) ;
}
if ( write_op . callback_ . should_fail_ ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
} else {
ASSERT_OK ( s ) ;
}
} ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
// do all the writes
std : : vector < port : : Thread > threads ;
for ( uint32_t i = 0 ; i < write_group . size ( ) ; i + + ) {
threads . emplace_back ( write_with_callback_func ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
// check for keys
string value ;
for ( auto & w : write_group ) {
ASSERT_TRUE ( w . callback_ . was_called_ . load ( ) ) ;
for ( auto & kvp : w . kvs_ ) {
if ( w . callback_ . should_fail_ ) {
ASSERT_TRUE ( db - > Get ( read_options , kvp . first , & value ) . IsNotFound ( ) ) ;
} else {
ASSERT_OK ( db - > Get ( read_options , kvp . first , & value ) ) ;
ASSERT_EQ ( value , kvp . second ) ;
}
}
}
}
}
ASSERT_EQ ( seq . load ( ) , db_impl - > TEST_GetLastVisibleSequence ( ) ) ;
delete db ;
DestroyDB ( dbname , options ) ;
}
}
INSTANTIATE_TEST_CASE_P ( WriteCallbackPTest , WriteCallbackPTest ,
: : testing : : Combine ( : : testing : : Bool ( ) , : : testing : : Bool ( ) ,
: : testing : : Bool ( ) , : : testing : : Bool ( ) ,
: : testing : : Bool ( ) , : : testing : : Bool ( ) ,
: : testing : : Bool ( ) ) ) ;
TEST_F ( WriteCallbackTest , WriteCallBackTest ) {
Options options ;
WriteOptions write_options ;