Refactor PessimisticTransaction

Summary:
This patch splits Commit and Prepare into lock-related logic and db-write-related logic. It moves lock-related logic to PessimisticTransaction to be reused by all children classes and movies the existing impl of db-write-related to PrepareInternal, CommitSingleInternal, and CommitInternal in WriteCommittedTxnImpl.
Closes https://github.com/facebook/rocksdb/pull/2691

Differential Revision: D5569464

Pulled By: maysamyabandeh

fbshipit-source-id: d1b8698e69801a4126c7bc211745d05c636f5325
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent a9a4e89c38
commit bdc056f8aa
  1. 6
      CMakeLists.txt
  2. 6
      TARGETS
  3. 6
      db/db_impl.h
  4. 6
      src.mk
  5. 2
      utilities/blob_db/blob_db_impl.cc
  6. 26
      utilities/transactions/optimistic_transaction.cc
  7. 18
      utilities/transactions/optimistic_transaction.h
  8. 8
      utilities/transactions/optimistic_transaction_db_impl.cc
  9. 120
      utilities/transactions/pessimistic_transaction.cc
  10. 47
      utilities/transactions/pessimistic_transaction.h
  11. 20
      utilities/transactions/pessimistic_transaction_db.cc
  12. 14
      utilities/transactions/pessimistic_transaction_db.h
  13. 16
      utilities/transactions/transaction_lock_mgr.cc
  14. 18
      utilities/transactions/transaction_lock_mgr.h
  15. 32
      utilities/transactions/write_prepared_txn.cc
  16. 24
      utilities/transactions/write_prepared_txn.h

