Expose Transaction State Publicly

Summary:
This exposes a transactions state through a public api rather than through a public member variable. I also do some name refactoring.
ExecutionStatus => TransactionState
exec_status_ => trx_state_

Test Plan: It compiles and transaction_test passes.

Reviewers: IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, mung, dhruba, sdong

Differential Revision: https://reviews.facebook.net/D64689
main
Reid Horuff 8 years ago
parent 2c1f95291d
commit 37737c3a6b
  1. 9
      include/rocksdb/utilities/transaction.h
  2. 6
      utilities/transactions/transaction_db_impl.cc
  3. 79
      utilities/transactions/transaction_impl.cc

@ -405,7 +405,7 @@ class Transaction {
return 0; return 0;
} }
enum ExecutionStatus { enum TransactionState {
STARTED = 0, STARTED = 0,
AWAITING_PREPARE = 1, AWAITING_PREPARE = 1,
PREPARED = 2, PREPARED = 2,
@ -416,8 +416,8 @@ class Transaction {
LOCKS_STOLEN = 7, LOCKS_STOLEN = 7,
}; };
// Execution status of the transaction. TransactionState GetState() { return txn_state_; }
std::atomic<ExecutionStatus> exec_status_; void SetState(TransactionState state) { txn_state_ = state; }
protected: protected:
explicit Transaction(const TransactionDB* db) {} explicit Transaction(const TransactionDB* db) {}
@ -428,6 +428,9 @@ class Transaction {
uint64_t log_number_; uint64_t log_number_;
TransactionName name_; TransactionName name_;
// Execution status of the transaction.
std::atomic<TransactionState> txn_state_;
private: private:
// No copying allowed // No copying allowed
Transaction(const Transaction&); Transaction(const Transaction&);

@ -109,7 +109,7 @@ Status TransactionDBImpl::Initialize(
} }
s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_);
real_trx->exec_status_ = Transaction::PREPARED; real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -435,7 +435,7 @@ void TransactionDBImpl::GetAllPreparedTransactions(
transv->clear(); transv->clear();
std::lock_guard<std::mutex> lock(name_map_mutex_); std::lock_guard<std::mutex> lock(name_map_mutex_);
for (auto it = transactions_.begin(); it != transactions_.end(); it++) { for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
if (it->second->exec_status_ == Transaction::PREPARED) { if (it->second->GetState() == Transaction::PREPARED) {
transv->push_back(it->second); transv->push_back(it->second);
} }
} }
@ -449,7 +449,7 @@ void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
assert(txn); assert(txn);
assert(txn->GetName().length() > 0); assert(txn->GetName().length() > 0);
assert(GetTransactionByName(txn->GetName()) == nullptr); assert(GetTransactionByName(txn->GetName()) == nullptr);
assert(txn->exec_status_ == Transaction::STARTED); assert(txn->GetState() == Transaction::STARTED);
std::lock_guard<std::mutex> lock(name_map_mutex_); std::lock_guard<std::mutex> lock(name_map_mutex_);
transactions_[txn->GetName()] = txn; transactions_[txn->GetName()] = txn;
} }

