diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index e55f65006..8435f936a 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -193,6 +193,7 @@ class TransactionDB : public StackableDB { } // Open a TransactionDB similar to DB::Open(). // Internally call PrepareWrap() and WrapDB() + // If the return status is not ok, then dbptr is set to nullptr. static Status Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr); @@ -203,27 +204,29 @@ class TransactionDB : public StackableDB { const std::vector& column_families, std::vector* handles, TransactionDB** dbptr); - // The following functions are used to open a TransactionDB internally using - // an opened DB or StackableDB. - // 1. Call prepareWrap(), passing an empty std::vector to - // compaction_enabled_cf_indices. - // 2. Open DB or Stackable DB with db_options and column_families passed to - // prepareWrap() // Note: PrepareWrap() may change parameters, make copies before the // invocation if needed. - // 3. Call Wrap*DB() with compaction_enabled_cf_indices in step 1 and handles - // of the opened DB/StackableDB in step 2 static void PrepareWrap(DBOptions* db_options, std::vector* column_families, std::vector* compaction_enabled_cf_indices); + // If the return status is not ok, then dbptr will bet set to nullptr. The + // input db parameter might or might not be deleted as a result of the + // failure. If it is properly deleted it will be set to nullptr. If the return + // status is ok, the ownership of db is transferred to dbptr. static Status WrapDB(DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr); + // If the return status is not ok, then dbptr will bet set to nullptr. The + // input db parameter might or might not be deleted as a result of the + // failure. If it is properly deleted it will be set to nullptr. If the return + // status is ok, the ownership of db is transferred to dbptr. static Status WrapStackableDB( StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr); + // Since the destructor in StackableDB is virtual, this destructor is virtual + // too. The root db will be deleted by the base's destructor. ~TransactionDB() override {} // Starts a new Transaction. @@ -252,6 +255,7 @@ class TransactionDB : public StackableDB { protected: // To Create an TransactionDB, call Open() + // The ownership of db is transferred to the base StackableDB explicit TransactionDB(DB* db) : StackableDB(db) {} private: diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index b4bcc34f8..1e72299fa 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -207,7 +207,7 @@ Status TransactionDB::Open( const std::vector& column_families, std::vector* handles, TransactionDB** dbptr) { Status s; - DB* db; + DB* db = nullptr; ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, static_cast(txn_db_options.write_policy)); @@ -223,6 +223,10 @@ Status TransactionDB::Open( s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } + if (!s.ok()) { + // just in case it was not deleted (and not set to nullptr). + delete db; + } return s; } @@ -254,48 +258,62 @@ Status TransactionDB::WrapDB( DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - PessimisticTransactionDB* txn_db; + assert(db != nullptr); + assert(dbptr != nullptr); + *dbptr = nullptr; + std::unique_ptr txn_db; switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); case WRITE_PREPARED: - txn_db = new WritePreparedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_COMMITTED: default: - txn_db = new WriteCommittedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); } txn_db->UpdateCFComparatorMap(handles); - *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); + // In case of a failure at this point, db is deleted via the txn_db destructor + // and set to nullptr. + if (s.ok()) { + *dbptr = txn_db.release(); + } return s; } Status TransactionDB::WrapStackableDB( // make sure this stackable_db is already opened with memtable history - // enabled, - // auto compaction distabled and 2 phase commit enabled + // enabled, auto compaction distabled and 2 phase commit enabled StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - PessimisticTransactionDB* txn_db; + assert(db != nullptr); + assert(dbptr != nullptr); + *dbptr = nullptr; + std::unique_ptr txn_db; + switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); case WRITE_PREPARED: - txn_db = new WritePreparedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_COMMITTED: default: - txn_db = new WriteCommittedTxnDB( - db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + txn_db.reset(new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); } txn_db->UpdateCFComparatorMap(handles); - *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); + // In case of a failure at this point, db is deleted via the txn_db destructor + // and set to nullptr. + if (s.ok()) { + *dbptr = txn_db.release(); + } return s; } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 54c386d62..84f4be5bd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -97,11 +97,10 @@ TEST_P(TransactionTest, SuccessTest) { WriteOptions write_options; ReadOptions read_options; - string value; - Status s; + std::string value; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn); @@ -109,24 +108,19 @@ TEST_P(TransactionTest, SuccessTest) { ASSERT_EQ(0, txn->GetNumPuts()); ASSERT_LE(0, txn->GetID()); - s = txn->GetForUpdate(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); ASSERT_EQ(value, "bar"); - s = txn->Put(Slice("foo"), Slice("bar2")); - ASSERT_OK(s); + ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2"))); ASSERT_EQ(1, txn->GetNumPuts()); - s = txn->GetForUpdate(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); + ASSERT_OK(db->Get(read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); delete txn; @@ -135,22 +129,18 @@ TEST_P(TransactionTest, SuccessTest) { // This test clarifies the contract of ValidateSnapshot TEST_P(TransactionTest, ValidateSnapshotTest) { for (bool with_2pc : {true, false}) { - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; ReadOptions read_options; - string value; - Status s; + std::string value; Transaction* txn1 = db->BeginTransaction(write_options, TransactionOptions()); ASSERT_TRUE(txn1); - s = txn1->Put(Slice("foo"), Slice("bar1")); - ASSERT_OK(s); + ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1"))); if (with_2pc) { - s = txn1->SetName("xid1"); - ASSERT_OK(s); - s = txn1->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); } Transaction* txn2 = @@ -158,15 +148,14 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { ASSERT_TRUE(txn2); txn2->SetSnapshot(); - s = txn1->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn1->Commit()); delete txn1; auto pes_txn2 = dynamic_cast(txn2); // Test the simple case where the key is not tracked yet auto trakced_seq = kMaxSequenceNumber; - s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", - &trakced_seq); + auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", + &trakced_seq); ASSERT_TRUE(s.IsBusy()); delete txn2; } @@ -351,8 +340,8 @@ TEST_P(TransactionTest, SharedLocks) { s = txn2->GetForUpdate(read_options, "foo", nullptr); ASSERT_OK(s); - txn1->Rollback(); - txn2->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); // Test txn1 trying to downgrade its lock. s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */); @@ -752,13 +741,13 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { WriteOptions write_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn1); - txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"); + ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog")); s = txn1->Put("foo", "bar"); ASSERT_OK(s); @@ -774,7 +763,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) { TransactionOptions txn_options; WriteOptions write_options; options.write_buffer_size = 1024; - ReOpenNoDelete(); + ASSERT_OK(ReOpenNoDelete()); Random rnd(47); std::vector txns; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -808,7 +797,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) { TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { for (bool cwb4recovery : {true, false}) { - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; ReadOptions read_options; @@ -1011,7 +1000,7 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { if (!cwb4recovery && test_with_empty_wal) { continue; } - ReOpen(); + ASSERT_OK(ReOpen()); Status s; std::string value; @@ -1111,7 +1100,7 @@ TEST_P(TransactionTest, TwoPhaseRollbackTest) { TransactionOptions txn_options; - string value; + std::string value; Status s; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -1182,7 +1171,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { TransactionOptions txn_options; - string value; + std::string value; Status s; DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); @@ -1575,7 +1564,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); Status s; - string v; + std::string v; ColumnFamilyHandle *cfa, *cfb; // Create 2 new column families @@ -2017,7 +2006,7 @@ TEST_P(TransactionTest, WriteConflictTest2) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; db->Put(write_options, "foo", "bar"); @@ -2065,7 +2054,7 @@ TEST_P(TransactionTest, ReadConflictTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; db->Put(write_options, "foo", "bar"); @@ -2103,7 +2092,7 @@ TEST_P(TransactionTest, TxnOnlyTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -2121,7 +2110,7 @@ TEST_P(TransactionTest, TxnOnlyTest) { TEST_P(TransactionTest, FlushTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; db->Put(write_options, Slice("foo"), Slice("bar")); @@ -2313,7 +2302,7 @@ TEST_P(TransactionTest, FlushTest2) { TEST_P(TransactionTest, NoSnapshotTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; db->Put(write_options, "AAA", "bar"); @@ -2343,12 +2332,12 @@ TEST_P(TransactionTest, NoSnapshotTest) { TEST_P(TransactionTest, MultipleSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; - db->Put(write_options, "AAA", "bar"); - db->Put(write_options, "BBB", "bar"); - db->Put(write_options, "CCC", "bar"); + ASSERT_OK(db->Put(write_options, "AAA", "bar")); + ASSERT_OK(db->Put(write_options, "BBB", "bar")); + ASSERT_OK(db->Put(write_options, "CCC", "bar")); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -2356,24 +2345,24 @@ TEST_P(TransactionTest, MultipleSnapshotTest) { db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot - txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("AAA", "bar2"); ASSERT_OK(s); // Modify BBB before snapshot is taken - db->Put(write_options, "BBB", "bar1"); + ASSERT_OK(db->Put(write_options, "BBB", "bar1")); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); // Read and write with snapshot - txn->GetForUpdate(snapshot_read_options, "BBB", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("BBB", "bar2"); ASSERT_OK(s); - db->Put(write_options, "CCC", "bar1"); + ASSERT_OK(db->Put(write_options, "CCC", "bar1")); // Set a new snapshot txn->SetSnapshot(); @@ -2812,7 +2801,7 @@ TEST_P(TransactionTest, LostUpdate) { WriteOptions write_options; ReadOptions read_options, read_options1, read_options2; TransactionOptions txn_options; - string value; + std::string value; Status s; // Test 2 transactions writing to the same key in multiple orders and @@ -2937,7 +2926,7 @@ TEST_P(TransactionTest, LostUpdate) { TEST_P(TransactionTest, UntrackedWrites) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; // Verify transaction rollback works for untracked keys. @@ -3028,7 +3017,7 @@ TEST_P(TransactionTest, ReinitializeTest) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; // Set txn expiration timeout to 0 microseconds (expires instantly) @@ -3134,7 +3123,7 @@ TEST_P(TransactionTest, Rollback) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options, txn_options); @@ -3277,7 +3266,7 @@ TEST_P(TransactionTest, IteratorTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; // Write some keys to the db @@ -3395,7 +3384,7 @@ TEST_P(TransactionTest, IteratorTest) { TEST_P(TransactionTest, DisableIndexingTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -3458,7 +3447,7 @@ TEST_P(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -3750,7 +3739,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; txn_options.lock_timeout = 1; // 1 ms @@ -3894,7 +3883,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - string value; + std::string value; Status s; s = db->Put(write_options, "A", ""); @@ -4099,7 +4088,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { TEST_P(TransactionTest, TimeoutTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; delete db; @@ -4236,7 +4225,7 @@ TEST_P(TransactionTest, TimeoutTest) { TEST_P(TransactionTest, SingleDeleteTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options); @@ -4334,7 +4323,7 @@ TEST_P(TransactionTest, SingleDeleteTest) { TEST_P(TransactionTest, MergeTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(write_options, TransactionOptions()); @@ -4390,7 +4379,7 @@ TEST_P(TransactionTest, MergeTest) { TEST_P(TransactionTest, DeferSnapshotTest) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; Status s; s = db->Put(write_options, "A", "a0"); @@ -4441,7 +4430,7 @@ TEST_P(TransactionTest, DeferSnapshotTest) { TEST_P(TransactionTest, DeferSnapshotTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); @@ -4498,7 +4487,7 @@ TEST_P(TransactionTest, DeferSnapshotTest2) { TEST_P(TransactionTest, DeferSnapshotSavePointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; Transaction* txn1 = db->BeginTransaction(write_options); @@ -4606,7 +4595,7 @@ TEST_P(TransactionTest, DeferSnapshotSavePointTest) { TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) { WriteOptions write_options; ReadOptions read_options; - string value; + std::string value; class Notifier : public TransactionNotifier { private: @@ -4665,7 +4654,7 @@ TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) { TEST_P(TransactionTest, ClearSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - string value; + std::string value; Status s; s = db->Put(write_options, "foo", "0"); @@ -4906,7 +4895,7 @@ TEST_P(TransactionTest, MemoryLimitTest) { TransactionOptions txn_options; // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. txn_options.max_write_batch_size = 29; - string value; + std::string value; Status s; Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options); @@ -4942,7 +4931,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { FlushOptions fopt; options.disable_auto_compactions = true; - ReOpen(); + ASSERT_OK(ReOpen()); // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // of the branches. This is the same as counting a binary number where i-th @@ -4956,7 +4945,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { return n & filter; }; const size_t max_n = static_cast(1) << NUM_BRANCHES; - for (size_t n = 0; n < max_n; n++, ReOpen()) { + for (size_t n = 0; n < max_n; n++) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); size_t branch = 0; auto seq = db_impl->GetLatestSequenceNumber(); @@ -4966,13 +4955,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -4988,13 +4977,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5005,13 +4994,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5023,13 +5012,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); @@ -5040,17 +5029,18 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { - db_impl->Flush(fopt); + ASSERT_OK(db_impl->Flush(fopt)); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (!short_test && branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); + ASSERT_OK(db_impl->FlushWAL(true)); + ASSERT_OK(ReOpenNoDelete()); db_impl = reinterpret_cast(db->GetRootDB()); seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq); } + ASSERT_OK(ReOpen()); } } @@ -5062,7 +5052,7 @@ TEST_P(TransactionTest, Optimizations) { optimizations.skip_concurrency_control = IsInCombination(0, new_comb); optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb); - ReOpen(); + ASSERT_OK(ReOpen()); WriteOptions write_options; WriteBatch batch; batch.Put(Slice("k"), Slice("v1")); @@ -5111,7 +5101,7 @@ TEST_P(TransactionTest, DuplicateKeys) { std::string cf_name = "two"; ColumnFamilyHandle* cf_handle = nullptr; { - db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); WriteOptions write_options; WriteBatch batch; batch.Put(Slice("key"), Slice("value")); @@ -5144,7 +5134,7 @@ TEST_P(TransactionTest, DuplicateKeys) { // Test with non-bytewise comparator { - ReOpen(); + ASSERT_OK(ReOpen()); std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); @@ -5188,8 +5178,8 @@ TEST_P(TransactionTest, DuplicateKeys) { if (with_commit_batch && do_rollback) { continue; } - ReOpen(); - db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + ASSERT_OK(ReOpen()); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); TransactionOptions txn_options; txn_options.use_only_the_last_commit_time_batch_for_recovery = false; WriteOptions write_options; @@ -5316,11 +5306,11 @@ TEST_P(TransactionTest, DuplicateKeys) { // verify that. cf_options.max_successive_merges = 2; cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); - ReOpen(); + ASSERT_OK(ReOpen()); db->CreateColumnFamily(cf_options, cf_name, &cf_handle); WriteOptions write_options; // Ensure one value for the key - db->Put(write_options, cf_handle, Slice("key"), Slice("value")); + ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value"))); WriteBatch batch; // Merge more than max_successive_merges times batch.Merge(cf_handle, Slice("key"), Slice("1")); @@ -5344,14 +5334,14 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); txn0->SetSavePoint(); - txn0->RollbackToSavePoint(); + ASSERT_OK(txn0->RollbackToSavePoint()); ASSERT_OK(txn0->Commit()); delete txn0; } // Test sucessfull recovery after a crash { - ReOpen(); + ASSERT_OK(ReOpen()); TransactionOptions txn_options; WriteOptions write_options; ReadOptions ropt; @@ -5383,7 +5373,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -5442,7 +5432,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5475,7 +5465,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5502,7 +5492,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); @@ -5529,7 +5519,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 reinterpret_cast(db->GetRootDB()) ->TEST_FlushMemTable(true, handles[1]); diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 56d3d6e2e..d72454750 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -51,7 +51,7 @@ class TransactionTestBase : public ::testing::Test { TransactionTestBase(bool use_stackable_db, bool two_write_queue, TxnDBWritePolicy write_policy) - : use_stackable_db_(use_stackable_db) { + : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) { options.create_if_missing = true; options.max_write_buffer_number = 2; options.write_buffer_size = 4 * 1024; @@ -78,6 +78,7 @@ class TransactionTestBase : public ::testing::Test { ~TransactionTestBase() { delete db; + db = nullptr; // This is to skip the assert statement in FaultInjectionTestEnv. There // seems to be a bug in btrfs that the makes readdir return recently // unlink-ed files. By using the default fs we simply ignore errors resulted @@ -125,6 +126,7 @@ class TransactionTestBase : public ::testing::Test { Status ReOpen() { delete db; + db = nullptr; DestroyDB(dbname, options); Status s; if (use_stackable_db_ == false) { @@ -139,16 +141,23 @@ class TransactionTestBase : public ::testing::Test { std::vector* handles) { std::vector compaction_enabled_cf_indices; TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices); - DB* root_db; + DB* root_db = nullptr; Options options_copy(options); const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, use_seq_per_batch); + StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { - s = TransactionDB::WrapStackableDB( - new StackableDB(root_db), txn_db_options, - compaction_enabled_cf_indices, *handles, &db); + assert(root_db != nullptr); + s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + compaction_enabled_cf_indices, + *handles, &db); + } + if (!s.ok()) { + delete stackable_db; + // just in case it was not deleted (and not set to nullptr). + delete root_db; } return s; } @@ -161,19 +170,26 @@ class TransactionTestBase : public ::testing::Test { TransactionDB::PrepareWrap(&options, &column_families, &compaction_enabled_cf_indices); std::vector handles; - DB* root_db; + DB* root_db = nullptr; Options options_copy(options); const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, column_families, &handles, &root_db, use_seq_per_batch); + StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { + assert(root_db != nullptr); assert(handles.size() == 1); - s = TransactionDB::WrapStackableDB( - new StackableDB(root_db), txn_db_options, - compaction_enabled_cf_indices, handles, &db); + s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + compaction_enabled_cf_indices, handles, + &db); delete handles[0]; } + if (!s.ok()) { + delete stackable_db; + // just in case it was not deleted (and not set to nullptr). + delete root_db; + } return s; } @@ -207,9 +223,9 @@ class TransactionTestBase : public ::testing::Test { // equivalent to commit without prepare. WriteBatch wb; auto istr = std::to_string(index); - wb.Put("k1" + istr, "v1"); - wb.Put("k2" + istr, "v2"); - wb.Put("k3" + istr, "v3"); + ASSERT_OK(wb.Put("k1" + istr, "v1")); + ASSERT_OK(wb.Put("k2" + istr, "v2")); + ASSERT_OK(wb.Put("k3" + istr, "v3")); WriteOptions wopts; auto s = db->Write(wopts, &wb); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { @@ -231,15 +247,12 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - ASSERT_OK(s); - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Commit()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 4; @@ -261,20 +274,16 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - s = txn->Put(Slice("foo5" + istr), Slice("bar5")); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5"))); expected_commits++; - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); commit_writes++; - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 5; @@ -292,20 +301,16 @@ class TransactionTestBase : public ::testing::Test { WriteOptions write_options; Transaction* txn = db->BeginTransaction(write_options, txn_options); auto istr = std::to_string(index); - auto s = txn->SetName("xid" + istr); - ASSERT_OK(s); - s = txn->Put(Slice("foo" + istr), Slice("bar")); - s = txn->Put(Slice("foo2" + istr), Slice("bar2")); - s = txn->Put(Slice("foo3" + istr), Slice("bar3")); - s = txn->Put(Slice("foo4" + istr), Slice("bar4")); - s = txn->Put(Slice("foo5" + istr), Slice("bar5")); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); + ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar"))); + ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2"))); + ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3"))); + ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4"))); + ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5"))); expected_commits++; - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); commit_writes++; - s = txn->Rollback(); - ASSERT_OK(s); + ASSERT_OK(txn->Rollback()); if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // No seq is consumed for deleting the txn buffer exp_seq += 0; @@ -345,15 +350,12 @@ class TransactionTestBase : public ::testing::Test { // For test the duplicate keys auto v2 = Slice("bar2-" + istr).ToString(); auto type = rnd.Uniform(4); - Status s; switch (type) { case 0: committed_kvs[k] = v; - s = db->Put(write_options, k, v); - ASSERT_OK(s); + ASSERT_OK(db->Put(write_options, k, v)); committed_kvs[k] = v2; - s = db->Put(write_options, k, v2); - ASSERT_OK(s); + ASSERT_OK(db->Put(write_options, k, v2)); break; case 1: { WriteBatch wb; @@ -361,26 +363,22 @@ class TransactionTestBase : public ::testing::Test { wb.Put(k, v); committed_kvs[k] = v2; wb.Put(k, v2); - s = db->Write(write_options, &wb); - ASSERT_OK(s); + ASSERT_OK(db->Write(write_options, &wb)); + } break; case 2: case 3: txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid" + istr); - ASSERT_OK(s); + ASSERT_OK(txn->SetName("xid" + istr)); committed_kvs[k] = v; - s = txn->Put(k, v); - ASSERT_OK(s); + ASSERT_OK(txn->Put(k, v)); committed_kvs[k] = v2; - s = txn->Put(k, v2); - ASSERT_OK(s); + ASSERT_OK(txn->Put(k, v2)); + if (type == 3) { - s = txn->Prepare(); - ASSERT_OK(s); + ASSERT_OK(txn->Prepare()); } - s = txn->Commit(); - ASSERT_OK(s); + ASSERT_OK(txn->Commit()); if (type == 2) { auto pdb = reinterpret_cast(db); // TODO(myabandeh): this is counter-intuitive. The destructor should