Refactor TransactionImpl

Summary:
This patch refactors TransactionImpl by separating the logic for pessimistic concurrency control from the implementation of how to write the data to rocksdb. The existing implementation is named WriteCommittedTxnImpl as it writes committed data to the db. A template named WritePreparedTxnImpl is also added which will be later completed to provide a an alternative implementation.
Closes https://github.com/facebook/rocksdb/pull/2676

Differential Revision: D5549998

Pulled By: maysamyabandeh

fbshipit-source-id: 16298e86b43ca4849324c1f35c731913c6d17bec
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent 060ccd4f84
commit c3d5c4d38a
  1. 1
      CMakeLists.txt
  2. 4
      db/db_impl.h
  3. 1
      src.mk
  4. 16
      utilities/transactions/transaction_db_impl.cc
  5. 11
      utilities/transactions/transaction_db_impl.h
  6. 57
      utilities/transactions/transaction_impl.cc
  7. 74
      utilities/transactions/transaction_impl.h
  8. 16
      utilities/transactions/transaction_lock_mgr.cc
  9. 16
      utilities/transactions/transaction_lock_mgr.h
  10. 65
      utilities/transactions/write_prepared_transaction_impl.cc
  11. 70
      utilities/transactions/write_prepared_transaction_impl.h

@ -525,6 +525,7 @@ set(SOURCES
utilities/transactions/transaction_impl.cc utilities/transactions/transaction_impl.cc
utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_transaction_impl.cc
utilities/ttl/db_ttl_impl.cc utilities/ttl/db_ttl_impl.cc
utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc

@ -631,7 +631,9 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
friend class InternalStats; friend class InternalStats;
friend class TransactionImpl; friend class PessimisticTxn;
friend class WriteCommittedTxnImpl;
friend class WritePreparedTxnImpl;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
friend class ForwardIterator; friend class ForwardIterator;
#endif #endif

@ -200,6 +200,7 @@ LIB_SOURCES = \
utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_util.cc \ utilities/transactions/transaction_util.cc \
utilities/transactions/write_prepared_transaction_impl.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \

@ -128,7 +128,7 @@ Transaction* TransactionDBImpl::BeginTransaction(
ReinitializeTransaction(old_txn, write_options, txn_options); ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn; return old_txn;
} else { } else {
return new TransactionImpl(this, write_options, txn_options); return new WriteCommittedTxnImpl(this, write_options, txn_options);
} }
} }
@ -266,17 +266,17 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
return s; return s;
} }
Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key, bool exclusive) { const std::string& key, bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
} }
void TransactionDBImpl::UnLock(TransactionImpl* txn, void TransactionDBImpl::UnLock(PessimisticTxn* txn,
const TransactionKeyMap* keys) { const TransactionKeyMap* keys) {
lock_mgr_.UnLock(txn, keys, GetEnv()); lock_mgr_.UnLock(txn, keys, GetEnv());
} }
void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key) { const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
} }
@ -372,7 +372,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Transaction* txn = BeginInternalTransaction(opts); Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing(); txn->DisableIndexing();
auto txn_impl = static_cast_with_check<TransactionImpl, Transaction>(txn); auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write() // Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock. // operations will not cause a deadlock.
@ -386,7 +386,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
} }
void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id,
TransactionImpl* tx) { PessimisticTxn* tx) {
assert(tx->GetExpirationTime() > 0); assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_); std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx}); expirable_transactions_map_.insert({tx_id, tx});
@ -405,14 +405,14 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
if (tx_it == expirable_transactions_map_.end()) { if (tx_it == expirable_transactions_map_.end()) {
return true; return true;
} }
TransactionImpl& tx = *(tx_it->second); PessimisticTxn& tx = *(tx_it->second);
return tx.TryStealingLocks(); return tx.TryStealingLocks();
} }
void TransactionDBImpl::ReinitializeTransaction( void TransactionDBImpl::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options, Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
auto txn_impl = static_cast_with_check<TransactionImpl, Transaction>(txn); auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn);
txn_impl->Reinitialize(this, write_options, txn_options); txn_impl->Reinitialize(this, write_options, txn_options);
} }

@ -63,11 +63,11 @@ class TransactionDBImpl : public TransactionDB {
using StackableDB::DropColumnFamily; using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key, Status TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key,
bool exclusive); bool exclusive);
void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys); void UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys);
void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); void UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key);
void AddColumnFamily(const ColumnFamilyHandle* handle); void AddColumnFamily(const ColumnFamilyHandle* handle);
@ -78,7 +78,7 @@ class TransactionDBImpl : public TransactionDB {
return txn_db_options_; return txn_db_options_;
} }
void InsertExpirableTransaction(TransactionID tx_id, TransactionImpl* tx); void InsertExpirableTransaction(TransactionID tx_id, PessimisticTxn* tx);
void RemoveExpirableTransaction(TransactionID tx_id); void RemoveExpirableTransaction(TransactionID tx_id);
// If transaction is no longer available, locks can be stolen // If transaction is no longer available, locks can be stolen
@ -109,13 +109,12 @@ class TransactionDBImpl : public TransactionDB {
// Must be held when adding/dropping column families. // Must be held when adding/dropping column families.
InstrumentedMutex column_family_mutex_; InstrumentedMutex column_family_mutex_;
Transaction* BeginInternalTransaction(const WriteOptions& options); Transaction* BeginInternalTransaction(const WriteOptions& options);
Status WriteHelper(WriteBatch* updates, TransactionImpl* txn_impl);
// Used to ensure that no locks are stolen from an expirable transaction // Used to ensure that no locks are stolen from an expirable transaction
// that has started a commit. Only transactions with an expiration time // that has started a commit. Only transactions with an expiration time
// should be in this map. // should be in this map.
std::mutex map_mutex_; std::mutex map_mutex_;
std::unordered_map<TransactionID, TransactionImpl*> std::unordered_map<TransactionID, PessimisticTxn*>
expirable_transactions_map_; expirable_transactions_map_;
// map from name to two phase transaction instance // map from name to two phase transaction instance

@ -29,31 +29,31 @@ namespace rocksdb {
struct WriteOptions; struct WriteOptions;
std::atomic<TransactionID> TransactionImpl::txn_id_counter_(1); std::atomic<TransactionID> PessimisticTxn::txn_id_counter_(1);
TransactionID TransactionImpl::GenTxnID() { TransactionID PessimisticTxn::GenTxnID() {
return txn_id_counter_.fetch_add(1); return txn_id_counter_.fetch_add(1);
} }
TransactionImpl::TransactionImpl(TransactionDB* txn_db, PessimisticTxn::PessimisticTxn(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetRootDB(), write_options), : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
txn_db_impl_(nullptr), txn_db_impl_(nullptr),
expiration_time_(0),
txn_id_(0), txn_id_(0),
waiting_cf_id_(0), waiting_cf_id_(0),
waiting_key_(nullptr), waiting_key_(nullptr),
expiration_time_(0),
lock_timeout_(0), lock_timeout_(0),
deadlock_detect_(false), deadlock_detect_(false),
deadlock_detect_depth_(0) { deadlock_detect_depth_(0) {
txn_db_impl_ = txn_db_impl_ =
static_cast_with_check<TransactionDBImpl, TransactionDB>(txn_db); static_cast_with_check<TransactionDBImpl, TransactionDB>(txn_db);
db_impl_ = static_cast_with_check<DBImpl, DB>(txn_db->GetRootDB()); db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
Initialize(txn_options); Initialize(txn_options);
} }
void TransactionImpl::Initialize(const TransactionOptions& txn_options) { void PessimisticTxn::Initialize(const TransactionOptions& txn_options) {
txn_id_ = GenTxnID(); txn_id_ = GenTxnID();
txn_state_ = STARTED; txn_state_ = STARTED;
@ -84,7 +84,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
} }
} }
TransactionImpl::~TransactionImpl() { PessimisticTxn::~PessimisticTxn() {
txn_db_impl_->UnLock(this, &GetTrackedKeys()); txn_db_impl_->UnLock(this, &GetTrackedKeys());
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
txn_db_impl_->RemoveExpirableTransaction(txn_id_); txn_db_impl_->RemoveExpirableTransaction(txn_id_);
@ -94,12 +94,12 @@ TransactionImpl::~TransactionImpl() {
} }
} }
void TransactionImpl::Clear() { void PessimisticTxn::Clear() {
txn_db_impl_->UnLock(this, &GetTrackedKeys()); txn_db_impl_->UnLock(this, &GetTrackedKeys());
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
void TransactionImpl::Reinitialize(TransactionDB* txn_db, void PessimisticTxn::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
if (!name_.empty() && txn_state_ != COMMITED) { if (!name_.empty() && txn_state_ != COMMITED) {
@ -109,7 +109,7 @@ void TransactionImpl::Reinitialize(TransactionDB* txn_db,
Initialize(txn_options); Initialize(txn_options);
} }
bool TransactionImpl::IsExpired() const { bool PessimisticTxn::IsExpired() const {
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
if (db_->GetEnv()->NowMicros() >= expiration_time_) { if (db_->GetEnv()->NowMicros() >= expiration_time_) {
// Transaction is expired. // Transaction is expired.
@ -120,7 +120,12 @@ bool TransactionImpl::IsExpired() const {
return false; return false;
} }
Status TransactionImpl::CommitBatch(WriteBatch* batch) { WriteCommittedTxnImpl::WriteCommittedTxnImpl(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTxn(txn_db, write_options, txn_options){};
Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) {
TransactionKeyMap keys_to_unlock; TransactionKeyMap keys_to_unlock;
Status s = LockBatch(batch, &keys_to_unlock); Status s = LockBatch(batch, &keys_to_unlock);
@ -158,7 +163,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) {
return s; return s;
} }
Status TransactionImpl::Prepare() { Status WriteCommittedTxnImpl::Prepare() {
Status s; Status s;
if (name_.empty()) { if (name_.empty()) {
@ -213,7 +218,7 @@ Status TransactionImpl::Prepare() {
return s; return s;
} }
Status TransactionImpl::Commit() { Status WriteCommittedTxnImpl::Commit() {
Status s; Status s;
bool commit_single = false; bool commit_single = false;
bool commit_prepared = false; bool commit_prepared = false;
@ -299,7 +304,7 @@ Status TransactionImpl::Commit() {
return s; return s;
} }
Status TransactionImpl::Rollback() { Status WriteCommittedTxnImpl::Rollback() {
Status s; Status s;
if (txn_state_ == PREPARED) { if (txn_state_ == PREPARED) {
WriteBatch rollback_marker; WriteBatch rollback_marker;
@ -326,7 +331,7 @@ Status TransactionImpl::Rollback() {
return s; return s;
} }
Status TransactionImpl::RollbackToSavePoint() { Status PessimisticTxn::RollbackToSavePoint() {
if (txn_state_ != STARTED) { if (txn_state_ != STARTED) {
return Status::InvalidArgument("Transaction is beyond state for rollback."); return Status::InvalidArgument("Transaction is beyond state for rollback.");
} }
@ -344,7 +349,7 @@ Status TransactionImpl::RollbackToSavePoint() {
// Lock all keys in this batch. // Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock // On success, caller should unlock keys_to_unlock
Status TransactionImpl::LockBatch(WriteBatch* batch, Status PessimisticTxn::LockBatch(WriteBatch* batch,
TransactionKeyMap* keys_to_unlock) { TransactionKeyMap* keys_to_unlock) {
class Handler : public WriteBatch::Handler { class Handler : public WriteBatch::Handler {
public: public:
@ -422,9 +427,9 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
// If check_shapshot is true and this transaction has a snapshot set, // If check_shapshot is true and this transaction has a snapshot set,
// this key will only be locked if there have been no writes to this key since // this key will only be locked if there have been no writes to this key since
// the snapshot time. // the snapshot time.
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only, const Slice& key, bool read_only, bool exclusive,
bool exclusive, bool untracked) { bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family); uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString(); std::string key_str = key.ToString();
bool previously_locked; bool previously_locked;
@ -510,7 +515,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// Return OK() if this key has not been modified more recently than the // Return OK() if this key has not been modified more recently than the
// transaction snapshot_. // transaction snapshot_.
Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber prev_seqno,
SequenceNumber* new_seqno) { SequenceNumber* new_seqno) {
@ -526,29 +531,27 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
*new_seqno = seq; *new_seqno = seq;
auto db_impl = static_cast_with_check<DBImpl, DB>(db_);
ColumnFamilyHandle* cfh = ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl->DefaultColumnFamily(); column_family ? column_family : db_impl_->DefaultColumnFamily();
return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(), return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snapshot_->GetSequenceNumber(), snapshot_->GetSequenceNumber(),
false /* cache_only */); false /* cache_only */);
} }
bool TransactionImpl::TryStealingLocks() { bool PessimisticTxn::TryStealingLocks() {
assert(IsExpired()); assert(IsExpired());
TransactionState expected = STARTED; TransactionState expected = STARTED;
return std::atomic_compare_exchange_strong(&txn_state_, &expected, return std::atomic_compare_exchange_strong(&txn_state_, &expected,
LOCKS_STOLEN); LOCKS_STOLEN);
} }
void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, void PessimisticTxn::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
} }
Status TransactionImpl::SetName(const TransactionName& name) { Status PessimisticTxn::SetName(const TransactionName& name) {
Status s; Status s;
if (txn_state_ == STARTED) { if (txn_state_ == STARTED) {
if (name_.length()) { if (name_.length()) {

@ -31,24 +31,28 @@
namespace rocksdb { namespace rocksdb {
class TransactionDBImpl; class TransactionDBImpl;
class PessimisticTxn;
class TransactionImpl : public TransactionBaseImpl { // A transaction under pessimistic concurrency control. This class implements
// the locking API and interfaces with the lock manager as well as the
// pessimistic transactional db.
class PessimisticTxn : public TransactionBaseImpl {
public: public:
TransactionImpl(TransactionDB* db, const WriteOptions& write_options, PessimisticTxn(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
virtual ~TransactionImpl(); virtual ~PessimisticTxn();
void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
Status Prepare() override; Status Prepare() override = 0;
Status Commit() override; Status Commit() override = 0;
Status CommitBatch(WriteBatch* batch); virtual Status CommitBatch(WriteBatch* batch) = 0;
Status Rollback() override; Status Rollback() override = 0;
Status RollbackToSavePoint() override; Status RollbackToSavePoint() override;
@ -107,14 +111,24 @@ class TransactionImpl : public TransactionBaseImpl {
int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
protected: protected:
void Initialize(const TransactionOptions& txn_options);
Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock);
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive, bool read_only, bool exclusive,
bool untracked = false) override; bool untracked = false) override;
private: void Clear() override;
TransactionDBImpl* txn_db_impl_; TransactionDBImpl* txn_db_impl_;
DBImpl* db_impl_; DBImpl* db_impl_;
// If non-zero, this transaction should not be committed after this time (in
// microseconds according to Env->NowMicros())
uint64_t expiration_time_;
private:
// Used to create unique ids for transactions. // Used to create unique ids for transactions.
static std::atomic<TransactionID> txn_id_counter_; static std::atomic<TransactionID> txn_id_counter_;
@ -140,10 +154,6 @@ class TransactionImpl : public TransactionBaseImpl {
// Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_.
mutable std::mutex wait_mutex_; mutable std::mutex wait_mutex_;
// If non-zero, this transaction should not be committed after this time (in
// microseconds according to Env->NowMicros())
uint64_t expiration_time_;
// Timeout in microseconds when locking a key or -1 if there is no timeout. // Timeout in microseconds when locking a key or -1 if there is no timeout.
int64_t lock_timeout_; int64_t lock_timeout_;
@ -153,32 +163,46 @@ class TransactionImpl : public TransactionBaseImpl {
// Whether to perform deadlock detection or not. // Whether to perform deadlock detection or not.
int64_t deadlock_detect_depth_; int64_t deadlock_detect_depth_;
void Clear() override;
void Initialize(const TransactionOptions& txn_options);
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno); SequenceNumber prev_seqno, SequenceNumber* new_seqno);
Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status DoCommit(WriteBatch* batch); // No copying allowed
PessimisticTxn(const PessimisticTxn&);
void operator=(const PessimisticTxn&);
};
void RollbackLastN(size_t num); class WriteCommittedTxnImpl : public PessimisticTxn {
public:
WriteCommittedTxnImpl(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
void UnlockGetForUpdate(ColumnFamilyHandle* column_family, virtual ~WriteCommittedTxnImpl() {}
const Slice& key) override;
Status Prepare() override;
Status Commit() override;
Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override;
private:
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno);
// No copying allowed // No copying allowed
TransactionImpl(const TransactionImpl&); WriteCommittedTxnImpl(const WriteCommittedTxnImpl&);
void operator=(const TransactionImpl&); void operator=(const WriteCommittedTxnImpl&);
}; };
// Used at commit time to check whether transaction is committing before its // Used at commit time to check whether transaction is committing before its
// expiration time. // expiration time.
class TransactionCallback : public WriteCallback { class TransactionCallback : public WriteCallback {
public: public:
explicit TransactionCallback(TransactionImpl* txn) : txn_(txn) {} explicit TransactionCallback(PessimisticTxn* txn) : txn_(txn) {}
Status Callback(DB* db) override { Status Callback(DB* db) override {
if (txn_->IsExpired()) { if (txn_->IsExpired()) {
@ -191,7 +215,7 @@ class TransactionCallback : public WriteCallback {
bool AllowWriteBatching() override { return true; } bool AllowWriteBatching() override { return true; }
private: private:
TransactionImpl* txn_; PessimisticTxn* txn_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -227,7 +227,7 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
return expired; return expired;
} }
Status TransactionLockMgr::TryLock(TransactionImpl* txn, Status TransactionLockMgr::TryLock(PessimisticTxn* txn,
uint32_t column_family_id, uint32_t column_family_id,
const std::string& key, Env* env, const std::string& key, Env* env,
bool exclusive) { bool exclusive) {
@ -256,7 +256,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn,
// Helper function for TryLock(). // Helper function for TryLock().
Status TransactionLockMgr::AcquireWithTimeout( Status TransactionLockMgr::AcquireWithTimeout(
TransactionImpl* txn, LockMap* lock_map, LockMapStripe* stripe, PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe,
uint32_t column_family_id, const std::string& key, Env* env, uint32_t column_family_id, const std::string& key, Env* env,
int64_t timeout, const LockInfo& lock_info) { int64_t timeout, const LockInfo& lock_info) {
Status result; Status result;
@ -357,13 +357,13 @@ Status TransactionLockMgr::AcquireWithTimeout(
} }
void TransactionLockMgr::DecrementWaiters( void TransactionLockMgr::DecrementWaiters(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_ids); DecrementWaitersImpl(txn, wait_ids);
} }
void TransactionLockMgr::DecrementWaitersImpl( void TransactionLockMgr::DecrementWaitersImpl(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID(); auto id = txn->GetID();
assert(wait_txn_map_.Contains(id)); assert(wait_txn_map_.Contains(id));
wait_txn_map_.Delete(id); wait_txn_map_.Delete(id);
@ -377,7 +377,7 @@ void TransactionLockMgr::DecrementWaitersImpl(
} }
bool TransactionLockMgr::IncrementWaiters( bool TransactionLockMgr::IncrementWaiters(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID(); auto id = txn->GetID();
std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth()); std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth());
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
@ -501,7 +501,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
return result; return result;
} }
void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn,
const std::string& key, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, LockMapStripe* stripe, LockMap* lock_map,
Env* env) { Env* env) {
@ -537,7 +537,7 @@ void TransactionLockMgr::UnLockKey(const TransactionImpl* txn,
} }
} }
void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id,
const std::string& key, Env* env) { const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get(); LockMap* lock_map = lock_map_ptr.get();
@ -559,7 +559,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
stripe->stripe_cv->NotifyAll(); stripe->stripe_cv->NotifyAll();
} }
void TransactionLockMgr::UnLock(const TransactionImpl* txn, void TransactionLockMgr::UnLock(const PessimisticTxn* txn,
const TransactionKeyMap* key_map, Env* env) { const TransactionKeyMap* key_map, Env* env) {
for (auto& key_map_iter : *key_map) { for (auto& key_map_iter : *key_map) {
uint32_t column_family_id = key_map_iter.first; uint32_t column_family_id = key_map_iter.first;

@ -47,14 +47,14 @@ class TransactionLockMgr {
// Attempt to lock key. If OK status is returned, the caller is responsible // Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key. // for calling UnLock() on this key.
Status TryLock(TransactionImpl* txn, uint32_t column_family_id, Status TryLock(PessimisticTxn* txn, uint32_t column_family_id,
const std::string& key, Env* env, bool exclusive); const std::string& key, Env* env, bool exclusive);
// Unlock a key locked by TryLock(). txn must be the same Transaction that // Unlock a key locked by TryLock(). txn must be the same Transaction that
// locked this key. // locked this key.
void UnLock(const TransactionImpl* txn, const TransactionKeyMap* keys, void UnLock(const PessimisticTxn* txn, const TransactionKeyMap* keys,
Env* env); Env* env);
void UnLock(TransactionImpl* txn, uint32_t column_family_id, void UnLock(PessimisticTxn* txn, uint32_t column_family_id,
const std::string& key, Env* env); const std::string& key, Env* env);
using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>; using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>;
@ -102,7 +102,7 @@ class TransactionLockMgr {
std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id); std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id);
Status AcquireWithTimeout(TransactionImpl* txn, LockMap* lock_map, Status AcquireWithTimeout(PessimisticTxn* txn, LockMap* lock_map,
LockMapStripe* stripe, uint32_t column_family_id, LockMapStripe* stripe, uint32_t column_family_id,
const std::string& key, Env* env, int64_t timeout, const std::string& key, Env* env, int64_t timeout,
const LockInfo& lock_info); const LockInfo& lock_info);
@ -112,14 +112,14 @@ class TransactionLockMgr {
const LockInfo& lock_info, uint64_t* wait_time, const LockInfo& lock_info, uint64_t* wait_time,
autovector<TransactionID>* txn_ids); autovector<TransactionID>* txn_ids);
void UnLockKey(const TransactionImpl* txn, const std::string& key, void UnLockKey(const PessimisticTxn* txn, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, Env* env); LockMapStripe* stripe, LockMap* lock_map, Env* env);
bool IncrementWaiters(const TransactionImpl* txn, bool IncrementWaiters(const PessimisticTxn* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
void DecrementWaiters(const TransactionImpl* txn, void DecrementWaiters(const PessimisticTxn* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
void DecrementWaitersImpl(const TransactionImpl* txn, void DecrementWaitersImpl(const PessimisticTxn* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
// No copying allowed // No copying allowed

@ -0,0 +1,65 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_prepared_transaction_impl.h"
#include <map>
#include <set>
#include <string>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/transaction_impl.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
struct WriteOptions;
WritePreparedTxnImpl::WritePreparedTxnImpl(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTxn(txn_db, write_options, txn_options) {
PessimisticTxn::Initialize(txn_options);
}
Status WritePreparedTxnImpl::CommitBatch(WriteBatch* batch) {
// TODO(myabandeh) Implement this
throw std::runtime_error("CommitBatch not Implemented");
return Status::OK();
}
Status WritePreparedTxnImpl::Prepare() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Prepare not Implemented");
return Status::OK();
}
Status WritePreparedTxnImpl::Commit() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Commit not Implemented");
return Status::OK();
}
Status WritePreparedTxnImpl::Rollback() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Rollback not Implemented");
return Status::OK();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,70 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <algorithm>
#include <atomic>
#include <mutex>
#include <stack>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/autovector.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_impl.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
class TransactionDBImpl;
// This impl could write to DB also uncomitted data and then later tell apart
// committed data from uncomitted data. Uncommitted data could be after the
// Prepare phase in 2PC (WritePreparedTxnImpl) or before that
// (WriteUnpreparedTxnImpl).
class WritePreparedTxnImpl : public PessimisticTxn {
public:
WritePreparedTxnImpl(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
virtual ~WritePreparedTxnImpl() {}
Status Prepare() override;
Status Commit() override;
Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override;
private:
// TODO(myabandeh): verify that the current impl work with values being
// written with prepare sequence number too.
// Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice&
// key,
// SequenceNumber prev_seqno, SequenceNumber*
// new_seqno);
// No copying allowed
WritePreparedTxnImpl(const WritePreparedTxnImpl&);
void operator=(const WritePreparedTxnImpl&);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE
Loading…
Cancel
Save