Make Optimistic Tx database stackable

Summary:
This change models Optimistic Tx db after Pessimistic TX db. The motivation for this change is to make the ptr polymorphic so it can be held by the same raw or smart ptr.

Currently, due to the inheritance of the Opt Tx db not being rooted in the manner of Pess Tx from a single DB root it is more difficult to write clean code and have clear ownership of the database in cases when options dictate instantiate of plan DB, Pess Tx DB or Opt tx db.
Closes https://github.com/facebook/rocksdb/pull/3566

Differential Revision: D7184502

Pulled By: yiwu-arbug

fbshipit-source-id: 31d06efafd79497bb0c230e971857dba3bd962c3
main
Dmitri Smirnov 6 years ago committed by Facebook Github Bot
parent b058a33705
commit 2a62ca1750
  1. 15
      include/rocksdb/utilities/optimistic_transaction_db.h
  2. 12
      utilities/transactions/optimistic_transaction_db_impl.h
  3. 173
      utilities/transactions/optimistic_transaction_test.cc

@ -11,6 +11,7 @@
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/stackable_db.h"
namespace rocksdb {
@ -30,7 +31,7 @@ struct OptimisticTransactionOptions {
const Comparator* cmp = BytewiseComparator();
};
class OptimisticTransactionDB {
class OptimisticTransactionDB : public StackableDB {
public:
// Open an OptimisticTransactionDB similar to DB::Open().
static Status Open(const Options& options, const std::string& dbname,
@ -57,18 +58,12 @@ class OptimisticTransactionDB {
OptimisticTransactionOptions(),
Transaction* old_txn = nullptr) = 0;
// Return the underlying Database that was opened
virtual DB* GetBaseDB() = 0;
OptimisticTransactionDB(const OptimisticTransactionDB&) = delete;
void operator=(const OptimisticTransactionDB&) = delete;
protected:
// To Create an OptimisticTransactionDB, call Open()
explicit OptimisticTransactionDB(DB* /*db*/) {}
OptimisticTransactionDB() {}
private:
// No copying allowed
OptimisticTransactionDB(const OptimisticTransactionDB&);
void operator=(const OptimisticTransactionDB&);
explicit OptimisticTransactionDB(DB* db) : StackableDB(db) {}
};
} // namespace rocksdb

@ -15,11 +15,13 @@ namespace rocksdb {
class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public:
explicit OptimisticTransactionDBImpl(DB* db, bool take_ownership = true)
: OptimisticTransactionDB(db), db_(db), db_owner_(take_ownership) {}
: OptimisticTransactionDB(db), db_owner_(take_ownership) {}
~OptimisticTransactionDBImpl() {
// Prevent this stackable from destroying
// base db
if (!db_owner_) {
db_.release();
db_ = nullptr;
}
}
@ -27,11 +29,9 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
const OptimisticTransactionOptions& txn_options,
Transaction* old_txn) override;
DB* GetBaseDB() override { return db_.get(); }
private:
std::unique_ptr<DB> db_;
bool db_owner_;
bool db_owner_;
void ReinitializeTransaction(Transaction* txn,
const WriteOptions& write_options,

@ -26,7 +26,6 @@ namespace rocksdb {
class OptimisticTransactionTest : public testing::Test {
public:
OptimisticTransactionDB* txn_db;
DB* db;
string dbname;
Options options;
@ -54,7 +53,6 @@ private:
Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db);
assert(s.ok());
assert(txn_db != nullptr);
db = txn_db->GetBaseDB();
}
};
@ -64,8 +62,8 @@ TEST_F(OptimisticTransactionTest, SuccessTest) {
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
txn_db->Put(write_options, Slice("foo"), Slice("bar"));
txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -81,7 +79,7 @@ TEST_F(OptimisticTransactionTest, SuccessTest) {
s = txn->Commit();
ASSERT_OK(s);
db->Get(read_options, "foo", &value);
txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
delete txn;
@ -93,8 +91,8 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) {
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
txn_db->Put(write_options, "foo", "bar");
txn_db->Put(write_options, "foo2", "bar");
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -102,10 +100,10 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) {
txn->Put("foo", "bar2");
// This Put outside of a transaction will conflict with the previous write
s = db->Put(write_options, "foo", "barz");
s = txn_db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
s = txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
ASSERT_EQ(1, txn->GetNumKeys());
@ -113,9 +111,9 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) {
ASSERT_TRUE(s.IsBusy()); // Txn should not commit
// Verify that transaction did not write anything
db->Get(read_options, "foo", &value);
txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
db->Get(read_options, "foo2", &value);
txn_db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar");
delete txn;
@ -128,29 +126,29 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest2) {
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
txn_db->Put(write_options, "foo", "bar");
txn_db->Put(write_options, "foo2", "bar");
txn_options.set_snapshot = true;
Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn);
// This Put outside of a transaction will conflict with a later write
s = db->Put(write_options, "foo", "barz");
s = txn_db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
s = db->Get(read_options, "foo", &value);
s = txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
s = txn->Commit();
ASSERT_TRUE(s.IsBusy()); // Txn should not commit
// Verify that transaction did not write anything
db->Get(read_options, "foo", &value);
txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
db->Get(read_options, "foo2", &value);
txn_db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar");
delete txn;
@ -163,8 +161,8 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) {
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
txn_db->Put(write_options, "foo", "bar");
txn_db->Put(write_options, "foo2", "bar");
txn_options.set_snapshot = true;
Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
@ -177,10 +175,10 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) {
ASSERT_EQ(value, "bar");
// This Put outside of a transaction will conflict with the previous read
s = db->Put(write_options, "foo", "barz");
s = txn_db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
s = txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
s = txn->Commit();
@ -221,8 +219,8 @@ TEST_F(OptimisticTransactionTest, FlushTest) {
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
txn_db->Put(write_options, Slice("foo"), Slice("bar"));
txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -238,18 +236,18 @@ TEST_F(OptimisticTransactionTest, FlushTest) {
ASSERT_EQ(value, "bar2");
// Put a random key so we have a memtable to flush
s = db->Put(write_options, "dummy", "dummy");
s = txn_db->Put(write_options, "dummy", "dummy");
ASSERT_OK(s);
// force a memtable flush
FlushOptions flush_ops;
db->Flush(flush_ops);
txn_db->Flush(flush_ops);
s = txn->Commit();
// txn should commit since the flushed table is still in MemtableList History
ASSERT_OK(s);
db->Get(read_options, "foo", &value);
txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
delete txn;
@ -261,8 +259,8 @@ TEST_F(OptimisticTransactionTest, FlushTest2) {
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
txn_db->Put(write_options, Slice("foo"), Slice("bar"));
txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -278,33 +276,33 @@ TEST_F(OptimisticTransactionTest, FlushTest2) {
ASSERT_EQ(value, "bar2");
// Put a random key so we have a MemTable to flush
s = db->Put(write_options, "dummy", "dummy");
s = txn_db->Put(write_options, "dummy", "dummy");
ASSERT_OK(s);
// force a memtable flush
FlushOptions flush_ops;
db->Flush(flush_ops);
txn_db->Flush(flush_ops);
// Put a random key so we have a MemTable to flush
s = db->Put(write_options, "dummy", "dummy2");
s = txn_db->Put(write_options, "dummy", "dummy2");
ASSERT_OK(s);
// force a memtable flush
db->Flush(flush_ops);
txn_db->Flush(flush_ops);
s = db->Put(write_options, "dummy", "dummy3");
s = txn_db->Put(write_options, "dummy", "dummy3");
ASSERT_OK(s);
// force a memtable flush
// Since our test db has max_write_buffer_number=2, this flush will cause
// the first memtable to get purged from the MemtableList history.
db->Flush(flush_ops);
txn_db->Flush(flush_ops);
s = txn->Commit();
// txn should not commit since MemTableList History is not large enough
ASSERT_TRUE(s.IsTryAgain());
db->Get(read_options, "foo", &value);
txn_db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar");
delete txn;
@ -316,13 +314,13 @@ TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
string value;
Status s;
db->Put(write_options, "AAA", "bar");
txn_db->Put(write_options, "AAA", "bar");
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
// Modify key after transaction start
db->Put(write_options, "AAA", "bar1");
txn_db->Put(write_options, "AAA", "bar1");
// Read and write without a snapshot
txn->GetForUpdate(read_options, "AAA", &value);
@ -345,14 +343,14 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
string value;
Status s;
db->Put(write_options, "AAA", "bar");
db->Put(write_options, "BBB", "bar");
db->Put(write_options, "CCC", "bar");
txn_db->Put(write_options, "AAA", "bar");
txn_db->Put(write_options, "BBB", "bar");
txn_db->Put(write_options, "CCC", "bar");
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
db->Put(write_options, "AAA", "bar1");
txn_db->Put(write_options, "AAA", "bar1");
// Read and write without a snapshot
txn->GetForUpdate(read_options, "AAA", &value);
@ -360,7 +358,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
txn->Put("AAA", "bar2");
// Modify BBB before snapshot is taken
db->Put(write_options, "BBB", "bar1");
txn_db->Put(write_options, "BBB", "bar1");
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
@ -370,7 +368,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
ASSERT_EQ(value, "bar1");
txn->Put("BBB", "bar2");
db->Put(write_options, "CCC", "bar1");
txn_db->Put(write_options, "CCC", "bar1");
// Set a new snapshot
txn->SetSnapshot();
@ -391,26 +389,26 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "AAA", &value);
s = txn_db->Get(read_options, "AAA", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = db->Get(read_options, "BBB", &value);
s = txn_db->Get(read_options, "BBB", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = db->Get(read_options, "CCC", &value);
s = txn_db->Get(read_options, "CCC", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "AAA", &value);
s = txn_db->Get(read_options, "AAA", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "BBB", &value);
s = txn_db->Get(read_options, "BBB", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "CCC", &value);
s = txn_db->Get(read_options, "CCC", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
@ -419,8 +417,8 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
txn = txn_db->BeginTransaction(write_options);
// Potentially conflicting writes
db->Put(write_options, "ZZZ", "zzz");
db->Put(write_options, "XXX", "xxx");
txn_db->Put(write_options, "ZZZ", "zzz");
txn_db->Put(write_options, "XXX", "xxx");
txn->SetSnapshot();
@ -457,9 +455,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
ColumnFamilyOptions cf_options;
// Create 2 new column families
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
delete cfa;
@ -482,7 +480,6 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
&txn_db);
ASSERT_OK(s);
assert(txn_db != nullptr);
db = txn_db->GetBaseDB();
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -499,9 +496,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
batch.Put("foo", "foo");
batch.Put(handles[1], "AAA", "bar");
batch.Put(handles[1], "AAAZZZ", "bar");
s = db->Write(write_options, &batch);
s = txn_db->Write(write_options, &batch);
ASSERT_OK(s);
db->Delete(write_options, handles[1], "AAAZZZ");
txn_db->Delete(write_options, handles[1], "AAAZZZ");
// These keys do no conflict with existing writes since they're in
// different column families
@ -516,9 +513,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "AAA", &value);
s = txn_db->Get(read_options, "AAA", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, handles[2], "AAAZZZ", &value);
s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value);
ASSERT_EQ(value, "barbar");
Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
@ -534,7 +531,7 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
// Verify txn did not commit
s = txn2->Commit();
ASSERT_TRUE(s.IsBusy());
s = db->Get(read_options, handles[1], "AAAZZZ", &value);
s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value);
ASSERT_EQ(value, "barbar");
delete txn;
@ -572,11 +569,11 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, handles[2], "ZZZ", &value);
s = txn_db->Get(read_options, handles[2], "ZZZ", &value);
ASSERT_TRUE(s.IsNotFound());
// Put a key which will conflict with the next txn using the previous snapshot
db->Put(write_options, handles[2], "foo", "000");
txn_db->Put(write_options, handles[2], "foo", "000");
results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
multiget_keys, &values);
@ -592,9 +589,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
s = txn2->Commit();
ASSERT_TRUE(s.IsBusy());
s = db->DropColumnFamily(handles[1]);
s = txn_db->DropColumnFamily(handles[1]);
ASSERT_OK(s);
s = db->DropColumnFamily(handles[2]);
s = txn_db->DropColumnFamily(handles[2]);
ASSERT_OK(s);
delete txn;
@ -611,7 +608,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) {
string value;
Status s;
s = db->Put(write_options, "aaa", "aaa");
s = txn_db->Put(write_options, "aaa", "aaa");
ASSERT_OK(s);
Transaction* txn = txn_db->BeginTransaction(write_options);
@ -636,7 +633,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) {
s = txn->GetForUpdate(read_options, "aaa", &value);
ASSERT_EQ(value, "aaa");
s = db->Put(write_options, "aaa", "xxx");
s = txn_db->Put(write_options, "aaa", "xxx");
s = txn->Commit();
ASSERT_TRUE(s.IsBusy());
delete txn;
@ -799,7 +796,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) {
delete txn1;
delete txn2;
s = db->Get(read_options, "1", &value);
s = txn_db->Get(read_options, "1", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "8");
}
@ -814,7 +811,7 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
Transaction* txn = txn_db->BeginTransaction(write_options);
txn->PutUntracked("untracked", "0");
txn->Rollback();
s = db->Get(read_options, "untracked", &value);
s = txn_db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
@ -827,13 +824,13 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
// Write to the untracked key outside of the transaction and verify
// it doesn't prevent the transaction from committing.
s = db->Put(write_options, "untracked", "x");
s = txn_db->Put(write_options, "untracked", "x");
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "untracked", &value);
s = txn_db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
@ -844,12 +841,12 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
// Write to tracked key outside of the transaction and verify that the
// untracked keys are not written when the commit fails.
s = db->Delete(write_options, "tracked");
s = txn_db->Delete(write_options, "tracked");
s = txn->Commit();
ASSERT_TRUE(s.IsBusy());
s = db->Get(read_options, "untracked", &value);
s = txn_db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
@ -863,19 +860,19 @@ TEST_F(OptimisticTransactionTest, IteratorTest) {
Status s;
// Write some keys to the db
s = db->Put(write_options, "A", "a");
s = txn_db->Put(write_options, "A", "a");
ASSERT_OK(s);
s = db->Put(write_options, "G", "g");
s = txn_db->Put(write_options, "G", "g");
ASSERT_OK(s);
s = db->Put(write_options, "F", "f");
s = txn_db->Put(write_options, "F", "f");
ASSERT_OK(s);
s = db->Put(write_options, "C", "c");
s = txn_db->Put(write_options, "C", "c");
ASSERT_OK(s);
s = db->Put(write_options, "D", "d");
s = txn_db->Put(write_options, "D", "d");
ASSERT_OK(s);
Transaction* txn = txn_db->BeginTransaction(write_options);
@ -898,10 +895,10 @@ TEST_F(OptimisticTransactionTest, IteratorTest) {
const Snapshot* snapshot = txn->GetSnapshot();
// Write some keys to the db after the snapshot
s = db->Put(write_options, "BB", "xx");
s = txn_db->Put(write_options, "BB", "xx");
ASSERT_OK(s);
s = db->Put(write_options, "C", "xx");
s = txn_db->Put(write_options, "C", "xx");
ASSERT_OK(s);
read_options.snapshot = snapshot;
@ -995,7 +992,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "B", &value);
s = txn_db->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_EQ("b", value);
@ -1107,28 +1104,28 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "F", &value);
s = txn_db->Get(read_options, "F", &value);
ASSERT_OK(s);
ASSERT_EQ("f", value);
s = db->Get(read_options, "G", &value);
s = txn_db->Get(read_options, "G", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, "A", &value);
s = txn_db->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("aa", value);
s = db->Get(read_options, "B", &value);
s = txn_db->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_EQ("b", value);
s = db->Get(read_options, "C", &value);
s = txn_db->Get(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, "D", &value);
s = txn_db->Get(read_options, "D", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, "E", &value);
s = txn_db->Get(read_options, "E", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
@ -1141,7 +1138,7 @@ TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) {
string value;
Status s;
db->Put(write_options, "A", "");
txn_db->Put(write_options, "A", "");
Transaction* txn1 = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn1);
@ -1350,7 +1347,7 @@ TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) {
}
// Verify that data is consistent
Status s = RandomTransactionInserter::Verify(db, num_sets);
Status s = RandomTransactionInserter::Verify(txn_db, num_sets);
ASSERT_OK(s);
}

Loading…
Cancel
Save