From 66c7aa32fbc591c0c766024ff51aecd3f1a8f3e1 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Fri, 11 May 2018 15:08:16 -0700 Subject: [PATCH] Clarify the ownership of root db after TransactionDB::Open Summary: The patch clarifies the ownership of the root db after TransactionDB::Open. If it is a success the ownership if with the TransactionDB, and the root db will be deleted when the destructor of the base class, StackableDB, is called. If it is failure, the temporarily created root db will also be deleted properly. The patch also includes lots of useful formatting changes. Closes https://github.com/facebook/rocksdb/pull/3714 upon which this patch is built. Closes https://github.com/facebook/rocksdb/pull/3806 Differential Revision: D7878010 Pulled By: maysamyabandeh fbshipit-source-id: f54f3942e29434143ae5a2423ceec9c7072cd4c2 --- include/rocksdb/utilities/transaction_db.h | 20 +- .../pessimistic_transaction_db.cc | 48 +++-- utilities/transactions/transaction_test.cc | 188 +++++++++--------- utilities/transactions/transaction_test.h | 122 ++++++------ 4 files changed, 194 insertions(+), 184 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index e55f65006..8435f936a 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -193,6 +193,7 @@ class TransactionDB : public StackableDB { } // Open a TransactionDB similar to DB::Open(). // Internally call PrepareWrap() and WrapDB() + // If the return status is not ok, then dbptr is set to nullptr. static Status Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr); @@ -203,27 +204,29 @@ class TransactionDB : public StackableDB { const std::vector& column_families, std::vector* handles, TransactionDB** dbptr); - // The following functions are used to open a TransactionDB internally using - // an opened DB or StackableDB. - // 1. Call prepareWrap(), passing an empty std::vector to - // compaction_enabled_cf_indices. - // 2. Open DB or Stackable DB with db_options and column_families passed to - // prepareWrap() // Note: PrepareWrap() may change parameters, make copies before the // invocation if needed. - // 3. Call Wrap*DB() with compaction_enabled_cf_indices in step 1 and handles - // of the opened DB/StackableDB in step 2 static void PrepareWrap(DBOptions* db_options, std::vector* column_families, std::vector* compaction_enabled_cf_indices); + // If the return status is not ok, then dbptr will bet set to nullptr. The + // input db parameter might or might not be deleted as a result of the + // failure. If it is properly deleted it will be set to nullptr. If the return + // status is ok, the ownership of db is transferred to dbptr. static Status WrapDB(DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr); + // If the return status is not ok, then dbptr will bet set to nullptr. The + // input db parameter might or might not be deleted as a result of the + // failure. If it is properly deleted it will be set to nullptr. If the return + // status is ok, the ownership of db is transferred to dbptr. static Status WrapStackableDB( StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr); + // Since the destructor in StackableDB is virtual, this destructor is virtual + // too. The root db will be deleted by the base's destructor. ~TransactionDB() override {} // Starts a new Transaction. @@ -252,6 +255,7 @@ class TransactionDB : public StackableDB { protected: // To Create an TransactionDB, call Open() + // The ownership of db is transferred to the base StackableDB explicit TransactionDB(DB* db) : StackableDB(db) {} private: diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index b4bcc34f8..1e72299fa 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -207,7 +207,7 @@ Status TransactionDB::Open( const std::vector& column_families, std::vector* handles, TransactionDB** dbptr) { Status s; - DB* db; + DB* db = nullptr; ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, static_cast(txn_db_options.write_policy)); @@ -223,6 +223,10 @@ Status TransactionDB::Open( s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } + if (!s.ok()) { + // just in case it was not deleted (and not set to nullptr). + delete db; + } return s; } @@ -254,48 +258,62 @@ Status TransactionDB::WrapDB( DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - PessimisticTransactionDB* txn_db; + assert(db != nullptr); + assert(dbptr != nullptr); + *dbptr = nullptr; + std::unique_ptr txn_db; switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); case WRITE_PREPARED: - txn_db = new WritePreparedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_COMMITTED: default: - txn_db = new WriteCommittedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); } txn_db->UpdateCFComparatorMap(handles); - *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); + // In case of a failure at this point, db is deleted via the txn_db destructor + // and set to nullptr. + if (s.ok()) { + *dbptr = txn_db.release(); + } return s; } Status TransactionDB::WrapStackableDB( // make sure this stackable_db is already opened with memtable history - // enabled, - // auto compaction distabled and 2 phase commit enabled + // enabled, auto compaction distabled and 2 phase commit enabled StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - PessimisticTransactionDB* txn_db; + assert(db != nullptr); + assert(dbptr != nullptr); + *dbptr = nullptr; + std::unique_ptr txn_db; + switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); case WRITE_PREPARED: - txn_db = new WritePreparedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_COMMITTED: default: - txn_db = new WriteCommittedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); } txn_db->UpdateCFComparatorMap(handles); - *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); + // In case of a failure at this point, db is deleted via the txn_db destructor + // and set to nullptr. + if (s.ok()) { + *dbptr = txn_db.release(); + } return s; } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 54c386d62..84f4be5bd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -97,11 +97,10 @@ TEST_P(TransactionTest, SuccessTest) { WriteOptions write_options; ReadOptions read_options; - string value; - Status s; + std::string value; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn); @@ -109,24 +108,19 @@ TEST_P(TransactionTest, SuccessTest) { ASSERT_EQ(0, txn->GetNumPuts()); ASSERT_LE(0, txn->GetID()); - s = txn->GetForUpdate(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); ASSERT_EQ(value, "bar"); - s = txn->Put(Slice("foo"), Slice("bar2")); - ASSERT_OK(s); + ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2"))); ASSERT_EQ(1, txn->GetNumPuts()); - s = txn->GetForUpdate(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(db->Get(read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); delete txn; @@ -135,22 +129,18 @@ TEST_P(TransactionTest, SuccessTest) { // This test clarifies the contract of ValidateSnapshot TEST_P(TransactionTest, ValidateSnapshotTest) { for (bool with_2pc : {true, false}) { - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; ReadOptions read_options; - string value; - Status s; + std::string value; Transaction* txn1 = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn1); - s = txn1->Put(Slice("foo"), Slice("bar1")); - ASSERT_OK(s); + ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1"))); if (with_2pc) { - s = txn1->SetName("xid1"); - ASSERT_OK(s); - s = txn1->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); } Transaction* txn2 = @@ -158,15 +148,14 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { ASSERT_TRUE(txn2); txn2->SetSnapshot(); - s = txn1->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn1->Commit()); delete txn1; auto pes_txn2 = dynamic_cast(txn2); // Test the simple case where the key is not tracked yet auto trakced_seq = kMaxSequenceNumber; - s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", - &trakced_seq); + auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", + &trakced_seq); ASSERT_TRUE(s.IsBusy()); delete txn2; } @@ -351,8 +340,8 @@ TEST_P(TransactionTest, SharedLocks) { s = txn2->GetForUpdate(read_options, "foo", nullptr); ASSERT_OK(s); - txn1->Rollback(); - txn2->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); // Test txn1 trying to downgrade its lock. s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */); @@ -752,13 +741,13 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { WriteOptions write_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn1); - txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"); + ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog")); s = txn1->Put("foo", "bar"); ASSERT_OK(s); @@ -774,7 +763,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) { TransactionOptions txn_options; WriteOptions write_options; options.write_buffer_size = 1024; - ReOpenNoDelete(); + ASSERT_OK(ReOpenNoDelete()); Random rnd(47); std::vector txns; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -808,7 +797,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) { TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { for (bool cwb4recovery : {true, false}) { - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; ReadOptions read_options; @@ -1011,7 +1000,7 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { if (!cwb4recovery && test_with_empty_wal) { continue; } - ReOpen(); + ASSERT_OK(ReOpen()); Status s; std::string value; @@ -1111,7 +1100,7 @@ TEST_P(TransactionTest, TwoPhaseRollbackTest) { TransactionOptions txn_options; - string value; + std::string value; Status s; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -1182,7 +1171,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { TransactionOptions txn_options; - string value; + std::string value; Status s; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -1575,7 +1564,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); Status s; - string v; + std::string v; ColumnFamilyHandle *cfa, *cfb; // Create 2 new column families @@ -2017,7 +2006,7 @@ TEST_P(TransactionTest, WriteConflictTest2) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; db->Put(write_options, "foo", "bar"); @@ -2065,7 +2054,7 @@ TEST_P(TransactionTest, ReadConflictTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; db->Put(write_options, "foo", "bar"); @@ -2103,7 +2092,7 @@ TEST_P(TransactionTest, TxnOnlyTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -2121,7 +2110,7 @@ TEST_P(TransactionTest, TxnOnlyTest) { TEST_P(TransactionTest, FlushTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; db->Put(write_options, Slice("foo"), Slice("bar")); @@ -2313,7 +2302,7 @@ TEST_P(TransactionTest, FlushTest2) { TEST_P(TransactionTest, NoSnapshotTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; db->Put(write_options, "AAA", "bar"); @@ -2343,12 +2332,12 @@ TEST_P(TransactionTest, NoSnapshotTest) { TEST_P(TransactionTest, MultipleSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; - db->Put(write_options, "AAA", "bar"); - db->Put(write_options, "BBB", "bar"); - db->Put(write_options, "CCC", "bar"); + ASSERT_OK(db->Put(write_options, "AAA", "bar")); + ASSERT_OK(db->Put(write_options, "BBB", "bar")); + ASSERT_OK(db->Put(write_options, "CCC", "bar")); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -2356,24 +2345,24 @@ TEST_P(TransactionTest, MultipleSnapshotTest) { db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot - txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("AAA", "bar2"); ASSERT_OK(s); // Modify BBB before snapshot is taken - db->Put(write_options, "BBB", "bar1"); + ASSERT_OK(db->Put(write_options, "BBB", "bar1")); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); // Read and write with snapshot - txn->GetForUpdate(snapshot_read_options, "BBB", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("BBB", "bar2"); ASSERT_OK(s); - db->Put(write_options, "CCC", "bar1"); + ASSERT_OK(db->Put(write_options, "CCC", "bar1")); // Set a new snapshot txn->SetSnapshot(); @@ -2812,7 +2801,7 @@ TEST_P(TransactionTest, LostUpdate) { WriteOptions write_options; ReadOptions read_options, read_options1, read_options2; TransactionOptions txn_options; - string value; + std::string value; Status s; // Test 2 transactions writing to the same key in multiple orders and @@ -2937,7 +2926,7 @@ TEST_P(TransactionTest, LostUpdate) { TEST_P(TransactionTest, UntrackedWrites) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; // Verify transaction rollback works for untracked keys. @@ -3028,7 +3017,7 @@ TEST_P(TransactionTest, ReinitializeTest) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; // Set txn expiration timeout to 0 microseconds (expires instantly) @@ -3134,7 +3123,7 @@ TEST_P(TransactionTest, Rollback) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); @@ -3277,7 +3266,7 @@ TEST_P(TransactionTest, IteratorTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; // Write some keys to the db @@ -3395,7 +3384,7 @@ TEST_P(TransactionTest, IteratorTest) { TEST_P(TransactionTest, DisableIndexingTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -3458,7 +3447,7 @@ TEST_P(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -3750,7 +3739,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; txn_options.lock_timeout = 1; // 1 ms @@ -3894,7 +3883,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; s = db->Put(write_options, "A", ""); @@ -4099,7 +4088,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { TEST_P(TransactionTest, TimeoutTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; delete db; @@ -4236,7 +4225,7 @@ TEST_P(TransactionTest, TimeoutTest) { TEST_P(TransactionTest, SingleDeleteTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -4334,7 +4323,7 @@ TEST_P(TransactionTest, SingleDeleteTest) { TEST_P(TransactionTest, MergeTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); @@ -4390,7 +4379,7 @@ TEST_P(TransactionTest, MergeTest) { TEST_P(TransactionTest, DeferSnapshotTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; s = db->Put(write_options, "A", "a0"); @@ -4441,7 +4430,7 @@ TEST_P(TransactionTest, DeferSnapshotTest) { TEST_P(TransactionTest, DeferSnapshotTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); @@ -4498,7 +4487,7 @@ TEST_P(TransactionTest, DeferSnapshotTest2) { TEST_P(TransactionTest, DeferSnapshotSavePointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); @@ -4606,7 +4595,7 @@ TEST_P(TransactionTest, DeferSnapshotSavePointTest) { TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; class Notifier : public TransactionNotifier { private: @@ -4665,7 +4654,7 @@ TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) { TEST_P(TransactionTest, ClearSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; s = db->Put(write_options, "foo", "0"); @@ -4906,7 +4895,7 @@ TEST_P(TransactionTest, MemoryLimitTest) { TransactionOptions txn_options; // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. txn_options.max_write_batch_size = 29; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options); @@ -4942,7 +4931,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { FlushOptions fopt; options.disable_auto_compactions = true; - ReOpen(); + ASSERT_OK(ReOpen()); // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // of the branches. This is the same as counting a binary number where i-th @@ -4956,7 +4945,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { return n & filter; }; const size_t max_n = static_cast(1) << NUM_BRANCHES; - for (size_t n = 0; n < max_n; n++, ReOpen()) { + for (size_t n = 0; n < max_n; n++) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); size_t branch = 0; auto seq = db_impl->GetLatestSequenceNumber(); @@ -4966,13 +4955,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -4988,13 +4977,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5005,13 +4994,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5023,13 +5012,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5040,17 +5029,18 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); } + ASSERT_OK(ReOpen()); } } @@ -5062,7 +5052,7 @@ TEST_P(TransactionTest, Optimizations) { optimizations.skip_concurrency_control = IsInCombination(0, new_comb); optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb); - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; WriteBatch batch; batch.Put(Slice("k"), Slice("v1")); @@ -5111,7 +5101,7 @@ TEST_P(TransactionTest, DuplicateKeys) { std::string cf_name = "two"; ColumnFamilyHandle* cf_handle = nullptr; { - db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); WriteOptions write_options; WriteBatch batch; batch.Put(Slice("key"), Slice("value")); @@ -5144,7 +5134,7 @@ TEST_P(TransactionTest, DuplicateKeys) { // Test with non-bytewise comparator { - ReOpen(); + ASSERT_OK(ReOpen()); std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); @@ -5188,8 +5178,8 @@ TEST_P(TransactionTest, DuplicateKeys) { if (with_commit_batch && do_rollback) { continue; } - ReOpen(); - db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + ASSERT_OK(ReOpen()); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); TransactionOptions txn_options; txn_options.use_only_the_last_commit_time_batch_for_recovery = false; WriteOptions write_options; @@ -5316,11 +5306,11 @@ TEST_P(TransactionTest, DuplicateKeys) { // verify that. cf_options.max_successive_merges = 2; cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); - ReOpen(); + ASSERT_OK(ReOpen()); db->CreateColumnFamily(cf_options, cf_name, &cf_handle); WriteOptions write_options; // Ensure one value for the key - db->Put(write_options, cf_handle, Slice("key"), Slice("value")); + ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value"))); WriteBatch batch; // Merge more than max_successive_merges times batch.Merge(cf_handle, Slice("key"), Slice("1")); @@ -5344,14 +5334,14 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); txn0->SetSavePoint(); - txn0->RollbackToSavePoint(); + ASSERT_OK(txn0->RollbackToSavePoint()); ASSERT_OK(txn0->Commit()); delete txn0; } // Test sucessfull recovery after a crash { - ReOpen(); + ASSERT_OK(ReOpen()); TransactionOptions txn_options; WriteOptions write_options; ReadOptions ropt; @@ -5383,7 +5373,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -5442,7 +5432,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5475,7 +5465,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5502,7 +5492,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5529,7 +5519,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 56d3d6e2e..d72454750 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -51,7 +51,7 @@ class TransactionTestBase : public ::testing::Test { TransactionTestBase(bool use_stackable_db, bool two_write_queue, TxnDBWritePolicy write_policy) - : use_stackable_db_(use_stackable_db) { + : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) { options.create_if_missing = true; options.max_write_buffer_number = 2; options.write_buffer_size = 4 * 1024; @@ -78,6 +78,7 @@ class TransactionTestBase : public ::testing::Test { ~TransactionTestBase() { delete db; + db = nullptr; // This is to skip the assert statement in FaultInjectionTestEnv. There // seems to be a bug in btrfs that the makes readdir return recently // unlink-ed files. By using the default fs we simply ignore errors resulted @@ -125,6 +126,7 @@ class TransactionTestBase : public ::testing::Test { Status ReOpen() { delete db; + db = nullptr; DestroyDB(dbname, options); Status s; if (use_stackable_db_ == false) { @@ -139,16 +141,23 @@ class TransactionTestBase : public ::testing::Test { std::vector* handles) { std::vector compaction_enabled_cf_indices; TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices); - DB* root_db; + DB* root_db = nullptr; Options options_copy(options); const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, use_seq_per_batch); + StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { - s = TransactionDB::WrapStackableDB( - new StackableDB(root_db), txn_db_options, - compaction_enabled_cf_indices, *handles, &db); + assert(root_db != nullptr); + s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + compaction_enabled_cf_indices, + *handles, &db); + } + if (!s.ok()) { + delete stackable_db; + // just in case it was not deleted (and not set to nullptr). + delete root_db; } return s; } @@ -161,19 +170,26 @@ class TransactionTestBase : public ::testing::Test { TransactionDB::PrepareWrap(&options, &column_families, &compaction_enabled_cf_indices); std::vector handles; - DB* root_db; + DB* root_db = nullptr; Options options_copy(options); const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, column_families, &handles, &root_db, use_seq_per_batch); + StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { + assert(root_db != nullptr); assert(handles.size() == 1); - s = TransactionDB::WrapStackableDB( - new StackableDB(root_db), txn_db_options, - compaction_enabled_cf_indices, handles, &db); + s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + compaction_enabled_cf_indices, handles, + &db); delete handles[0]; } + if (!s.ok()) { + delete stackable_db; + // just in case it was not deleted (and not set to nullptr). + delete root_db; + } return s; } @@ -207,9 +223,9 @@ class TransactionTestBase : public ::testing::Test { // equivalent to commit without prepare. WriteBatch wb; auto istr = std::to_string(index); - wb.Put("k1" + istr, "v1"); - wb.Put("k2" + istr, "v2"); - wb.Put("k3" + istr, "v3"); + ASSERT_OK(wb.Put("k1" + istr, "v1")); + ASSERT_OK(wb.Put("k2" + istr, "v2")); + ASSERT_OK(wb.Put("k3" + istr, "v3")); WriteOptions wopts; auto s = db->Write(wopts, &wb); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { @@ -231,15 +247,12 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - ASSERT_OK(s); - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Commit()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 4; @@ -261,20 +274,16 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - s = txn->Put(Slice("foo5" + istr), Slice("bar5")); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5"))); expected_commits++; - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); commit_writes++; - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 5; @@ -292,20 +301,16 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - s = txn->Put(Slice("foo5" + istr), Slice("bar5")); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5"))); expected_commits++; - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); commit_writes++; - s = txn->Rollback(); - ASSERT_OK(s); + ASSERT_OK(txn->Rollback()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // No seq is consumed for deleting the txn buffer exp_seq += 0; @@ -345,15 +350,12 @@ class TransactionTestBase : public ::testing::Test { // For test the duplicate keys auto v2 = Slice("bar2-" + istr).ToString(); auto type = rnd.Uniform(4); - Status s; switch (type) { case 0: committed_kvs[k] = v; - s = db->Put(write_options, k, v); - ASSERT_OK(s); + ASSERT_OK(db->Put(write_options, k, v)); committed_kvs[k] = v2; - s = db->Put(write_options, k, v2); - ASSERT_OK(s); + ASSERT_OK(db->Put(write_options, k, v2)); break; case 1: { WriteBatch wb; @@ -361,26 +363,22 @@ class TransactionTestBase : public ::testing::Test { wb.Put(k, v); committed_kvs[k] = v2; wb.Put(k, v2); - s = db->Write(write_options, &wb); - ASSERT_OK(s); + ASSERT_OK(db->Write(write_options, &wb)); + } break; case 2: case 3: txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid" + istr); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); committed_kvs[k] = v; - s = txn->Put(k, v); - ASSERT_OK(s); + ASSERT_OK(txn->Put(k, v)); committed_kvs[k] = v2; - s = txn->Put(k, v2); - ASSERT_OK(s); + ASSERT_OK(txn->Put(k, v2)); + if (type == 3) { - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); } - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); if (type == 2) { auto pdb = reinterpret_cast(db); // TODO(myabandeh): this is counter-intuitive. The destructor should