diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 01ec298cf..6f28c2fce 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -607,6 +607,8 @@ Status WriteUnpreparedTxn::CommitInternal() { // commit write batch as just another "unprepared" batch. This will also // update the unprep_seqs_ in the update_commit_map callback. unprep_seqs_[commit_batch_seq] = commit_batch_cnt; + WriteUnpreparedCommitEntryPreReleaseCallback + update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0); // Note: the 2nd write comes with a performance penality. So if we have too // many of commits accompanied with ComitTimeWriteBatch and yet we cannot @@ -623,7 +625,7 @@ Status WriteUnpreparedTxn::CommitInternal() { const uint64_t NO_REF_LOG = 0; s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, - &update_commit_map); + &update_commit_map_with_commit_batch); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Note RemovePrepared should be called after WriteImpl that publishsed the // seq. Otherwise SmallestUnCommittedSeq optimization breaks. @@ -704,6 +706,9 @@ Status WriteUnpreparedTxn::RollbackInternal() { // need to read our own writes when reading prior versions of the key for // rollback. WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); + // TODO(lth): We write rollback batch all in a single batch here, but this + // should be subdivded into multiple batches as well. In phase 2, when key + // sets are read from WAL, this will happen naturally. WriteRollbackKeys(GetTrackedKeys(), &rollback_batch, &callback, roptions); // The Rollback marker will be used as a batch separator @@ -712,26 +717,29 @@ Status WriteUnpreparedTxn::RollbackInternal() { const bool DISABLE_MEMTABLE = true; const uint64_t NO_REF_LOG = 0; uint64_t seq_used = kMaxSequenceNumber; - // TODO(lth): We write rollback batch all in a single batch here, but this - // should be subdivded into multiple batches as well. In phase 2, when key - // sets are read from WAL, this will happen naturally. - const size_t ONE_BATCH = 1; - // We commit the rolled back prepared batches. ALthough this is + // Rollback batch may contain duplicate keys, because tracked_keys_ is not + // comparator aware. + auto rollback_batch_cnt = rollback_batch.SubBatchCnt(); + // We commit the rolled back prepared batches. Although this is // counter-intuitive, i) it is safe to do so, since the prepared batches are // already canceled out by the rollback batch, ii) adding the commit entry to // CommitCache will allow us to benefit from the existing mechanism in // CommitCache that keeps an entry evicted due to max advance and yet overlaps // with a live snapshot around so that the live snapshot properly skips the // entry even if its prepare seq is lower than max_evicted_seq_. + // + // TODO(lth): RollbackInternal is conceptually very similar to + // CommitInternal, with the rollback batch simply taking on the role of + // CommitTimeWriteBatch. We should be able to merge the two code paths. WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH); + wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt); // Note: the rollback batch does not need AddPrepared since it is written to // DB in one shot. min_uncommitted still works since it requires capturing - // data that is written to DB but not yet committed, while the roolback + // data that is written to DB but not yet committed, while the rollback // batch commits with PreReleaseCallback. s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, - &seq_used, rollback_batch.SubBatchCnt(), + &seq_used, rollback_batch_cnt, do_one_write ? &update_commit_map : nullptr); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (!s.ok()) { @@ -746,21 +754,26 @@ Status WriteUnpreparedTxn::RollbackInternal() { unflushed_save_points_.reset(nullptr); return s; } // else do the 2nd write for commit + uint64_t& prepare_seq = seq_used; + // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the + // rollback write batch as just another "unprepared" batch. This will also + // update the unprep_seqs_ in the update_commit_map callback. + unprep_seqs_[prepare_seq] = rollback_batch_cnt; + WriteUnpreparedCommitEntryPreReleaseCallback + update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0); + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, "RollbackInternal 2nd write prepare_seq: %" PRIu64, prepare_seq); - // Commit the batch by writing an empty batch to the queue that will release - // the commit sequence number to readers. - WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, unprep_seqs_, prepare_seq); WriteBatch empty_batch; + const size_t ONE_BATCH = 1; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, - &update_commit_map_with_prepare); + &update_commit_map_with_rollback_batch); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back if (s.ok()) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 30c8f4c55..55730e15b 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -262,7 +262,6 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // value when calling RollbackToSavepoint. SequenceNumber largest_validated_seq_; - using KeySet = std::unordered_map>; struct SavePoint { // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is // used during RollbackToSavepoint to determine visibility when restoring @@ -333,6 +332,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // last savepoint. Also, it may make sense to merge this into tracked_keys_ // and differentiate between tracked but not locked keys to avoid having two // very similar data structures. + using KeySet = std::unordered_map>; KeySet untracked_keys_; }; diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index ad8e40f94..c40e96d49 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -104,45 +104,5 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { bool publish_seq_; }; -class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback { - // TODO(lth): Reduce code duplication with - // WritePreparedCommitEntryPreReleaseCallback - public: - WriteUnpreparedRollbackPreReleaseCallback( - WritePreparedTxnDB* db, DBImpl* db_impl, - const std::map& unprep_seqs, - SequenceNumber rollback_seq) - : db_(db), - db_impl_(db_impl), - unprep_seqs_(unprep_seqs), - rollback_seq_(rollback_seq) { - assert(unprep_seqs.size() > 0); - assert(db_impl_->immutable_db_options().two_write_queues); - } - - virtual Status Callback(SequenceNumber commit_seq, - bool is_mem_disabled __attribute__((__unused__)), - uint64_t, size_t /*index*/, - size_t /*total*/) override { - assert(is_mem_disabled); // implies the 2nd queue - const uint64_t last_commit_seq = commit_seq; - db_->AddCommitted(rollback_seq_, last_commit_seq); - // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt. - for (const auto& s : unprep_seqs_) { - for (size_t i = 0; i < s.second; i++) { - db_->AddCommitted(s.first + i, last_commit_seq); - } - } - db_impl_->SetLastPublishedSequence(last_commit_seq); - return Status::OK(); - } - - private: - WritePreparedTxnDB* db_; - DBImpl* db_impl_; - const std::map& unprep_seqs_; - SequenceNumber rollback_seq_; -}; - } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE