diff --git a/CMakeLists.txt b/CMakeLists.txt index bc8eb5d61..bd7a8fbe4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -525,6 +525,7 @@ set(SOURCES utilities/transactions/transaction_impl.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc + utilities/transactions/write_prepared_transaction_impl.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc diff --git a/db/db_impl.h b/db/db_impl.h index 7fec69cd7..3284048a6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -631,7 +631,9 @@ class DBImpl : public DB { private: friend class DB; friend class InternalStats; - friend class TransactionImpl; + friend class PessimisticTxn; + friend class WriteCommittedTxnImpl; + friend class WritePreparedTxnImpl; #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif diff --git a/src.mk b/src.mk index 81d78eb36..0b0d4e6ab 100644 --- a/src.mk +++ b/src.mk @@ -200,6 +200,7 @@ LIB_SOURCES = \ utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ + utilities/transactions/write_prepared_transaction_impl.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 69b5bc1ea..bd43b585a 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -128,7 +128,7 @@ Transaction* TransactionDBImpl::BeginTransaction( ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { - return new TransactionImpl(this, write_options, txn_options); + return new WriteCommittedTxnImpl(this, write_options, txn_options); } } @@ -266,17 +266,17 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { return s; } -Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, +Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } -void TransactionDBImpl::UnLock(TransactionImpl* txn, +void TransactionDBImpl::UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys) { lock_mgr_.UnLock(txn, keys, GetEnv()); } -void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, +void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } @@ -372,7 +372,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); // Since commitBatch sorts the keys before locking, concurrent Write() // operations will not cause a deadlock. @@ -386,7 +386,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { } void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, - TransactionImpl* tx) { + PessimisticTxn* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); @@ -405,14 +405,14 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks( if (tx_it == expirable_transactions_map_.end()) { return true; } - TransactionImpl& tx = *(tx_it->second); + PessimisticTxn& tx = *(tx_it->second); return tx.TryStealingLocks(); } void TransactionDBImpl::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index 428512e82..dfc13fbd7 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -63,11 +63,11 @@ class TransactionDBImpl : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key, + Status TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, bool exclusive); - void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys); - void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); + void UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys); + void UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key); void AddColumnFamily(const ColumnFamilyHandle* handle); @@ -78,7 +78,7 @@ class TransactionDBImpl : public TransactionDB { return txn_db_options_; } - void InsertExpirableTransaction(TransactionID tx_id, TransactionImpl* tx); + void InsertExpirableTransaction(TransactionID tx_id, PessimisticTxn* tx); void RemoveExpirableTransaction(TransactionID tx_id); // If transaction is no longer available, locks can be stolen @@ -109,13 +109,12 @@ class TransactionDBImpl : public TransactionDB { // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; Transaction* BeginInternalTransaction(const WriteOptions& options); - Status WriteHelper(WriteBatch* updates, TransactionImpl* txn_impl); // Used to ensure that no locks are stolen from an expirable transaction // that has started a commit. Only transactions with an expiration time // should be in this map. std::mutex map_mutex_; - std::unordered_map + std::unordered_map expirable_transactions_map_; // map from name to two phase transaction instance diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index dd0c69be4..ececec6d5 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -29,31 +29,31 @@ namespace rocksdb { struct WriteOptions; -std::atomic TransactionImpl::txn_id_counter_(1); +std::atomic PessimisticTxn::txn_id_counter_(1); -TransactionID TransactionImpl::GenTxnID() { +TransactionID PessimisticTxn::GenTxnID() { return txn_id_counter_.fetch_add(1); } -TransactionImpl::TransactionImpl(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) +PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), + expiration_time_(0), txn_id_(0), waiting_cf_id_(0), waiting_key_(nullptr), - expiration_time_(0), lock_timeout_(0), deadlock_detect_(false), deadlock_detect_depth_(0) { txn_db_impl_ = static_cast_with_check(txn_db); - db_impl_ = static_cast_with_check(txn_db->GetRootDB()); + db_impl_ = static_cast_with_check(db_); Initialize(txn_options); } -void TransactionImpl::Initialize(const TransactionOptions& txn_options) { +void PessimisticTxn::Initialize(const TransactionOptions& txn_options) { txn_id_ = GenTxnID(); txn_state_ = STARTED; @@ -84,7 +84,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) { } } -TransactionImpl::~TransactionImpl() { +PessimisticTxn::~PessimisticTxn() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); @@ -94,14 +94,14 @@ TransactionImpl::~TransactionImpl() { } } -void TransactionImpl::Clear() { +void PessimisticTxn::Clear() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); TransactionBaseImpl::Clear(); } -void TransactionImpl::Reinitialize(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) { +void PessimisticTxn::Reinitialize(TransactionDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) { if (!name_.empty() && txn_state_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } @@ -109,7 +109,7 @@ void TransactionImpl::Reinitialize(TransactionDB* txn_db, Initialize(txn_options); } -bool TransactionImpl::IsExpired() const { +bool PessimisticTxn::IsExpired() const { if (expiration_time_ > 0) { if (db_->GetEnv()->NowMicros() >= expiration_time_) { // Transaction is expired. @@ -120,7 +120,12 @@ bool TransactionImpl::IsExpired() const { return false; } -Status TransactionImpl::CommitBatch(WriteBatch* batch) { +WriteCommittedTxnImpl::WriteCommittedTxnImpl( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) + : PessimisticTxn(txn_db, write_options, txn_options){}; + +Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) { TransactionKeyMap keys_to_unlock; Status s = LockBatch(batch, &keys_to_unlock); @@ -158,7 +163,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) { return s; } -Status TransactionImpl::Prepare() { +Status WriteCommittedTxnImpl::Prepare() { Status s; if (name_.empty()) { @@ -213,7 +218,7 @@ Status TransactionImpl::Prepare() { return s; } -Status TransactionImpl::Commit() { +Status WriteCommittedTxnImpl::Commit() { Status s; bool commit_single = false; bool commit_prepared = false; @@ -299,7 +304,7 @@ Status TransactionImpl::Commit() { return s; } -Status TransactionImpl::Rollback() { +Status WriteCommittedTxnImpl::Rollback() { Status s; if (txn_state_ == PREPARED) { WriteBatch rollback_marker; @@ -326,7 +331,7 @@ Status TransactionImpl::Rollback() { return s; } -Status TransactionImpl::RollbackToSavePoint() { +Status PessimisticTxn::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); } @@ -344,8 +349,8 @@ Status TransactionImpl::RollbackToSavePoint() { // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock -Status TransactionImpl::LockBatch(WriteBatch* batch, - TransactionKeyMap* keys_to_unlock) { +Status PessimisticTxn::LockBatch(WriteBatch* batch, + TransactionKeyMap* keys_to_unlock) { class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. @@ -422,9 +427,9 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. -Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool read_only, - bool exclusive, bool untracked) { +Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family, + const Slice& key, bool read_only, bool exclusive, + bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; @@ -510,10 +515,10 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // Return OK() if this key has not been modified more recently than the // transaction snapshot_. -Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, - const Slice& key, - SequenceNumber prev_seqno, - SequenceNumber* new_seqno) { +Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, + SequenceNumber prev_seqno, + SequenceNumber* new_seqno) { assert(snapshot_); SequenceNumber seq = snapshot_->GetSequenceNumber(); @@ -526,29 +531,27 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, *new_seqno = seq; - auto db_impl = static_cast_with_check(db_); - ColumnFamilyHandle* cfh = - column_family ? column_family : db_impl->DefaultColumnFamily(); + column_family ? column_family : db_impl_->DefaultColumnFamily(); - return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(), + return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snapshot_->GetSequenceNumber(), false /* cache_only */); } -bool TransactionImpl::TryStealingLocks() { +bool PessimisticTxn::TryStealingLocks() { assert(IsExpired()); TransactionState expected = STARTED; return std::atomic_compare_exchange_strong(&txn_state_, &expected, LOCKS_STOLEN); } -void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, - const Slice& key) { +void PessimisticTxn::UnlockGetForUpdate(ColumnFamilyHandle* column_family, + const Slice& key) { txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } -Status TransactionImpl::SetName(const TransactionName& name) { +Status PessimisticTxn::SetName(const TransactionName& name) { Status s; if (txn_state_ == STARTED) { if (name_.length()) { diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 01f8f4b2a..8445b0a50 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -31,24 +31,28 @@ namespace rocksdb { class TransactionDBImpl; +class PessimisticTxn; -class TransactionImpl : public TransactionBaseImpl { +// A transaction under pessimistic concurrency control. This class implements +// the locking API and interfaces with the lock manager as well as the +// pessimistic transactional db. +class PessimisticTxn : public TransactionBaseImpl { public: - TransactionImpl(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + PessimisticTxn(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); - virtual ~TransactionImpl(); + virtual ~PessimisticTxn(); void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options); - Status Prepare() override; + Status Prepare() override = 0; - Status Commit() override; + Status Commit() override = 0; - Status CommitBatch(WriteBatch* batch); + virtual Status CommitBatch(WriteBatch* batch) = 0; - Status Rollback() override; + Status Rollback() override = 0; Status RollbackToSavePoint() override; @@ -107,14 +111,24 @@ class TransactionImpl : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } protected: + void Initialize(const TransactionOptions& txn_options); + + Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool untracked = false) override; - private: + void Clear() override; + TransactionDBImpl* txn_db_impl_; DBImpl* db_impl_; + // If non-zero, this transaction should not be committed after this time (in + // microseconds according to Env->NowMicros()) + uint64_t expiration_time_; + + private: // Used to create unique ids for transactions. static std::atomic txn_id_counter_; @@ -140,10 +154,6 @@ class TransactionImpl : public TransactionBaseImpl { // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. mutable std::mutex wait_mutex_; - // If non-zero, this transaction should not be committed after this time (in - // microseconds according to Env->NowMicros()) - uint64_t expiration_time_; - // Timeout in microseconds when locking a key or -1 if there is no timeout. int64_t lock_timeout_; @@ -153,32 +163,46 @@ class TransactionImpl : public TransactionBaseImpl { // Whether to perform deadlock detection or not. int64_t deadlock_detect_depth_; - void Clear() override; - - void Initialize(const TransactionOptions& txn_options); - Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); - Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); + void UnlockGetForUpdate(ColumnFamilyHandle* column_family, + const Slice& key) override; - Status DoCommit(WriteBatch* batch); + // No copying allowed + PessimisticTxn(const PessimisticTxn&); + void operator=(const PessimisticTxn&); +}; - void RollbackLastN(size_t num); +class WriteCommittedTxnImpl : public PessimisticTxn { + public: + WriteCommittedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); - void UnlockGetForUpdate(ColumnFamilyHandle* column_family, - const Slice& key) override; + virtual ~WriteCommittedTxnImpl() {} + + Status Prepare() override; + + Status Commit() override; + + Status CommitBatch(WriteBatch* batch) override; + + Status Rollback() override; + + private: + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, + SequenceNumber prev_seqno, SequenceNumber* new_seqno); // No copying allowed - TransactionImpl(const TransactionImpl&); - void operator=(const TransactionImpl&); + WriteCommittedTxnImpl(const WriteCommittedTxnImpl&); + void operator=(const WriteCommittedTxnImpl&); }; // Used at commit time to check whether transaction is committing before its // expiration time. class TransactionCallback : public WriteCallback { public: - explicit TransactionCallback(TransactionImpl* txn) : txn_(txn) {} + explicit TransactionCallback(PessimisticTxn* txn) : txn_(txn) {} Status Callback(DB* db) override { if (txn_->IsExpired()) { @@ -191,7 +215,7 @@ class TransactionCallback : public WriteCallback { bool AllowWriteBatching() override { return true; } private: - TransactionImpl* txn_; + PessimisticTxn* txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 1184f667d..99e71eeb0 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -227,7 +227,7 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, return expired; } -Status TransactionLockMgr::TryLock(TransactionImpl* txn, +Status TransactionLockMgr::TryLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive) { @@ -256,7 +256,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn, // Helper function for TryLock(). Status TransactionLockMgr::AcquireWithTimeout( - TransactionImpl* txn, LockMap* lock_map, LockMapStripe* stripe, + PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info) { Status result; @@ -357,13 +357,13 @@ Status TransactionLockMgr::AcquireWithTimeout( } void TransactionLockMgr::DecrementWaiters( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); DecrementWaitersImpl(txn, wait_ids); } void TransactionLockMgr::DecrementWaitersImpl( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); @@ -377,7 +377,7 @@ void TransactionLockMgr::DecrementWaitersImpl( } bool TransactionLockMgr::IncrementWaiters( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { auto id = txn->GetID(); std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); @@ -501,7 +501,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } -void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, +void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env) { @@ -537,7 +537,7 @@ void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, } } -void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, +void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -559,7 +559,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, stripe->stripe_cv->NotifyAll(); } -void TransactionLockMgr::UnLock(const TransactionImpl* txn, +void TransactionLockMgr::UnLock(const PessimisticTxn* txn, const TransactionKeyMap* key_map, Env* env) { for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 6389f8d7d..6c0d1e99d 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -47,14 +47,14 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. - Status TryLock(TransactionImpl* txn, uint32_t column_family_id, + Status TryLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive); // Unlock a key locked by TryLock(). txn must be the same Transaction that // locked this key. - void UnLock(const TransactionImpl* txn, const TransactionKeyMap* keys, + void UnLock(const PessimisticTxn* txn, const TransactionKeyMap* keys, Env* env); - void UnLock(TransactionImpl* txn, uint32_t column_family_id, + void UnLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env); using LockStatusData = std::unordered_multimap; @@ -102,7 +102,7 @@ class TransactionLockMgr { std::shared_ptr GetLockMap(uint32_t column_family_id); - Status AcquireWithTimeout(TransactionImpl* txn, LockMap* lock_map, + Status AcquireWithTimeout(PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info); @@ -112,14 +112,14 @@ class TransactionLockMgr { const LockInfo& lock_info, uint64_t* wait_time, autovector* txn_ids); - void UnLockKey(const TransactionImpl* txn, const std::string& key, + void UnLockKey(const PessimisticTxn* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env); - bool IncrementWaiters(const TransactionImpl* txn, + bool IncrementWaiters(const PessimisticTxn* txn, const autovector& wait_ids); - void DecrementWaiters(const TransactionImpl* txn, + void DecrementWaiters(const PessimisticTxn* txn, const autovector& wait_ids); - void DecrementWaitersImpl(const TransactionImpl* txn, + void DecrementWaitersImpl(const PessimisticTxn* txn, const autovector& wait_ids); // No copying allowed diff --git a/utilities/transactions/write_prepared_transaction_impl.cc b/utilities/transactions/write_prepared_transaction_impl.cc new file mode 100644 index 000000000..ded6bcb2b --- /dev/null +++ b/utilities/transactions/write_prepared_transaction_impl.cc @@ -0,0 +1,65 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/write_prepared_transaction_impl.h" + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/transaction_util.h" + +namespace rocksdb { + +struct WriteOptions; + +WritePreparedTxnImpl::WritePreparedTxnImpl( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) + : PessimisticTxn(txn_db, write_options, txn_options) { + PessimisticTxn::Initialize(txn_options); +} + +Status WritePreparedTxnImpl::CommitBatch(WriteBatch* batch) { + // TODO(myabandeh) Implement this + throw std::runtime_error("CommitBatch not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Prepare() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Prepare not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Commit() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Commit not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Rollback() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Rollback not Implemented"); + return Status::OK(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_transaction_impl.h b/utilities/transactions/write_prepared_transaction_impl.h new file mode 100644 index 000000000..eab2b8669 --- /dev/null +++ b/utilities/transactions/write_prepared_transaction_impl.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include + +#include "db/write_callback.h" +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/autovector.h" +#include "utilities/transactions/transaction_base.h" +#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/transaction_util.h" + +namespace rocksdb { + +class TransactionDBImpl; + +// This impl could write to DB also uncomitted data and then later tell apart +// committed data from uncomitted data. Uncommitted data could be after the +// Prepare phase in 2PC (WritePreparedTxnImpl) or before that +// (WriteUnpreparedTxnImpl). +class WritePreparedTxnImpl : public PessimisticTxn { + public: + WritePreparedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); + + virtual ~WritePreparedTxnImpl() {} + + Status Prepare() override; + + Status Commit() override; + + Status CommitBatch(WriteBatch* batch) override; + + Status Rollback() override; + + private: + // TODO(myabandeh): verify that the current impl work with values being + // written with prepare sequence number too. + // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& + // key, + // SequenceNumber prev_seqno, SequenceNumber* + // new_seqno); + + // No copying allowed + WritePreparedTxnImpl(const WritePreparedTxnImpl&); + void operator=(const WritePreparedTxnImpl&); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE