TransactionDB:ReinitializeTransaction

Summary: Add function to reinitialize a transaction object so that it can be reused.  This is an optimization so users can potentially avoid reallocating transaction objects.

Test Plan: added tests

Reviewers: yhchiang, kradhakrishnan, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: jkedgar, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D53835
main
agiardullo 9 years ago
parent 1f5954147b
commit 5ea9aa3c14
  1. 14
      include/rocksdb/utilities/transaction_db.h
  2. 17
      utilities/transactions/transaction_base.cc
  3. 6
      utilities/transactions/transaction_base.h
  4. 23
      utilities/transactions/transaction_db_impl.cc
  5. 7
      utilities/transactions/transaction_db_impl.h
  6. 29
      utilities/transactions/transaction_impl.cc
  7. 9
      utilities/transactions/transaction_impl.h
  8. 91
      utilities/transactions/transaction_test.cc

@ -111,14 +111,18 @@ class TransactionDB : public StackableDB {
virtual ~TransactionDB() {}
// Starts a new Transaction. Passing set_snapshot=true has the same effect
// as calling Transaction::SetSnapshot().
// Starts a new Transaction.
//
// Caller should delete the returned transaction after calling
// Transaction::Commit() or Transaction::Rollback().
// Caller is responsible for deleting the returned transaction when no
// longer needed.
//
// If old_txn is not null, BeginTransaction will reuse this Transaction
// handle instead of allocating a new one. This is an optimization to avoid
// extra allocations when repeatedly creating transactions.
virtual Transaction* BeginTransaction(
const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions()) = 0;
const TransactionOptions& txn_options = TransactionOptions(),
Transaction* old_txn = nullptr) = 0;
protected:
// To Create an TransactionDB, call Open()

@ -24,7 +24,10 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true) {}
TransactionBaseImpl::~TransactionBaseImpl() {}
TransactionBaseImpl::~TransactionBaseImpl() {
// Release snapshot if snapshot is set
SetSnapshotInternal(nullptr);
}
void TransactionBaseImpl::Clear() {
save_points_.reset(nullptr);
@ -35,12 +38,22 @@ void TransactionBaseImpl::Clear() {
num_merges_ = 0;
}
void TransactionBaseImpl::Reinitialize(const WriteOptions& write_options) {
Clear();
write_options_ = write_options;
start_time_ = db_->GetEnv()->NowMicros();
}
void TransactionBaseImpl::SetSnapshot() {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db_);
const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary();
SetSnapshotInternal(snapshot);
}
void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
// Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
// be released, not deleted when it is no longer referenced.
snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
@ -493,7 +506,9 @@ WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
}
void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
if (snapshot != nullptr) {
db->ReleaseSnapshot(snapshot);
}
}
void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,

@ -32,6 +32,8 @@ class TransactionBaseImpl : public Transaction {
// Remove pending operations queued in this transaction.
virtual void Clear();
void Reinitialize(const WriteOptions& write_options);
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
// untracked will be true if called from PutUntracked, DeleteUntracked, or
@ -240,7 +242,7 @@ class TransactionBaseImpl : public Transaction {
const Comparator* cmp_;
// Stores that time the txn was constructed, in microseconds.
const uint64_t start_time_;
uint64_t start_time_;
// Stores the current snapshot that was was set by SetSnapshot or null if
// no snapshot is currently set.
@ -306,6 +308,8 @@ class TransactionBaseImpl : public Transaction {
bool read_only, bool untracked = false);
WriteBatchBase* GetBatchForWrite();
void SetSnapshotInternal(const Snapshot* snapshot);
};
} // namespace rocksdb

