[rocksdb] Two Phase Transaction

Summary:
Two Phase Commit addition to RocksDB.

See wiki: https://github.com/facebook/rocksdb/wiki/Two-Phase-Commit-Implementation
Quip: https://fb.quip.com/pxZrAyrx53r3

Depends on:
WriteBatch modification: https://reviews.facebook.net/D54093
Memtable Log Referencing and Prepared Batch Recovery: https://reviews.facebook.net/D56919

Test Plan:
- SimpleTwoPhaseTransactionTest
- PersistentTwoPhaseTransactionTest.
- TwoPhaseRollbackTest
- TwoPhaseMultiThreadTest
- TwoPhaseLogRollingTest
- TwoPhaseEmptyWriteTest
- TwoPhaseExpirationTest

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: leveldb, hermanlee4, andrewkr, vasilep, dhruba, santoshb

Differential Revision: https://reviews.facebook.net/D56925
main
Reid Horuff 9 years ago
parent 1b8a2e8fdd
commit 8a66c85e90
  1. 6
      db/db_impl.h
  2. 38
      include/rocksdb/utilities/transaction.h
  3. 3
      include/rocksdb/utilities/transaction_db.h
  4. 14
      utilities/transactions/optimistic_transaction_impl.cc
  5. 6
      utilities/transactions/optimistic_transaction_impl.h
  6. 72
      utilities/transactions/transaction_base.cc
  7. 10
      utilities/transactions/transaction_base.h
  8. 80
      utilities/transactions/transaction_db_impl.cc
  9. 15
      utilities/transactions/transaction_db_impl.h
  10. 251
      utilities/transactions/transaction_impl.cc
  11. 12
      utilities/transactions/transaction_impl.h
  12. 651
      utilities/transactions/transaction_test.cc

