diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 0fa7e98e6..5d04d61de 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -87,10 +87,7 @@ Status WritePreparedTxn::PrepareInternal() { !WRITE_AFTER_COMMIT); // For each duplicate key we account for a new sub-batch prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); - // AddPrepared better to be called in the pre-release callback otherwise there - // is a non-zero chance of max advancing prepare_seq and readers assume the - // data as committed. - // Also having it in the PreReleaseCallback allows in-order addition of + // Having AddPrepared in the PreReleaseCallback allows in-order addition of // prepared entries to PrepareHeap and hence enables an optimization. Refer to // SmallestUnCommittedSeq for more details. AddPreparedCallback add_prepared_callback( @@ -151,14 +148,18 @@ Status WritePreparedTxn::CommitInternal() { const bool disable_memtable = !includes_data; const bool do_one_write = !db_impl_->immutable_db_options().two_write_queues || disable_memtable; - const bool publish_seq = do_one_write; - // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to - // DB in one shot. min_uncommitted still works since it requires capturing - // data that is written to DB but not yet committed, while - // CommitTimeWriteBatch commits with PreReleaseCallback. WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, - publish_seq); + wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); + // This is to call AddPrepared on CommitTimeWriteBatch + AddPreparedCallback add_prepared_callback( + wpt_db_, commit_batch_cnt, + db_impl_->immutable_db_options().two_write_queues); + PreReleaseCallback* pre_release_callback; + if (do_one_write) { + pre_release_callback = &update_commit_map; + } else { + pre_release_callback = &add_prepared_callback; + } uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to @@ -167,37 +168,29 @@ Status WritePreparedTxn::CommitInternal() { size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, zero_log_number, disable_memtable, &seq_used, - batch_cnt, &update_commit_map); + batch_cnt, pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); + const SequenceNumber commit_batch_seq = seq_used; if (LIKELY(do_one_write || !s.ok())) { if (LIKELY(s.ok())) { // Note RemovePrepared should be called after WriteImpl that publishsed // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); } + if (UNLIKELY(!do_one_write)) { + wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); + } return s; } // else do the 2nd write to publish seq // Note: the 2nd write comes with a performance penality. So if we have too // many of commits accompanied with ComitTimeWriteBatch and yet we cannot // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, // two_write_queues should be disabled to avoid many additional writes here. - class PublishSeqPreReleaseCallback : public PreReleaseCallback { - public: - explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) - : db_impl_(db_impl) {} - Status Callback(SequenceNumber seq, bool is_mem_disabled) override { -#ifdef NDEBUG - (void)is_mem_disabled; -#endif - assert(is_mem_disabled); - assert(db_impl_->immutable_db_options().two_write_queues); - db_impl_->SetLastPublishedSequence(seq); - return Status::OK(); - } - - private: - DBImpl* db_impl_; - } publish_seq_callback(db_impl_); + const size_t kZeroData = 0; + // Update commit map only from the 2nd queue + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch( + wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData, + commit_batch_seq, commit_batch_cnt); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator @@ -207,11 +200,12 @@ Status WritePreparedTxn::CommitInternal() { const uint64_t NO_REF_LOG = 0; s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, - &publish_seq_callback); + &update_commit_map_with_aux_batch); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Note RemovePrepared should be called after WriteImpl that publishsed the // seq. Otherwise SmallestUnCommittedSeq optimization breaks. wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); + wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); return s; } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 50245c6f9..7dcc8f6a8 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -321,12 +321,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { return false; } - // Add the transaction with prepare sequence seq to the prepared list + // Add the transaction with prepare sequence seq to the prepared list. + // Note: must be called serially with increasing seq on each call. void AddPrepared(uint64_t seq); // Remove the transaction with prepare sequence seq from the prepared list void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); - // Add the transaction with prepare sequence prepare_seq and comtit sequence + // Add the transaction with prepare sequence prepare_seq and commit sequence // commit_seq to the commit map. loop_cnt is to detect infinite loops. + // Note: must be called serially. void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, uint8_t loop_cnt = 0); @@ -752,6 +754,7 @@ class AddPreparedCallback : public PreReleaseCallback { #ifdef NDEBUG (void)is_mem_disabled; #endif + // Always Prepare from the main queue assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue for (size_t i = 0; i < sub_batch_cnt_; i++) { db_->AddPrepared(prepare_seq + i); @@ -769,21 +772,22 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { public: // includes_data indicates that the commit also writes non-empty // CommitTimeWriteBatch to memtable, which needs to be committed separately. - WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, - DBImpl* db_impl, - SequenceNumber prep_seq, - size_t prep_batch_cnt, - size_t data_batch_cnt = 0, - bool publish_seq = true) + WritePreparedCommitEntryPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, + size_t prep_batch_cnt, size_t data_batch_cnt = 0, + SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), prep_batch_cnt_(prep_batch_cnt), data_batch_cnt_(data_batch_cnt), includes_data_(data_batch_cnt_ > 0), - publish_seq_(publish_seq) { + aux_seq_(aux_seq), + aux_batch_cnt_(aux_batch_cnt), + includes_aux_batch_(aux_batch_cnt > 0) { assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); + assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor } virtual Status Callback(SequenceNumber commit_seq, @@ -791,7 +795,12 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { #ifdef NDEBUG (void)is_mem_disabled; #endif + // Always commit from the 2nd queue + assert(!db_impl_->immutable_db_options().two_write_queues || + is_mem_disabled); assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); + // Data batch is what accompanied with the commit marker and affects the + // last seq in the commit batch. const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) ? commit_seq : commit_seq + data_batch_cnt_ - 1; @@ -800,6 +809,11 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { db_->AddCommitted(prep_seq_ + i, last_commit_seq); } } // else there was no prepare phase + if (includes_aux_batch_) { + for (size_t i = 0; i < aux_batch_cnt_; i++) { + db_->AddCommitted(aux_seq_ + i, last_commit_seq); + } + } if (includes_data_) { assert(data_batch_cnt_); // Commit the data that is accompanied with the commit request @@ -810,7 +824,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { db_->AddCommitted(commit_seq + i, last_commit_seq); } } - if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { + if (db_impl_->immutable_db_options().two_write_queues) { assert(is_mem_disabled); // implies the 2nd queue // Publish the sequence number. We can do that here assuming the callback // is invoked only from one write queue, which would guarantee that the @@ -830,11 +844,16 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq_; size_t prep_batch_cnt_; size_t data_batch_cnt_; - // Either because it is commit without prepare or it has a - // CommitTimeWriteBatch + // Data here is the batch that is written with the commit marker, either + // because it is commit without prepare or commit has a CommitTimeWriteBatch. bool includes_data_; - // Should the callback also publishes the commit seq number - bool publish_seq_; + // Auxiliary batch (if there is any) is a batch that is written before, but + // gets the same commit seq as prepare batch or data batch. This is used in + // two write queues where the CommitTimeWriteBatch becomes the aux batch and + // we do a separate write to actually commit everything. + SequenceNumber aux_seq_; + size_t aux_batch_cnt_; + bool includes_aux_batch_; }; // For two_write_queues commit both the aborted batch and the cleanup batch and @@ -858,7 +877,9 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override { + // Always commit from the 2nd queue assert(is_mem_disabled); // implies the 2nd queue + assert(db_impl_->immutable_db_options().two_write_queues); #ifdef NDEBUG (void)is_mem_disabled; #endif