@ -518,14 +518,14 @@ set(SOURCES
utilities/spatialdb/spatial_db.cc utilities/spatialdb/spatial_db.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction_impl.cc utilities/transactions/optimistic_transaction.cc
utilities/transactions/transaction_base.cc utilities/transactions/transaction_base.cc
utilities/transactions/pessimistic_transaction_db.cc utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/transaction_db_mutex_impl.cc utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_impl.cc utilities/transactions/pessimistic_transaction.cc
utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_transaction_impl.cc utilities/transactions/write_prepared_txn.cc
utilities/ttl/db_ttl_impl.cc utilities/ttl/db_ttl_impl.cc
utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc

@ -245,14 +245,14 @@ cpp_library(
"utilities/spatialdb/spatial_db.cc", "utilities/spatialdb/spatial_db.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/optimistic_transaction_impl.cc", "utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/transaction_base.cc", "utilities/transactions/transaction_base.cc",
"utilities/transactions/pessimistic_transaction_db.cc", "utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/transaction_db_mutex_impl.cc", "utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_impl.cc", "utilities/transactions/pessimistic_transaction.cc",
"utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc", "utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_transaction_impl.cc", "utilities/transactions/write_prepared_txn.cc",
"utilities/ttl/db_ttl_impl.cc", "utilities/ttl/db_ttl_impl.cc",
"utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index.cc",
"utilities/write_batch_with_index/write_batch_with_index_internal.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc",

@ -631,9 +631,9 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
friend class InternalStats; friend class InternalStats;
friend class PessimisticTxn; friend class PessimisticTransaction;
friend class WriteCommittedTxnImpl; friend class WriteCommittedTxn;
friend class WritePreparedTxnImpl; friend class WritePreparedTxn;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
friend class ForwardIterator; friend class ForwardIterator;
#endif #endif

@ -193,14 +193,14 @@ LIB_SOURCES = \
utilities/spatialdb/spatial_db.cc \ utilities/spatialdb/spatial_db.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/optimistic_transaction_impl.cc \ utilities/transactions/optimistic_transaction.cc \
utilities/transactions/transaction_base.cc \ utilities/transactions/transaction_base.cc \
utilities/transactions/pessimistic_transaction_db.cc \ utilities/transactions/pessimistic_transaction_db.cc \
utilities/transactions/transaction_db_mutex_impl.cc \ utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/transaction_impl.cc \ utilities/transactions/pessimistic_transaction.cc \
utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_util.cc \ utilities/transactions/transaction_util.cc \
utilities/transactions/write_prepared_transaction_impl.cc \ utilities/transactions/write_prepared_txn.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \

@ -32,7 +32,7 @@
#include "util/random.h" #include "util/random.h"
#include "util/timer_queue.h" #include "util/timer_queue.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h" #include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "utilities/transactions/optimistic_transaction_impl.h" #include "utilities/transactions/optimistic_transaction.h"
namespace { namespace {
int kBlockBasedTableVersionFormat = 2; int kBlockBasedTableVersionFormat = 2;

@ -5,11 +5,9 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/transactions/optimistic_transaction_impl.h" #include "utilities/transactions/optimistic_transaction.h"
#include <algorithm>
#include <string> #include <string>
#include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl.h" #include "db/db_impl.h"
@ -25,40 +23,40 @@ namespace rocksdb {
struct WriteOptions; struct WriteOptions;
OptimisticTransactionImpl::OptimisticTransactionImpl( OptimisticTransaction::OptimisticTransaction(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options, OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) const OptimisticTransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) {
Initialize(txn_options); Initialize(txn_options);
} }
void OptimisticTransactionImpl::Initialize( void OptimisticTransaction::Initialize(
const OptimisticTransactionOptions& txn_options) { const OptimisticTransactionOptions& txn_options) {
if (txn_options.set_snapshot) { if (txn_options.set_snapshot) {
SetSnapshot(); SetSnapshot();
} }
} }
void OptimisticTransactionImpl::Reinitialize( void OptimisticTransaction::Reinitialize(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options, OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) { const OptimisticTransactionOptions& txn_options) {
TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
Initialize(txn_options); Initialize(txn_options);
} }
OptimisticTransactionImpl::~OptimisticTransactionImpl() { OptimisticTransaction::~OptimisticTransaction() {
} }
void OptimisticTransactionImpl::Clear() { void OptimisticTransaction::Clear() {
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
Status OptimisticTransactionImpl::Prepare() { Status OptimisticTransaction::Prepare() {
return Status::InvalidArgument( return Status::InvalidArgument(
"Two phase commit not supported for optimistic transactions."); "Two phase commit not supported for optimistic transactions.");
} }
Status OptimisticTransactionImpl::Commit() { Status OptimisticTransaction::Commit() {
// Set up callback which will call CheckTransactionForConflicts() to // Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed. // check whether this transaction is safe to be committed.
OptimisticTransactionCallback callback(this); OptimisticTransactionCallback callback(this);
@ -75,7 +73,7 @@ Status OptimisticTransactionImpl::Commit() {
return s; return s;
} }
Status OptimisticTransactionImpl::Rollback() { Status OptimisticTransaction::Rollback() {
Clear(); Clear();
return Status::OK(); return Status::OK();
} }
@ -83,7 +81,7 @@ Status OptimisticTransactionImpl::Rollback() {
// Record this key so that we can check it for conflicts at commit time. // Record this key so that we can check it for conflicts at commit time.
// //
// 'exclusive' is unused for OptimisticTransaction. // 'exclusive' is unused for OptimisticTransaction.
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only, const Slice& key, bool read_only,
bool exclusive, bool untracked) { bool exclusive, bool untracked) {
if (untracked) { if (untracked) {
@ -114,7 +112,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// //
// Should only be called on writer thread in order to avoid any race conditions // Should only be called on writer thread in order to avoid any race conditions
// in detecting write conflicts. // in detecting write conflicts.
Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) {
Status result; Status result;
auto db_impl = static_cast_with_check<DBImpl, DB>(db); auto db_impl = static_cast_with_check<DBImpl, DB>(db);
@ -127,7 +125,7 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
true /* cache_only */); true /* cache_only */);
} }
Status OptimisticTransactionImpl::SetName(const TransactionName& name) { Status OptimisticTransaction::SetName(const TransactionName& /* unused */) {
return Status::InvalidArgument("Optimistic transactions cannot be named."); return Status::InvalidArgument("Optimistic transactions cannot be named.");
} }

@ -26,13 +26,13 @@
namespace rocksdb { namespace rocksdb {
class OptimisticTransactionImpl : public TransactionBaseImpl { class OptimisticTransaction : public TransactionBaseImpl {
public: public:
OptimisticTransactionImpl(OptimisticTransactionDB* db, OptimisticTransaction(OptimisticTransactionDB* db,
const WriteOptions& write_options, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options); const OptimisticTransactionOptions& txn_options);
virtual ~OptimisticTransactionImpl(); virtual ~OptimisticTransaction();
void Reinitialize(OptimisticTransactionDB* txn_db, void Reinitialize(OptimisticTransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
@ -67,20 +67,20 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
void Clear() override; void Clear() override;
void UnlockGetForUpdate(ColumnFamilyHandle* column_family, void UnlockGetForUpdate(ColumnFamilyHandle* /* unused */,
const Slice& key) override { const Slice& /* unused */) override {
// Nothing to unlock. // Nothing to unlock.
} }
// No copying allowed // No copying allowed
OptimisticTransactionImpl(const OptimisticTransactionImpl&); OptimisticTransaction(const OptimisticTransaction&);
void operator=(const OptimisticTransactionImpl&); void operator=(const OptimisticTransaction&);
}; };
// Used at commit time to trigger transaction validation // Used at commit time to trigger transaction validation
class OptimisticTransactionCallback : public WriteCallback { class OptimisticTransactionCallback : public WriteCallback {
public: public:
explicit OptimisticTransactionCallback(OptimisticTransactionImpl* txn) explicit OptimisticTransactionCallback(OptimisticTransaction* txn)
: txn_(txn) {} : txn_(txn) {}
Status Callback(DB* db) override { Status Callback(DB* db) override {
@ -90,7 +90,7 @@ class OptimisticTransactionCallback : public WriteCallback {
bool AllowWriteBatching() override { return false; } bool AllowWriteBatching() override { return false; }
private: private:
OptimisticTransactionImpl* txn_; OptimisticTransaction* txn_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -14,7 +14,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
#include "utilities/transactions/optimistic_transaction_impl.h" #include "utilities/transactions/optimistic_transaction.h"
namespace rocksdb { namespace rocksdb {
@ -25,7 +25,7 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction(
ReinitializeTransaction(old_txn, write_options, txn_options); ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn; return old_txn;
} else { } else {
return new OptimisticTransactionImpl(this, write_options, txn_options); return new OptimisticTransaction(this, write_options, txn_options);
} }
} }
@ -81,8 +81,8 @@ Status OptimisticTransactionDB::Open(
void OptimisticTransactionDBImpl::ReinitializeTransaction( void OptimisticTransactionDBImpl::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options, Transaction* txn, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) { const OptimisticTransactionOptions& txn_options) {
assert(dynamic_cast<OptimisticTransactionImpl*>(txn) != nullptr); assert(dynamic_cast<OptimisticTransaction*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<OptimisticTransactionImpl*>(txn); auto txn_impl = reinterpret_cast<OptimisticTransaction*>(txn);
txn_impl->Reinitialize(this, write_options, txn_options); txn_impl->Reinitialize(this, write_options, txn_options);
} }

@ -5,7 +5,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
#include <map> #include <map>
#include <set> #include <set>
@ -29,13 +29,13 @@ namespace rocksdb {
struct WriteOptions; struct WriteOptions;
std::atomic<TransactionID> PessimisticTxn::txn_id_counter_(1); std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
TransactionID PessimisticTxn::GenTxnID() { TransactionID PessimisticTransaction::GenTxnID() {
return txn_id_counter_.fetch_add(1); return txn_id_counter_.fetch_add(1);
} }
PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, PessimisticTransaction::PessimisticTransaction(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetRootDB(), write_options), : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
@ -53,7 +53,7 @@ PessimisticTxn::PessimisticTxn(TransactionDB* txn_db,
Initialize(txn_options); Initialize(txn_options);
} }
void PessimisticTxn::Initialize(const TransactionOptions& txn_options) { void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
txn_id_ = GenTxnID(); txn_id_ = GenTxnID();
txn_state_ = STARTED; txn_state_ = STARTED;
@ -84,7 +84,7 @@ void PessimisticTxn::Initialize(const TransactionOptions& txn_options) {
} }
} }
PessimisticTxn::~PessimisticTxn() { PessimisticTransaction::~PessimisticTransaction() {
txn_db_impl_->UnLock(this, &GetTrackedKeys()); txn_db_impl_->UnLock(this, &GetTrackedKeys());
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
txn_db_impl_->RemoveExpirableTransaction(txn_id_); txn_db_impl_->RemoveExpirableTransaction(txn_id_);
@ -94,12 +94,12 @@ PessimisticTxn::~PessimisticTxn() {
} }
} }
void PessimisticTxn::Clear() { void PessimisticTransaction::Clear() {
txn_db_impl_->UnLock(this, &GetTrackedKeys()); txn_db_impl_->UnLock(this, &GetTrackedKeys());
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
void PessimisticTxn::Reinitialize(TransactionDB* txn_db, void PessimisticTransaction::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
if (!name_.empty() && txn_state_ != COMMITED) { if (!name_.empty() && txn_state_ != COMMITED) {
@ -109,7 +109,7 @@ void PessimisticTxn::Reinitialize(TransactionDB* txn_db,
Initialize(txn_options); Initialize(txn_options);
} }
bool PessimisticTxn::IsExpired() const { bool PessimisticTransaction::IsExpired() const {
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
if (db_->GetEnv()->NowMicros() >= expiration_time_) { if (db_->GetEnv()->NowMicros() >= expiration_time_) {
// Transaction is expired. // Transaction is expired.
@ -120,12 +120,12 @@ bool PessimisticTxn::IsExpired() const {
return false; return false;
} }
WriteCommittedTxnImpl::WriteCommittedTxnImpl( WriteCommittedTxn::WriteCommittedTxn(
TransactionDB* txn_db, const WriteOptions& write_options, TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: PessimisticTxn(txn_db, write_options, txn_options){}; : PessimisticTransaction(txn_db, write_options, txn_options){};
Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) { Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) {
TransactionKeyMap keys_to_unlock; TransactionKeyMap keys_to_unlock;
Status s = LockBatch(batch, &keys_to_unlock); Status s = LockBatch(batch, &keys_to_unlock);
@ -163,7 +163,7 @@ Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) {
return s; return s;
} }
Status WriteCommittedTxnImpl::Prepare() { Status PessimisticTransaction::Prepare() {
Status s; Status s;
if (name_.empty()) { if (name_.empty()) {
@ -192,12 +192,7 @@ Status WriteCommittedTxnImpl::Prepare() {
txn_state_.store(AWAITING_PREPARE); txn_state_.store(AWAITING_PREPARE);
// transaction can't expire after preparation // transaction can't expire after preparation
expiration_time_ = 0; expiration_time_ = 0;
WriteOptions write_options = write_options_; s = PrepareInternal();
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
/* disable_memtable*/ true);
if (s.ok()) { if (s.ok()) {
assert(log_number_ != 0); assert(log_number_ != 0);
dbimpl_->MarkLogAsContainingPrepSection(log_number_); dbimpl_->MarkLogAsContainingPrepSection(log_number_);
@ -218,9 +213,20 @@ Status WriteCommittedTxnImpl::Prepare() {
return s; return s;
} }
Status WriteCommittedTxnImpl::Commit() { Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
/* disable_memtable*/ true);
return s;
}
Status PessimisticTransaction::Commit() {
Status s; Status s;
bool commit_single = false; bool commit_without_prepare = false;
bool commit_prepared = false; bool commit_prepared = false;
if (IsExpired()) { if (IsExpired()) {
@ -234,25 +240,28 @@ Status WriteCommittedTxnImpl::Commit() {
// our locks stolen. In this case the only valid state is STARTED because // our locks stolen. In this case the only valid state is STARTED because
// a state of PREPARED would have a cleared expiration_time_. // a state of PREPARED would have a cleared expiration_time_.
TransactionState expected = STARTED; TransactionState expected = STARTED;
commit_single = std::atomic_compare_exchange_strong(&txn_state_, &expected, commit_without_prepare = std::atomic_compare_exchange_strong(
AWAITING_COMMIT); &txn_state_, &expected, AWAITING_COMMIT);
TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
} else if (txn_state_ == PREPARED) { } else if (txn_state_ == PREPARED) {
// expiration and lock stealing is not a concern // expiration and lock stealing is not a concern
commit_prepared = true; commit_prepared = true;
} else if (txn_state_ == STARTED) { } else if (txn_state_ == STARTED) {
// expiration and lock stealing is not a concern // expiration and lock stealing is not a concern
commit_single = true; commit_without_prepare = true;
// TODO(myabandeh): what if the user mistakenly forgets prepare? We should
// add an option so that the user explictly express the intention of
// skipping the prepare phase.
} }
if (commit_single) { if (commit_without_prepare) {
assert(!commit_prepared); assert(!commit_prepared);
if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
s = Status::InvalidArgument( s = Status::InvalidArgument(
"Commit-time batch contains values that will not be committed."); "Commit-time batch contains values that will not be committed.");
} else { } else {
txn_state_.store(AWAITING_COMMIT); txn_state_.store(AWAITING_COMMIT);
s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); s = CommitWithoutPrepareInternal();
Clear(); Clear();
if (s.ok()) { if (s.ok()) {
txn_state_.store(COMMITED); txn_state_.store(COMMITED);
@ -261,21 +270,8 @@ Status WriteCommittedTxnImpl::Commit() {
} else if (commit_prepared) { } else if (commit_prepared) {
txn_state_.store(AWAITING_COMMIT); txn_state_.store(AWAITING_COMMIT);
// We take the commit-time batch and append the Commit marker. s = CommitInternal();
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Commit write failed"); "Commit write failed");
@ -304,7 +300,31 @@ Status WriteCommittedTxnImpl::Commit() {
return s; return s;
} }
Status WriteCommittedTxnImpl::Rollback() { Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
Status s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
return s;
}
Status WriteCommittedTxn::CommitInternal() {
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_);
return s;
}
Status WriteCommittedTxn::Rollback() {
Status s; Status s;
if (txn_state_ == PREPARED) { if (txn_state_ == PREPARED) {
WriteBatch rollback_marker; WriteBatch rollback_marker;
@ -331,7 +351,7 @@ Status WriteCommittedTxnImpl::Rollback() {
return s; return s;
} }
Status PessimisticTxn::RollbackToSavePoint() { Status PessimisticTransaction::RollbackToSavePoint() {
if (txn_state_ != STARTED) { if (txn_state_ != STARTED) {
return Status::InvalidArgument("Transaction is beyond state for rollback."); return Status::InvalidArgument("Transaction is beyond state for rollback.");
} }
@ -349,7 +369,7 @@ Status PessimisticTxn::RollbackToSavePoint() {
// Lock all keys in this batch. // Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock // On success, caller should unlock keys_to_unlock
Status PessimisticTxn::LockBatch(WriteBatch* batch, Status PessimisticTransaction::LockBatch(WriteBatch* batch,
TransactionKeyMap* keys_to_unlock) { TransactionKeyMap* keys_to_unlock) {
class Handler : public WriteBatch::Handler { class Handler : public WriteBatch::Handler {
public: public:
@ -372,12 +392,12 @@ Status PessimisticTxn::LockBatch(WriteBatch* batch,
} }
virtual Status PutCF(uint32_t column_family_id, const Slice& key, virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override { const Slice& /* unused */) override {
RecordKey(column_family_id, key); RecordKey(column_family_id, key);
return Status::OK(); return Status::OK();
} }
virtual Status MergeCF(uint32_t column_family_id, const Slice& key, virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override { const Slice& /* unused */) override {
RecordKey(column_family_id, key); RecordKey(column_family_id, key);
return Status::OK(); return Status::OK();
} }
@ -427,7 +447,7 @@ Status PessimisticTxn::LockBatch(WriteBatch* batch,
// If check_shapshot is true and this transaction has a snapshot set, // If check_shapshot is true and this transaction has a snapshot set,
// this key will only be locked if there have been no writes to this key since // this key will only be locked if there have been no writes to this key since
// the snapshot time. // the snapshot time.
Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family, Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only, bool exclusive, const Slice& key, bool read_only, bool exclusive,
bool untracked) { bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family); uint32_t cfh_id = GetColumnFamilyID(column_family);
@ -515,7 +535,7 @@ Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family,
// Return OK() if this key has not been modified more recently than the // Return OK() if this key has not been modified more recently than the
// transaction snapshot_. // transaction snapshot_.
Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, Status PessimisticTransaction::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber prev_seqno,
SequenceNumber* new_seqno) { SequenceNumber* new_seqno) {
@ -539,19 +559,19 @@ Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
false /* cache_only */); false /* cache_only */);
} }
bool PessimisticTxn::TryStealingLocks() { bool PessimisticTransaction::TryStealingLocks() {
assert(IsExpired()); assert(IsExpired());
TransactionState expected = STARTED; TransactionState expected = STARTED;
return std::atomic_compare_exchange_strong(&txn_state_, &expected, return std::atomic_compare_exchange_strong(&txn_state_, &expected,
LOCKS_STOLEN); LOCKS_STOLEN);
} }
void PessimisticTxn::UnlockGetForUpdate(ColumnFamilyHandle* column_family, void PessimisticTransaction::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
} }
Status PessimisticTxn::SetName(const TransactionName& name) { Status PessimisticTransaction::SetName(const TransactionName& name) {
Status s; Status s;
if (txn_state_ == STARTED) { if (txn_state_ == STARTED) {
if (name_.length()) { if (name_.length()) {

@ -31,24 +31,23 @@
namespace rocksdb { namespace rocksdb {
class PessimisticTransactionDB; class PessimisticTransactionDB;
class PessimisticTxn;
// A transaction under pessimistic concurrency control. This class implements // A transaction under pessimistic concurrency control. This class implements
// the locking API and interfaces with the lock manager as well as the // the locking API and interfaces with the lock manager as well as the
// pessimistic transactional db. // pessimistic transactional db.
class PessimisticTxn : public TransactionBaseImpl { class PessimisticTransaction : public TransactionBaseImpl {
public: public:
PessimisticTxn(TransactionDB* db, const WriteOptions& write_options, PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
virtual ~PessimisticTxn(); virtual ~PessimisticTransaction();
void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
Status Prepare() override = 0; Status Prepare() override;
Status Commit() override = 0; Status Commit() override;
virtual Status CommitBatch(WriteBatch* batch) = 0; virtual Status CommitBatch(WriteBatch* batch) = 0;
@ -111,6 +110,12 @@ class PessimisticTxn : public TransactionBaseImpl {
int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
protected: protected:
virtual Status PrepareInternal() = 0;
virtual Status CommitWithoutPrepareInternal() = 0;
virtual Status CommitInternal() = 0;
void Initialize(const TransactionOptions& txn_options); void Initialize(const TransactionOptions& txn_options);
Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock);
@ -170,41 +175,43 @@ class PessimisticTxn : public TransactionBaseImpl {
const Slice& key) override; const Slice& key) override;
// No copying allowed // No copying allowed
PessimisticTxn(const PessimisticTxn&); PessimisticTransaction(const PessimisticTransaction&);
void operator=(const PessimisticTxn&); void operator=(const PessimisticTransaction&);
}; };
class WriteCommittedTxnImpl : public PessimisticTxn { class WriteCommittedTxn : public PessimisticTransaction {
public: public:
WriteCommittedTxnImpl(TransactionDB* db, const WriteOptions& write_options, WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
virtual ~WriteCommittedTxnImpl() {} virtual ~WriteCommittedTxn() {}
Status Prepare() override;
Status Commit() override;
Status CommitBatch(WriteBatch* batch) override; Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override; Status Rollback() override;
private: private:
Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override;
Status CommitInternal() override;
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno); SequenceNumber prev_seqno, SequenceNumber* new_seqno);
// No copying allowed // No copying allowed
WriteCommittedTxnImpl(const WriteCommittedTxnImpl&); WriteCommittedTxn(const WriteCommittedTxn&);
void operator=(const WriteCommittedTxnImpl&); void operator=(const WriteCommittedTxn&);
}; };
// Used at commit time to check whether transaction is committing before its // Used at commit time to check whether transaction is committing before its
// expiration time. // expiration time.
class TransactionCallback : public WriteCallback { class TransactionCallback : public WriteCallback {
public: public:
explicit TransactionCallback(PessimisticTxn* txn) : txn_(txn) {} explicit TransactionCallback(PessimisticTransaction* txn) : txn_(txn) {}
Status Callback(DB* db) override { Status Callback(DB* /* unused */) override {
if (txn_->IsExpired()) { if (txn_->IsExpired()) {
return Status::Expired(); return Status::Expired();
} else { } else {
@ -215,7 +222,7 @@ class TransactionCallback : public WriteCallback {
bool AllowWriteBatching() override { return true; } bool AllowWriteBatching() override { return true; }
private: private:
PessimisticTxn* txn_; PessimisticTransaction* txn_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -17,7 +17,7 @@
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/transaction_db_mutex_impl.h"
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
namespace rocksdb { namespace rocksdb {
@ -128,7 +128,7 @@ Transaction* WriteCommittedTxnDB::BeginTransaction(
ReinitializeTransaction(old_txn, write_options, txn_options); ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn; return old_txn;
} else { } else {
return new WriteCommittedTxnImpl(this, write_options, txn_options); return new WriteCommittedTxn(this, write_options, txn_options);
} }
} }
@ -139,7 +139,7 @@ Transaction* WritePreparedTxnDB::BeginTransaction(
ReinitializeTransaction(old_txn, write_options, txn_options); ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn; return old_txn;
} else { } else {
return new WritePreparedTxnImpl(this, write_options, txn_options); return new WritePreparedTxn(this, write_options, txn_options);
} }
} }
@ -301,18 +301,18 @@ Status PessimisticTransactionDB::DropColumnFamily(
return s; return s;
} }
Status PessimisticTransactionDB::TryLock(PessimisticTxn* txn, uint32_t cfh_id, Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key, const std::string& key,
bool exclusive) { bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
} }
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
const TransactionKeyMap* keys) { const TransactionKeyMap* keys) {
lock_mgr_.UnLock(txn, keys, GetEnv()); lock_mgr_.UnLock(txn, keys, GetEnv());
} }
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, uint32_t cfh_id, void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key) { const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
} }
@ -409,7 +409,7 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
Transaction* txn = BeginInternalTransaction(opts); Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing(); txn->DisableIndexing();
auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn); auto txn_impl = static_cast_with_check<PessimisticTransaction, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write() // Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock. // operations will not cause a deadlock.
@ -423,7 +423,7 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
} }
void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id, void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTxn* tx) { PessimisticTransaction* tx) {
assert(tx->GetExpirationTime() > 0); assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_); std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx}); expirable_transactions_map_.insert({tx_id, tx});
@ -442,14 +442,14 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
if (tx_it == expirable_transactions_map_.end()) { if (tx_it == expirable_transactions_map_.end()) {
return true; return true;
} }
PessimisticTxn& tx = *(tx_it->second); PessimisticTransaction& tx = *(tx_it->second);
return tx.TryStealingLocks(); return tx.TryStealingLocks();
} }
void PessimisticTransactionDB::ReinitializeTransaction( void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options, Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn); auto txn_impl = static_cast_with_check<PessimisticTransaction, Transaction>(txn);
txn_impl->Reinitialize(this, write_options, txn_options); txn_impl->Reinitialize(this, write_options, txn_options);
} }

@ -15,9 +15,9 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_transaction_impl.h" #include "utilities/transactions/write_prepared_txn.h"
namespace rocksdb { namespace rocksdb {
@ -64,11 +64,11 @@ class PessimisticTransactionDB : public TransactionDB {
using StackableDB::DropColumnFamily; using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
Status TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key,
bool exclusive); bool exclusive);
void UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys); void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys);
void UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key); void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key);
void AddColumnFamily(const ColumnFamilyHandle* handle); void AddColumnFamily(const ColumnFamilyHandle* handle);
@ -79,7 +79,7 @@ class PessimisticTransactionDB : public TransactionDB {
return txn_db_options_; return txn_db_options_;
} }
void InsertExpirableTransaction(TransactionID tx_id, PessimisticTxn* tx); void InsertExpirableTransaction(TransactionID tx_id, PessimisticTransaction* tx);
void RemoveExpirableTransaction(TransactionID tx_id); void RemoveExpirableTransaction(TransactionID tx_id);
// If transaction is no longer available, locks can be stolen // If transaction is no longer available, locks can be stolen
@ -116,7 +116,7 @@ class PessimisticTransactionDB : public TransactionDB {
// that has started a commit. Only transactions with an expiration time // that has started a commit. Only transactions with an expiration time
// should be in this map. // should be in this map.
std::mutex map_mutex_; std::mutex map_mutex_;
std::unordered_map<TransactionID, PessimisticTxn*> std::unordered_map<TransactionID, PessimisticTransaction*>
expirable_transactions_map_; expirable_transactions_map_;
// map from name to two phase transaction instance // map from name to two phase transaction instance

@ -227,7 +227,7 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
return expired; return expired;
} }
Status TransactionLockMgr::TryLock(PessimisticTxn* txn, Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
uint32_t column_family_id, uint32_t column_family_id,
const std::string& key, Env* env, const std::string& key, Env* env,
bool exclusive) { bool exclusive) {
@ -256,7 +256,7 @@ Status TransactionLockMgr::TryLock(PessimisticTxn* txn,
// Helper function for TryLock(). // Helper function for TryLock().
Status TransactionLockMgr::AcquireWithTimeout( Status TransactionLockMgr::AcquireWithTimeout(
PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
uint32_t column_family_id, const std::string& key, Env* env, uint32_t column_family_id, const std::string& key, Env* env,
int64_t timeout, const LockInfo& lock_info) { int64_t timeout, const LockInfo& lock_info) {
Status result; Status result;
@ -357,13 +357,13 @@ Status TransactionLockMgr::AcquireWithTimeout(
} }
void TransactionLockMgr::DecrementWaiters( void TransactionLockMgr::DecrementWaiters(
const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_ids); DecrementWaitersImpl(txn, wait_ids);
} }
void TransactionLockMgr::DecrementWaitersImpl( void TransactionLockMgr::DecrementWaitersImpl(
const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID(); auto id = txn->GetID();
assert(wait_txn_map_.Contains(id)); assert(wait_txn_map_.Contains(id));
wait_txn_map_.Delete(id); wait_txn_map_.Delete(id);
@ -377,7 +377,7 @@ void TransactionLockMgr::DecrementWaitersImpl(
} }
bool TransactionLockMgr::IncrementWaiters( bool TransactionLockMgr::IncrementWaiters(
const PessimisticTxn* txn, const autovector<TransactionID>& wait_ids) { const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID(); auto id = txn->GetID();
std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth()); std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth());
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_); std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
@ -501,7 +501,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
return result; return result;
} }
void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn, void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
const std::string& key, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, LockMapStripe* stripe, LockMap* lock_map,
Env* env) { Env* env) {
@ -537,7 +537,7 @@ void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn,
} }
} }
void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id, void TransactionLockMgr::UnLock(PessimisticTransaction* txn, uint32_t column_family_id,
const std::string& key, Env* env) { const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get(); LockMap* lock_map = lock_map_ptr.get();
@ -559,7 +559,7 @@ void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id,
stripe->stripe_cv->NotifyAll(); stripe->stripe_cv->NotifyAll();
} }
void TransactionLockMgr::UnLock(const PessimisticTxn* txn, void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
const TransactionKeyMap* key_map, Env* env) { const TransactionKeyMap* key_map, Env* env) {
for (auto& key_map_iter : *key_map) { for (auto& key_map_iter : *key_map) {
uint32_t column_family_id = key_map_iter.first; uint32_t column_family_id = key_map_iter.first;

@ -17,7 +17,7 @@
#include "util/autovector.h" #include "util/autovector.h"
#include "util/hash_map.h" #include "util/hash_map.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
namespace rocksdb { namespace rocksdb {
@ -47,14 +47,14 @@ class TransactionLockMgr {
// Attempt to lock key. If OK status is returned, the caller is responsible // Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key. // for calling UnLock() on this key.
Status TryLock(PessimisticTxn* txn, uint32_t column_family_id, Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id,
const std::string& key, Env* env, bool exclusive); const std::string& key, Env* env, bool exclusive);
// Unlock a key locked by TryLock(). txn must be the same Transaction that // Unlock a key locked by TryLock(). txn must be the same Transaction that
// locked this key. // locked this key.
void UnLock(const PessimisticTxn* txn, const TransactionKeyMap* keys, void UnLock(const PessimisticTransaction* txn, const TransactionKeyMap* keys,
Env* env); Env* env);
void UnLock(PessimisticTxn* txn, uint32_t column_family_id, void UnLock(PessimisticTransaction* txn, uint32_t column_family_id,
const std::string& key, Env* env); const std::string& key, Env* env);
using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>; using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>;
@ -102,7 +102,7 @@ class TransactionLockMgr {
std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id); std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id);
Status AcquireWithTimeout(PessimisticTxn* txn, LockMap* lock_map, Status AcquireWithTimeout(PessimisticTransaction* txn, LockMap* lock_map,
LockMapStripe* stripe, uint32_t column_family_id, LockMapStripe* stripe, uint32_t column_family_id,
const std::string& key, Env* env, int64_t timeout, const std::string& key, Env* env, int64_t timeout,
const LockInfo& lock_info); const LockInfo& lock_info);
@ -112,14 +112,14 @@ class TransactionLockMgr {
const LockInfo& lock_info, uint64_t* wait_time, const LockInfo& lock_info, uint64_t* wait_time,
autovector<TransactionID>* txn_ids); autovector<TransactionID>* txn_ids);
void UnLockKey(const PessimisticTxn* txn, const std::string& key, void UnLockKey(const PessimisticTransaction* txn, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, Env* env); LockMapStripe* stripe, LockMap* lock_map, Env* env);
bool IncrementWaiters(const PessimisticTxn* txn, bool IncrementWaiters(const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
void DecrementWaiters(const PessimisticTxn* txn, void DecrementWaiters(const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
void DecrementWaitersImpl(const PessimisticTxn* txn, void DecrementWaitersImpl(const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids); const autovector<TransactionID>& wait_ids);
// No copying allowed // No copying allowed

@ -5,56 +5,54 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/transactions/write_prepared_transaction_impl.h" #include "utilities/transactions/write_prepared_txn.h"
#include <map> #include <map>
#include <set>
#include <string>
#include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb { namespace rocksdb {
struct WriteOptions; struct WriteOptions;
WritePreparedTxnImpl::WritePreparedTxnImpl( WritePreparedTxn::WritePreparedTxn(
TransactionDB* txn_db, const WriteOptions& write_options, TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: PessimisticTxn(txn_db, write_options, txn_options) { : PessimisticTransaction(txn_db, write_options, txn_options) {
PessimisticTxn::Initialize(txn_options); PessimisticTransaction::Initialize(txn_options);
} }
Status WritePreparedTxnImpl::CommitBatch(WriteBatch* batch) { Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) {
// TODO(myabandeh) Implement this // TODO(myabandeh) Implement this
throw std::runtime_error("CommitBatch not Implemented"); throw std::runtime_error("CommitBatch not Implemented");
return Status::OK(); return Status::OK();
} }
Status WritePreparedTxnImpl::Prepare() { Status WritePreparedTxn::PrepareInternal() {
// TODO(myabandeh) Implement this // TODO(myabandeh) Implement this
throw std::runtime_error("Prepare not Implemented"); throw std::runtime_error("Prepare not Implemented");
return Status::OK(); return Status::OK();
} }
Status WritePreparedTxnImpl::Commit() { Status WritePreparedTxn::CommitWithoutPrepareInternal() {
// TODO(myabandeh) Implement this // TODO(myabandeh) Implement this
throw std::runtime_error("Commit not Implemented"); throw std::runtime_error("Commit not Implemented");
return Status::OK(); return Status::OK();
} }
Status WritePreparedTxnImpl::Rollback() { Status WritePreparedTxn::CommitInternal() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Commit not Implemented");
return Status::OK();
}
Status WritePreparedTxn::Rollback() {
// TODO(myabandeh) Implement this // TODO(myabandeh) Implement this
throw std::runtime_error("Rollback not Implemented"); throw std::runtime_error("Rollback not Implemented");
return Status::OK(); return Status::OK();

@ -26,7 +26,7 @@
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_util.h" #include "utilities/transactions/transaction_util.h"
namespace rocksdb { namespace rocksdb {
@ -35,24 +35,26 @@ class TransactionDBImpl;
// This impl could write to DB also uncomitted data and then later tell apart // This impl could write to DB also uncomitted data and then later tell apart
// committed data from uncomitted data. Uncommitted data could be after the // committed data from uncomitted data. Uncommitted data could be after the
// Prepare phase in 2PC (WritePreparedTxnImpl) or before that // Prepare phase in 2PC (WritePreparedTxn) or before that
// (WriteUnpreparedTxnImpl). // (WriteUnpreparedTxnImpl).
class WritePreparedTxnImpl : public PessimisticTxn { class WritePreparedTxn : public PessimisticTransaction {
public: public:
WritePreparedTxnImpl(TransactionDB* db, const WriteOptions& write_options, WritePreparedTxn(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
virtual ~WritePreparedTxnImpl() {} virtual ~WritePreparedTxn() {}
Status Prepare() override;
Status Commit() override;
Status CommitBatch(WriteBatch* batch) override; Status CommitBatch(WriteBatch* batch) override;
Status Rollback() override; Status Rollback() override;
private: private:
Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override;
Status CommitInternal() override;
// TODO(myabandeh): verify that the current impl work with values being // TODO(myabandeh): verify that the current impl work with values being
// written with prepare sequence number too. // written with prepare sequence number too.
// Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice&
@ -61,8 +63,8 @@ class WritePreparedTxnImpl : public PessimisticTxn {
// new_seqno); // new_seqno);
// No copying allowed // No copying allowed
WritePreparedTxnImpl(const WritePreparedTxnImpl&); WritePreparedTxn(const WritePreparedTxn&);
void operator=(const WritePreparedTxnImpl&); void operator=(const WritePreparedTxn&);
}; };
} // namespace rocksdb } // namespace rocksdb
Loading…
Cancel
Save