@ -443,6 +443,11 @@ class DBImpl : public DB {
bool allow_2pc() const { return db_options_.allow_2pc; } bool allow_2pc() const { return db_options_.allow_2pc; }
std::unordered_map<std::string, RecoveredTransaction*>
recovered_transactions() {
return recovered_transactions_;
}
RecoveredTransaction* GetRecoveredTransaction(const std::string& name) { RecoveredTransaction* GetRecoveredTransaction(const std::string& name) {
auto it = recovered_transactions_.find(name); auto it = recovered_transactions_.find(name);
if (it == recovered_transactions_.end()) { if (it == recovered_transactions_.end()) {
@ -521,6 +526,7 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
friend class InternalStats; friend class InternalStats;
friend class TransactionImpl;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
friend class ForwardIterator; friend class ForwardIterator;
#endif #endif

@ -20,6 +20,8 @@ class Iterator;
class TransactionDB; class TransactionDB;
class WriteBatchWithIndex; class WriteBatchWithIndex;
typedef std::string TransactionName;
// Provides notification to the caller of SetSnapshotOnNextOperation when // Provides notification to the caller of SetSnapshotOnNextOperation when
// the actual snapshot gets created // the actual snapshot gets created
class TransactionNotifier { class TransactionNotifier {
@ -114,6 +116,9 @@ class Transaction {
// longer be valid and should be discarded after a call to ClearSnapshot(). // longer be valid and should be discarded after a call to ClearSnapshot().
virtual void ClearSnapshot() = 0; virtual void ClearSnapshot() = 0;
// Prepare the current transation for 2PC
virtual Status Prepare() = 0;
// Write all batched keys to the db atomically. // Write all batched keys to the db atomically.
// //
// Returns OK on success. // Returns OK on success.
@ -132,7 +137,7 @@ class Transaction {
virtual Status Commit() = 0; virtual Status Commit() = 0;
// Discard all batched writes in this transaction. // Discard all batched writes in this transaction.
virtual void Rollback() = 0; virtual Status Rollback() = 0;
// Records the state of the transaction for future calls to // Records the state of the transaction for future calls to
// RollbackToSavePoint(). May be called multiple times to set multiple save // RollbackToSavePoint(). May be called multiple times to set multiple save
@ -378,10 +383,41 @@ class Transaction {
const Slice& key) = 0; const Slice& key) = 0;
virtual void UndoGetForUpdate(const Slice& key) = 0; virtual void UndoGetForUpdate(const Slice& key) = 0;
virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) = 0;
virtual WriteBatch* GetCommitTimeWriteBatch() = 0;
virtual void SetLogNumber(uint64_t log) { log_number_ = log; }
virtual uint64_t GetLogNumber() { return log_number_; }
virtual Status SetName(const TransactionName& name) = 0;
virtual TransactionName GetName() { return name_; }
enum ExecutionStatus {
STARTED = 0,
AWAITING_PREPARE = 1,
PREPARED = 2,
AWAITING_COMMIT = 3,
COMMITED = 4,
AWAITING_ROLLBACK = 5,
ROLLEDBACK = 6,
LOCKS_STOLEN = 7,
};
// Execution status of the transaction.
std::atomic<ExecutionStatus> exec_status_;
protected: protected:
explicit Transaction(const TransactionDB* db) {} explicit Transaction(const TransactionDB* db) {}
Transaction() {} Transaction() {}
// the log in which the prepared section for this txn resides
// (for two phase commit)
uint64_t log_number_;
TransactionName name_;
private: private:
// No copying allowed // No copying allowed
Transaction(const Transaction&); Transaction(const Transaction&);

@ -124,6 +124,9 @@ class TransactionDB : public StackableDB {
const TransactionOptions& txn_options = TransactionOptions(), const TransactionOptions& txn_options = TransactionOptions(),
Transaction* old_txn = nullptr) = 0; Transaction* old_txn = nullptr) = 0;
virtual Transaction* GetTransactionByName(const TransactionName& name) = 0;
virtual void GetAllPreparedTransactions(std::vector<Transaction*>* trans) = 0;
protected: protected:
// To Create an TransactionDB, call Open() // To Create an TransactionDB, call Open()
explicit TransactionDB(DB* db) : StackableDB(db) {} explicit TransactionDB(DB* db) : StackableDB(db) {}

@ -52,6 +52,11 @@ void OptimisticTransactionImpl::Clear() {
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
Status OptimisticTransactionImpl::Prepare() {
return Status::InvalidArgument(
"Two phase commit not supported for optimistic transactions.");
}
Status OptimisticTransactionImpl::Commit() { Status OptimisticTransactionImpl::Commit() {
// Set up callback which will call CheckTransactionForConflicts() to // Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed. // check whether this transaction is safe to be committed.
@ -75,7 +80,10 @@ Status OptimisticTransactionImpl::Commit() {
return s; return s;
} }
void OptimisticTransactionImpl::Rollback() { Clear(); } Status OptimisticTransactionImpl::Rollback() {
Clear();
return Status::OK();
}
// Record this key so that we can check it for conflicts at commit time. // Record this key so that we can check it for conflicts at commit time.
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
@ -123,6 +131,10 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
true /* cache_only */); true /* cache_only */);
} }
Status OptimisticTransactionImpl::SetName(const TransactionName& name) {
return Status::InvalidArgument("Optimistic transactions cannot be named.");
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -38,9 +38,13 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
const WriteOptions& write_options, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options); const OptimisticTransactionOptions& txn_options);
Status Prepare() override;
Status Commit() override; Status Commit() override;
void Rollback() override; Status Rollback() override;
Status SetName(const TransactionName& name) override;
protected: protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,

@ -19,11 +19,18 @@ namespace rocksdb {
TransactionBaseImpl::TransactionBaseImpl(DB* db, TransactionBaseImpl::TransactionBaseImpl(DB* db,
const WriteOptions& write_options) const WriteOptions& write_options)
: db_(db), : db_(db),
dbimpl_(reinterpret_cast<DBImpl*>(db)),
write_options_(write_options), write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
start_time_(db_->GetEnv()->NowMicros()), start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true), write_batch_(cmp_, 0, true),
indexing_enabled_(true) {} indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
if (dbimpl_->allow_2pc()) {
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
}
}
TransactionBaseImpl::~TransactionBaseImpl() { TransactionBaseImpl::~TransactionBaseImpl() {
// Release snapshot if snapshot is set // Release snapshot if snapshot is set
@ -33,10 +40,15 @@ TransactionBaseImpl::~TransactionBaseImpl() {
void TransactionBaseImpl::Clear() { void TransactionBaseImpl::Clear() {
save_points_.reset(nullptr); save_points_.reset(nullptr);
write_batch_.Clear(); write_batch_.Clear();
commit_time_batch_.Clear();
tracked_keys_.clear(); tracked_keys_.clear();
num_puts_ = 0; num_puts_ = 0;
num_deletes_ = 0; num_deletes_ = 0;
num_merges_ = 0; num_merges_ = 0;
if (dbimpl_->allow_2pc()) {
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
}
} }
void TransactionBaseImpl::Reinitialize(DB* db, void TransactionBaseImpl::Reinitialize(DB* db,
@ -44,6 +56,8 @@ void TransactionBaseImpl::Reinitialize(DB* db,
Clear(); Clear();
ClearSnapshot(); ClearSnapshot();
db_ = db; db_ = db;
name_.clear();
log_number_ = 0;
write_options_ = write_options; write_options_ = write_options;
start_time_ = db_->GetEnv()->NowMicros(); start_time_ = db_->GetEnv()->NowMicros();
indexing_enabled_ = true; indexing_enabled_ = true;
@ -51,11 +65,7 @@ void TransactionBaseImpl::Reinitialize(DB* db,
} }
void TransactionBaseImpl::SetSnapshot() { void TransactionBaseImpl::SetSnapshot() {
assert(dynamic_cast<DBImpl*>(db_) != nullptr); const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
auto db_impl = reinterpret_cast<DBImpl*>(db_);
const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary();
SetSnapshotInternal(snapshot); SetSnapshotInternal(snapshot);
} }
@ -571,6 +581,56 @@ void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
} }
} }
Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
Transaction* txn_;
DBImpl* db_;
IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
: txn_(txn), db_(db) {
assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
}
Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return txn_->Put(db_->GetColumnFamilyHandle(cf), key, val);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return txn_->Delete(db_->GetColumnFamilyHandle(cf), key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return txn_->Merge(db_->GetColumnFamilyHandle(cf), key, val);
}
// this is used for reconstructing prepared transactions upon
// recovery. there should not be any meta markers in the batches
// we are processing.
Status MarkBeginPrepare() override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
IndexedWriteBatchBuilder copycat(this, dbimpl_);
return src_batch->Iterate(&copycat);
}
WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
return &commit_time_batch_;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -213,6 +213,12 @@ class TransactionBaseImpl : public Transaction {
// Used for memory management for snapshot_ // Used for memory management for snapshot_
void ReleaseSnapshot(const Snapshot* snapshot, DB* db); void ReleaseSnapshot(const Snapshot* snapshot, DB* db);
// iterates over the given batch and makes the appropriate inserts.
// used for rebuilding prepared transactions after recovery.
Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
WriteBatch* GetCommitTimeWriteBatch() override;
protected: protected:
// Add a key to the list of tracked keys. // Add a key to the list of tracked keys.
// //
@ -236,6 +242,7 @@ class TransactionBaseImpl : public Transaction {
void SetSnapshotIfNeeded(); void SetSnapshotIfNeeded();
DB* db_; DB* db_;
DBImpl* dbimpl_;
WriteOptions write_options_; WriteOptions write_options_;
@ -279,6 +286,9 @@ class TransactionBaseImpl : public Transaction {
// Records writes pending in this transaction // Records writes pending in this transaction
WriteBatchWithIndex write_batch_; WriteBatchWithIndex write_batch_;
// batch to be written at commit time
WriteBatch commit_time_batch_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be // Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called. // nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_; std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_;

@ -23,12 +23,15 @@ namespace rocksdb {
TransactionDBImpl::TransactionDBImpl(DB* db, TransactionDBImpl::TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options) const TransactionDBOptions& txn_db_options)
: TransactionDB(db), : TransactionDB(db),
db_impl_(dynamic_cast<DBImpl*>(db)),
txn_db_options_(txn_db_options), txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.custom_mutex_factory txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory ? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>( : std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {} new TransactionDBMutexFactoryImpl())) {
assert(db_impl_ != nullptr);
}
Transaction* TransactionDBImpl::BeginTransaction( Transaction* TransactionDBImpl::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options, const WriteOptions& write_options, const TransactionOptions& txn_options,
@ -100,7 +103,9 @@ Status TransactionDB::Open(
} }
} }
s = DB::Open(db_options, dbname, column_families_copy, handles, &db); DBOptions db_options_2pc = db_options;
db_options_2pc.allow_2pc = true;
s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db);
if (s.ok()) { if (s.ok()) {
TransactionDBImpl* txn_db = new TransactionDBImpl( TransactionDBImpl* txn_db = new TransactionDBImpl(
@ -121,6 +126,37 @@ Status TransactionDB::Open(
} }
s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles); s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles);
// create 'real' transactions from recovered shell transactions
assert(dynamic_cast<DBImpl*>(db) != nullptr);
auto dbimpl = reinterpret_cast<DBImpl*>(db);
auto rtrxs = dbimpl->recovered_transactions();
for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
auto recovered_trx = it->second;
assert(recovered_trx);
assert(recovered_trx->log_number_);
assert(recovered_trx->name_.length());
WriteOptions w_options;
TransactionOptions t_options;
Transaction* real_trx =
txn_db->BeginTransaction(w_options, t_options, nullptr);
assert(real_trx);
real_trx->SetLogNumber(recovered_trx->log_number_);
s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
break;
}
s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_);
real_trx->exec_status_ = Transaction::PREPARED;
if (!s.ok()) {
break;
}
}
} }
return s; return s;
@ -315,5 +351,45 @@ void TransactionDBImpl::ReinitializeTransaction(
txn_impl->Reinitialize(this, write_options, txn_options); txn_impl->Reinitialize(this, write_options, txn_options);
} }
Transaction* TransactionDBImpl::GetTransactionByName(
const TransactionName& name) {
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(name);
if (it == transactions_.end()) {
return nullptr;
} else {
return it->second;
}
}
void TransactionDBImpl::GetAllPreparedTransactions(
std::vector<Transaction*>* transv) {
assert(transv);
transv->clear();
std::lock_guard<std::mutex> lock(name_map_mutex_);
for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
if (it->second->exec_status_ == Transaction::PREPARED) {
transv->push_back(it->second);
}
}
}
void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);
assert(GetTransactionByName(txn->GetName()) == nullptr);
assert(txn->exec_status_ == Transaction::STARTED);
std::lock_guard<std::mutex> lock(name_map_mutex_);
transactions_[txn->GetName()] = txn;
}
void TransactionDBImpl::UnregisterTransaction(Transaction* txn) {
assert(txn);
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(txn->GetName());
assert(it != transactions_.end());
transactions_.erase(it);
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -7,8 +7,10 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <mutex> #include <mutex>
#include <queue>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -78,11 +80,20 @@ class TransactionDBImpl : public TransactionDB {
// is expirable (GetExpirationTime() > 0) and that it is expired. // is expirable (GetExpirationTime() > 0) and that it is expired.
bool TryStealingExpiredTransactionLocks(TransactionID tx_id); bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
Transaction* GetTransactionByName(const TransactionName& name) override;
void RegisterTransaction(Transaction* txn);
void UnregisterTransaction(Transaction* txn);
// not thread safe. current use case is during recovery (single thread)
void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
private: private:
void ReinitializeTransaction( void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options, Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions()); const TransactionOptions& txn_options = TransactionOptions());
DBImpl* db_impl_;
const TransactionDBOptions txn_db_options_; const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_; TransactionLockMgr lock_mgr_;
@ -97,6 +108,10 @@ class TransactionDBImpl : public TransactionDB {
std::mutex map_mutex_; std::mutex map_mutex_;
std::unordered_map<TransactionID, TransactionImpl*> std::unordered_map<TransactionID, TransactionImpl*>
expirable_transactions_map_; expirable_transactions_map_;
// map from name to two phase transaction instance
std::mutex name_map_mutex_;
std::unordered_map<TransactionName, Transaction*> transactions_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -41,11 +41,13 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
txn_db_impl_(nullptr), txn_db_impl_(nullptr),
txn_id_(0), txn_id_(0),
expiration_time_(0), expiration_time_(0),
lock_timeout_(0), lock_timeout_(0) {
exec_status_(STARTED) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db); txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_); assert(txn_db_impl_);
db_impl_ = dynamic_cast<DBImpl*>(txn_db->GetBaseDB());
assert(db_impl_);
Initialize(txn_options); Initialize(txn_options);
} }
@ -81,6 +83,15 @@ TransactionImpl::~TransactionImpl() {
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
txn_db_impl_->RemoveExpirableTransaction(txn_id_); txn_db_impl_->RemoveExpirableTransaction(txn_id_);
} }
if (!name_.empty() && exec_status_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this);
}
// if we have a prep section that was never committed
// and we are releasing the transaction then we
// can release that prep section
if (log_number_ != 0 && exec_status_ != COMMITED) {
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
}
} }
void TransactionImpl::Clear() { void TransactionImpl::Clear() {
@ -91,6 +102,15 @@ void TransactionImpl::Clear() {
void TransactionImpl::Reinitialize(TransactionDB* txn_db, void TransactionImpl::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
if (!name_.empty() && exec_status_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this);
}
// if we have a prep section that was never committed
// and we are releasing the transaction then we
// can release that prep section
if (log_number_ != 0 && exec_status_ != COMMITED) {
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
}
TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
Initialize(txn_options); Initialize(txn_options);
} }
@ -108,61 +128,216 @@ bool TransactionImpl::IsExpired() const {
Status TransactionImpl::CommitBatch(WriteBatch* batch) { Status TransactionImpl::CommitBatch(WriteBatch* batch) {
TransactionKeyMap keys_to_unlock; TransactionKeyMap keys_to_unlock;
Status s = LockBatch(batch, &keys_to_unlock); Status s = LockBatch(batch, &keys_to_unlock);
if (s.ok()) { if (!s.ok()) {
s = DoCommit(batch); return s;
}
txn_db_impl_->UnLock(this, &keys_to_unlock); bool can_commit = false;
if (IsExpired()) {
s = Status::Expired();
} else if (expiration_time_ > 0) {
ExecutionStatus expected = STARTED;
can_commit = std::atomic_compare_exchange_strong(&exec_status_, &expected,
AWAITING_COMMIT);
} else if (exec_status_ == STARTED) {
// lock stealing is not a concern
can_commit = true;
}
if (can_commit) {
exec_status_.store(AWAITING_COMMIT);
s = db_->Write(write_options_, batch);
if (s.ok()) {
exec_status_.store(COMMITED);
}
} else if (exec_status_ == LOCKS_STOLEN) {
s = Status::Expired();
} else {
s = Status::InvalidArgument("Transaction is not in state for commit.");
} }
txn_db_impl_->UnLock(this, &keys_to_unlock);
return s; return s;
} }
Status TransactionImpl::Commit() { Status TransactionImpl::Prepare() {
Status s = DoCommit(GetWriteBatch()->GetWriteBatch()); Status s;
if (name_.empty()) {
return Status::InvalidArgument(
"Cannot prepare a transaction that has not been named.");
}
Clear(); if (IsExpired()) {
return Status::Expired();
}
bool can_prepare = false;
if (expiration_time_ > 0) {
// must concern ourselves with expiraton and/or lock stealing
// need to compare/exchange bc locks could be stolen under us here
ExecutionStatus expected = STARTED;
can_prepare = std::atomic_compare_exchange_strong(&exec_status_, &expected,
AWAITING_PREPARE);
} else if (exec_status_ == STARTED) {
// expiration and lock stealing is not possible
can_prepare = true;
}
if (can_prepare) {
exec_status_.store(AWAITING_PREPARE);
// transaction can't expire after preparation
expiration_time_ = 0;
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
/* disable_memtable*/ true);
if (s.ok()) {
assert(log_number_ != 0);
dbimpl_->MarkLogAsContainingPrepSection(log_number_);
exec_status_.store(PREPARED);
}
} else if (exec_status_ == LOCKS_STOLEN) {
s = Status::Expired();
} else if (exec_status_ == PREPARED) {
s = Status::InvalidArgument("Transaction has already been prepared.");
} else if (exec_status_ == COMMITED) {
s = Status::InvalidArgument("Transaction has already been committed.");
} else if (exec_status_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback.");
} else {
s = Status::InvalidArgument("Transaction is not in state for commit.");
}
return s; return s;
} }
Status TransactionImpl::DoCommit(WriteBatch* batch) { Status TransactionImpl::Commit() {
Status s; Status s;
bool commit_single = false;
bool commit_prepared = false;
if (expiration_time_ > 0) { if (IsExpired()) {
if (IsExpired()) { return Status::Expired();
return Status::Expired(); }
}
// Transaction should only be committed if the thread succeeds if (expiration_time_ > 0) {
// changing its execution status to COMMITTING. This is because // we must atomicaly compare and exchange the state here because at
// A different transaction may consider this one expired and attempt // this state in the transaction it is possible for another thread
// to steal its locks between the IsExpired() check and the beginning // to change our state out from under us in the even that we expire and have
// of a commit. // our locks stolen. In this case the only valid state is STARTED because
// a state of PREPARED would have a cleared expiration_time_.
ExecutionStatus expected = STARTED; ExecutionStatus expected = STARTED;
bool can_commit = std::atomic_compare_exchange_strong( commit_single = std::atomic_compare_exchange_strong(
&exec_status_, &expected, COMMITTING); &exec_status_, &expected, AWAITING_COMMIT);
TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
} else if (exec_status_ == PREPARED) {
// expiration and lock stealing is not a concern
commit_prepared = true;
} else if (exec_status_ == STARTED) {
// expiration and lock stealing is not a concern
commit_single = true;
}
if (can_commit) { if (commit_single) {
s = db_->Write(write_options_, batch); assert(!commit_prepared);
if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
s = Status::InvalidArgument(
"Commit-time batch contains values that will not be committed.");
} else { } else {
assert(exec_status_ == LOCKS_STOLEN); exec_status_.store(AWAITING_COMMIT);
return Status::Expired(); s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
Clear();
if (s.ok()) {
exec_status_.store(COMMITED);
}
} }
} else if (commit_prepared) {
exec_status_.store(AWAITING_COMMIT);
WriteOptions write_options = write_options_;
// insert prepared batch into Memtable only.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
write_options.disableWAL = true;
assert(log_number_ > 0);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
nullptr, nullptr, log_number_);
if (!s.ok()) {
return s;
}
// We take the commit-time batch and append the Commit marker.
// We then write this batch to both WAL and Memtable.
// The Memtable will ignore the Commit marker in non-recovery mode
write_options.disableWAL = false;
WriteBatchInternal::MarkCommit(GetCommitTimeWriteBatch(), name_);
s = db_impl_->WriteImpl(write_options, GetCommitTimeWriteBatch());
if (!s.ok()) {
return s;
}
// FindObsoleteFiles must now look to the memtables
// to determine what prep logs must be kept around,
// not the prep section heap.
assert(log_number_ > 0);
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
txn_db_impl_->UnregisterTransaction(this);
Clear();
exec_status_.store(COMMITED);
} else if (exec_status_ == LOCKS_STOLEN) {
s = Status::Expired();
} else if (exec_status_ == COMMITED) {
s = Status::InvalidArgument("Transaction has already been committed.");
} else if (exec_status_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback.");
} else { } else {
s = db_->Write(write_options_, batch); s = Status::InvalidArgument("Transaction is not in state for commit.");
} }
return s; return s;
} }
void TransactionImpl::Rollback() { Clear(); } Status TransactionImpl::Rollback() {
Status s;
if (exec_status_ == PREPARED) {
WriteBatch rollback_marker;
WriteBatchInternal::MarkRollback(&rollback_marker, name_);
exec_status_.store(AWAITING_ROLLBACK);
s = db_impl_->WriteImpl(write_options_, &rollback_marker);
if (s.ok()) {
// we do not need to keep our prepared section around
assert(log_number_ > 0);
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
Clear();
exec_status_.store(ROLLEDBACK);
}
} else if (exec_status_ == STARTED) {
// prepare couldn't have taken place
Clear();
} else if (exec_status_ == COMMITED) {
s = Status::InvalidArgument("This transaction has already been committed.");
} else {
s = Status::InvalidArgument(
"Two phase transaction is not in state for rollback.");
}
return s;
}
Status TransactionImpl::RollbackToSavePoint() { Status TransactionImpl::RollbackToSavePoint() {
if (exec_status_ != STARTED) {
return Status::InvalidArgument("Transaction is beyond state for rollback.");
}
// Unlock any keys locked since last transaction // Unlock any keys locked since last transaction
const std::unique_ptr<TransactionKeyMap>& keys = const std::unique_ptr<TransactionKeyMap>& keys =
GetTrackedKeysSinceSavePoint(); GetTrackedKeysSinceSavePoint();
@ -370,6 +545,26 @@ void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
} }
Status TransactionImpl::SetName(const TransactionName& name) {
Status s;
if (exec_status_ == STARTED) {
if (name_.length()) {
s = Status::InvalidArgument("Transaction has already been named.");
} else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
s = Status::InvalidArgument("Transaction name must be unique.");
} else if (name.length() < 1 || name.length() > 512) {
s = Status::InvalidArgument(
"Transaction name length must be between 1 and 512 chars.");
} else {
name_ = name;
txn_db_impl_->RegisterTransaction(this);
}
} else {
s = Status::InvalidArgument("Transaction is beyond state for naming.");
}
return s;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -41,14 +41,18 @@ class TransactionImpl : public TransactionBaseImpl {
void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
Status Prepare() override;
Status Commit() override; Status Commit() override;
Status CommitBatch(WriteBatch* batch); Status CommitBatch(WriteBatch* batch);
void Rollback() override; Status Rollback() override;
Status RollbackToSavePoint() override; Status RollbackToSavePoint() override;
Status SetName(const TransactionName& name) override;
// Generate a new unique transaction identifier // Generate a new unique transaction identifier
static TransactionID GenTxnID(); static TransactionID GenTxnID();
@ -77,9 +81,8 @@ class TransactionImpl : public TransactionBaseImpl {
bool read_only, bool untracked = false) override; bool read_only, bool untracked = false) override;
private: private:
enum ExecutionStatus { STARTED, COMMITTING, LOCKS_STOLEN };
TransactionDBImpl* txn_db_impl_; TransactionDBImpl* txn_db_impl_;
DBImpl* db_impl_;
// Used to create unique ids for transactions. // Used to create unique ids for transactions.
static std::atomic<TransactionID> txn_id_counter_; static std::atomic<TransactionID> txn_id_counter_;
@ -94,9 +97,6 @@ class TransactionImpl : public TransactionBaseImpl {
// Timeout in microseconds when locking a key or -1 if there is no timeout. // Timeout in microseconds when locking a key or -1 if there is no timeout.
int64_t lock_timeout_; int64_t lock_timeout_;
// Execution status of the transaction.
std::atomic<ExecutionStatus> exec_status_;
void Clear() override; void Clear() override;
void Initialize(const TransactionOptions& txn_options); void Initialize(const TransactionOptions& txn_options);

@ -55,6 +55,13 @@ class TransactionTest : public testing::Test {
DestroyDB(dbname, options); DestroyDB(dbname, options);
} }
Status ReOpenNoDelete() {
delete db;
db = nullptr;
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
return s;
}
Status ReOpen() { Status ReOpen() {
delete db; delete db;
DestroyDB(dbname, options); DestroyDB(dbname, options);
@ -113,6 +120,635 @@ TEST_F(TransactionTest, SuccessTest) {
delete txn; delete txn;
} }
TEST_F(TransactionTest, CommitTimeBatchFailTest) {
WriteOptions write_options;
TransactionOptions txn_options;
string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
txn1->GetCommitTimeWriteBatch()->Put("cat", "dog");
s = txn1->Put("foo", "bar");
ASSERT_OK(s);
// fails due to non-empty commit-time batch
s = txn1->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
delete txn1;
}
TEST_F(TransactionTest, SimpleTwoPhaseTransactionTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
ASSERT_EQ(db->GetTransactionByName("xid"), txn);
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// regular db put
s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// regular db read
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar2");
// commit time put
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
// nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
s = txn->Prepare();
ASSERT_OK(s);
// data not im mem yet
s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, Slice("gtid"), &value);
ASSERT_TRUE(s.IsNotFound());
// find trans in list of prepared transactions
std::vector<Transaction*> prepared_trans;
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 1);
ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
auto log_containing_prep =
db_impl->TEST_FindMinLogContainingOutstandingPrep();
ASSERT_GT(log_containing_prep, 0);
// make commit
s = txn->Commit();
ASSERT_OK(s);
// value is now available
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
// we already committed
s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
// no longer is prpared results
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
db_impl->TEST_FlushMemTable(true);
// after memtable flush we can now relese the log
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn;
}
TEST_F(TransactionTest, TwoPhaseNameTest) {
Status s;
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn3);
delete txn3;
// cant prepare txn without name
s = txn1->Prepare();
ASSERT_EQ(s, Status::InvalidArgument());
// name too short
s = txn1->SetName("");
ASSERT_EQ(s, Status::InvalidArgument());
// name too long
s = txn1->SetName(std::string(513, 'x'));
ASSERT_EQ(s, Status::InvalidArgument());
// valid set name
s = txn1->SetName("name1");
ASSERT_OK(s);
// cant have duplicate name
s = txn2->SetName("name1");
ASSERT_EQ(s, Status::InvalidArgument());
// shouldn't be able to prepare
s = txn2->Prepare();
ASSERT_EQ(s, Status::InvalidArgument());
// valid name set
s = txn2->SetName("name2");
ASSERT_OK(s);
// cant reset name
s = txn2->SetName("name3");
ASSERT_EQ(s, Status::InvalidArgument());
ASSERT_EQ(txn1->GetName(), "name1");
ASSERT_EQ(txn2->GetName(), "name2");
s = txn1->Prepare();
ASSERT_OK(s);
// can't rename after prepare
s = txn1->SetName("name4");
ASSERT_EQ(s, Status::InvalidArgument());
delete txn1;
delete txn2;
}
TEST_F(TransactionTest, TwoPhaseEmptyWriteTest) {
Status s;
std::string value;
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2);
s = txn1->SetName("joe");
ASSERT_OK(s);
s = txn2->SetName("bob");
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
s = txn2->Prepare();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
delete txn2;
}
TEST_F(TransactionTest, TwoPhaseExpirationTest) {
Status s;
WriteOptions write_options;
TransactionOptions txn_options;
txn_options.expiration = 500; // 500ms
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
ASSERT_TRUE(txn1);
s = txn1->SetName("joe");
ASSERT_OK(s);
s = txn2->SetName("bob");
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
s = txn1->Commit();
ASSERT_OK(s);
s = txn2->Prepare();
ASSERT_EQ(s, Status::Expired());
delete txn1;
delete txn2;
}
TEST_F(TransactionTest, TwoPhaseRollbackTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
// transaction put
s = txn->Put(Slice("tfoo"), Slice("tbar"));
ASSERT_OK(s);
// value is readable form txn
s = txn->Get(read_options, Slice("tfoo"), &value);
ASSERT_OK(s);
ASSERT_EQ(value, "tbar");
// issue rollback
s = txn->Rollback();
ASSERT_OK(s);
// value is nolonger readable
s = txn->Get(read_options, Slice("tfoo"), &value);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(txn->GetNumPuts(), 0);
// put new txn values
s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
ASSERT_OK(s);
// new value is readable from txn
s = txn->Get(read_options, Slice("tfoo2"), &value);
ASSERT_OK(s);
ASSERT_EQ(value, "tbar2");
s = txn->Prepare();
ASSERT_OK(s);
// flush to next wal
s = db->Put(write_options, Slice("foo"), Slice("bar"));
ASSERT_OK(s);
db_impl->TEST_FlushMemTable(true);
// issue rollback (marker written to WAL)
s = txn->Rollback();
ASSERT_OK(s);
// value is nolonger readable
s = txn->Get(read_options, Slice("tfoo2"), &value);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(txn->GetNumPuts(), 0);
// make commit
s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
// try rollback again
s = txn->Rollback();
ASSERT_EQ(s, Status::InvalidArgument());
}
TEST_F(TransactionTest, PersistentTwoPhaseTransactionTest) {
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = false;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
ASSERT_EQ(db->GetTransactionByName("xid"), txn);
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// txn read
s = txn->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
// regular db put
s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
db_impl->TEST_FlushMemTable(true);
// regular db read
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar2");
// nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// prepare
s = txn->Prepare();
ASSERT_OK(s);
// still not available to db
s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound());
// kill and reopen
s = ReOpenNoDelete();
ASSERT_OK(s);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// find trans in list of prepared transactions
std::vector<Transaction*> prepared_trans;
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 1);
txn = prepared_trans.front();
ASSERT_TRUE(txn);
ASSERT_EQ(txn->GetName(), "xid");
ASSERT_EQ(db->GetTransactionByName("xid"), txn);
// log has been marked
auto log_containing_prep =
db_impl->TEST_FindMinLogContainingOutstandingPrep();
ASSERT_GT(log_containing_prep, 0);
// value is readable from txn
s = txn->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
// make commit
s = txn->Commit();
ASSERT_OK(s);
// value is now available
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar");
// we already committed
s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
// no longer is prpared results
prepared_trans.clear();
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
// transaction should no longer be visible
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
db_impl->TEST_FlushMemTable(true);
// after memtable flush we can now relese the log
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn;
// deleting transaction should unregister transaction
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
}
TEST_F(TransactionTest, TwoPhaseMultiThreadTest) {
// mix transaction writes and regular writes
const int NUM_TXN_THREADS = 50;
std::atomic<uint32_t> txn_thread_num(0);
std::function<void()> txn_write_thread = [&]() {
uint32_t id = txn_thread_num.fetch_add(1);
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = false;
TransactionOptions txn_options;
txn_options.lock_timeout = 1000000;
if (id % 2 == 0) {
txn_options.expiration = 1000000;
}
TransactionName name("xid_" + std::string(1, 'A' + id));
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName(name));
for (int i = 0; i < 10; i++) {
std::string key(name + "_" + std::string(1, 'A' + i));
ASSERT_OK(txn->Put(key, "val"));
}
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
delete txn;
};
// assure that all thread are in the same write group
std::atomic<uint32_t> t_wait_on_prepare(0);
std::atomic<uint32_t> t_wait_on_commit(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (writer->ShouldWriteToWAL()) {
t_wait_on_prepare.fetch_add(1);
// wait for friends
while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
}
} else if (writer->ShouldWriteToMemtable()) {
t_wait_on_commit.fetch_add(1);
// wait for friends
while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
}
} else {
ASSERT_TRUE(false);
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes
std::vector<std::thread> threads;
for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
threads.emplace_back(txn_write_thread);
}
for (auto& t : threads) {
t.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
ReadOptions read_options;
std::string value;
Status s;
for (int t = 0; t < NUM_TXN_THREADS; t++) {
TransactionName name("xid_" + std::string(1, 'A' + t));
for (int i = 0; i < 10; i++) {
std::string key(name + "_" + std::string(1, 'A' + i));
s = db->Get(read_options, key, &value);
ASSERT_OK(s);
ASSERT_EQ(value, "val");
}
}
}
TEST_F(TransactionTest, TwoPhaseLogRollingTest) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Status s;
string v;
ColumnFamilyHandle *cfa, *cfb;
// Create 2 new column families
ColumnFamilyOptions cf_options;
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
TransactionOptions topts1;
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
s = txn1->SetName("xid1");
ASSERT_OK(s);
TransactionOptions topts2;
Transaction* txn2 = db->BeginTransaction(wopts, topts2);
s = txn2->SetName("xid2");
ASSERT_OK(s);
// transaction put in two column families
s = txn1->Put(cfa, "ka1", "va1");
ASSERT_OK(s);
// transaction put in two column families
s = txn2->Put(cfa, "ka2", "va2");
ASSERT_OK(s);
s = txn2->Put(cfb, "kb2", "vb2");
ASSERT_OK(s);
// write prep section to wal
s = txn1->Prepare();
ASSERT_OK(s);
// our log should be in the heap
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn1->GetLogNumber());
ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
// flush default cf to crate new log
s = db->Put(wopts, "foo", "bar");
ASSERT_OK(s);
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
// make sure we are on a new log
ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
// put txn2 prep section in this log
s = txn2->Prepare();
ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
// heap should still see first log
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn1->GetLogNumber());
// commit txn1
s = txn1->Commit();
ASSERT_OK(s);
// heap should now show txn2s log
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn2->GetLogNumber());
// we should see txn1s log refernced by the memtables
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn1->GetLogNumber());
// flush default cf to crate new log
s = db->Put(wopts, "foo", "bar2");
ASSERT_OK(s);
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
// make sure we are on a new log
ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
// commit txn2
s = txn2->Commit();
ASSERT_OK(s);
// heap should not show any logs
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// should show the first txn log
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn1->GetLogNumber());
// flush only cfa memtable
s = db_impl->TEST_FlushMemTable(true, cfa);
ASSERT_OK(s);
// should show the first txn log
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn2->GetLogNumber());
// flush only cfb memtable
s = db_impl->TEST_FlushMemTable(true, cfb);
ASSERT_OK(s);
// should show not dependency on logs
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
delete txn1;
delete txn2;
delete cfa;
delete cfb;
}
TEST_F(TransactionTest, FirstWriteTest) { TEST_F(TransactionTest, FirstWriteTest) {
WriteOptions write_options; WriteOptions write_options;
@ -1300,6 +1936,21 @@ TEST_F(TransactionTest, ReinitializeTest) {
s = db->Get(read_options, "Y", &value); s = db->Get(read_options, "Y", &value);
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
txn1 = db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->SetName("name");
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
txn1 = db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->SetName("name");
ASSERT_OK(s);
delete txn1; delete txn1;
} }

Loading…
Cancel
Save