diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 9669c9f49..f7542411b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -405,7 +405,7 @@ class Transaction { return 0; } - enum ExecutionStatus { + enum TransactionState { STARTED = 0, AWAITING_PREPARE = 1, PREPARED = 2, @@ -416,8 +416,8 @@ class Transaction { LOCKS_STOLEN = 7, }; - // Execution status of the transaction. - std::atomic exec_status_; + TransactionState GetState() { return txn_state_; } + void SetState(TransactionState state) { txn_state_ = state; } protected: explicit Transaction(const TransactionDB* db) {} @@ -428,6 +428,9 @@ class Transaction { uint64_t log_number_; TransactionName name_; + // Execution status of the transaction. + std::atomic txn_state_; + private: // No copying allowed Transaction(const Transaction&); diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 8d3874d48..bd10e6fb7 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -109,7 +109,7 @@ Status TransactionDBImpl::Initialize( } s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); - real_trx->exec_status_ = Transaction::PREPARED; + real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { break; } @@ -435,7 +435,7 @@ void TransactionDBImpl::GetAllPreparedTransactions( transv->clear(); std::lock_guard lock(name_map_mutex_); for (auto it = transactions_.begin(); it != transactions_.end(); it++) { - if (it->second->exec_status_ == Transaction::PREPARED) { + if (it->second->GetState() == Transaction::PREPARED) { transv->push_back(it->second); } } @@ -449,7 +449,7 @@ void TransactionDBImpl::RegisterTransaction(Transaction* txn) { assert(txn); assert(txn->GetName().length() > 0); assert(GetTransactionByName(txn->GetName()) == nullptr); - assert(txn->exec_status_ == Transaction::STARTED); + assert(txn->GetState() == Transaction::STARTED); std::lock_guard lock(name_map_mutex_); transactions_[txn->GetName()] = txn; } diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 8676582e9..1425b6361 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -55,7 +55,7 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, void TransactionImpl::Initialize(const TransactionOptions& txn_options) { txn_id_ = GenTxnID(); - exec_status_ = STARTED; + txn_state_ = STARTED; lock_timeout_ = txn_options.lock_timeout * 1000; if (lock_timeout_ < 0) { @@ -84,7 +84,7 @@ TransactionImpl::~TransactionImpl() { if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); } - if (!name_.empty() && exec_status_ != COMMITED) { + if (!name_.empty() && txn_state_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } } @@ -97,7 +97,7 @@ void TransactionImpl::Clear() { void TransactionImpl::Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) { - if (!name_.empty() && exec_status_ != COMMITED) { + if (!name_.empty() && txn_state_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options); @@ -128,21 +128,21 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) { if (IsExpired()) { s = Status::Expired(); } else if (expiration_time_ > 0) { - ExecutionStatus expected = STARTED; - can_commit = std::atomic_compare_exchange_strong(&exec_status_, &expected, + TransactionState expected = STARTED; + can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected, AWAITING_COMMIT); - } else if (exec_status_ == STARTED) { + } else if (txn_state_ == STARTED) { // lock stealing is not a concern can_commit = true; } if (can_commit) { - exec_status_.store(AWAITING_COMMIT); + txn_state_.store(AWAITING_COMMIT); s = db_->Write(write_options_, batch); if (s.ok()) { - exec_status_.store(COMMITED); + txn_state_.store(COMMITED); } - } else if (exec_status_ == LOCKS_STOLEN) { + } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); @@ -170,16 +170,16 @@ Status TransactionImpl::Prepare() { if (expiration_time_ > 0) { // must concern ourselves with expiraton and/or lock stealing // need to compare/exchange bc locks could be stolen under us here - ExecutionStatus expected = STARTED; - can_prepare = std::atomic_compare_exchange_strong(&exec_status_, &expected, + TransactionState expected = STARTED; + can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected, AWAITING_PREPARE); - } else if (exec_status_ == STARTED) { + } else if (txn_state_ == STARTED) { // expiration and lock stealing is not possible can_prepare = true; } if (can_prepare) { - exec_status_.store(AWAITING_PREPARE); + txn_state_.store(AWAITING_PREPARE); // transaction can't expire after preparation expiration_time_ = 0; WriteOptions write_options = write_options_; @@ -191,15 +191,15 @@ Status TransactionImpl::Prepare() { if (s.ok()) { assert(log_number_ != 0); dbimpl_->MarkLogAsContainingPrepSection(log_number_); - exec_status_.store(PREPARED); + txn_state_.store(PREPARED); } - } else if (exec_status_ == LOCKS_STOLEN) { + } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); - } else if (exec_status_ == PREPARED) { + } else if (txn_state_ == PREPARED) { s = Status::InvalidArgument("Transaction has already been prepared."); - } else if (exec_status_ == COMMITED) { + } else if (txn_state_ == COMMITED) { s = Status::InvalidArgument("Transaction has already been committed."); - } else if (exec_status_ == ROLLEDBACK) { + } else if (txn_state_ == ROLLEDBACK) { s = Status::InvalidArgument("Transaction has already been rolledback."); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); @@ -223,14 +223,14 @@ Status TransactionImpl::Commit() { // to change our state out from under us in the even that we expire and have // our locks stolen. In this case the only valid state is STARTED because // a state of PREPARED would have a cleared expiration_time_. - ExecutionStatus expected = STARTED; - commit_single = std::atomic_compare_exchange_strong( - &exec_status_, &expected, AWAITING_COMMIT); + TransactionState expected = STARTED; + commit_single = std::atomic_compare_exchange_strong(&txn_state_, &expected, + AWAITING_COMMIT); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); - } else if (exec_status_ == PREPARED) { + } else if (txn_state_ == PREPARED) { // expiration and lock stealing is not a concern commit_prepared = true; - } else if (exec_status_ == STARTED) { + } else if (txn_state_ == STARTED) { // expiration and lock stealing is not a concern commit_single = true; } @@ -241,16 +241,15 @@ Status TransactionImpl::Commit() { s = Status::InvalidArgument( "Commit-time batch contains values that will not be committed."); } else { - exec_status_.store(AWAITING_COMMIT); + txn_state_.store(AWAITING_COMMIT); s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); Clear(); if (s.ok()) { - exec_status_.store(COMMITED); + txn_state_.store(COMMITED); } } } else if (commit_prepared) { - exec_status_.store(AWAITING_COMMIT); - + txn_state_.store(AWAITING_COMMIT); // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode @@ -279,12 +278,12 @@ Status TransactionImpl::Commit() { txn_db_impl_->UnregisterTransaction(this); Clear(); - exec_status_.store(COMMITED); - } else if (exec_status_ == LOCKS_STOLEN) { + txn_state_.store(COMMITED); + } else if (txn_state_ == LOCKS_STOLEN) { s = Status::Expired(); - } else if (exec_status_ == COMMITED) { + } else if (txn_state_ == COMMITED) { s = Status::InvalidArgument("Transaction has already been committed."); - } else if (exec_status_ == ROLLEDBACK) { + } else if (txn_state_ == ROLLEDBACK) { s = Status::InvalidArgument("Transaction has already been rolledback."); } else { s = Status::InvalidArgument("Transaction is not in state for commit."); @@ -295,22 +294,22 @@ Status TransactionImpl::Commit() { Status TransactionImpl::Rollback() { Status s; - if (exec_status_ == PREPARED) { + if (txn_state_ == PREPARED) { WriteBatch rollback_marker; WriteBatchInternal::MarkRollback(&rollback_marker, name_); - exec_status_.store(AWAITING_ROLLBACK); + txn_state_.store(AWAITING_ROLLBACK); s = db_impl_->WriteImpl(write_options_, &rollback_marker); if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); Clear(); - exec_status_.store(ROLLEDBACK); + txn_state_.store(ROLLEDBACK); } - } else if (exec_status_ == STARTED) { + } else if (txn_state_ == STARTED) { // prepare couldn't have taken place Clear(); - } else if (exec_status_ == COMMITED) { + } else if (txn_state_ == COMMITED) { s = Status::InvalidArgument("This transaction has already been committed."); } else { s = Status::InvalidArgument( @@ -321,7 +320,7 @@ Status TransactionImpl::Rollback() { } Status TransactionImpl::RollbackToSavePoint() { - if (exec_status_ != STARTED) { + if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); } @@ -522,8 +521,8 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, bool TransactionImpl::TryStealingLocks() { assert(IsExpired()); - ExecutionStatus expected = STARTED; - return std::atomic_compare_exchange_strong(&exec_status_, &expected, + TransactionState expected = STARTED; + return std::atomic_compare_exchange_strong(&txn_state_, &expected, LOCKS_STOLEN); } @@ -534,7 +533,7 @@ void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, Status TransactionImpl::SetName(const TransactionName& name) { Status s; - if (exec_status_ == STARTED) { + if (txn_state_ == STARTED) { if (name_.length()) { s = Status::InvalidArgument("Transaction has already been named."); } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {