diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index fe5fd1b3a..c2bd61379 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -329,13 +329,13 @@ Status WriteCommittedTxn::CommitInternal() { return s; } -Status WriteCommittedTxn::Rollback() { +Status PessimisticTransaction::Rollback() { Status s; if (txn_state_ == PREPARED) { - WriteBatch rollback_marker; - WriteBatchInternal::MarkRollback(&rollback_marker, name_); txn_state_.store(AWAITING_ROLLBACK); - s = db_impl_->WriteImpl(write_options_, &rollback_marker); + + s = RollbackInternal(); + if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); @@ -356,6 +356,13 @@ Status WriteCommittedTxn::Rollback() { return s; } +Status WriteCommittedTxn::RollbackInternal() { + WriteBatch rollback_marker; + WriteBatchInternal::MarkRollback(&rollback_marker, name_); + auto s = db_impl_->WriteImpl(write_options_, &rollback_marker); + return s; +} + Status PessimisticTransaction::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 057eaf88a..95045c04b 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -54,7 +54,7 @@ class PessimisticTransaction : public TransactionBaseImpl { // transactions writes to an internal write batch. Status CommitBatch(WriteBatch* batch); - Status Rollback() override = 0; + Status Rollback() override; Status RollbackToSavePoint() override; @@ -121,6 +121,8 @@ class PessimisticTransaction : public TransactionBaseImpl { virtual Status CommitInternal() = 0; + virtual Status RollbackInternal() = 0; + void Initialize(const TransactionOptions& txn_options); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); @@ -191,8 +193,6 @@ class WriteCommittedTxn : public PessimisticTransaction { virtual ~WriteCommittedTxn() {} - Status Rollback() override; - private: Status PrepareInternal() override; @@ -202,6 +202,8 @@ class WriteCommittedTxn : public PessimisticTransaction { Status CommitInternal() override; + Status RollbackInternal() override; + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 301c5d5b9..38191972d 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -655,6 +655,34 @@ void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); } +void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, + uint64_t rollback_seq) { + ROCKS_LOG_DEBUG( + info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", + prep_seq, rollback_seq); + std::vector snapshots = + GetSnapshotListFromDB(kMaxSequenceNumber); + // TODO(myabandeh): currently we are assuming that there is no snapshot taken + // when a transaciton is rolled back. This is the case the way MySQL does + // rollback which is after recovery. We should extend it to be able to + // rollback txns that overlap with exsiting snapshots. + assert(snapshots.size() == 0); + if (snapshots.size()) { + throw std::runtime_error( + "Rollback reqeust while there are live snapshots."); + } + WriteLock wl(&prepared_mutex_); + prepared_txns_.erase(prep_seq); + bool was_empty = delayed_prepared_.empty(); + if (!was_empty) { + delayed_prepared_.erase(prep_seq); + bool is_empty = delayed_prepared_.empty(); + if (was_empty != is_empty) { + delayed_prepared_empty_.store(is_empty, std::memory_order_release); + } + } +} + void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq) { ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 4a75990b2..f0eb87857 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -210,6 +210,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); // Add the trasnaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); + // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq + // with which the additional data is written to cancel the txn effect. It can + // be used to idenitfy the snapshots that overlap with the rolled back txn. + void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); // Add the transaction with prepare sequence prepare_seq and commit sequence // commit_seq to the commit map void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); @@ -306,6 +310,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; + friend class WritePreparedTransactionTest_RollbackTest_Test; void init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index fb0ff7a8e..8558e7a13 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1109,6 +1109,134 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } } +void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, + Slice key) { + Status s; + PinnableSlice v; + ReadOptions roptions; + s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); + ASSERT_TRUE(exp_s == s); + ASSERT_TRUE(s.ok() || s.IsNotFound()); + if (s.ok()) { + ASSERT_TRUE(exp_v == v); + } +} + +TEST_P(WritePreparedTransactionTest, RollbackTest) { + ReadOptions roptions; + WriteOptions woptions; + TransactionOptions txn_options; + const size_t num_keys = 4; + const size_t num_values = 5; + for (size_t ikey = 1; ikey <= num_keys; ikey++) { + for (size_t ivalue = 0; ivalue < num_values; ivalue++) { + for (bool crash : {false, true}) { + ReOpen(); + WritePreparedTxnDB* wp_db = dynamic_cast(db); + std::string key_str = "key" + ToString(ikey); + switch (ivalue) { + case 0: + break; + case 1: + ASSERT_OK(db->Put(woptions, key_str, "initvalue1")); + break; + case 2: + ASSERT_OK(db->Merge(woptions, key_str, "initvalue2")); + break; + case 3: + ASSERT_OK(db->Delete(woptions, key_str)); + break; + case 4: + ASSERT_OK(db->SingleDelete(woptions, key_str)); + break; + default: + assert(0); + } + + PinnableSlice v1; + auto s1 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1); + PinnableSlice v2; + auto s2 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2); + PinnableSlice v3; + auto s3 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3); + PinnableSlice v4; + auto s4 = + db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4); + Transaction* txn = db->BeginTransaction(woptions, txn_options); + auto s = txn->SetName("xid0"); + ASSERT_OK(s); + s = txn->Put(Slice("key1"), Slice("value1")); + ASSERT_OK(s); + s = txn->Merge(Slice("key2"), Slice("value2")); + ASSERT_OK(s); + s = txn->Delete(Slice("key3")); + ASSERT_OK(s); + s = txn->SingleDelete(Slice("key4")); + ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); + + { + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_FALSE(wp_db->prepared_txns_.empty()); + ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + + if (crash) { + // TODO(myabandeh): replace it with true crash (commented lines below) + // after compaction PR is landed. + auto db_impl = reinterpret_cast(db->GetRootDB()); + auto seq = db_impl->GetLatestSequenceNumber(); + wp_db = dynamic_cast(db); + SequenceNumber prev_max = wp_db->max_evicted_seq_; + wp_db->AdvanceMaxEvictedSeq(prev_max, seq); + // delete txn; + // auto db_impl = reinterpret_cast(db->GetRootDB()); + // db_impl->FlushWAL(true); + // ReOpenNoDelete(); + // wp_db = dynamic_cast(db); + // txn = db->GetTransactionByName("xid0"); + // ASSERT_FALSE(wp_db->delayed_prepared_empty_); + // ReadLock rl(&wp_db->prepared_mutex_); + // ASSERT_TRUE(wp_db->prepared_txns_.empty()); + // ASSERT_FALSE(wp_db->delayed_prepared_.empty()); + // ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != + // wp_db->delayed_prepared_.end()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + + s = txn->Rollback(); + ASSERT_OK(s); + + { + ASSERT_TRUE(wp_db->delayed_prepared_empty_); + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + } + + ASSERT_SAME(db, s1, v1, "key1"); + ASSERT_SAME(db, s2, v2, "key2"); + ASSERT_SAME(db, s3, v3, "key3"); + ASSERT_SAME(db, s4, v4, "key4"); + delete txn; + } + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 4b2e918e4..86bc6fc62 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -63,7 +63,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { - // In the absenese of Prepare markers, use Noop as a batch separator + // In the absense of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool disable_memtable = true; const uint64_t no_log_ref = 0; @@ -108,10 +108,89 @@ Status WritePreparedTxn::CommitInternal() { return s; } -Status WritePreparedTxn::Rollback() { - // TODO(myabandeh) Implement this - throw std::runtime_error("Rollback not Implemented"); - return Status::OK(); +Status WritePreparedTxn::RollbackInternal() { + WriteBatch rollback_batch; + assert(GetId() != kMaxSequenceNumber); + assert(GetId() > 0); + // In the absense of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&rollback_batch); + // In WritePrepared, the txn is is the same as prepare seq + auto last_visible_txn = GetId() - 1; + struct RollbackWriteBatchBuilder : public WriteBatch::Handler { + DBImpl* db_; + ReadOptions roptions; + WritePreparedTxnReadCallback callback; + WriteBatch* rollback_batch_; + RollbackWriteBatchBuilder(DBImpl* db, WritePreparedTxnDB* wpt_db, + SequenceNumber snap_seq, WriteBatch* dst_batch) + : db_(db), callback(wpt_db, snap_seq), rollback_batch_(dst_batch) {} + + Status Rollback(uint32_t cf, const Slice& key) { + PinnableSlice pinnable_val; + bool not_used; + auto cf_handle = db_->GetColumnFamilyHandle(cf); + auto s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + assert(s.ok() || s.IsNotFound()); + if (s.ok()) { + s = rollback_batch_->Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + // There has been no readable value before txn. By adding a delete we + // make sure that there will be none afterwards either. + s = rollback_batch_->Delete(cf_handle, key); + assert(s.ok()); + } else { + // Unexpected status. Return it to the user. + } + return s; + } + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Rollback(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Rollback(cf, key); + } + + Status MarkNoop(bool) override { return Status::OK(); } + Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); + assert(s.ok()); + if (!s.ok()) { + return s; + } + WriteBatchInternal::MarkRollback(&rollback_batch, name_); + const bool disable_memtable = true; + const uint64_t no_log_ref = 0; + uint64_t seq_used = kMaxSequenceNumber; + s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, + no_log_ref, !disable_memtable, &seq_used); + assert(seq_used != kMaxSequenceNumber); + uint64_t& prepare_seq = seq_used; + uint64_t& commit_seq = seq_used; + // TODO(myabandeh): skip AddPrepared + wpt_db_->AddPrepared(prepare_seq); + wpt_db_->AddCommitted(prepare_seq, commit_seq); + // Mark the txn as rolled back + wpt_db_->RollbackPrepared(GetId(), commit_seq); + + return s; } } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 131a27575..0ae9887c3 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -50,8 +50,6 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - Status Rollback() override; - private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; @@ -63,6 +61,8 @@ class WritePreparedTxn : public PessimisticTransaction { Status CommitInternal() override; + Status RollbackInternal() override; + // TODO(myabandeh): verify that the current impl work with values being // written with prepare sequence number too. // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice&