@ -35,26 +35,32 @@ INSTANTIATE_TEST_CASE_P(
: : testing : : Values ( std : : make_tuple ( false , false , WRITE_UNPREPARED ) ,
std : : make_tuple ( false , true , WRITE_UNPREPARED ) ) ) ;
enum StressAction { NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT } ;
class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase ,
enum SnapshotAction { NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT } ;
enum VerificationOperation { VERIFY_GET , VERIFY_NEXT , VERIFY_PREV } ;
class WriteUnpreparedSnapshotTest
: public WriteUnpreparedTransactionTestBase ,
virtual public : : testing : : WithParamInterface <
std : : tuple < bool , StressAction > > {
std : : tuple < bool , SnapshotAction , VerificationOpera tion > > {
public :
WriteUnpreparedStress Test ( )
WriteUnpreparedSnapshot Test ( )
: WriteUnpreparedTransactionTestBase ( false , std : : get < 0 > ( GetParam ( ) ) ,
WRITE_UNPREPARED ) ,
action_ ( std : : get < 1 > ( GetParam ( ) ) ) { }
StressAction action_ ;
action_ ( std : : get < 1 > ( GetParam ( ) ) ) ,
verify_op_ ( std : : get < 2 > ( GetParam ( ) ) ) { }
SnapshotAction action_ ;
VerificationOperation verify_op_ ;
} ;
// Test parameters:
// Param 0): use stackable db, parameterization hard coded to be overwritten to
// false. Param 1): test mode for snapshot action Param 2): test mode for
// verification operation
INSTANTIATE_TEST_CASE_P (
WriteUnpreparedStressTest , WriteUnpreparedStressTest ,
: : testing : : Values ( std : : make_tuple ( false , NO_SNAPSHOT ) ,
std : : make_tuple ( false , RO_SNAPSHOT ) ,
std : : make_tuple ( false , REFRESH_SNAPSHOT ) ,
std : : make_tuple ( true , NO_SNAPSHOT ) ,
std : : make_tuple ( true , RO_SNAPSHOT ) ,
std : : make_tuple ( true , REFRESH_SNAPSHOT ) ) ) ;
WriteUnpreparedSnapshotTest , WriteUnpreparedSnapshotTest ,
: : testing : : Combine (
: : testing : : Bool ( ) ,
: : testing : : Values ( NO_SNAPSHOT , RO_SNAPSHOT , REFRESH_SNAPSHOT ) ,
: : testing : : Values ( VERIFY_GET , VERIFY_NEXT , VERIFY_PREV ) ) ) ;
TEST_P ( WriteUnpreparedTransactionTest , ReadYourOwnWrite ) {
// The following tests checks whether reading your own write for
@ -135,42 +141,33 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
}
}
# if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P ( WriteUnpreparedStressTest , ReadYourOwnWriteStress ) {
// This is a stress test where different threads are writing random keys, and
// then before committing or aborting the transaction, it validates to see
// that it can read the keys it wrote, and the keys it did not write respect
// the snapshot. To avoid row lock contention (and simply stressing the
// locking system), each thread is mostly only writing to its own set of keys.
TEST_P ( WriteUnpreparedSnapshotTest , ReadYourOwnWrite ) {
// This test validates a transaction can read its writes and the correctness
// of its read with regard to a mocked snapshot functionality.
const uint32_t kNumIter = 1000 ;
const uint32_t kNumThreads = 10 ;
const uint32_t kNumKeys = 5 ;
// Test with
// 1. no snapshots set
// 2. snapshot set on ReadOptions
// 3. snapshot set, and refreshing after every write.
StressAction a = action_ ;
SnapshotAction snapshot_action = action_ ;
WriteOptions write_options ;
txn_db_options . transaction_lock_timeout = - 1 ;
options . disable_auto_compactions = true ;
ASSERT_OK ( ReOpen ( ) ) ;
std : : vector < std : : string > keys ;
for ( uint32_t k = 0 ; k < kNumKeys * kNumThreads ; k + + ) {
for ( uint32_t k = 0 ; k < kNumKeys ; k + + ) {
keys . push_back ( " k " + std : : to_string ( k ) ) ;
}
RandomShuffle ( keys . begin ( ) , keys . end ( ) ) ;
// This counter will act as a "sequence number" to help us validate
// visibility logic with snapshots. If we had direct access to the seqno of
// snapshots and key/values, then we should directly compare those instead.
std : : atomic < int64_t > counter ( 0 ) ;
std : : function < void ( uint32_t ) > stress_thread = [ & ] ( int id ) {
size_t tid = std : : hash < std : : thread : : id > ( ) ( std : : this_thread : : get_id ( ) ) ;
Random64 rnd ( static_cast < uint32_t > ( tid ) ) ;
std : : function < void ( ) > check_correctness_wrt_snapshot = [ & ] ( ) {
Transaction * txn ;
TransactionOptions txn_options ;
// batch_size of 1 causes writes to DB for every marker.
@ -178,114 +175,82 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
ReadOptions read_options ;
for ( uint32_t i = 0 ; i < kNumIter ; i + + ) {
std : : set < std : : string > owned_keys ( keys . begin ( ) + id * kNumKeys ,
keys . begin ( ) + ( id + 1 ) * kNumKeys ) ;
// Add unowned keys to make the workload more interesting, but this
// increases row lock contention, so just do it sometimes.
if ( rnd . OneIn ( 2 ) ) {
owned_keys . insert ( keys [ rnd . Uniform ( kNumKeys * kNumThreads ) ] ) ;
}
txn = db - > BeginTransaction ( write_options , txn_options ) ;
ASSERT_OK ( txn - > SetName ( std : : to_string ( id ) ) ) ;
txn - > SetSnapshot ( ) ;
if ( a > = RO_SNAPSHOT ) {
if ( snapshot_action > = RO_SNAPSHOT ) {
read_options . snapshot = txn - > GetSnapshot ( ) ;
ASSERT_TRUE ( read_options . snapshot ! = nullptr ) ;
}
uint64_t buf [ 2 ] ;
buf [ 0 ] = id ;
uint64_t buf [ 1 ] ;
// When scanning through the database, make sure that all unprepared
// keys have value >= snapshot and all other keys have value < snapshot .
// keys have value >= snapshot.
int64_t snapshot_num = counter . fetch_add ( 1 ) ;
Status s ;
for ( const auto & key : owned_ keys) {
buf [ 1 ] = counter . fetch_add ( 1 ) ;
for ( const auto & key : keys ) {
buf [ 0 ] = counter . fetch_add ( 1 ) ;
s = txn - > Put ( key , Slice ( ( const char * ) buf , sizeof ( buf ) ) ) ;
if ( ! s . ok ( ) ) {
break ;
}
if ( a = = REFRESH_SNAPSHOT ) {
if ( sn apshot_action = = REFRESH_SNAPSHOT ) {
txn - > SetSnapshot ( ) ;
read_options . snapshot = txn - > GetSnapshot ( ) ;
snapshot_num = counter . fetch_add ( 1 ) ;
}
}
// Failure is possible due to snapshot validation. In this case,
// rollback and move onto next iteration.
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsBusy ( ) ) ;
ASSERT_OK ( txn - > Rollback ( ) ) ;
delete txn ;
continue ;
}
ASSERT_OK ( s ) ;
auto verify_key = [ & owned_keys , & a , & id , & snapshot_num ] (
const std : : string & key , const std : : string & value ) {
if ( owned_keys . count ( key ) > 0 ) {
ASSERT_EQ ( value . size ( ) , 16 ) ;
auto verify_key = [ & snapshot_action ,
& snapshot_num ] ( const std : : string & value ) {
ASSERT_EQ ( value . size ( ) , 8 ) ;
// Since this key is part of owned_keys, then this key must be
// unprepared by this transaction identified by 'id'
ASSERT_EQ ( ( ( int64_t * ) value . c_str ( ) ) [ 0 ] , id ) ;
if ( a = = REFRESH_SNAPSHOT ) {
if ( snapshot_action = = REFRESH_SNAPSHOT ) {
// If refresh snapshot is true, then the snapshot is refreshed
// after every Put(), meaning that the current snapshot in
// snapshot_num must be greater than the "seqno" of any keys
// written by the current transaction.
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 0 ] , snapshot_num ) ;
} else {
// If refresh snapshot is not on, then the snapshot was taken at
// the beginning of the transaction, meaning all writes must come
// after snapshot_num
ASSERT_GT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
}
} else if ( a > = RO_SNAPSHOT ) {
// If this is not an unprepared key, just assert that the key
// "seqno" is smaller than the snapshot seqno.
ASSERT_EQ ( value . size ( ) , 16 ) ;
ASSERT_LT ( ( ( int64_t * ) value . c_str ( ) ) [ 1 ] , snapshot_num ) ;
ASSERT_GT ( ( ( int64_t * ) value . c_str ( ) ) [ 0 ] , snapshot_num ) ;
}
} ;
// Validate Get()/Next()/Prev(). Do only one of them to save time, and
// reduce lock contention .
switch ( rnd . Uniform ( 3 ) ) {
case 0 : // Validate Get()
// Validate one of Get()/Next()/Prev() depending on the verification
// operation to use.
switch ( verify_op_ ) {
case VERIFY_GET : // Validate Get()
{
for ( const auto & key : keys ) {
std : : string value ;
s = txn - > Get ( read_options , Slice ( key ) , & value ) ;
if ( ! s . ok ( ) ) {
ASSERT_TRUE ( s . IsNotFound ( ) ) ;
ASSERT_EQ ( owned_keys . count ( key ) , 0 ) ;
} else {
verify_key ( key , value ) ;
}
ASSERT_OK ( txn - > Get ( read_options , Slice ( key ) , & value ) ) ;
verify_key ( value ) ;
}
break ;
}
case 1 : // Validate Next()
case VERIFY_NEXT : // Validate Next()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
ASSERT_OK ( iter - > status ( ) ) ;
for ( iter - > SeekToFirst ( ) ; iter - > Valid ( ) ; iter - > Next ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
verify_key ( iter - > value ( ) . ToString ( ) ) ;
}
ASSERT_OK ( iter - > status ( ) ) ;
delete iter ;
break ;
}
case 2 : // Validate Prev()
case VERIFY_PREV : // Validate Prev()
{
Iterator * iter = txn - > GetIterator ( read_options ) ;
ASSERT_OK ( iter - > status ( ) ) ;
for ( iter - > SeekToLast ( ) ; iter - > Valid ( ) ; iter - > Prev ( ) ) {
verify_key ( iter - > key ( ) . ToString ( ) , iter - > value ( ) . ToString ( ) ) ;
verify_key ( iter - > value ( ) . ToString ( ) ) ;
}
ASSERT_OK ( iter - > status ( ) ) ;
delete iter ;
@ -295,25 +260,13 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
FAIL ( ) ;
}
if ( rnd . OneIn ( 2 ) ) {
ASSERT_OK ( txn - > Commit ( ) ) ;
} else {
ASSERT_OK ( txn - > Rollback ( ) ) ;
}
delete txn ;
}
} ;
std : : vector < port : : Thread > threads ;
for ( uint32_t i = 0 ; i < kNumThreads ; i + + ) {
threads . emplace_back ( stress_thread , i ) ;
}
for ( auto & t : threads ) {
t . join ( ) ;
}
check_correctness_wrt_snapshot ( ) ;
}
# endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
// This tests how write unprepared behaves during recovery when the DB crashes
// after a transaction has either been unprepared or prepared, and tests if