Improve snapshot handling for Transaction reinitialization

Summary: Previously, reusing a transaction (by passing it as an argument to BeginTransaction) would not clear the transaction's snapshot.  This is not a clear, well-definited behavior.

Test Plan: improved test

Reviewers: sdong, IslamAbdelRahman, horuff, jkedgar

Reviewed By: jkedgar

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D55053
main
agiardullo 9 years ago
parent 171c8e80b1
commit 200080ed72
  1. 10
      utilities/transactions/transaction_base.cc
  2. 6
      utilities/transactions/transaction_base.h
  3. 2
      utilities/transactions/transaction_db_impl.cc
  4. 6
      utilities/transactions/transaction_impl.cc
  5. 2
      utilities/transactions/transaction_impl.h
  6. 15
      utilities/transactions/transaction_test.cc

@ -22,7 +22,8 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
write_options_(write_options), write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
start_time_(db_->GetEnv()->NowMicros()), start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true) {} write_batch_(cmp_, 0, true),
indexing_enabled_(true) {}
TransactionBaseImpl::~TransactionBaseImpl() { TransactionBaseImpl::~TransactionBaseImpl() {
// Release snapshot if snapshot is set // Release snapshot if snapshot is set
@ -38,10 +39,15 @@ void TransactionBaseImpl::Clear() {
num_merges_ = 0; num_merges_ = 0;
} }
void TransactionBaseImpl::Reinitialize(const WriteOptions& write_options) { void TransactionBaseImpl::Reinitialize(DB* db,
const WriteOptions& write_options) {
Clear(); Clear();
ClearSnapshot();
db_ = db;
write_options_ = write_options; write_options_ = write_options;
start_time_ = db_->GetEnv()->NowMicros(); start_time_ = db_->GetEnv()->NowMicros();
indexing_enabled_ = true;
cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
} }
void TransactionBaseImpl::SetSnapshot() { void TransactionBaseImpl::SetSnapshot() {

@ -32,7 +32,7 @@ class TransactionBaseImpl : public Transaction {
// Remove pending operations queued in this transaction. // Remove pending operations queued in this transaction.
virtual void Clear(); virtual void Clear();
void Reinitialize(const WriteOptions& write_options); void Reinitialize(DB* db, const WriteOptions& write_options);
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
@ -235,7 +235,7 @@ class TransactionBaseImpl : public Transaction {
// Sets a snapshot if SetSnapshotOnNextOperation() has been called. // Sets a snapshot if SetSnapshotOnNextOperation() has been called.
void SetSnapshotIfNeeded(); void SetSnapshotIfNeeded();
DB* const db_; DB* db_;
WriteOptions write_options_; WriteOptions write_options_;
@ -294,7 +294,7 @@ class TransactionBaseImpl : public Transaction {
// WriteBatchWithIndex. // WriteBatchWithIndex.
// If false, future Put/Merge/Deletes will be inserted directly into the // If false, future Put/Merge/Deletes will be inserted directly into the
// underlying WriteBatch and not indexed in the WriteBatchWithIndex. // underlying WriteBatch and not indexed in the WriteBatchWithIndex.
bool indexing_enabled_ = true; bool indexing_enabled_;
// SetSnapshotOnNextOperation() has been called and the snapshot has not yet // SetSnapshotOnNextOperation() has been called and the snapshot has not yet
// been reset. // been reset.

@ -312,7 +312,7 @@ void TransactionDBImpl::ReinitializeTransaction(
assert(dynamic_cast<TransactionImpl*>(txn) != nullptr); assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<TransactionImpl*>(txn); auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
txn_impl->Reinitialize(write_options, txn_options); txn_impl->Reinitialize(this, write_options, txn_options);
} }
} // namespace rocksdb } // namespace rocksdb

@ -70,6 +70,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
if (txn_options.set_snapshot) { if (txn_options.set_snapshot) {
SetSnapshot(); SetSnapshot();
} }
if (expiration_time_ > 0) { if (expiration_time_ > 0) {
txn_db_impl_->InsertExpirableTransaction(txn_id_, this); txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
} }
@ -87,9 +88,10 @@ void TransactionImpl::Clear() {
TransactionBaseImpl::Clear(); TransactionBaseImpl::Clear();
} }
void TransactionImpl::Reinitialize(const WriteOptions& write_options, void TransactionImpl::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options) { const TransactionOptions& txn_options) {
TransactionBaseImpl::Reinitialize(write_options); TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
Initialize(txn_options); Initialize(txn_options);
} }

@ -38,7 +38,7 @@ class TransactionImpl : public TransactionBaseImpl {
virtual ~TransactionImpl(); virtual ~TransactionImpl();
void Reinitialize(const WriteOptions& write_options, void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options); const TransactionOptions& txn_options);
Status Commit() override; Status Commit() override;

@ -1222,7 +1222,7 @@ TEST_F(TransactionTest, ReinitializeTest) {
// Reinitialize transaction to no long expire // Reinitialize transaction to no long expire
txn_options.expiration = -1; txn_options.expiration = -1;
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "z"); s = txn1->Put("Z", "z");
ASSERT_OK(s); ASSERT_OK(s);
@ -1231,13 +1231,13 @@ TEST_F(TransactionTest, ReinitializeTest) {
s = txn1->Commit(); s = txn1->Commit();
ASSERT_OK(s); ASSERT_OK(s);
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "zz"); s = txn1->Put("Z", "zz");
ASSERT_OK(s); ASSERT_OK(s);
// Reinitilize txn1 and verify that Z gets unlocked // Reinitilize txn1 and verify that Z gets unlocked
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr); Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
s = txn2->Put("Z", "zzz"); s = txn2->Put("Z", "zzz");
@ -1262,12 +1262,12 @@ TEST_F(TransactionTest, ReinitializeTest) {
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ(value, "zzzz"); ASSERT_EQ(value, "zzzz");
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
const Snapshot* snapshot = txn1->GetSnapshot(); const Snapshot* snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot); ASSERT_FALSE(snapshot);
txn_options.set_snapshot = true; txn_options.set_snapshot = true;
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot(); snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot); ASSERT_TRUE(snapshot);
@ -1280,8 +1280,9 @@ TEST_F(TransactionTest, ReinitializeTest) {
ASSERT_OK(s); ASSERT_OK(s);
txn_options.set_snapshot = false; txn_options.set_snapshot = false;
db->BeginTransaction(write_options, txn_options, txn1); txn1 = db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot(); snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot);
s = txn1->Put("X", "x"); s = txn1->Put("X", "x");
ASSERT_OK(s); ASSERT_OK(s);

Loading…
Cancel
Save