@ -31,10 +31,14 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
new TransactionDBMutexFactoryImpl())) {}
Transaction* TransactionDBImpl::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options) {
Transaction* txn = new TransactionImpl(this, write_options, txn_options);
return txn;
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new TransactionImpl(this, write_options, txn_options);
}
}
TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions(
@ -173,7 +177,7 @@ void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id,
Transaction* TransactionDBImpl::BeginInternalTransaction(
const WriteOptions& options) {
TransactionOptions txn_options;
Transaction* txn = BeginTransaction(options, txn_options);
Transaction* txn = BeginTransaction(options, txn_options, nullptr);
assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
@ -302,5 +306,14 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
return tx.TryStealingLocks();
}
void TransactionDBImpl::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
txn_impl->Reinitialize(write_options, txn_options);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -26,7 +26,8 @@ class TransactionDBImpl : public TransactionDB {
~TransactionDBImpl() {}
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options) override;
const TransactionOptions& txn_options,
Transaction* old_txn) override;
using StackableDB::Put;
virtual Status Put(const WriteOptions& options,
@ -78,6 +79,10 @@ class TransactionDBImpl : public TransactionDB {
bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
private:
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;

@ -39,21 +39,34 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
const TransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options),
txn_db_impl_(nullptr),
txn_id_(GenTxnID()),
expiration_time_(txn_options.expiration >= 0
? start_time_ + txn_options.expiration * 1000
: 0),
lock_timeout_(txn_options.lock_timeout * 1000),
txn_id_(0),
expiration_time_(0),
lock_timeout_(0),
exec_status_(STARTED) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_);
Initialize(txn_options);
}
void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
txn_id_ = GenTxnID();
exec_status_ = STARTED;
lock_timeout_ = txn_options.lock_timeout * 1000;
if (lock_timeout_ < 0) {
// Lock timeout not set, use default
lock_timeout_ =
txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
}
if (txn_options.expiration >= 0) {
expiration_time_ = start_time_ + txn_options.expiration * 1000;
} else {
expiration_time_ = 0;
}
if (txn_options.set_snapshot) {
SetSnapshot();
}
@ -74,6 +87,12 @@ void TransactionImpl::Clear() {
TransactionBaseImpl::Clear();
}
void TransactionImpl::Reinitialize(const WriteOptions& write_options,
const TransactionOptions& txn_options) {
TransactionBaseImpl::Reinitialize(write_options);
Initialize(txn_options);
}
bool TransactionImpl::IsExpired() const {
if (expiration_time_ > 0) {
if (db_->GetEnv()->NowMicros() >= expiration_time_) {

@ -38,6 +38,9 @@ class TransactionImpl : public TransactionBaseImpl {
virtual ~TransactionImpl();
void Reinitialize(const WriteOptions& write_options,
const TransactionOptions& txn_options);
Status Commit() override;
Status CommitBatch(WriteBatch* batch);
@ -82,11 +85,11 @@ class TransactionImpl : public TransactionBaseImpl {
static std::atomic<TransactionID> txn_id_counter_;
// Unique ID for this transaction
const TransactionID txn_id_;
TransactionID txn_id_;
// If non-zero, this transaction should not be committed after this time (in
// microseconds according to Env->NowMicros())
const uint64_t expiration_time_;
uint64_t expiration_time_;
// Timeout in microseconds when locking a key or -1 if there is no timeout.
int64_t lock_timeout_;
@ -96,6 +99,8 @@ class TransactionImpl : public TransactionBaseImpl {
void Clear() override;
void Initialize(const TransactionOptions& txn_options);
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno);

@ -448,7 +448,6 @@ TEST_F(TransactionTest, FlushTest2) {
s = txn->Delete("S");
// Should fail after encountering a write to S in SST file
fprintf(stderr, "%" ROCKSDB_PRIszt " %s\n", n, s.ToString().c_str());
ASSERT_TRUE(s.IsBusy());
// Write a bunch of keys to db to force a compaction
@ -1210,6 +1209,96 @@ TEST_F(TransactionTest, ExpiredTransaction) {
delete txn2;
}
TEST_F(TransactionTest, ReinitializeTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
// Set txn expiration timeout to 0 microseconds (expires instantly)
txn_options.expiration = 0;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Reinitialize transaction to no long expire
txn_options.expiration = -1;
db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "z");
ASSERT_OK(s);
// Should commit since not expired
s = txn1->Commit();
ASSERT_OK(s);
db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "zz");
ASSERT_OK(s);
// Reinitilize txn1 and verify that Z gets unlocked
db->BeginTransaction(write_options, txn_options, txn1);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
s = txn2->Put("Z", "zzz");
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "zzz");
// Verify snapshots get reinitialized correctly
txn1->SetSnapshot();
s = txn1->Put("Z", "zzzz");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "zzzz");
db->BeginTransaction(write_options, txn_options, txn1);
const Snapshot* snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot);
txn_options.set_snapshot = true;
db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot);
s = txn1->Put("Z", "a");
ASSERT_OK(s);
txn1->Rollback();
s = txn1->Put("Y", "y");
ASSERT_OK(s);
txn_options.set_snapshot = false;
db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot();
s = txn1->Put("X", "x");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "zzzz");
s = db->Get(read_options, "Y", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn1;
}
TEST_F(TransactionTest, Rollback) {
WriteOptions write_options;
ReadOptions read_options;

Loading…
Cancel
Save