From d27258d3a6865f34527e633ff0ac60d888706dff Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 2 Oct 2017 19:46:42 -0700 Subject: [PATCH] WritePrepared Txn: Rollback Summary: Implement the rollback of WritePrepared txns. For each modified value, it reads the value before the txn and write it back. This would cancel out the effect of transaction. It also remove the rolled back txn from prepared heap. Closes https://github.com/facebook/rocksdb/pull/2946 Differential Revision: D5937575 Pulled By: maysamyabandeh fbshipit-source-id: a6d3c47f44db3729f44b287a80f97d08dc4e888d --- .../transactions/pessimistic_transaction.cc | 15 +- .../transactions/pessimistic_transaction.h | 8 +- .../pessimistic_transaction_db.cc | 28 ++++ .../transactions/pessimistic_transaction_db.h | 5 + .../write_prepared_transaction_test.cc | 128 ++++++++++++++++++ utilities/transactions/write_prepared_txn.cc | 89 +++++++++++- utilities/transactions/write_prepared_txn.h | 4 +- 7 files changed, 263 insertions(+), 14 deletions(-) diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index fe5fd1b3a..c2bd61379 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -329,13 +329,13 @@ Status WriteCommittedTxn::CommitInternal() { return s; } -Status WriteCommittedTxn::Rollback() { +Status PessimisticTransaction::Rollback() { Status s; if (txn_state_ == PREPARED) { - WriteBatch rollback_marker; - WriteBatchInternal::MarkRollback(&rollback_marker, name_); txn_state_.store(AWAITING_ROLLBACK); - s = db_impl_->WriteImpl(write_options_, &rollback_marker); + + s = RollbackInternal(); + if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); @@ -356,6 +356,13 @@ Status WriteCommittedTxn::Rollback() { return s; } +Status WriteCommittedTxn::RollbackInternal() { + WriteBatch rollback_marker; + WriteBatchInternal::MarkRollback(&rollback_marker, name_); + auto s = db_impl_->WriteImpl(write_options_, &rollback_marker); + return s; +} + Status PessimisticTransaction::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 057eaf88a..95045c04b 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -54,7 +54,7 @@ class PessimisticTransaction : public TransactionBaseImpl { // transactions writes to an internal write batch. Status CommitBatch(WriteBatch* batch); - Status Rollback() override = 0; + Status Rollback() override; Status RollbackToSavePoint() override; @@ -121,6 +121,8 @@ class PessimisticTransaction : public TransactionBaseImpl { virtual Status CommitInternal() = 0; + virtual Status RollbackInternal() = 0; + void Initialize(const TransactionOptions& txn_options); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); @@ -191,8 +193,6 @@ class WriteCommittedTxn : public PessimisticTransaction { virtual ~WriteCommittedTxn() {} - Status Rollback() override; - private: Status PrepareInternal() override; @@ -202,6 +202,8 @@ class WriteCommittedTxn : public PessimisticTransaction { Status CommitInternal() override; + Status RollbackInternal() override; + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 301c5d5b9..38191972d 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -655,6 +655,34 @@ void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); } +void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, + uint64_t rollback_seq) { + ROCKS_LOG_DEBUG( + info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", + prep_seq, rollback_seq); + std::vector snapshots = + GetSnapshotListFromDB(kMaxSequenceNumber); + // TODO(myabandeh): currently we are assuming that there is no snapshot taken + // when a transaciton is rolled back. This is the case the way MySQL does + // rollback which is after recovery. We should extend it to be able to + // rollback txns that overlap with exsiting snapshots. + assert(snapshots.size() == 0); + if (snapshots.size()) { + throw std::runtime_error( + "Rollback reqeust while there are live snapshots."); + } + WriteLock wl(&prepared_mutex_); + prepared_txns_.erase(prep_seq); + bool was_empty = delayed_prepared_.empty(); + if (!was_empty) { + delayed_prepared_.erase(prep_seq); + bool is_empty = delayed_prepared_.empty(); + if (was_empty != is_empty) { + delayed_prepared_empty_.store(is_empty, std::memory_order_release); + } + } +} + void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq) { ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 4a75990b2..f0eb87857 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -210,6 +210,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); // Add the trasnaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); + // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq + // with which the additional data is written to cancel the txn effect. It can + // be used to idenitfy the snapshots that overlap with the rolled back txn. + void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); // Add the transaction with prepare sequence prepare_seq and commit sequence // commit_seq to the commit map void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); @@ -306,6 +310,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; + friend class WritePreparedTransactionTest_RollbackTest_Test; void init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index fb0ff7a8e..8558e7a13 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1109,6 +1109,134 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } } +void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, + Slice key) { + Status s; + PinnableSlice v; + ReadOptions roptions; + s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); + ASSERT_TRUE(exp_s == s); + ASSERT_TRUE(s.ok() || s.IsNotFound()); + if (s.ok()) { + ASSERT_TRUE(exp_v == v); + } +} + +TEST_P(WritePreparedTransactionTest, RollbackTest) { + ReadOptions roptions; + WriteOptions woptions; + TransactionOptions txn_options; + const size_t num_keys = 4; + const size_t num_values = 5; + for (size_t ikey = 1; ikey <= num_keys; ikey++) { + for (size_t ivalue = 0; ivalue < num_values; ivalue++) { + for (bool crash : {false, true}) { + ReOpen(); + WritePreparedTxnDB* wp_db = dynamic_cast(db); + std::string key_str = "key" + ToString(ikey); + switch (ivalue) { + case 0: + break; + case 1: + ASSERT_OK(db->Put(woptions, key_str, "initvalue1")); + break; + case 2: + ASSERT_OK(db->Merge(woptions, key_str, "initvalue2")); + break; + case 3: + ASSERT_OK(db->Delete(woptions, key_str)); + break; + case 4: + ASSERT_OK(db->SingleDelete(woptions, key_str)); + break; + default: + assert(0); + } + + PinnableSlice v1; + auto s1 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1); + PinnableSlice v2; + auto s2 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2); + PinnableSlice v3; + auto s3 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3); + PinnableSlice v4; + auto s4 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4); + Transaction* txn = db->BeginTransaction(woptions, txn_options); + auto s = txn->SetName("xid0"); + ASSERT_OK(s); + s = txn->Put(Slice("key1"), Slice("value1")); + ASSERT_OK(s); + s = txn->Merge(Slice("key2"), Slice("value2")); + ASSERT_OK(s); + s = txn->Delete(Slice("key3")); + ASSERT_OK(s); + s = txn->SingleDelete(Slice("key4")); + ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); + + { + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_FALSE(wp_db->prepared_txns_.empty()); + ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + + if (crash) { + // TODO(myabandeh): replace it with true crash (commented lines below) + // after compaction PR is landed. + auto db_impl = reinterpret_cast(db->GetRootDB()); + auto seq = db_impl->GetLatestSequenceNumber(); + wp_db = dynamic_cast(db); + SequenceNumber prev_max = wp_db->max_evicted_seq_; + wp_db->AdvanceMaxEvictedSeq(prev_max, seq); + // delete txn; + // auto db_impl = reinterpret_cast(db->GetRootDB()); + // db_impl->FlushWAL(true); + // ReOpenNoDelete(); + // wp_db = dynamic_cast(db); + // txn = db->GetTransactionByName("xid0"); + // ASSERT_FALSE(wp_db->delayed_prepared_empty_); + // ReadLock rl(&wp_db->prepared_mutex_); + // ASSERT_TRUE(wp_db->prepared_txns_.empty()); + // ASSERT_FALSE(wp_db->delayed_prepared_.empty()); + // ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != + // wp_db->delayed_prepared_.end()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + + s = txn->Rollback(); + ASSERT_OK(s); + + { + ASSERT_TRUE(wp_db->delayed_prepared_empty_); + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + delete txn; + } + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 4b2e918e4..86bc6fc62 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -63,7 +63,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { - // In the absenese of Prepare markers, use Noop as a batch separator + // In the absense of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool disable_memtable = true; const uint64_t no_log_ref = 0; @@ -108,10 +108,89 @@ Status WritePreparedTxn::CommitInternal() { return s; } -Status WritePreparedTxn::Rollback() { - // TODO(myabandeh) Implement this - throw std::runtime_error("Rollback not Implemented"); - return Status::OK(); +Status WritePreparedTxn::RollbackInternal() { + WriteBatch rollback_batch; + assert(GetId() != kMaxSequenceNumber); + assert(GetId() > 0); + // In the absense of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&rollback_batch); + // In WritePrepared, the txn is is the same as prepare seq + auto last_visible_txn = GetId() - 1; + struct RollbackWriteBatchBuilder : public WriteBatch::Handler { + DBImpl* db_; + ReadOptions roptions; + WritePreparedTxnReadCallback callback; + WriteBatch* rollback_batch_; + RollbackWriteBatchBuilder(DBImpl* db, WritePreparedTxnDB* wpt_db, + SequenceNumber snap_seq, WriteBatch* dst_batch) + : db_(db), callback(wpt_db, snap_seq), rollback_batch_(dst_batch) {} + + Status Rollback(uint32_t cf, const Slice& key) { + PinnableSlice pinnable_val; + bool not_used; + auto cf_handle = db_->GetColumnFamilyHandle(cf); + auto s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + assert(s.ok() || s.IsNotFound()); + if (s.ok()) { + s = rollback_batch_->Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + // There has been no readable value before txn. By adding a delete we + // make sure that there will be none afterwards either. + s = rollback_batch_->Delete(cf_handle, key); + assert(s.ok()); + } else { + // Unexpected status. Return it to the user. + } + return s; + } + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Rollback(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Rollback(cf, key); + } + + Status MarkNoop(bool) override { return Status::OK(); } + Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); + assert(s.ok()); + if (!s.ok()) { + return s; + } + WriteBatchInternal::MarkRollback(&rollback_batch, name_); + const bool disable_memtable = true; + const uint64_t no_log_ref = 0; + uint64_t seq_used = kMaxSequenceNumber; + s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, + no_log_ref, !disable_memtable, &seq_used); + assert(seq_used != kMaxSequenceNumber); + uint64_t& prepare_seq = seq_used; + uint64_t& commit_seq = seq_used; + // TODO(myabandeh): skip AddPrepared + wpt_db_->AddPrepared(prepare_seq); + wpt_db_->AddCommitted(prepare_seq, commit_seq); + // Mark the txn as rolled back + wpt_db_->RollbackPrepared(GetId(), commit_seq); + + return s; } } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 131a27575..0ae9887c3 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -50,8 +50,6 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - Status Rollback() override; - private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; @@ -63,6 +61,8 @@ class WritePreparedTxn : public PessimisticTransaction { Status CommitInternal() override; + Status RollbackInternal() override; + // TODO(myabandeh): verify that the current impl work with values being // written with prepare sequence number too. // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice&