@ -36,6 +36,27 @@ INSTANTIATE_TEST_CASE_P(
: : testing : : Values ( std : : make_tuple ( false , false , WRITE_UNPREPARED ) ,
std : : make_tuple ( false , true , WRITE_UNPREPARED ) ) ) ;
enum StressAction { NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT } ;
class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase ,
virtual public : : testing : : WithParamInterface <
std : : tuple < bool , StressAction > > {
public :
WriteUnpreparedStressTest ( )
: WriteUnpreparedTransactionTestBase ( false , std : : get < 0 > ( GetParam ( ) ) ,
WRITE_UNPREPARED ) ,
action_ ( std : : get < 1 > ( GetParam ( ) ) ) { }
StressAction action_ ;
} ;
INSTANTIATE_TEST_CASE_P (
WriteUnpreparedStressTest , WriteUnpreparedStressTest ,
: : testing : : Values ( std : : make_tuple ( false , NO_SNAPSHOT ) ,
std : : make_tuple ( false , RO_SNAPSHOT ) ,
std : : make_tuple ( false , REFRESH_SNAPSHOT ) ,
std : : make_tuple ( true , NO_SNAPSHOT ) ,
std : : make_tuple ( true , RO_SNAPSHOT ) ,
std : : make_tuple ( true , REFRESH_SNAPSHOT ) ) ) ;
TEST_P ( WriteUnpreparedTransactionTest , ReadYourOwnWrite ) {
// The following tests checks whether reading your own write for
// a transaction works for write unprepared, when there are uncommitted
@ -116,7 +137,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
}
# ifndef ROCKSDB_VALGRIND_RUN
TEST_P ( WriteUnpreparedTransaction Test , ReadYourOwnWriteStress ) {
TEST_P ( WriteUnpreparedStress Test , ReadYourOwnWriteStress ) {
// This is a stress test where different threads are writing random keys, and
// then before committing or aborting the transaction, it validates to see
// that it can read the keys it wrote, and the keys it did not write respect
@ -129,170 +150,167 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) {
std : : default_random_engine rand ( static_cast < uint32_t > (
std : : hash < std : : thread : : id > ( ) ( std : : this_thread : : get_id ( ) ) ) ) ;
enum Action { NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT } ;
// Test with
// 1. no snapshots set
// 2. snapshot set on ReadOptions
// 3. snapshot set, and refreshing after every write.
for ( Action a : { NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT } ) {
WriteOptions write_options ;
txn_db_options . transaction_lock_timeout = - 1 ;
options . disable_auto_compactions = true ;
ReOpen ( ) ;
StressAction a = action_ ;
WriteOptions write_options ;
txn_db_options . transaction_lock_timeout = - 1 ;
options . disable_auto_compactions = true ;
ReOpen ( ) ;
std : : vector < std : : string > keys ;
for ( uint32_t k = 0 ; k < kNumKeys * kNumThreads ; k + + ) {
keys . push_back ( " k " + ToString ( k ) ) ;
}
std : : shuffle ( keys . begin ( ) , keys . end ( ) , rand ) ;
// This counter will act as a "sequence number" to help us validate
// visibility logic with snapshots. If we had direct access to the seqno of
// snapshots and key/values, then we should directly compare those instead.
std : : atomic < int64_t > counter ( 0 ) ;
std : : function < void ( uint32_t ) > stress_thread = [ & ] ( int id ) {
size_t tid = std : : hash < std : : thread : : id > ( ) ( std : : this_thread : : get_id ( ) ) ;
Random64 rnd ( static_cast < uint32_t > ( tid ) ) ;
Transaction * txn ;
TransactionOptions txn_options ;
// batch_size of 1 causes writes to DB for every marker.
txn_options . write_batch_flush_threshold = 1 ;
ReadOptions read_options ;
for ( uint32_t i = 0 ; i < kNumIter ; i + + ) {
std : : set < std : : string > owned_keys ( & keys [ id * kNumKeys ] ,
& keys [ ( id + 1 ) * kNumKeys ] ) ;
// Add unowned keys to make the workload more interesting, but this
// increases row lock contention, so just do it sometimes.
if ( rnd . OneIn ( 2 ) ) {
owned_keys . insert ( keys [ rnd . Uniform ( kNumKeys * kNumThreads ) ] ) ;
}
std : : vector < std : : string > keys ;
for ( uint32_t k = 0 ; k < kNumKeys * kNumThreads ; k + + ) {
keys . push_back ( " k " + ToString ( k ) ) ;
}
std : : shuffle ( keys . begin ( ) , keys . end ( ) , rand ) ;
txn = db - > BeginTransaction ( write_options , txn_options ) ;
txn - > SetName ( ToString ( id ) ) ;
txn - > SetSnapshot ( ) ;
if ( a > = RO_SNAPSHOT ) {
read_options . snapshot = txn - > GetSnapshot ( ) ;
ASSERT_TRUE ( read_options . snapshot ! = nullptr ) ;
}
// This counter will act as a "sequence number" to help us validate
// visibility logic with snapshots. If we had direct access to the seqno of
// snapshots and key/values, then we should directly compare those instead.
std : : atomic < int64_t > counter ( 0 ) ;
uint64_t buf [ 2 ] ;
buf [ 0 ] = id ;
std : : function < void ( uint32_t ) > stress_thread = [ & ] ( int id ) {
size_t tid = std : : hash < std : : thread : : id > ( ) ( std : : this_thread : : get_id ( ) ) ;
Random64 rnd ( static_cast < uint32_t > ( tid ) ) ;
// When scanning through the database, make sure that all unprepared
// keys have value >= snapshot and all other keys have value < snapshot.
int64_t snapshot_num = counter . fetch_add ( 1 ) ;
Transaction * txn ;
TransactionOptions txn_options ;
// batch_size of 1 causes writes to DB for every marker.
txn_options . write_batch_flush_threshold = 1 ;
ReadOptions read_options ;
for ( uint32_t i = 0 ; i < kNumIter ; i + + ) {
std : : set < std : : string > owned_keys ( & keys [ id * kNumKeys ] ,
& keys [ ( id + 1 ) * kNumKeys ] ) ;
// Add unowned keys to make the workload more interesting, but this
// increases row lock contention, so just do it sometimes.
if ( rnd . OneIn ( 2 ) ) {
owned_keys . insert ( keys [ rnd . Uniform ( kNumKeys * kNumThreads ) ] ) ;
}
Status s ;
for ( const auto & key : owned_keys ) {
buf [ 1 ] = counter . fetch_add ( 1 ) ;
s = txn - > Put ( key , Slice ( ( const char * ) buf , sizeof ( buf ) ) ) ;
if ( ! s . ok ( ) ) {
break ;
}
if ( a = = REFRESH_SNAPSHOT ) {
txn - > SetSnapshot ( ) ;
read_options . snapshot = txn - > GetSnapshot ( ) ;
snapshot_num = counter . fetch_add ( 1 ) ;
}
}
txn = db - > BeginTransaction ( write_options , txn_options ) ;
txn - > SetName ( ToString ( id ) ) ;
txn - > SetSnapshot ( ) ;
if ( a > = RO_SNAPSHOT ) {
read_options . snapshot = txn - > GetSnapshot ( ) ;
ASSERT_TRUE ( read_options . snapshot ! = nullptr ) ;
}
uint64_t buf [ 2 ] ;
buf [ 0 ] = id ;
// Failure is possible due to snapshot validation. In this case,
// rollback and move onto next iteration.
// When scanning through the database, make sure that all unprepared
// keys have value >= snapshot and all other keys have value < snapshot.
int64_t snapshot_num = counter . fetch_add ( 1 ) ;
Status s ;
for ( const auto & key : owned_keys ) {
buf [ 1 ] = counter . fetch_add ( 1 ) ;
s = txn - > Put ( key , Slice ( ( const char * ) buf , sizeof ( buf ) ) ) ;
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
ASSERT_OK ( txn - > Rollback ( ) ) ;
delete txn ;
continue ;
break ;
}
if ( a = = REFRESH_SNAPSHOT ) {
txn - > SetSnapshot ( ) ;
read_options . snapshot = txn - > GetSnapshot ( ) ;
snapshot_num = counter . fetch_add ( 1 ) ;
}
}
auto verify_key = [ & owned_keys , & a , & id , & snapshot_num ] (
const std : : string & key ,
const std : : string & value ) {
if ( owned_keys . count ( key ) > 0 ) {
ASSERT_EQ ( value . size ( ) , 16 ) ;
// Since this key is part of owned_keys, then this key must be
// unprepared by this transaction identified by 'id'
ASSERT_EQ ( ( ( int64_t * ) value . c_str ( ) ) [ 0 ] , id ) ;
if ( a = = REFRESH_SNAPSHOT ) {
// If refresh snapshot is true, then the snapshot is refreshed
// after every Put(), meaning that the current snapshot in
// snapshot_num must be greater than the "seqno" of any keys
// written by the current transaction.
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
} else {
// If refresh snapshot is not on, then the snapshot was taken at
// the beginning of the transaction, meaning all writes must come
// after snapshot_num
ASSERT_GT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
}
} else if ( a > = RO_SNAPSHOT ) {
// If this is not an unprepared key, just assert that the key
// "seqno" is smaller than the snapshot seqno.
ASSERT_EQ ( value . size ( ) , 16 ) ;
// Failure is possible due to snapshot validation. In this case,
// rollback and move onto next iteration.
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
ASSERT_OK ( txn - > Rollback ( ) ) ;
delete txn ;
continue ;
}
auto verify_key = [ & owned_keys , & a , & id , & snapshot_num ] (
const std : : string & key , const std : : string & value ) {
if ( owned_keys . count ( key ) > 0 ) {
ASSERT_EQ ( value . size ( ) , 16 ) ;
// Since this key is part of owned_keys, then this key must be
// unprepared by this transaction identified by 'id'
ASSERT_EQ ( ( ( int64_t * ) value . c_str ( ) ) [ 0 ] , id ) ;
if ( a = = REFRESH_SNAPSHOT ) {
// If refresh snapshot is true, then the snapshot is refreshed
// after every Put(), meaning that the current snapshot in
// snapshot_num must be greater than the "seqno" of any keys
// written by the current transaction.
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
} else {
// If refresh snapshot is not on, then the snapshot was taken at
// the beginning of the transaction, meaning all writes must come
// after snapshot_num
ASSERT_GT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
}
} ;
// Validate Get()/Next()/Prev(). Do only one of them to save time, and
// reduce lock contention.
switch ( rnd . Uniform ( 3 ) ) {
case 0 : // Validate Get()
{
for ( const auto & key : keys ) {
std : : string value ;
s = txn - > Get ( read_options , Slice ( key ) , & value ) ;
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsNotFound ( ) ) ;
ASSERT_EQ ( owned_keys . count ( key ) , 0 ) ;
} else {
verify_key ( key , value ) ;
}
}
break ;
}
case 1 : // Validate Next()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
for ( iter - > SeekToFirst ( ) ; iter - > Valid ( ) ; iter - > Next ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
} else if ( a > = RO_SNAPSHOT ) {
// If this is not an unprepared key, just assert that the key
// "seqno" is smaller than the snapshot seqno.
ASSERT_EQ ( value . size ( ) , 16 ) ;
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
}
} ;
// Validate Get()/Next()/Prev(). Do only one of them to save time, and
// reduce lock contention.
switch ( rnd . Uniform ( 3 ) ) {
case 0 : // Validate Get()
{
for ( const auto & key : keys ) {
std : : string value ;
s = txn - > Get ( read_options , Slice ( key ) , & value ) ;
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsNotFound ( ) ) ;
ASSERT_EQ ( owned_keys . count ( key ) , 0 ) ;
} else {
verify_key ( key , value ) ;
}
delete iter ;
break ;
}
case 2 : // Validate Prev()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
for ( iter - > SeekToLast ( ) ; iter - > Valid ( ) ; iter - > Prev ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
}
delete iter ;
break ;
break ;
}
case 1 : // Validate Next()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
for ( iter - > SeekToFirst ( ) ; iter - > Valid ( ) ; iter - > Next ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
}
default :
ASSERT_TRUE ( false ) ;
delete iter ;
break ;
}
if ( rnd . OneIn ( 2 ) ) {
ASSERT_OK ( txn - > Commit ( ) ) ;
} else {
ASSERT_OK ( txn - > Rollback ( ) ) ;
case 2 : // Validate Prev()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
for ( iter - > SeekToLast ( ) ; iter - > Valid ( ) ; iter - > Prev ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
}
delete iter ;
break ;
}
delete txn ;
default :
ASSERT_TRUE ( false ) ;
}
} ;
std : : vector < port : : Thread > threads ;
for ( uint32_t i = 0 ; i < kNumThreads ; i + + ) {
threads . emplace_back ( stress_thread , i ) ;
if ( rnd . OneIn ( 2 ) ) {
ASSERT_OK ( txn - > Commit ( ) ) ;
} else {
ASSERT_OK ( txn - > Rollback ( ) ) ;
}
delete txn ;
}
} ;
for ( auto & t : threads ) {
t . join ( ) ;
}
std : : vector < port : : Thread > threads ;
for ( uint32_t i = 0 ; i < kNumThreads ; i + + ) {
threads . emplace_back ( stress_thread , i ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
}
# endif // ROCKSDB_VALGRIND_RUN