@ -55,7 +55,7 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
void TransactionImpl::Initialize(const TransactionOptions& txn_options) { void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
txn_id_ = GenTxnID(); txn_id_ = GenTxnID();
exec_status_ = STARTED; txn_state_ = STARTED;
lock_timeout_ = txn_options.lock_timeout * 1000; lock_timeout_ = txn_options.lock_timeout * 1000;
if (lock_timeout_ < 0) { if (lock_timeout_ < 0) {
@ -84,7 +84,7 @@ TransactionImpl::~TransactionImpl() {
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
txn_db_impl_->RemoveExpirableTransaction(txn_id_); txn_db_impl_->RemoveExpirableTransaction(txn_id_);
} }
if (!name_.empty() && exec_status_ != COMMITED) { if (!name_.empty() && txn_state_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
} }
} }
@ -97,7 +97,7 @@ void TransactionImpl::Clear() {
void TransactionImpl::Reinitialize(TransactionDB* txn_db, void TransactionImpl::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
if (!name_.empty() && exec_status_ != COMMITED) { if (!name_.empty() && txn_state_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
} }
TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options); TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
@ -128,21 +128,21 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) {
if (IsExpired()) { if (IsExpired()) {
s = Status::Expired(); s = Status::Expired();
} else if (expiration_time_ > 0) { } else if (expiration_time_ > 0) {
ExecutionStatus expected = STARTED; TransactionState expected = STARTED;
can_commit = std::atomic_compare_exchange_strong(&exec_status_, &expected, can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
AWAITING_COMMIT); AWAITING_COMMIT);
} else if (exec_status_ == STARTED) { } else if (txn_state_ == STARTED) {
// lock stealing is not a concern // lock stealing is not a concern
can_commit = true; can_commit = true;
} }
if (can_commit) { if (can_commit) {
exec_status_.store(AWAITING_COMMIT); txn_state_.store(AWAITING_COMMIT);
s = db_->Write(write_options_, batch); s = db_->Write(write_options_, batch);
if (s.ok()) { if (s.ok()) {
exec_status_.store(COMMITED); txn_state_.store(COMMITED);
} }
} else if (exec_status_ == LOCKS_STOLEN) { } else if (txn_state_ == LOCKS_STOLEN) {
s = Status::Expired(); s = Status::Expired();
} else { } else {
s = Status::InvalidArgument("Transaction is not in state for commit."); s = Status::InvalidArgument("Transaction is not in state for commit.");
@ -170,16 +170,16 @@ Status TransactionImpl::Prepare() {
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
// must concern ourselves with expiraton and/or lock stealing // must concern ourselves with expiraton and/or lock stealing
// need to compare/exchange bc locks could be stolen under us here // need to compare/exchange bc locks could be stolen under us here
ExecutionStatus expected = STARTED; TransactionState expected = STARTED;
can_prepare = std::atomic_compare_exchange_strong(&exec_status_, &expected, can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
AWAITING_PREPARE); AWAITING_PREPARE);
} else if (exec_status_ == STARTED) { } else if (txn_state_ == STARTED) {
// expiration and lock stealing is not possible // expiration and lock stealing is not possible
can_prepare = true; can_prepare = true;
} }
if (can_prepare) { if (can_prepare) {
exec_status_.store(AWAITING_PREPARE); txn_state_.store(AWAITING_PREPARE);
// transaction can't expire after preparation // transaction can't expire after preparation
expiration_time_ = 0; expiration_time_ = 0;
WriteOptions write_options = write_options_; WriteOptions write_options = write_options_;
@ -191,15 +191,15 @@ Status TransactionImpl::Prepare() {
if (s.ok()) { if (s.ok()) {
assert(log_number_ != 0); assert(log_number_ != 0);
dbimpl_->MarkLogAsContainingPrepSection(log_number_); dbimpl_->MarkLogAsContainingPrepSection(log_number_);
exec_status_.store(PREPARED); txn_state_.store(PREPARED);
} }
} else if (exec_status_ == LOCKS_STOLEN) { } else if (txn_state_ == LOCKS_STOLEN) {
s = Status::Expired(); s = Status::Expired();
} else if (exec_status_ == PREPARED) { } else if (txn_state_ == PREPARED) {
s = Status::InvalidArgument("Transaction has already been prepared."); s = Status::InvalidArgument("Transaction has already been prepared.");
} else if (exec_status_ == COMMITED) { } else if (txn_state_ == COMMITED) {
s = Status::InvalidArgument("Transaction has already been committed."); s = Status::InvalidArgument("Transaction has already been committed.");
} else if (exec_status_ == ROLLEDBACK) { } else if (txn_state_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback."); s = Status::InvalidArgument("Transaction has already been rolledback.");
} else { } else {
s = Status::InvalidArgument("Transaction is not in state for commit."); s = Status::InvalidArgument("Transaction is not in state for commit.");
@ -223,14 +223,14 @@ Status TransactionImpl::Commit() {
// to change our state out from under us in the even that we expire and have // to change our state out from under us in the even that we expire and have
// our locks stolen. In this case the only valid state is STARTED because // our locks stolen. In this case the only valid state is STARTED because
// a state of PREPARED would have a cleared expiration_time_. // a state of PREPARED would have a cleared expiration_time_.
ExecutionStatus expected = STARTED; TransactionState expected = STARTED;
commit_single = std::atomic_compare_exchange_strong( commit_single = std::atomic_compare_exchange_strong(&txn_state_, &expected,
&exec_status_, &expected, AWAITING_COMMIT); AWAITING_COMMIT);
TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
} else if (exec_status_ == PREPARED) { } else if (txn_state_ == PREPARED) {
// expiration and lock stealing is not a concern // expiration and lock stealing is not a concern
commit_prepared = true; commit_prepared = true;
} else if (exec_status_ == STARTED) { } else if (txn_state_ == STARTED) {
// expiration and lock stealing is not a concern // expiration and lock stealing is not a concern
commit_single = true; commit_single = true;
} }
@ -241,16 +241,15 @@ Status TransactionImpl::Commit() {
s = Status::InvalidArgument( s = Status::InvalidArgument(
"Commit-time batch contains values that will not be committed."); "Commit-time batch contains values that will not be committed.");
} else { } else {
exec_status_.store(AWAITING_COMMIT); txn_state_.store(AWAITING_COMMIT);
s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
Clear(); Clear();
if (s.ok()) { if (s.ok()) {
exec_status_.store(COMMITED); txn_state_.store(COMMITED);
} }
} }
} else if (commit_prepared) { } else if (commit_prepared) {
exec_status_.store(AWAITING_COMMIT); txn_state_.store(AWAITING_COMMIT);
// We take the commit-time batch and append the Commit marker. // We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode // The Memtable will ignore the Commit marker in non-recovery mode
@ -279,12 +278,12 @@ Status TransactionImpl::Commit() {
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
Clear(); Clear();
exec_status_.store(COMMITED); txn_state_.store(COMMITED);
} else if (exec_status_ == LOCKS_STOLEN) { } else if (txn_state_ == LOCKS_STOLEN) {
s = Status::Expired(); s = Status::Expired();
} else if (exec_status_ == COMMITED) { } else if (txn_state_ == COMMITED) {
s = Status::InvalidArgument("Transaction has already been committed."); s = Status::InvalidArgument("Transaction has already been committed.");
} else if (exec_status_ == ROLLEDBACK) { } else if (txn_state_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback."); s = Status::InvalidArgument("Transaction has already been rolledback.");
} else { } else {
s = Status::InvalidArgument("Transaction is not in state for commit."); s = Status::InvalidArgument("Transaction is not in state for commit.");
@ -295,22 +294,22 @@ Status TransactionImpl::Commit() {
Status TransactionImpl::Rollback() { Status TransactionImpl::Rollback() {
Status s; Status s;
if (exec_status_ == PREPARED) { if (txn_state_ == PREPARED) {
WriteBatch rollback_marker; WriteBatch rollback_marker;
WriteBatchInternal::MarkRollback(&rollback_marker, name_); WriteBatchInternal::MarkRollback(&rollback_marker, name_);
exec_status_.store(AWAITING_ROLLBACK); txn_state_.store(AWAITING_ROLLBACK);
s = db_impl_->WriteImpl(write_options_, &rollback_marker); s = db_impl_->WriteImpl(write_options_, &rollback_marker);
if (s.ok()) { if (s.ok()) {
// we do not need to keep our prepared section around // we do not need to keep our prepared section around
assert(log_number_ > 0); assert(log_number_ > 0);
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
Clear(); Clear();
exec_status_.store(ROLLEDBACK); txn_state_.store(ROLLEDBACK);
} }
} else if (exec_status_ == STARTED) { } else if (txn_state_ == STARTED) {
// prepare couldn't have taken place // prepare couldn't have taken place
Clear(); Clear();
} else if (exec_status_ == COMMITED) { } else if (txn_state_ == COMMITED) {
s = Status::InvalidArgument("This transaction has already been committed."); s = Status::InvalidArgument("This transaction has already been committed.");
} else { } else {
s = Status::InvalidArgument( s = Status::InvalidArgument(
@ -321,7 +320,7 @@ Status TransactionImpl::Rollback() {
} }
Status TransactionImpl::RollbackToSavePoint() { Status TransactionImpl::RollbackToSavePoint() {
if (exec_status_ != STARTED) { if (txn_state_ != STARTED) {
return Status::InvalidArgument("Transaction is beyond state for rollback."); return Status::InvalidArgument("Transaction is beyond state for rollback.");
} }
@ -522,8 +521,8 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
bool TransactionImpl::TryStealingLocks() { bool TransactionImpl::TryStealingLocks() {
assert(IsExpired()); assert(IsExpired());
ExecutionStatus expected = STARTED; TransactionState expected = STARTED;
return std::atomic_compare_exchange_strong(&exec_status_, &expected, return std::atomic_compare_exchange_strong(&txn_state_, &expected,
LOCKS_STOLEN); LOCKS_STOLEN);
} }
@ -534,7 +533,7 @@ void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
Status TransactionImpl::SetName(const TransactionName& name) { Status TransactionImpl::SetName(const TransactionName& name) {
Status s; Status s;
if (exec_status_ == STARTED) { if (txn_state_ == STARTED) {
if (name_.length()) { if (name_.length()) {
s = Status::InvalidArgument("Transaction has already been named."); s = Status::InvalidArgument("Transaction has already been named.");
} else if (txn_db_impl_->GetTransactionByName(name) != nullptr) { } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {

Loading…
Cancel
Save