@ -20,6 +20,7 @@
# include "rocksdb/status.h"
# include "rocksdb/status.h"
# include "rocksdb/utilities/transaction_db.h"
# include "rocksdb/utilities/transaction_db.h"
# include "util/string_util.h"
# include "util/string_util.h"
# include "util/sync_point.h"
# include "utilities/transactions/transaction_db_impl.h"
# include "utilities/transactions/transaction_db_impl.h"
# include "utilities/transactions/transaction_util.h"
# include "utilities/transactions/transaction_util.h"
@ -42,7 +43,8 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
expiration_time_ ( txn_options . expiration > = 0
expiration_time_ ( txn_options . expiration > = 0
? start_time_ + txn_options . expiration * 1000
? start_time_ + txn_options . expiration * 1000
: 0 ) ,
: 0 ) ,
lock_timeout_ ( txn_options . lock_timeout * 1000 ) {
lock_timeout_ ( txn_options . lock_timeout * 1000 ) ,
exec_status_ ( STARTED ) {
txn_db_impl_ = dynamic_cast < TransactionDBImpl * > ( txn_db ) ;
txn_db_impl_ = dynamic_cast < TransactionDBImpl * > ( txn_db ) ;
assert ( txn_db_impl_ ) ;
assert ( txn_db_impl_ ) ;
@ -55,10 +57,16 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
if ( txn_options . set_snapshot ) {
if ( txn_options . set_snapshot ) {
SetSnapshot ( ) ;
SetSnapshot ( ) ;
}
}
if ( expiration_time_ > 0 ) {
txn_db_impl_ - > InsertExpirableTransaction ( txn_id_ , this ) ;
}
}
}
TransactionImpl : : ~ TransactionImpl ( ) {
TransactionImpl : : ~ TransactionImpl ( ) {
txn_db_impl_ - > UnLock ( this , & GetTrackedKeys ( ) ) ;
txn_db_impl_ - > UnLock ( this , & GetTrackedKeys ( ) ) ;
if ( expiration_time_ > 0 ) {
txn_db_impl_ - > RemoveExpirableTransaction ( txn_id_ ) ;
}
}
}
void TransactionImpl : : Clear ( ) {
void TransactionImpl : : Clear ( ) {
@ -103,18 +111,27 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
Status s ;
Status s ;
if ( expiration_time_ > 0 ) {
if ( expiration_time_ > 0 ) {
// We cannot commit a transaction that is expired as its locks might have
if ( IsExpired ( ) ) {
// been released.
return Status : : Expired ( ) ;
// To avoid race conditions, we need to use a WriteCallback to check the
}
// expiration time once we're on the writer thread.
TransactionCallback callback ( this ) ;
// Do write directly on base db as TransctionDB::Write() would attempt to
// do conflict checking that we've already done.
assert ( dynamic_cast < DBImpl * > ( db_ ) ! = nullptr ) ;
auto db_impl = reinterpret_cast < DBImpl * > ( db_ ) ;
s = db_impl - > WriteWithCallback ( write_options_ , batch , & callback ) ;
// Transaction should only be committed if the thread succeeds
// changing its execution status to COMMITTING. This is because
// A different transaction may consider this one expired and attempt
// to steal its locks between the IsExpired() check and the beginning
// of a commit.
ExecutionStatus expected = STARTED ;
bool can_commit = std : : atomic_compare_exchange_strong (
& exec_status_ , & expected , COMMITTING ) ;
TEST_SYNC_POINT ( " TransactionTest::ExpirableTransactionDataRace:1 " ) ;
if ( can_commit ) {
s = db_ - > Write ( write_options_ , batch ) ;
} else {
assert ( exec_status_ = = LOCKS_STOLEN ) ;
return Status : : Expired ( ) ;
}
} else {
} else {
s = db_ - > Write ( write_options_ , batch ) ;
s = db_ - > Write ( write_options_ , batch ) ;
}
}
@ -316,6 +333,13 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
false /* cache_only */ ) ;
false /* cache_only */ ) ;
}
}
bool TransactionImpl : : TryStealingLocks ( ) {
assert ( IsExpired ( ) ) ;
ExecutionStatus expected = STARTED ;
return std : : atomic_compare_exchange_strong ( & exec_status_ , & expected ,
LOCKS_STOLEN ) ;
}
} // namespace rocksdb
} // namespace rocksdb
# endif // ROCKSDB_LITE
# endif // ROCKSDB_LITE