From 5ea9aa3c146f77e45563f30f346aa3e8f66082c3 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Tue, 2 Feb 2016 19:19:17 -0800 Subject: [PATCH] TransactionDB:ReinitializeTransaction Summary: Add function to reinitialize a transaction object so that it can be reused. This is an optimization so users can potentially avoid reallocating transaction objects. Test Plan: added tests Reviewers: yhchiang, kradhakrishnan, IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: jkedgar, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53835 --- include/rocksdb/utilities/transaction_db.h | 14 ++- utilities/transactions/transaction_base.cc | 19 +++- utilities/transactions/transaction_base.h | 6 +- utilities/transactions/transaction_db_impl.cc | 23 ++++- utilities/transactions/transaction_db_impl.h | 7 +- utilities/transactions/transaction_impl.cc | 29 +++++- utilities/transactions/transaction_impl.h | 9 +- utilities/transactions/transaction_test.cc | 91 ++++++++++++++++++- 8 files changed, 176 insertions(+), 22 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 35b06d899..ff29bc57a 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -111,14 +111,18 @@ class TransactionDB : public StackableDB { virtual ~TransactionDB() {} - // Starts a new Transaction. Passing set_snapshot=true has the same effect - // as calling Transaction::SetSnapshot(). + // Starts a new Transaction. // - // Caller should delete the returned transaction after calling - // Transaction::Commit() or Transaction::Rollback(). + // Caller is responsible for deleting the returned transaction when no + // longer needed. + // + // If old_txn is not null, BeginTransaction will reuse this Transaction + // handle instead of allocating a new one. This is an optimization to avoid + // extra allocations when repeatedly creating transactions. virtual Transaction* BeginTransaction( const WriteOptions& write_options, - const TransactionOptions& txn_options = TransactionOptions()) = 0; + const TransactionOptions& txn_options = TransactionOptions(), + Transaction* old_txn = nullptr) = 0; protected: // To Create an TransactionDB, call Open() diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 2754d38cb..72d12c607 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -24,7 +24,10 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, start_time_(db_->GetEnv()->NowMicros()), write_batch_(cmp_, 0, true) {} -TransactionBaseImpl::~TransactionBaseImpl() {} +TransactionBaseImpl::~TransactionBaseImpl() { + // Release snapshot if snapshot is set + SetSnapshotInternal(nullptr); +} void TransactionBaseImpl::Clear() { save_points_.reset(nullptr); @@ -35,12 +38,22 @@ void TransactionBaseImpl::Clear() { num_merges_ = 0; } +void TransactionBaseImpl::Reinitialize(const WriteOptions& write_options) { + Clear(); + write_options_ = write_options; + start_time_ = db_->GetEnv()->NowMicros(); +} + void TransactionBaseImpl::SetSnapshot() { assert(dynamic_cast(db_) != nullptr); auto db_impl = reinterpret_cast(db_); const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary(); + SetSnapshotInternal(snapshot); +} + +void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) { // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to // be released, not deleted when it is no longer referenced. snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot, @@ -493,7 +506,9 @@ WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { } void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { - db->ReleaseSnapshot(snapshot); + if (snapshot != nullptr) { + db->ReleaseSnapshot(snapshot); + } } void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family, diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index cb8ca2483..86903ea1f 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -32,6 +32,8 @@ class TransactionBaseImpl : public Transaction { // Remove pending operations queued in this transaction. virtual void Clear(); + void Reinitialize(const WriteOptions& write_options); + // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. // untracked will be true if called from PutUntracked, DeleteUntracked, or @@ -240,7 +242,7 @@ class TransactionBaseImpl : public Transaction { const Comparator* cmp_; // Stores that time the txn was constructed, in microseconds. - const uint64_t start_time_; + uint64_t start_time_; // Stores the current snapshot that was was set by SetSnapshot or null if // no snapshot is currently set. @@ -306,6 +308,8 @@ class TransactionBaseImpl : public Transaction { bool read_only, bool untracked = false); WriteBatchBase* GetBatchForWrite(); + + void SetSnapshotInternal(const Snapshot* snapshot); }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index bc5b9e596..b02d7bd25 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -31,10 +31,14 @@ TransactionDBImpl::TransactionDBImpl(DB* db, new TransactionDBMutexFactoryImpl())) {} Transaction* TransactionDBImpl::BeginTransaction( - const WriteOptions& write_options, const TransactionOptions& txn_options) { - Transaction* txn = new TransactionImpl(this, write_options, txn_options); - - return txn; + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { + if (old_txn != nullptr) { + ReinitializeTransaction(old_txn, write_options, txn_options); + return old_txn; + } else { + return new TransactionImpl(this, write_options, txn_options); + } } TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions( @@ -173,7 +177,7 @@ void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, Transaction* TransactionDBImpl::BeginInternalTransaction( const WriteOptions& options) { TransactionOptions txn_options; - Transaction* txn = BeginTransaction(options, txn_options); + Transaction* txn = BeginTransaction(options, txn_options, nullptr); assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); @@ -302,5 +306,14 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks( return tx.TryStealingLocks(); } +void TransactionDBImpl::ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const TransactionOptions& txn_options) { + assert(dynamic_cast(txn) != nullptr); + auto txn_impl = reinterpret_cast(txn); + + txn_impl->Reinitialize(write_options, txn_options); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index ace218d19..7b7d646a1 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -26,7 +26,8 @@ class TransactionDBImpl : public TransactionDB { ~TransactionDBImpl() {} Transaction* BeginTransaction(const WriteOptions& write_options, - const TransactionOptions& txn_options) override; + const TransactionOptions& txn_options, + Transaction* old_txn) override; using StackableDB::Put; virtual Status Put(const WriteOptions& options, @@ -78,6 +79,10 @@ class TransactionDBImpl : public TransactionDB { bool TryStealingExpiredTransactionLocks(TransactionID tx_id); private: + void ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const TransactionOptions& txn_options = TransactionOptions()); + const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 7cda2cd0e..33393751d 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -39,21 +39,34 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, const TransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_impl_(nullptr), - txn_id_(GenTxnID()), - expiration_time_(txn_options.expiration >= 0 - ? start_time_ + txn_options.expiration * 1000 - : 0), - lock_timeout_(txn_options.lock_timeout * 1000), + txn_id_(0), + expiration_time_(0), + lock_timeout_(0), exec_status_(STARTED) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); + Initialize(txn_options); +} + +void TransactionImpl::Initialize(const TransactionOptions& txn_options) { + txn_id_ = GenTxnID(); + + exec_status_ = STARTED; + + lock_timeout_ = txn_options.lock_timeout * 1000; if (lock_timeout_ < 0) { // Lock timeout not set, use default lock_timeout_ = txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; } + if (txn_options.expiration >= 0) { + expiration_time_ = start_time_ + txn_options.expiration * 1000; + } else { + expiration_time_ = 0; + } + if (txn_options.set_snapshot) { SetSnapshot(); } @@ -74,6 +87,12 @@ void TransactionImpl::Clear() { TransactionBaseImpl::Clear(); } +void TransactionImpl::Reinitialize(const WriteOptions& write_options, + const TransactionOptions& txn_options) { + TransactionBaseImpl::Reinitialize(write_options); + Initialize(txn_options); +} + bool TransactionImpl::IsExpired() const { if (expiration_time_ > 0) { if (db_->GetEnv()->NowMicros() >= expiration_time_) { diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 01521f172..8a8ed6531 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -38,6 +38,9 @@ class TransactionImpl : public TransactionBaseImpl { virtual ~TransactionImpl(); + void Reinitialize(const WriteOptions& write_options, + const TransactionOptions& txn_options); + Status Commit() override; Status CommitBatch(WriteBatch* batch); @@ -82,11 +85,11 @@ class TransactionImpl : public TransactionBaseImpl { static std::atomic txn_id_counter_; // Unique ID for this transaction - const TransactionID txn_id_; + TransactionID txn_id_; // If non-zero, this transaction should not be committed after this time (in // microseconds according to Env->NowMicros()) - const uint64_t expiration_time_; + uint64_t expiration_time_; // Timeout in microseconds when locking a key or -1 if there is no timeout. int64_t lock_timeout_; @@ -96,6 +99,8 @@ class TransactionImpl : public TransactionBaseImpl { void Clear() override; + void Initialize(const TransactionOptions& txn_options); + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index fec6c974f..809dc9506 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -448,7 +448,6 @@ TEST_F(TransactionTest, FlushTest2) { s = txn->Delete("S"); // Should fail after encountering a write to S in SST file - fprintf(stderr, "%" ROCKSDB_PRIszt " %s\n", n, s.ToString().c_str()); ASSERT_TRUE(s.IsBusy()); // Write a bunch of keys to db to force a compaction @@ -1210,6 +1209,96 @@ TEST_F(TransactionTest, ExpiredTransaction) { delete txn2; } +TEST_F(TransactionTest, ReinitializeTest) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + string value; + Status s; + + // Set txn expiration timeout to 0 microseconds (expires instantly) + txn_options.expiration = 0; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Reinitialize transaction to no long expire + txn_options.expiration = -1; + db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "z"); + ASSERT_OK(s); + + // Should commit since not expired + s = txn1->Commit(); + ASSERT_OK(s); + + db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "zz"); + ASSERT_OK(s); + + // Reinitilize txn1 and verify that Z gets unlocked + db->BeginTransaction(write_options, txn_options, txn1); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr); + s = txn2->Put("Z", "zzz"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzz"); + + // Verify snapshots get reinitialized correctly + txn1->SetSnapshot(); + s = txn1->Put("Z", "zzzz"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + db->BeginTransaction(write_options, txn_options, txn1); + const Snapshot* snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot); + + txn_options.set_snapshot = true; + db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot); + + s = txn1->Put("Z", "a"); + ASSERT_OK(s); + + txn1->Rollback(); + + s = txn1->Put("Y", "y"); + ASSERT_OK(s); + + txn_options.set_snapshot = false; + db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + + s = txn1->Put("X", "x"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + s = db->Get(read_options, "Y", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn1; +} + TEST_F(TransactionTest, Rollback) { WriteOptions write_options; ReadOptions read_options;