Clarify the ownership of root db after TransactionDB::Open

Summary:
The patch clarifies the ownership of the root db after TransactionDB::Open. If it is a success the ownership if with the TransactionDB, and the root db will be deleted when the destructor of the base class, StackableDB, is called. If it is failure, the temporarily created root db will also be deleted properly.
The patch also includes lots of useful formatting changes.

Closes https://github.com/facebook/rocksdb/pull/3714 upon which this patch is built.
Closes https://github.com/facebook/rocksdb/pull/3806

Differential Revision: D7878010

Pulled By: maysamyabandeh

fbshipit-source-id: f54f3942e29434143ae5a2423ceec9c7072cd4c2
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 3272bc07c6
commit 66c7aa32fb
  1. 20
      include/rocksdb/utilities/transaction_db.h
  2. 48
      utilities/transactions/pessimistic_transaction_db.cc
  3. 186
      utilities/transactions/transaction_test.cc
  4. 122
      utilities/transactions/transaction_test.h

@ -193,6 +193,7 @@ class TransactionDB : public StackableDB {
}
// Open a TransactionDB similar to DB::Open().
// Internally call PrepareWrap() and WrapDB()
// If the return status is not ok, then dbptr is set to nullptr.
static Status Open(const Options& options,
const TransactionDBOptions& txn_db_options,
const std::string& dbname, TransactionDB** dbptr);
@ -203,27 +204,29 @@ class TransactionDB : public StackableDB {
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
TransactionDB** dbptr);
// The following functions are used to open a TransactionDB internally using
// an opened DB or StackableDB.
// 1. Call prepareWrap(), passing an empty std::vector<size_t> to
// compaction_enabled_cf_indices.
// 2. Open DB or Stackable DB with db_options and column_families passed to
// prepareWrap()
// Note: PrepareWrap() may change parameters, make copies before the
// invocation if needed.
// 3. Call Wrap*DB() with compaction_enabled_cf_indices in step 1 and handles
// of the opened DB/StackableDB in step 2
static void PrepareWrap(DBOptions* db_options,
std::vector<ColumnFamilyDescriptor>* column_families,
std::vector<size_t>* compaction_enabled_cf_indices);
// If the return status is not ok, then dbptr will bet set to nullptr. The
// input db parameter might or might not be deleted as a result of the
// failure. If it is properly deleted it will be set to nullptr. If the return
// status is ok, the ownership of db is transferred to dbptr.
static Status WrapDB(DB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles,
TransactionDB** dbptr);
// If the return status is not ok, then dbptr will bet set to nullptr. The
// input db parameter might or might not be deleted as a result of the
// failure. If it is properly deleted it will be set to nullptr. If the return
// status is ok, the ownership of db is transferred to dbptr.
static Status WrapStackableDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr);
// Since the destructor in StackableDB is virtual, this destructor is virtual
// too. The root db will be deleted by the base's destructor.
~TransactionDB() override {}
// Starts a new Transaction.
@ -252,6 +255,7 @@ class TransactionDB : public StackableDB {
protected:
// To Create an TransactionDB, call Open()
// The ownership of db is transferred to the base StackableDB
explicit TransactionDB(DB* db) : StackableDB(db) {}
private:

@ -207,7 +207,7 @@ Status TransactionDB::Open(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
Status s;
DB* db;
DB* db = nullptr;
ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32,
static_cast<int>(txn_db_options.write_policy));
@ -223,6 +223,10 @@ Status TransactionDB::Open(
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
dbptr);
}
if (!s.ok()) {
// just in case it was not deleted (and not set to nullptr).
delete db;
}
return s;
}
@ -254,48 +258,62 @@ Status TransactionDB::WrapDB(
DB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
PessimisticTransactionDB* txn_db;
assert(db != nullptr);
assert(dbptr != nullptr);
*dbptr = nullptr;
std::unique_ptr<PessimisticTransactionDB> txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
txn_db.reset(new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
txn_db.reset(new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
}
txn_db->UpdateCFComparatorMap(handles);
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
// In case of a failure at this point, db is deleted via the txn_db destructor
// and set to nullptr.
if (s.ok()) {
*dbptr = txn_db.release();
}
return s;
}
Status TransactionDB::WrapStackableDB(
// make sure this stackable_db is already opened with memtable history
// enabled,
// auto compaction distabled and 2 phase commit enabled
// enabled, auto compaction distabled and 2 phase commit enabled
StackableDB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
PessimisticTransactionDB* txn_db;
assert(db != nullptr);
assert(dbptr != nullptr);
*dbptr = nullptr;
std::unique_ptr<PessimisticTransactionDB> txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
txn_db.reset(new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
txn_db.reset(new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
}
txn_db->UpdateCFComparatorMap(handles);
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
// In case of a failure at this point, db is deleted via the txn_db destructor
// and set to nullptr.
if (s.ok()) {
*dbptr = txn_db.release();
}
return s;
}

@ -97,11 +97,10 @@ TEST_P(TransactionTest, SuccessTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
std::string value;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn);
@ -109,24 +108,19 @@ TEST_P(TransactionTest, SuccessTest) {
ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_LE(0, txn->GetID());
s = txn->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
ASSERT_EQ(value, "bar");
s = txn->Put(Slice("foo"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
ASSERT_EQ(1, txn->GetNumPuts());
s = txn->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
ASSERT_EQ(value, "bar2");
s = txn->Commit();
ASSERT_OK(s);
ASSERT_OK(txn->Commit());
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_OK(db->Get(read_options, "foo", &value));
ASSERT_EQ(value, "bar2");
delete txn;
@ -135,22 +129,18 @@ TEST_P(TransactionTest, SuccessTest) {
// This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest, ValidateSnapshotTest) {
for (bool with_2pc : {true, false}) {
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
std::string value;
Transaction* txn1 =
db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn1);
s = txn1->Put(Slice("foo"), Slice("bar1"));
ASSERT_OK(s);
ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
if (with_2pc) {
s = txn1->SetName("xid1");
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Prepare());
}
Transaction* txn2 =
@ -158,14 +148,13 @@ TEST_P(TransactionTest, ValidateSnapshotTest) {
ASSERT_TRUE(txn2);
txn2->SetSnapshot();
s = txn1->Commit();
ASSERT_OK(s);
ASSERT_OK(txn1->Commit());
delete txn1;
auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
// Test the simple case where the key is not tracked yet
auto trakced_seq = kMaxSequenceNumber;
s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
&trakced_seq);
ASSERT_TRUE(s.IsBusy());
delete txn2;
@ -351,8 +340,8 @@ TEST_P(TransactionTest, SharedLocks) {
s = txn2->GetForUpdate(read_options, "foo", nullptr);
ASSERT_OK(s);
txn1->Rollback();
txn2->Rollback();
ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
// Test txn1 trying to downgrade its lock.
s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
@ -752,13 +741,13 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) {
WriteOptions write_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
txn1->GetCommitTimeWriteBatch()->Put("cat", "dog");
ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
s = txn1->Put("foo", "bar");
ASSERT_OK(s);
@ -774,7 +763,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) {
TransactionOptions txn_options;
WriteOptions write_options;
options.write_buffer_size = 1024;
ReOpenNoDelete();
ASSERT_OK(ReOpenNoDelete());
Random rnd(47);
std::vector<Transaction*> txns;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -808,7 +797,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) {
TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
for (bool cwb4recovery : {true, false}) {
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions write_options;
ReadOptions read_options;
@ -1011,7 +1000,7 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
if (!cwb4recovery && test_with_empty_wal) {
continue;
}
ReOpen();
ASSERT_OK(ReOpen());
Status s;
std::string value;
@ -1111,7 +1100,7 @@ TEST_P(TransactionTest, TwoPhaseRollbackTest) {
TransactionOptions txn_options;
string value;
std::string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -1182,7 +1171,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
TransactionOptions txn_options;
string value;
std::string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -1575,7 +1564,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Status s;
string v;
std::string v;
ColumnFamilyHandle *cfa, *cfb;
// Create 2 new column families
@ -2017,7 +2006,7 @@ TEST_P(TransactionTest, WriteConflictTest2) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
db->Put(write_options, "foo", "bar");
@ -2065,7 +2054,7 @@ TEST_P(TransactionTest, ReadConflictTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
db->Put(write_options, "foo", "bar");
@ -2103,7 +2092,7 @@ TEST_P(TransactionTest, TxnOnlyTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
@ -2121,7 +2110,7 @@ TEST_P(TransactionTest, TxnOnlyTest) {
TEST_P(TransactionTest, FlushTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
std::string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
@ -2313,7 +2302,7 @@ TEST_P(TransactionTest, FlushTest2) {
TEST_P(TransactionTest, NoSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
db->Put(write_options, "AAA", "bar");
@ -2343,12 +2332,12 @@ TEST_P(TransactionTest, NoSnapshotTest) {
TEST_P(TransactionTest, MultipleSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
std::string value;
Status s;
db->Put(write_options, "AAA", "bar");
db->Put(write_options, "BBB", "bar");
db->Put(write_options, "CCC", "bar");
ASSERT_OK(db->Put(write_options, "AAA", "bar"));
ASSERT_OK(db->Put(write_options, "BBB", "bar"));
ASSERT_OK(db->Put(write_options, "CCC", "bar"));
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
@ -2356,24 +2345,24 @@ TEST_P(TransactionTest, MultipleSnapshotTest) {
db->Put(write_options, "AAA", "bar1");
// Read and write without a snapshot
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
ASSERT_EQ(value, "bar1");
s = txn->Put("AAA", "bar2");
ASSERT_OK(s);
// Modify BBB before snapshot is taken
db->Put(write_options, "BBB", "bar1");
ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
// Read and write with snapshot
txn->GetForUpdate(snapshot_read_options, "BBB", &value);
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
ASSERT_EQ(value, "bar1");
s = txn->Put("BBB", "bar2");
ASSERT_OK(s);
db->Put(write_options, "CCC", "bar1");
ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
// Set a new snapshot
txn->SetSnapshot();
@ -2812,7 +2801,7 @@ TEST_P(TransactionTest, LostUpdate) {
WriteOptions write_options;
ReadOptions read_options, read_options1, read_options2;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
// Test 2 transactions writing to the same key in multiple orders and
@ -2937,7 +2926,7 @@ TEST_P(TransactionTest, LostUpdate) {
TEST_P(TransactionTest, UntrackedWrites) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
// Verify transaction rollback works for untracked keys.
@ -3028,7 +3017,7 @@ TEST_P(TransactionTest, ReinitializeTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
// Set txn expiration timeout to 0 microseconds (expires instantly)
@ -3134,7 +3123,7 @@ TEST_P(TransactionTest, Rollback) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
@ -3277,7 +3266,7 @@ TEST_P(TransactionTest, IteratorTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
// Write some keys to the db
@ -3395,7 +3384,7 @@ TEST_P(TransactionTest, IteratorTest) {
TEST_P(TransactionTest, DisableIndexingTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
@ -3458,7 +3447,7 @@ TEST_P(TransactionTest, SavepointTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
@ -3750,7 +3739,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
txn_options.lock_timeout = 1; // 1 ms
@ -3894,7 +3883,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
std::string value;
Status s;
s = db->Put(write_options, "A", "");
@ -4099,7 +4088,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) {
TEST_P(TransactionTest, TimeoutTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
delete db;
@ -4236,7 +4225,7 @@ TEST_P(TransactionTest, TimeoutTest) {
TEST_P(TransactionTest, SingleDeleteTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
@ -4334,7 +4323,7 @@ TEST_P(TransactionTest, SingleDeleteTest) {
TEST_P(TransactionTest, MergeTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
@ -4390,7 +4379,7 @@ TEST_P(TransactionTest, MergeTest) {
TEST_P(TransactionTest, DeferSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
Status s;
s = db->Put(write_options, "A", "a0");
@ -4441,7 +4430,7 @@ TEST_P(TransactionTest, DeferSnapshotTest) {
TEST_P(TransactionTest, DeferSnapshotTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
std::string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options);
@ -4498,7 +4487,7 @@ TEST_P(TransactionTest, DeferSnapshotTest2) {
TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
std::string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options);
@ -4606,7 +4595,7 @@ TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
WriteOptions write_options;
ReadOptions read_options;
string value;
std::string value;
class Notifier : public TransactionNotifier {
private:
@ -4665,7 +4654,7 @@ TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
TEST_P(TransactionTest, ClearSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
std::string value;
Status s;
s = db->Put(write_options, "foo", "0");
@ -4906,7 +4895,7 @@ TEST_P(TransactionTest, MemoryLimitTest) {
TransactionOptions txn_options;
// Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
txn_options.max_write_batch_size = 29;
string value;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
@ -4942,7 +4931,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
FlushOptions fopt;
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
// Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
// of the branches. This is the same as counting a binary number where i-th
@ -4956,7 +4945,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
return n & filter;
};
const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
for (size_t n = 0; n < max_n; n++, ReOpen()) {
for (size_t n = 0; n < max_n; n++) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
size_t branch = 0;
auto seq = db_impl->GetLatestSequenceNumber();
@ -4966,13 +4955,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
@ -4988,13 +4977,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
@ -5005,13 +4994,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
@ -5023,13 +5012,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
@ -5040,17 +5029,18 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
ASSERT_OK(ReOpen());
}
}
@ -5062,7 +5052,7 @@ TEST_P(TransactionTest, Optimizations) {
optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("k"), Slice("v1"));
@ -5111,7 +5101,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
std::string cf_name = "two";
ColumnFamilyHandle* cf_handle = nullptr;
{
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("key"), Slice("value"));
@ -5144,7 +5134,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
// Test with non-bytewise comparator
{
ReOpen();
ASSERT_OK(ReOpen());
std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
cf_options.comparator = comp_gc.get();
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
@ -5188,8 +5178,8 @@ TEST_P(TransactionTest, DuplicateKeys) {
if (with_commit_batch && do_rollback) {
continue;
}
ReOpen();
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
ASSERT_OK(ReOpen());
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
WriteOptions write_options;
@ -5316,11 +5306,11 @@ TEST_P(TransactionTest, DuplicateKeys) {
// verify that.
cf_options.max_successive_merges = 2;
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
ReOpen();
ASSERT_OK(ReOpen());
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
WriteOptions write_options;
// Ensure one value for the key
db->Put(write_options, cf_handle, Slice("key"), Slice("value"));
ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
WriteBatch batch;
// Merge more than max_successive_merges times
batch.Merge(cf_handle, Slice("key"), Slice("1"));
@ -5344,14 +5334,14 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
txn0->SetSavePoint();
txn0->RollbackToSavePoint();
ASSERT_OK(txn0->RollbackToSavePoint());
ASSERT_OK(txn0->Commit());
delete txn0;
}
// Test sucessfull recovery after a crash
{
ReOpen();
ASSERT_OK(ReOpen());
TransactionOptions txn_options;
WriteOptions write_options;
ReadOptions ropt;
@ -5383,7 +5373,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
@ -5442,7 +5432,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
@ -5475,7 +5465,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
@ -5502,7 +5492,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
@ -5529,7 +5519,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(txn0->Prepare());
delete txn0;
// This will check the asserts inside recovery code
db->FlushWAL(true);
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);

@ -51,7 +51,7 @@ class TransactionTestBase : public ::testing::Test {
TransactionTestBase(bool use_stackable_db, bool two_write_queue,
TxnDBWritePolicy write_policy)
: use_stackable_db_(use_stackable_db) {
: db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.write_buffer_size = 4 * 1024;
@ -78,6 +78,7 @@ class TransactionTestBase : public ::testing::Test {
~TransactionTestBase() {
delete db;
db = nullptr;
// This is to skip the assert statement in FaultInjectionTestEnv. There
// seems to be a bug in btrfs that the makes readdir return recently
// unlink-ed files. By using the default fs we simply ignore errors resulted
@ -125,6 +126,7 @@ class TransactionTestBase : public ::testing::Test {
Status ReOpen() {
delete db;
db = nullptr;
DestroyDB(dbname, options);
Status s;
if (use_stackable_db_ == false) {
@ -139,16 +141,23 @@ class TransactionTestBase : public ::testing::Test {
std::vector<ColumnFamilyHandle*>* handles) {
std::vector<size_t> compaction_enabled_cf_indices;
TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices);
DB* root_db;
DB* root_db = nullptr;
Options options_copy(options);
const bool use_seq_per_batch =
txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
use_seq_per_batch);
StackableDB* stackable_db = new StackableDB(root_db);
if (s.ok()) {
s = TransactionDB::WrapStackableDB(
new StackableDB(root_db), txn_db_options,
compaction_enabled_cf_indices, *handles, &db);
assert(root_db != nullptr);
s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
compaction_enabled_cf_indices,
*handles, &db);
}
if (!s.ok()) {
delete stackable_db;
// just in case it was not deleted (and not set to nullptr).
delete root_db;
}
return s;
}
@ -161,19 +170,26 @@ class TransactionTestBase : public ::testing::Test {
TransactionDB::PrepareWrap(&options, &column_families,
&compaction_enabled_cf_indices);
std::vector<ColumnFamilyHandle*> handles;
DB* root_db;
DB* root_db = nullptr;
Options options_copy(options);
const bool use_seq_per_batch =
txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
&root_db, use_seq_per_batch);
StackableDB* stackable_db = new StackableDB(root_db);
if (s.ok()) {
assert(root_db != nullptr);
assert(handles.size() == 1);
s = TransactionDB::WrapStackableDB(
new StackableDB(root_db), txn_db_options,
compaction_enabled_cf_indices, handles, &db);
s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
compaction_enabled_cf_indices, handles,
&db);
delete handles[0];
}
if (!s.ok()) {
delete stackable_db;
// just in case it was not deleted (and not set to nullptr).
delete root_db;
}
return s;
}
@ -207,9 +223,9 @@ class TransactionTestBase : public ::testing::Test {
// equivalent to commit without prepare.
WriteBatch wb;
auto istr = std::to_string(index);
wb.Put("k1" + istr, "v1");
wb.Put("k2" + istr, "v2");
wb.Put("k3" + istr, "v3");
ASSERT_OK(wb.Put("k1" + istr, "v1"));
ASSERT_OK(wb.Put("k2" + istr, "v2"));
ASSERT_OK(wb.Put("k3" + istr, "v3"));
WriteOptions wopts;
auto s = db->Write(wopts, &wb);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
@ -231,15 +247,12 @@ class TransactionTestBase : public ::testing::Test {
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
ASSERT_OK(txn->SetName("xid" + istr));
ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
ASSERT_OK(txn->Commit());
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq += 4;
@ -261,20 +274,16 @@ class TransactionTestBase : public ::testing::Test {
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
s = txn->Put(Slice("foo5" + istr), Slice("bar5"));
ASSERT_OK(s);
ASSERT_OK(txn->SetName("xid" + istr));
ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
expected_commits++;
s = txn->Prepare();
ASSERT_OK(s);
ASSERT_OK(txn->Prepare());
commit_writes++;
s = txn->Commit();
ASSERT_OK(s);
ASSERT_OK(txn->Commit());
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq += 5;
@ -292,20 +301,16 @@ class TransactionTestBase : public ::testing::Test {
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
s = txn->Put(Slice("foo5" + istr), Slice("bar5"));
ASSERT_OK(s);
ASSERT_OK(txn->SetName("xid" + istr));
ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
expected_commits++;
s = txn->Prepare();
ASSERT_OK(s);
ASSERT_OK(txn->Prepare());
commit_writes++;
s = txn->Rollback();
ASSERT_OK(s);
ASSERT_OK(txn->Rollback());
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// No seq is consumed for deleting the txn buffer
exp_seq += 0;
@ -345,15 +350,12 @@ class TransactionTestBase : public ::testing::Test {
// For test the duplicate keys
auto v2 = Slice("bar2-" + istr).ToString();
auto type = rnd.Uniform(4);
Status s;
switch (type) {
case 0:
committed_kvs[k] = v;
s = db->Put(write_options, k, v);
ASSERT_OK(s);
ASSERT_OK(db->Put(write_options, k, v));
committed_kvs[k] = v2;
s = db->Put(write_options, k, v2);
ASSERT_OK(s);
ASSERT_OK(db->Put(write_options, k, v2));
break;
case 1: {
WriteBatch wb;
@ -361,26 +363,22 @@ class TransactionTestBase : public ::testing::Test {
wb.Put(k, v);
committed_kvs[k] = v2;
wb.Put(k, v2);
s = db->Write(write_options, &wb);
ASSERT_OK(s);
ASSERT_OK(db->Write(write_options, &wb));
} break;
case 2:
case 3:
txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid" + istr);
ASSERT_OK(s);
ASSERT_OK(txn->SetName("xid" + istr));
committed_kvs[k] = v;
s = txn->Put(k, v);
ASSERT_OK(s);
ASSERT_OK(txn->Put(k, v));
committed_kvs[k] = v2;
s = txn->Put(k, v2);
ASSERT_OK(s);
ASSERT_OK(txn->Put(k, v2));
if (type == 3) {
s = txn->Prepare();
ASSERT_OK(s);
ASSERT_OK(txn->Prepare());
}
s = txn->Commit();
ASSERT_OK(s);
ASSERT_OK(txn->Commit());
if (type == 2) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
// TODO(myabandeh): this is counter-intuitive. The destructor should

Loading…
Cancel
Save