optimistic transactions support for reinitialization

Summary: Extend optimization in D53835 to optimistic transactions for completeness.

Test Plan: added test

Reviewers: sdong, IslamAbdelRahman, horuff, jkedgar

Reviewed By: horuff

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D55059
main
agiardullo 9 years ago
parent badd6b7846
commit 2200295ee1
  1. 16
      include/rocksdb/utilities/optimistic_transaction_db.h
  2. 25
      utilities/transactions/optimistic_transaction_db_impl.cc
  3. 11
      utilities/transactions/optimistic_transaction_db_impl.h
  4. 12
      utilities/transactions/optimistic_transaction_impl.cc
  5. 6
      utilities/transactions/optimistic_transaction_impl.h
  6. 84
      utilities/transactions/optimistic_transaction_test.cc

@ -43,15 +43,19 @@ class OptimisticTransactionDB {
virtual ~OptimisticTransactionDB() {}
// Starts a new Transaction. Passing set_snapshot=true has the same effect
// as calling SetSnapshot().
// Starts a new Transaction.
//
// Caller should delete the returned transaction after calling
// Commit() or Rollback().
// Caller is responsible for deleting the returned transaction when no
// longer needed.
//
// If old_txn is not null, BeginTransaction will reuse this Transaction
// handle instead of allocating a new one. This is an optimization to avoid
// extra allocations when repeatedly creating transactions.
virtual Transaction* BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions&
txn_options = OptimisticTransactionOptions()) = 0;
const OptimisticTransactionOptions& txn_options =
OptimisticTransactionOptions(),
Transaction* old_txn = nullptr) = 0;
// Return the underlying Database that was opened
virtual DB* GetBaseDB() = 0;

@ -5,11 +5,11 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include <string>
#include <vector>
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
@ -20,11 +20,13 @@ namespace rocksdb {
Transaction* OptimisticTransactionDBImpl::BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) {
Transaction* txn =
new OptimisticTransactionImpl(this, write_options, txn_options);
return txn;
const OptimisticTransactionOptions& txn_options, Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new OptimisticTransactionImpl(this, write_options, txn_options);
}
}
Status OptimisticTransactionDB::Open(const Options& options,
@ -76,5 +78,14 @@ Status OptimisticTransactionDB::Open(
return s;
}
void OptimisticTransactionDBImpl::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) {
assert(dynamic_cast<OptimisticTransactionImpl*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<OptimisticTransactionImpl*>(txn);
txn_impl->Reinitialize(this, write_options, txn_options);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -19,14 +19,19 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
~OptimisticTransactionDBImpl() {}
Transaction* BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) override;
Transaction* BeginTransaction(const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options,
Transaction* old_txn) override;
DB* GetBaseDB() override { return db_.get(); }
private:
std::unique_ptr<DB> db_;
void ReinitializeTransaction(Transaction* txn,
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options =
OptimisticTransactionOptions());
};
} // namespace rocksdb

@ -28,11 +28,23 @@ OptimisticTransactionImpl::OptimisticTransactionImpl(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) {
Initialize(txn_options);
}
void OptimisticTransactionImpl::Initialize(
const OptimisticTransactionOptions& txn_options) {
if (txn_options.set_snapshot) {
SetSnapshot();
}
}
void OptimisticTransactionImpl::Reinitialize(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) {
TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
Initialize(txn_options);
}
OptimisticTransactionImpl::~OptimisticTransactionImpl() {
}

@ -34,6 +34,10 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
virtual ~OptimisticTransactionImpl();
void Reinitialize(OptimisticTransactionDB* txn_db,
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options);
Status Commit() override;
void Rollback() override;
@ -47,6 +51,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
friend class OptimisticTransactionCallback;
void Initialize(const OptimisticTransactionOptions& txn_options);
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
// if there are read or write conflicts that would prevent us from committing
// OR if we can not determine whether there would be any such conflicts.

@ -1267,6 +1267,90 @@ TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) {
delete txn1;
}
TEST_F(OptimisticTransactionTest, ReinitializeTest) {
WriteOptions write_options;
ReadOptions read_options;
OptimisticTransactionOptions txn_options;
string value;
Status s;
Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "z");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Put("Z", "zz");
ASSERT_OK(s);
// Reinitilize txn1 and verify that zz is not written
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
s = txn1->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "z");
// Verify snapshots get reinitialized correctly
txn1->SetSnapshot();
s = txn1->Put("Z", "zzzz");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "zzzz");
const Snapshot* snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot);
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot);
txn_options.set_snapshot = true;
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot);
s = txn1->Put("Z", "a");
ASSERT_OK(s);
txn1->Rollback();
s = txn1->Put("Y", "y");
ASSERT_OK(s);
txn_options.set_snapshot = false;
txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1);
snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot);
s = txn1->Put("X", "x");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "zzzz");
s = db->Get(read_options, "Y", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn1;
}
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save