Long outstanding prepare test

Summary: This tests that a prepared transaction is not lost after several crashes, restarts, and memtable flushes.

Test Plan: TwoPhaseLongPrepareTest

Reviewers: sdong

Subscribers: hermanlee4, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D58185
main
Reid Horuff 9 years ago
parent 2ead115116
commit a6254f2bd4
  1. 8
      db/db_impl.h
  2. 9
      utilities/transactions/transaction_db_impl.cc
  3. 2
      utilities/transactions/transaction_db_impl.h
  4. 12
      utilities/transactions/transaction_impl.cc
  5. 83
      utilities/transactions/transaction_test.cc

@ -476,6 +476,14 @@ class DBImpl : public DB {
delete trx; delete trx;
} }
void DeleteAllRecoveredTransactions() {
for (auto it = recovered_transactions_.begin();
it != recovered_transactions_.end(); it++) {
delete it->second;
}
recovered_transactions_.clear();
}
void MarkLogAsHavingPrepSectionFlushed(uint64_t log); void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log); void MarkLogAsContainingPrepSection(uint64_t log);

@ -33,6 +33,12 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
assert(db_impl_ != nullptr); assert(db_impl_ != nullptr);
} }
TransactionDBImpl::~TransactionDBImpl() {
while (!transactions_.empty()) {
delete transactions_.begin()->second;
}
}
Transaction* TransactionDBImpl::BeginTransaction( Transaction* TransactionDBImpl::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options, const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) { Transaction* old_txn) {
@ -158,6 +164,9 @@ Status TransactionDB::Open(
break; break;
} }
} }
if (s.ok()) {
dbimpl->DeleteAllRecoveredTransactions();
}
} }
return s; return s;

@ -25,7 +25,7 @@ class TransactionDBImpl : public TransactionDB {
explicit TransactionDBImpl(DB* db, explicit TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options); const TransactionDBOptions& txn_db_options);
~TransactionDBImpl() {} ~TransactionDBImpl();
Transaction* BeginTransaction(const WriteOptions& write_options, Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options, const TransactionOptions& txn_options,

@ -86,12 +86,6 @@ TransactionImpl::~TransactionImpl() {
if (!name_.empty() && exec_status_ != COMMITED) { if (!name_.empty() && exec_status_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
} }
// if we have a prep section that was never committed
// and we are releasing the transaction then we
// can release that prep section
if (log_number_ != 0 && exec_status_ != COMMITED) {
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
}
} }
void TransactionImpl::Clear() { void TransactionImpl::Clear() {
@ -105,12 +99,6 @@ void TransactionImpl::Reinitialize(TransactionDB* txn_db,
if (!name_.empty() && exec_status_ != COMMITED) { if (!name_.empty() && exec_status_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
} }
// if we have a prep section that was never committed
// and we are releasing the transaction then we
// can release that prep section
if (log_number_ != 0 && exec_status_ != COMMITED) {
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
}
TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
Initialize(txn_options); Initialize(txn_options);
} }

@ -31,7 +31,7 @@ namespace rocksdb {
class TransactionTest : public testing::Test { class TransactionTest : public testing::Test {
public: public:
TransactionDB* db; TransactionDB* db;
FaultInjectionTestEnv* env_; FaultInjectionTestEnv* env;
string dbname; string dbname;
Options options; Options options;
@ -43,8 +43,8 @@ class TransactionTest : public testing::Test {
options.write_buffer_size = 4 * 1024; options.write_buffer_size = 4 * 1024;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
env_ = new FaultInjectionTestEnv(Env::Default()); env = new FaultInjectionTestEnv(Env::Default());
options.env = env_; options.env = env;
dbname = test::TmpDir() + "/transaction_testdb"; dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options); DestroyDB(dbname, options);
@ -57,15 +57,15 @@ class TransactionTest : public testing::Test {
~TransactionTest() { ~TransactionTest() {
delete db; delete db;
DestroyDB(dbname, options); DestroyDB(dbname, options);
delete env_; delete env;
} }
Status ReOpenNoDelete() { Status ReOpenNoDelete() {
delete db; delete db;
db = nullptr; db = nullptr;
env_->AssertNoOpenFile(); env->AssertNoOpenFile();
env_->DropUnsyncedFileData(); env->DropUnsyncedFileData();
env_->ResetState(); env->ResetState();
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
return s; return s;
} }
@ -643,6 +643,69 @@ TEST_F(TransactionTest, TwoPhaseMultiThreadTest) {
} }
} }
TEST_F(TransactionTest, TwoPhaseLongPrepareTest) {
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = false;
ReadOptions read_options;
TransactionOptions txn_options;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("bob");
ASSERT_OK(s);
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
// prepare
s = txn->Prepare();
ASSERT_OK(s);
delete txn;
for (int i = 0; i < 1000; i++) {
std::string key(i, 'k');
std::string val(1000, 'v');
s = db->Put(write_options, key, val);
ASSERT_OK(s);
if (i % 29 == 0) {
// crash
env->SetFilesystemActive(false);
ReOpenNoDelete();
} else if (i % 37 == 0) {
// close
ReOpenNoDelete();
}
}
// commit old txn
txn = db->GetTransactionByName("bob");
ASSERT_TRUE(txn);
s = txn->Commit();
ASSERT_OK(s);
// verify data txn data
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(value, "bar");
// verify non txn data
for (int i = 0; i < 1000; i++) {
std::string key(i, 'k');
std::string val(1000, 'v');
s = db->Get(read_options, key, &value);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(value, val);
}
delete txn;
}
TEST_F(TransactionTest, TwoPhaseSequenceTest) { TEST_F(TransactionTest, TwoPhaseSequenceTest) {
WriteOptions write_options; WriteOptions write_options;
write_options.sync = true; write_options.sync = true;
@ -679,7 +742,7 @@ TEST_F(TransactionTest, TwoPhaseSequenceTest) {
delete txn; delete txn;
// kill and reopen // kill and reopen
env_->SetFilesystemActive(false); env->SetFilesystemActive(false);
ReOpenNoDelete(); ReOpenNoDelete();
// value is now available // value is now available
@ -714,7 +777,7 @@ TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) {
delete txn; delete txn;
// kill and reopen // kill and reopen
env_->SetFilesystemActive(false); env->SetFilesystemActive(false);
ReOpenNoDelete(); ReOpenNoDelete();
// commit old txn // commit old txn
@ -744,7 +807,7 @@ TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) {
delete txn; delete txn;
// kill and reopen // kill and reopen
env_->SetFilesystemActive(false); env->SetFilesystemActive(false);
ReOpenNoDelete(); ReOpenNoDelete();
// value is now available // value is now available

Loading…
Cancel
Save