From 8a66c85e904d76fb3420561fdf7c65bed6079bf5 Mon Sep 17 00:00:00 2001 From: Reid Horuff Date: Mon, 18 Apr 2016 11:15:50 -0700 Subject: [PATCH] [rocksdb] Two Phase Transaction Summary: Two Phase Commit addition to RocksDB. See wiki: https://github.com/facebook/rocksdb/wiki/Two-Phase-Commit-Implementation Quip: https://fb.quip.com/pxZrAyrx53r3 Depends on: WriteBatch modification: https://reviews.facebook.net/D54093 Memtable Log Referencing and Prepared Batch Recovery: https://reviews.facebook.net/D56919 Test Plan: - SimpleTwoPhaseTransactionTest - PersistentTwoPhaseTransactionTest. - TwoPhaseRollbackTest - TwoPhaseMultiThreadTest - TwoPhaseLogRollingTest - TwoPhaseEmptyWriteTest - TwoPhaseExpirationTest Reviewers: IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: leveldb, hermanlee4, andrewkr, vasilep, dhruba, santoshb Differential Revision: https://reviews.facebook.net/D56925 --- db/db_impl.h | 6 + include/rocksdb/utilities/transaction.h | 38 +- include/rocksdb/utilities/transaction_db.h | 3 + .../optimistic_transaction_impl.cc | 14 +- .../optimistic_transaction_impl.h | 6 +- utilities/transactions/transaction_base.cc | 72 +- utilities/transactions/transaction_base.h | 10 + utilities/transactions/transaction_db_impl.cc | 80 ++- utilities/transactions/transaction_db_impl.h | 15 + utilities/transactions/transaction_impl.cc | 251 ++++++- utilities/transactions/transaction_impl.h | 12 +- utilities/transactions/transaction_test.cc | 651 ++++++++++++++++++ 12 files changed, 1113 insertions(+), 45 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 57f7fd8a8..99fbf6963 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -443,6 +443,11 @@ class DBImpl : public DB { bool allow_2pc() const { return db_options_.allow_2pc; } + std::unordered_map + recovered_transactions() { + return recovered_transactions_; + } + RecoveredTransaction* GetRecoveredTransaction(const std::string& name) { auto it = recovered_transactions_.find(name); if (it == recovered_transactions_.end()) { @@ -521,6 +526,7 @@ class DBImpl : public DB { private: friend class DB; friend class InternalStats; + friend class TransactionImpl; #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 80f6a898a..864896f9b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -20,6 +20,8 @@ class Iterator; class TransactionDB; class WriteBatchWithIndex; +typedef std::string TransactionName; + // Provides notification to the caller of SetSnapshotOnNextOperation when // the actual snapshot gets created class TransactionNotifier { @@ -114,6 +116,9 @@ class Transaction { // longer be valid and should be discarded after a call to ClearSnapshot(). virtual void ClearSnapshot() = 0; + // Prepare the current transation for 2PC + virtual Status Prepare() = 0; + // Write all batched keys to the db atomically. // // Returns OK on success. @@ -132,7 +137,7 @@ class Transaction { virtual Status Commit() = 0; // Discard all batched writes in this transaction. - virtual void Rollback() = 0; + virtual Status Rollback() = 0; // Records the state of the transaction for future calls to // RollbackToSavePoint(). May be called multiple times to set multiple save @@ -378,10 +383,41 @@ class Transaction { const Slice& key) = 0; virtual void UndoGetForUpdate(const Slice& key) = 0; + virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) = 0; + + virtual WriteBatch* GetCommitTimeWriteBatch() = 0; + + virtual void SetLogNumber(uint64_t log) { log_number_ = log; } + + virtual uint64_t GetLogNumber() { return log_number_; } + + virtual Status SetName(const TransactionName& name) = 0; + + virtual TransactionName GetName() { return name_; } + + enum ExecutionStatus { + STARTED = 0, + AWAITING_PREPARE = 1, + PREPARED = 2, + AWAITING_COMMIT = 3, + COMMITED = 4, + AWAITING_ROLLBACK = 5, + ROLLEDBACK = 6, + LOCKS_STOLEN = 7, + }; + + // Execution status of the transaction. + std::atomic exec_status_; + protected: explicit Transaction(const TransactionDB* db) {} Transaction() {} + // the log in which the prepared section for this txn resides + // (for two phase commit) + uint64_t log_number_; + TransactionName name_; + private: // No copying allowed Transaction(const Transaction&); diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 39f0f8a3f..4b6475424 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -124,6 +124,9 @@ class TransactionDB : public StackableDB { const TransactionOptions& txn_options = TransactionOptions(), Transaction* old_txn = nullptr) = 0; + virtual Transaction* GetTransactionByName(const TransactionName& name) = 0; + virtual void GetAllPreparedTransactions(std::vector* trans) = 0; + protected: // To Create an TransactionDB, call Open() explicit TransactionDB(DB* db) : StackableDB(db) {} diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 2647b3dd7..1a9b6436b 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -52,6 +52,11 @@ void OptimisticTransactionImpl::Clear() { TransactionBaseImpl::Clear(); } +Status OptimisticTransactionImpl::Prepare() { + return Status::InvalidArgument( + "Two phase commit not supported for optimistic transactions."); +} + Status OptimisticTransactionImpl::Commit() { // Set up callback which will call CheckTransactionForConflicts() to // check whether this transaction is safe to be committed. @@ -75,7 +80,10 @@ Status OptimisticTransactionImpl::Commit() { return s; } -void OptimisticTransactionImpl::Rollback() { Clear(); } +Status OptimisticTransactionImpl::Rollback() { + Clear(); + return Status::OK(); +} // Record this key so that we can check it for conflicts at commit time. Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, @@ -123,6 +131,10 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { true /* cache_only */); } +Status OptimisticTransactionImpl::SetName(const TransactionName& name) { + return Status::InvalidArgument("Optimistic transactions cannot be named."); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index 4876a100d..b09fe6706 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -38,9 +38,13 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options); + Status Prepare() override; + Status Commit() override; - void Rollback() override; + Status Rollback() override; + + Status SetName(const TransactionName& name) override; protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 01bab827a..a65e6331c 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -19,11 +19,18 @@ namespace rocksdb { TransactionBaseImpl::TransactionBaseImpl(DB* db, const WriteOptions& write_options) : db_(db), + dbimpl_(reinterpret_cast(db)), write_options_(write_options), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), start_time_(db_->GetEnv()->NowMicros()), write_batch_(cmp_, 0, true), - indexing_enabled_(true) {} + indexing_enabled_(true) { + assert(dynamic_cast(db_) != nullptr); + log_number_ = 0; + if (dbimpl_->allow_2pc()) { + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } +} TransactionBaseImpl::~TransactionBaseImpl() { // Release snapshot if snapshot is set @@ -33,10 +40,15 @@ TransactionBaseImpl::~TransactionBaseImpl() { void TransactionBaseImpl::Clear() { save_points_.reset(nullptr); write_batch_.Clear(); + commit_time_batch_.Clear(); tracked_keys_.clear(); num_puts_ = 0; num_deletes_ = 0; num_merges_ = 0; + + if (dbimpl_->allow_2pc()) { + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } } void TransactionBaseImpl::Reinitialize(DB* db, @@ -44,6 +56,8 @@ void TransactionBaseImpl::Reinitialize(DB* db, Clear(); ClearSnapshot(); db_ = db; + name_.clear(); + log_number_ = 0; write_options_ = write_options; start_time_ = db_->GetEnv()->NowMicros(); indexing_enabled_ = true; @@ -51,11 +65,7 @@ void TransactionBaseImpl::Reinitialize(DB* db, } void TransactionBaseImpl::SetSnapshot() { - assert(dynamic_cast(db_) != nullptr); - auto db_impl = reinterpret_cast(db_); - - const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary(); - + const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary(); SetSnapshotInternal(snapshot); } @@ -571,6 +581,56 @@ void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family, } } +Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) { + struct IndexedWriteBatchBuilder : public WriteBatch::Handler { + Transaction* txn_; + DBImpl* db_; + IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db) + : txn_(txn), db_(db) { + assert(dynamic_cast(txn_) != nullptr); + } + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return txn_->Put(db_->GetColumnFamilyHandle(cf), key, val); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return txn_->Delete(db_->GetColumnFamilyHandle(cf), key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return txn_->Merge(db_->GetColumnFamilyHandle(cf), key, val); + } + + // this is used for reconstructing prepared transactions upon + // recovery. there should not be any meta markers in the batches + // we are processing. + Status MarkBeginPrepare() override { return Status::InvalidArgument(); } + + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + }; + + IndexedWriteBatchBuilder copycat(this, dbimpl_); + return src_batch->Iterate(©cat); +} + +WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() { + return &commit_time_batch_; +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 256a53aad..a605d434d 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -213,6 +213,12 @@ class TransactionBaseImpl : public Transaction { // Used for memory management for snapshot_ void ReleaseSnapshot(const Snapshot* snapshot, DB* db); + // iterates over the given batch and makes the appropriate inserts. + // used for rebuilding prepared transactions after recovery. + Status RebuildFromWriteBatch(WriteBatch* src_batch) override; + + WriteBatch* GetCommitTimeWriteBatch() override; + protected: // Add a key to the list of tracked keys. // @@ -236,6 +242,7 @@ class TransactionBaseImpl : public Transaction { void SetSnapshotIfNeeded(); DB* db_; + DBImpl* dbimpl_; WriteOptions write_options_; @@ -279,6 +286,9 @@ class TransactionBaseImpl : public Transaction { // Records writes pending in this transaction WriteBatchWithIndex write_batch_; + // batch to be written at commit time + WriteBatch commit_time_batch_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be // nullptr if there was no snapshot at the time SetSavePoint() was called. std::unique_ptr> save_points_; diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index ef03f3454..f5be8526b 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -23,12 +23,15 @@ namespace rocksdb { TransactionDBImpl::TransactionDBImpl(DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), + db_impl_(dynamic_cast(db)), txn_db_options_(txn_db_options), lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, txn_db_options_.custom_mutex_factory ? txn_db_options_.custom_mutex_factory : std::shared_ptr( - new TransactionDBMutexFactoryImpl())) {} + new TransactionDBMutexFactoryImpl())) { + assert(db_impl_ != nullptr); +} Transaction* TransactionDBImpl::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, @@ -100,7 +103,9 @@ Status TransactionDB::Open( } } - s = DB::Open(db_options, dbname, column_families_copy, handles, &db); + DBOptions db_options_2pc = db_options; + db_options_2pc.allow_2pc = true; + s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db); if (s.ok()) { TransactionDBImpl* txn_db = new TransactionDBImpl( @@ -121,6 +126,37 @@ Status TransactionDB::Open( } s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles); + + // create 'real' transactions from recovered shell transactions + assert(dynamic_cast(db) != nullptr); + auto dbimpl = reinterpret_cast(db); + auto rtrxs = dbimpl->recovered_transactions(); + + for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) { + auto recovered_trx = it->second; + assert(recovered_trx); + assert(recovered_trx->log_number_); + assert(recovered_trx->name_.length()); + + WriteOptions w_options; + TransactionOptions t_options; + + Transaction* real_trx = + txn_db->BeginTransaction(w_options, t_options, nullptr); + assert(real_trx); + real_trx->SetLogNumber(recovered_trx->log_number_); + + s = real_trx->SetName(recovered_trx->name_); + if (!s.ok()) { + break; + } + + s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); + real_trx->exec_status_ = Transaction::PREPARED; + if (!s.ok()) { + break; + } + } } return s; @@ -315,5 +351,45 @@ void TransactionDBImpl::ReinitializeTransaction( txn_impl->Reinitialize(this, write_options, txn_options); } +Transaction* TransactionDBImpl::GetTransactionByName( + const TransactionName& name) { + std::lock_guard lock(name_map_mutex_); + auto it = transactions_.find(name); + if (it == transactions_.end()) { + return nullptr; + } else { + return it->second; + } +} + +void TransactionDBImpl::GetAllPreparedTransactions( + std::vector* transv) { + assert(transv); + transv->clear(); + std::lock_guard lock(name_map_mutex_); + for (auto it = transactions_.begin(); it != transactions_.end(); it++) { + if (it->second->exec_status_ == Transaction::PREPARED) { + transv->push_back(it->second); + } + } +} + +void TransactionDBImpl::RegisterTransaction(Transaction* txn) { + assert(txn); + assert(txn->GetName().length() > 0); + assert(GetTransactionByName(txn->GetName()) == nullptr); + assert(txn->exec_status_ == Transaction::STARTED); + std::lock_guard lock(name_map_mutex_); + transactions_[txn->GetName()] = txn; +} + +void TransactionDBImpl::UnregisterTransaction(Transaction* txn) { + assert(txn); + std::lock_guard lock(name_map_mutex_); + auto it = transactions_.find(txn->GetName()); + assert(it != transactions_.end()); + transactions_.erase(it); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index 7b7d646a1..61ea86673 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -7,8 +7,10 @@ #ifndef ROCKSDB_LITE #include +#include #include #include +#include #include "rocksdb/db.h" #include "rocksdb/options.h" @@ -78,11 +80,20 @@ class TransactionDBImpl : public TransactionDB { // is expirable (GetExpirationTime() > 0) and that it is expired. bool TryStealingExpiredTransactionLocks(TransactionID tx_id); + Transaction* GetTransactionByName(const TransactionName& name) override; + + void RegisterTransaction(Transaction* txn); + void UnregisterTransaction(Transaction* txn); + + // not thread safe. current use case is during recovery (single thread) + void GetAllPreparedTransactions(std::vector* trans) override; + private: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); + DBImpl* db_impl_; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -97,6 +108,10 @@ class TransactionDBImpl : public TransactionDB { std::mutex map_mutex_; std::unordered_map expirable_transactions_map_; + + // map from name to two phase transaction instance + std::mutex name_map_mutex_; + std::unordered_map transactions_; }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 8f80433a8..82bf8ccac 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -41,11 +41,13 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, txn_db_impl_(nullptr), txn_id_(0), expiration_time_(0), - lock_timeout_(0), - exec_status_(STARTED) { + lock_timeout_(0) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); + db_impl_ = dynamic_cast(txn_db->GetBaseDB()); + assert(db_impl_); + Initialize(txn_options); } @@ -81,6 +83,15 @@ TransactionImpl::~TransactionImpl() { if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); } + if (!name_.empty() && exec_status_ != COMMITED) { + txn_db_impl_->UnregisterTransaction(this); + } + // if we have a prep section that was never committed + // and we are releasing the transaction then we + // can release that prep section + if (log_number_ != 0 && exec_status_ != COMMITED) { + dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + } } void TransactionImpl::Clear() { @@ -91,6 +102,15 @@ void TransactionImpl::Clear() { void TransactionImpl::Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) { + if (!name_.empty() && exec_status_ != COMMITED) { + txn_db_impl_->UnregisterTransaction(this); + } + // if we have a prep section that was never committed + // and we are releasing the transaction then we + // can release that prep section + if (log_number_ != 0 && exec_status_ != COMMITED) { + dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + } TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); Initialize(txn_options); } @@ -108,61 +128,216 @@ bool TransactionImpl::IsExpired() const { Status TransactionImpl::CommitBatch(WriteBatch* batch) { TransactionKeyMap keys_to_unlock; - Status s = LockBatch(batch, &keys_to_unlock); - if (s.ok()) { - s = DoCommit(batch); + if (!s.ok()) { + return s; + } - txn_db_impl_->UnLock(this, &keys_to_unlock); + bool can_commit = false; + + if (IsExpired()) { + s = Status::Expired(); + } else if (expiration_time_ > 0) { + ExecutionStatus expected = STARTED; + can_commit = std::atomic_compare_exchange_strong(&exec_status_, &expected, + AWAITING_COMMIT); + } else if (exec_status_ == STARTED) { + // lock stealing is not a concern + can_commit = true; + } + + if (can_commit) { + exec_status_.store(AWAITING_COMMIT); + s = db_->Write(write_options_, batch); + if (s.ok()) { + exec_status_.store(COMMITED); + } + } else if (exec_status_ == LOCKS_STOLEN) { + s = Status::Expired(); + } else { + s = Status::InvalidArgument("Transaction is not in state for commit."); } + txn_db_impl_->UnLock(this, &keys_to_unlock); + return s; } -Status TransactionImpl::Commit() { - Status s = DoCommit(GetWriteBatch()->GetWriteBatch()); +Status TransactionImpl::Prepare() { + Status s; + + if (name_.empty()) { + return Status::InvalidArgument( + "Cannot prepare a transaction that has not been named."); + } - Clear(); + if (IsExpired()) { + return Status::Expired(); + } + + bool can_prepare = false; + + if (expiration_time_ > 0) { + // must concern ourselves with expiraton and/or lock stealing + // need to compare/exchange bc locks could be stolen under us here + ExecutionStatus expected = STARTED; + can_prepare = std::atomic_compare_exchange_strong(&exec_status_, &expected, + AWAITING_PREPARE); + } else if (exec_status_ == STARTED) { + // expiration and lock stealing is not possible + can_prepare = true; + } + + if (can_prepare) { + exec_status_.store(AWAITING_PREPARE); + // transaction can't expire after preparation + expiration_time_ = 0; + WriteOptions write_options = write_options_; + write_options.disableWAL = false; + WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &log_number_, /*log ref*/ 0, + /* disable_memtable*/ true); + if (s.ok()) { + assert(log_number_ != 0); + dbimpl_->MarkLogAsContainingPrepSection(log_number_); + exec_status_.store(PREPARED); + } + } else if (exec_status_ == LOCKS_STOLEN) { + s = Status::Expired(); + } else if (exec_status_ == PREPARED) { + s = Status::InvalidArgument("Transaction has already been prepared."); + } else if (exec_status_ == COMMITED) { + s = Status::InvalidArgument("Transaction has already been committed."); + } else if (exec_status_ == ROLLEDBACK) { + s = Status::InvalidArgument("Transaction has already been rolledback."); + } else { + s = Status::InvalidArgument("Transaction is not in state for commit."); + } return s; } -Status TransactionImpl::DoCommit(WriteBatch* batch) { +Status TransactionImpl::Commit() { Status s; + bool commit_single = false; + bool commit_prepared = false; - if (expiration_time_ > 0) { - if (IsExpired()) { - return Status::Expired(); - } + if (IsExpired()) { + return Status::Expired(); + } - // Transaction should only be committed if the thread succeeds - // changing its execution status to COMMITTING. This is because - // A different transaction may consider this one expired and attempt - // to steal its locks between the IsExpired() check and the beginning - // of a commit. + if (expiration_time_ > 0) { + // we must atomicaly compare and exchange the state here because at + // this state in the transaction it is possible for another thread + // to change our state out from under us in the even that we expire and have + // our locks stolen. In this case the only valid state is STARTED because + // a state of PREPARED would have a cleared expiration_time_. ExecutionStatus expected = STARTED; - bool can_commit = std::atomic_compare_exchange_strong( - &exec_status_, &expected, COMMITTING); - + commit_single = std::atomic_compare_exchange_strong( + &exec_status_, &expected, AWAITING_COMMIT); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); + } else if (exec_status_ == PREPARED) { + // expiration and lock stealing is not a concern + commit_prepared = true; + } else if (exec_status_ == STARTED) { + // expiration and lock stealing is not a concern + commit_single = true; + } - if (can_commit) { - s = db_->Write(write_options_, batch); + if (commit_single) { + assert(!commit_prepared); + if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { + s = Status::InvalidArgument( + "Commit-time batch contains values that will not be committed."); } else { - assert(exec_status_ == LOCKS_STOLEN); - return Status::Expired(); + exec_status_.store(AWAITING_COMMIT); + s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); + Clear(); + if (s.ok()) { + exec_status_.store(COMMITED); + } } + } else if (commit_prepared) { + exec_status_.store(AWAITING_COMMIT); + WriteOptions write_options = write_options_; + + // insert prepared batch into Memtable only. + // Memtable will ignore BeginPrepare/EndPrepare markers + // in non recovery mode and simply insert the values + write_options.disableWAL = true; + assert(log_number_ > 0); + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + nullptr, nullptr, log_number_); + if (!s.ok()) { + return s; + } + + // We take the commit-time batch and append the Commit marker. + // We then write this batch to both WAL and Memtable. + // The Memtable will ignore the Commit marker in non-recovery mode + write_options.disableWAL = false; + WriteBatchInternal::MarkCommit(GetCommitTimeWriteBatch(), name_); + s = db_impl_->WriteImpl(write_options, GetCommitTimeWriteBatch()); + if (!s.ok()) { + return s; + } + + // FindObsoleteFiles must now look to the memtables + // to determine what prep logs must be kept around, + // not the prep section heap. + assert(log_number_ > 0); + dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + txn_db_impl_->UnregisterTransaction(this); + + Clear(); + exec_status_.store(COMMITED); + } else if (exec_status_ == LOCKS_STOLEN) { + s = Status::Expired(); + } else if (exec_status_ == COMMITED) { + s = Status::InvalidArgument("Transaction has already been committed."); + } else if (exec_status_ == ROLLEDBACK) { + s = Status::InvalidArgument("Transaction has already been rolledback."); } else { - s = db_->Write(write_options_, batch); + s = Status::InvalidArgument("Transaction is not in state for commit."); } return s; } -void TransactionImpl::Rollback() { Clear(); } +Status TransactionImpl::Rollback() { + Status s; + if (exec_status_ == PREPARED) { + WriteBatch rollback_marker; + WriteBatchInternal::MarkRollback(&rollback_marker, name_); + exec_status_.store(AWAITING_ROLLBACK); + s = db_impl_->WriteImpl(write_options_, &rollback_marker); + if (s.ok()) { + // we do not need to keep our prepared section around + assert(log_number_ > 0); + dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + Clear(); + exec_status_.store(ROLLEDBACK); + } + } else if (exec_status_ == STARTED) { + // prepare couldn't have taken place + Clear(); + } else if (exec_status_ == COMMITED) { + s = Status::InvalidArgument("This transaction has already been committed."); + } else { + s = Status::InvalidArgument( + "Two phase transaction is not in state for rollback."); + } + + return s; +} Status TransactionImpl::RollbackToSavePoint() { + if (exec_status_ != STARTED) { + return Status::InvalidArgument("Transaction is beyond state for rollback."); + } + // Unlock any keys locked since last transaction const std::unique_ptr& keys = GetTrackedKeysSinceSavePoint(); @@ -370,6 +545,26 @@ void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } +Status TransactionImpl::SetName(const TransactionName& name) { + Status s; + if (exec_status_ == STARTED) { + if (name_.length()) { + s = Status::InvalidArgument("Transaction has already been named."); + } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) { + s = Status::InvalidArgument("Transaction name must be unique."); + } else if (name.length() < 1 || name.length() > 512) { + s = Status::InvalidArgument( + "Transaction name length must be between 1 and 512 chars."); + } else { + name_ = name; + txn_db_impl_->RegisterTransaction(this); + } + } else { + s = Status::InvalidArgument("Transaction is beyond state for naming."); + } + return s; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index cb02e2834..f1bd10bc1 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -41,14 +41,18 @@ class TransactionImpl : public TransactionBaseImpl { void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options); + Status Prepare() override; + Status Commit() override; Status CommitBatch(WriteBatch* batch); - void Rollback() override; + Status Rollback() override; Status RollbackToSavePoint() override; + Status SetName(const TransactionName& name) override; + // Generate a new unique transaction identifier static TransactionID GenTxnID(); @@ -77,9 +81,8 @@ class TransactionImpl : public TransactionBaseImpl { bool read_only, bool untracked = false) override; private: - enum ExecutionStatus { STARTED, COMMITTING, LOCKS_STOLEN }; - TransactionDBImpl* txn_db_impl_; + DBImpl* db_impl_; // Used to create unique ids for transactions. static std::atomic txn_id_counter_; @@ -94,9 +97,6 @@ class TransactionImpl : public TransactionBaseImpl { // Timeout in microseconds when locking a key or -1 if there is no timeout. int64_t lock_timeout_; - // Execution status of the transaction. - std::atomic exec_status_; - void Clear() override; void Initialize(const TransactionOptions& txn_options); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index f7a1f2ed8..baa9f9263 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -55,6 +55,13 @@ class TransactionTest : public testing::Test { DestroyDB(dbname, options); } + Status ReOpenNoDelete() { + delete db; + db = nullptr; + Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); + return s; + } + Status ReOpen() { delete db; DestroyDB(dbname, options); @@ -113,6 +120,635 @@ TEST_F(TransactionTest, SuccessTest) { delete txn; } +TEST_F(TransactionTest, CommitTimeBatchFailTest) { + WriteOptions write_options; + TransactionOptions txn_options; + + string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"); + + s = txn1->Put("foo", "bar"); + ASSERT_OK(s); + + // fails due to non-empty commit-time batch + s = txn1->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + delete txn1; +} + +TEST_F(TransactionTest, SimpleTwoPhaseTransactionTest) { + WriteOptions write_options; + ReadOptions read_options; + + TransactionOptions txn_options; + + string value; + Status s; + + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); + + // commit time put + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); + + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + s = txn->Prepare(); + ASSERT_OK(s); + + // data not im mem yet + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(read_options, Slice("gtid"), &value); + ASSERT_TRUE(s.IsNotFound()); + + // find trans in list of prepared transactions + std::vector prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); + + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // no longer is prpared results + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + db_impl->TEST_FlushMemTable(true); + + // after memtable flush we can now relese the log + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + delete txn; +} + +TEST_F(TransactionTest, TwoPhaseNameTest) { + Status s; + + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn3); + delete txn3; + + // cant prepare txn without name + s = txn1->Prepare(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // name too short + s = txn1->SetName(""); + ASSERT_EQ(s, Status::InvalidArgument()); + + // name too long + s = txn1->SetName(std::string(513, 'x')); + ASSERT_EQ(s, Status::InvalidArgument()); + + // valid set name + s = txn1->SetName("name1"); + ASSERT_OK(s); + + // cant have duplicate name + s = txn2->SetName("name1"); + ASSERT_EQ(s, Status::InvalidArgument()); + + // shouldn't be able to prepare + s = txn2->Prepare(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // valid name set + s = txn2->SetName("name2"); + ASSERT_OK(s); + + // cant reset name + s = txn2->SetName("name3"); + ASSERT_EQ(s, Status::InvalidArgument()); + + ASSERT_EQ(txn1->GetName(), "name1"); + ASSERT_EQ(txn2->GetName(), "name2"); + + s = txn1->Prepare(); + ASSERT_OK(s); + + // can't rename after prepare + s = txn1->SetName("name4"); + ASSERT_EQ(s, Status::InvalidArgument()); + + delete txn1; + delete txn2; +} + +TEST_F(TransactionTest, TwoPhaseEmptyWriteTest) { + Status s; + std::string value; + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn1->SetName("joe"); + ASSERT_OK(s); + + s = txn2->SetName("bob"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; + + txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); + + s = txn2->Prepare(); + ASSERT_OK(s); + + s = txn2->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + delete txn2; +} + +TEST_F(TransactionTest, TwoPhaseExpirationTest) { + Status s; + + WriteOptions write_options; + TransactionOptions txn_options; + txn_options.expiration = 500; // 500ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn1); + + s = txn1->SetName("joe"); + ASSERT_OK(s); + s = txn2->SetName("bob"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = txn2->Prepare(); + ASSERT_EQ(s, Status::Expired()); + + delete txn1; + delete txn2; +} + +TEST_F(TransactionTest, TwoPhaseRollbackTest) { + WriteOptions write_options; + ReadOptions read_options; + + TransactionOptions txn_options; + + string value; + Status s; + + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("tfoo"), Slice("tbar")); + ASSERT_OK(s); + + // value is readable form txn + s = txn->Get(read_options, Slice("tfoo"), &value); + ASSERT_OK(s); + ASSERT_EQ(value, "tbar"); + + // issue rollback + s = txn->Rollback(); + ASSERT_OK(s); + + // value is nolonger readable + s = txn->Get(read_options, Slice("tfoo"), &value); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(txn->GetNumPuts(), 0); + + // put new txn values + s = txn->Put(Slice("tfoo2"), Slice("tbar2")); + ASSERT_OK(s); + + // new value is readable from txn + s = txn->Get(read_options, Slice("tfoo2"), &value); + ASSERT_OK(s); + ASSERT_EQ(value, "tbar2"); + + s = txn->Prepare(); + ASSERT_OK(s); + + // flush to next wal + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + db_impl->TEST_FlushMemTable(true); + + // issue rollback (marker written to WAL) + s = txn->Rollback(); + ASSERT_OK(s); + + // value is nolonger readable + s = txn->Get(read_options, Slice("tfoo2"), &value); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(txn->GetNumPuts(), 0); + + // make commit + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // try rollback again + s = txn->Rollback(); + ASSERT_EQ(s, Status::InvalidArgument()); +} + +TEST_F(TransactionTest, PersistentTwoPhaseTransactionTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + string value; + Status s; + + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + // txn read + s = txn->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + db_impl->TEST_FlushMemTable(true); + + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); + + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + // still not available to db + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + + // kill and reopen + s = ReOpenNoDelete(); + ASSERT_OK(s); + db_impl = reinterpret_cast(db->GetRootDB()); + + // find trans in list of prepared transactions + std::vector prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + + txn = prepared_trans.front(); + ASSERT_TRUE(txn); + ASSERT_EQ(txn->GetName(), "xid"); + ASSERT_EQ(db->GetTransactionByName("xid"), txn); + + // log has been marked + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); + + // value is readable from txn + s = txn->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + // value is now available + db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar"); + + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); + + // no longer is prpared results + prepared_trans.clear(); + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + + // transaction should no longer be visible + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + db_impl->TEST_FlushMemTable(true); + + // after memtable flush we can now relese the log + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + delete txn; + + // deleting transaction should unregister transaction + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); +} + +TEST_F(TransactionTest, TwoPhaseMultiThreadTest) { + // mix transaction writes and regular writes + const int NUM_TXN_THREADS = 50; + std::atomic txn_thread_num(0); + + std::function txn_write_thread = [&]() { + uint32_t id = txn_thread_num.fetch_add(1); + + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + TransactionOptions txn_options; + txn_options.lock_timeout = 1000000; + if (id % 2 == 0) { + txn_options.expiration = 1000000; + } + TransactionName name("xid_" + std::string(1, 'A' + id)); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName(name)); + for (int i = 0; i < 10; i++) { + std::string key(name + "_" + std::string(1, 'A' + i)); + ASSERT_OK(txn->Put(key, "val")); + } + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + delete txn; + }; + + // assure that all thread are in the same write group + std::atomic t_wait_on_prepare(0); + std::atomic t_wait_on_commit(0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + auto* writer = reinterpret_cast(arg); + + if (writer->ShouldWriteToWAL()) { + t_wait_on_prepare.fetch_add(1); + // wait for friends + while (t_wait_on_prepare.load() < NUM_TXN_THREADS) { + } + } else if (writer->ShouldWriteToMemtable()) { + t_wait_on_commit.fetch_add(1); + // wait for friends + while (t_wait_on_commit.load() < NUM_TXN_THREADS) { + } + } else { + ASSERT_TRUE(false); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { + threads.emplace_back(txn_write_thread); + } + for (auto& t : threads) { + t.join(); + } + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ReadOptions read_options; + std::string value; + Status s; + for (int t = 0; t < NUM_TXN_THREADS; t++) { + TransactionName name("xid_" + std::string(1, 'A' + t)); + for (int i = 0; i < 10; i++) { + std::string key(name + "_" + std::string(1, 'A' + i)); + s = db->Get(read_options, key, &value); + ASSERT_OK(s); + ASSERT_EQ(value, "val"); + } + } +} + +TEST_F(TransactionTest, TwoPhaseLogRollingTest) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + + Status s; + string v; + ColumnFamilyHandle *cfa, *cfb; + + // Create 2 new column families + ColumnFamilyOptions cf_options; + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + WriteOptions wopts; + wopts.disableWAL = false; + wopts.sync = true; + + TransactionOptions topts1; + Transaction* txn1 = db->BeginTransaction(wopts, topts1); + s = txn1->SetName("xid1"); + ASSERT_OK(s); + + TransactionOptions topts2; + Transaction* txn2 = db->BeginTransaction(wopts, topts2); + s = txn2->SetName("xid2"); + ASSERT_OK(s); + + // transaction put in two column families + s = txn1->Put(cfa, "ka1", "va1"); + ASSERT_OK(s); + + // transaction put in two column families + s = txn2->Put(cfa, "ka2", "va2"); + ASSERT_OK(s); + s = txn2->Put(cfb, "kb2", "vb2"); + ASSERT_OK(s); + + // write prep section to wal + s = txn1->Prepare(); + ASSERT_OK(s); + + // our log should be in the heap + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + // flush default cf to crate new log + s = db->Put(wopts, "foo", "bar"); + ASSERT_OK(s); + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // make sure we are on a new log + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + // put txn2 prep section in this log + s = txn2->Prepare(); + ASSERT_OK(s); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + // heap should still see first log + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + + // commit txn1 + s = txn1->Commit(); + ASSERT_OK(s); + + // heap should now show txn2s log + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn2->GetLogNumber()); + + // we should see txn1s log refernced by the memtables + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), + txn1->GetLogNumber()); + + // flush default cf to crate new log + s = db->Put(wopts, "foo", "bar2"); + ASSERT_OK(s); + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); + + // make sure we are on a new log + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + // commit txn2 + s = txn2->Commit(); + ASSERT_OK(s); + + // heap should not show any logs + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + // should show the first txn log + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), + txn1->GetLogNumber()); + + // flush only cfa memtable + s = db_impl->TEST_FlushMemTable(true, cfa); + ASSERT_OK(s); + + // should show the first txn log + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), + txn2->GetLogNumber()); + + // flush only cfb memtable + s = db_impl->TEST_FlushMemTable(true, cfb); + ASSERT_OK(s); + + // should show not dependency on logs + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + delete txn1; + delete txn2; + delete cfa; + delete cfb; +} + TEST_F(TransactionTest, FirstWriteTest) { WriteOptions write_options; @@ -1300,6 +1936,21 @@ TEST_F(TransactionTest, ReinitializeTest) { s = db->Get(read_options, "Y", &value); ASSERT_TRUE(s.IsNotFound()); + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->SetName("name"); + ASSERT_OK(s); + + s = txn1->Prepare(); + ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); + + txn1 = db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->SetName("name"); + ASSERT_OK(s); + delete txn1; }