diff --git a/db/db_impl.h b/db/db_impl.h index 7e8c05a12..a587a5b27 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -476,6 +476,14 @@ class DBImpl : public DB { delete trx; } + void DeleteAllRecoveredTransactions() { + for (auto it = recovered_transactions_.begin(); + it != recovered_transactions_.end(); it++) { + delete it->second; + } + recovered_transactions_.clear(); + } + void MarkLogAsHavingPrepSectionFlushed(uint64_t log); void MarkLogAsContainingPrepSection(uint64_t log); diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 273565c2e..ab7d83c84 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -33,6 +33,12 @@ TransactionDBImpl::TransactionDBImpl(DB* db, assert(db_impl_ != nullptr); } +TransactionDBImpl::~TransactionDBImpl() { + while (!transactions_.empty()) { + delete transactions_.begin()->second; + } +} + Transaction* TransactionDBImpl::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { @@ -158,6 +164,9 @@ Status TransactionDB::Open( break; } } + if (s.ok()) { + dbimpl->DeleteAllRecoveredTransactions(); + } } return s; diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index 61ea86673..7019ba3e6 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -25,7 +25,7 @@ class TransactionDBImpl : public TransactionDB { explicit TransactionDBImpl(DB* db, const TransactionDBOptions& txn_db_options); - ~TransactionDBImpl() {} + ~TransactionDBImpl(); Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 82bf8ccac..6b3ee7ade 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -86,12 +86,6 @@ TransactionImpl::~TransactionImpl() { if (!name_.empty() && exec_status_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } - // if we have a prep section that was never committed - // and we are releasing the transaction then we - // can release that prep section - if (log_number_ != 0 && exec_status_ != COMMITED) { - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); - } } void TransactionImpl::Clear() { @@ -105,12 +99,6 @@ void TransactionImpl::Reinitialize(TransactionDB* txn_db, if (!name_.empty() && exec_status_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } - // if we have a prep section that was never committed - // and we are releasing the transaction then we - // can release that prep section - if (log_number_ != 0 && exec_status_ != COMMITED) { - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); - } TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); Initialize(txn_options); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 8c41aec84..cf9677e60 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -31,7 +31,7 @@ namespace rocksdb { class TransactionTest : public testing::Test { public: TransactionDB* db; - FaultInjectionTestEnv* env_; + FaultInjectionTestEnv* env; string dbname; Options options; @@ -43,8 +43,8 @@ class TransactionTest : public testing::Test { options.write_buffer_size = 4 * 1024; options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - env_ = new FaultInjectionTestEnv(Env::Default()); - options.env = env_; + env = new FaultInjectionTestEnv(Env::Default()); + options.env = env; dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); @@ -57,15 +57,15 @@ class TransactionTest : public testing::Test { ~TransactionTest() { delete db; DestroyDB(dbname, options); - delete env_; + delete env; } Status ReOpenNoDelete() { delete db; db = nullptr; - env_->AssertNoOpenFile(); - env_->DropUnsyncedFileData(); - env_->ResetState(); + env->AssertNoOpenFile(); + env->DropUnsyncedFileData(); + env->ResetState(); Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); return s; } @@ -643,6 +643,69 @@ TEST_F(TransactionTest, TwoPhaseMultiThreadTest) { } } +TEST_F(TransactionTest, TwoPhaseLongPrepareTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("bob"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + delete txn; + + for (int i = 0; i < 1000; i++) { + std::string key(i, 'k'); + std::string val(1000, 'v'); + s = db->Put(write_options, key, val); + ASSERT_OK(s); + + if (i % 29 == 0) { + // crash + env->SetFilesystemActive(false); + ReOpenNoDelete(); + } else if (i % 37 == 0) { + // close + ReOpenNoDelete(); + } + } + + // commit old txn + txn = db->GetTransactionByName("bob"); + ASSERT_TRUE(txn); + s = txn->Commit(); + ASSERT_OK(s); + + // verify data txn data + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + // verify non txn data + for (int i = 0; i < 1000; i++) { + std::string key(i, 'k'); + std::string val(1000, 'v'); + s = db->Get(read_options, key, &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, val); + } + + delete txn; +} + TEST_F(TransactionTest, TwoPhaseSequenceTest) { WriteOptions write_options; write_options.sync = true; @@ -679,7 +742,7 @@ TEST_F(TransactionTest, TwoPhaseSequenceTest) { delete txn; // kill and reopen - env_->SetFilesystemActive(false); + env->SetFilesystemActive(false); ReOpenNoDelete(); // value is now available @@ -714,7 +777,7 @@ TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) { delete txn; // kill and reopen - env_->SetFilesystemActive(false); + env->SetFilesystemActive(false); ReOpenNoDelete(); // commit old txn @@ -744,7 +807,7 @@ TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) { delete txn; // kill and reopen - env_->SetFilesystemActive(false); + env->SetFilesystemActive(false); ReOpenNoDelete(); // value is now available