diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index cfba5a7a0..90c55df66 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -698,8 +698,10 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { wp_db->TakeSnapshot(snap_seq1); auto commit_seq = ++seq; wp_db->AddCommitted(prep_seq, commit_seq); + wp_db->RemovePrepared(prep_seq); auto commit_seq2 = ++seq; wp_db->AddCommitted(prep_seq2, commit_seq2); + wp_db->RemovePrepared(prep_seq2); // Take the 2nd and 3rd snapshot that overlap with the same txn prep_seq = ++seq; wp_db->AddPrepared(prep_seq); @@ -711,12 +713,14 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { seq++; commit_seq = ++seq; wp_db->AddCommitted(prep_seq, commit_seq); + wp_db->RemovePrepared(prep_seq); // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the // only item in the commit_cache_ via another commit. prep_seq = ++seq; wp_db->AddPrepared(prep_seq); commit_seq = ++seq; wp_db->AddCommitted(prep_seq, commit_seq); + wp_db->RemovePrepared(prep_seq); // Verify that the evicted commit entries for all snapshots are in the // old_commit_map_ @@ -1367,6 +1371,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } else { // else commit it seq++; wp_db->AddCommitted(cur_txn, seq); + wp_db->RemovePrepared(cur_txn); commit_seqs.insert(seq); if (!snapshot) { committed_before.insert(cur_txn); @@ -1416,9 +1421,11 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // they are committed. if (cur_txn) { wp_db->AddCommitted(cur_txn, seq); + wp_db->RemovePrepared(cur_txn); } for (auto p : prepared) { wp_db->AddCommitted(p, seq); + wp_db->RemovePrepared(p); } ASSERT_TRUE(wp_db->delayed_prepared_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty()); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index b7880eed0..77e5b6126 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -138,7 +138,6 @@ Status WritePreparedTxn::CommitInternal() { assert(s.ok()); commit_batch_cnt = counter.BatchCount(); } - const bool PREP_HEAP_SKIPPED = true; const bool disable_memtable = !includes_data; const bool do_one_write = !db_impl_->immutable_db_options().two_write_queues || disable_memtable; @@ -149,7 +148,7 @@ Status WritePreparedTxn::CommitInternal() { // CommitTimeWriteBatch commits with PreReleaseCallback. WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, - !PREP_HEAP_SKIPPED, publish_seq); + publish_seq); uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to @@ -161,6 +160,11 @@ Status WritePreparedTxn::CommitInternal() { batch_cnt, &update_commit_map); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (LIKELY(do_one_write || !s.ok())) { + if (LIKELY(s.ok())) { + // Note RemovePrepared should be called after WriteImpl that publishsed + // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. + wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); + } return s; } // else do the 2nd write to publish seq // Note: the 2nd write comes with a performance penality. So if we have too @@ -192,6 +196,9 @@ Status WritePreparedTxn::CommitInternal() { NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &publish_seq_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Note RemovePrepared should be called after WriteImpl that publishsed the + // seq. Otherwise SmallestUnCommittedSeq optimization breaks. + wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); return s; } @@ -324,10 +331,8 @@ Status WritePreparedTxn::RollbackInternal() { // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. const size_t ZERO_COMMITS = 0; - const bool PREP_HEAP_SKIPPED = true; WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS, - PREP_HEAP_SKIPPED); + wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator @@ -379,10 +384,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, } void WritePreparedTxn::SetSnapshot() { + // Note: for this optimization setting the last sequence number and obtaining + // the smallest uncommitted seq should be done atomically. However to avoid + // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the + // snapshot. Since we always updated the list of unprepared seq (via + // AddPrepared) AFTER the last sequence is updated, this guarantees that the + // smallest uncommited seq that we pair with the snapshot is smaller or equal + // the value that would be obtained otherwise atomically. That is ok since + // this optimization works as long as min_uncommitted is less than or equal + // than the smallest uncommitted seq when the snapshot was taken. + auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq(); const bool FOR_WW_CONFLICT_CHECK = true; SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK); assert(snapshot); - wpt_db_->EnhanceSnapshot(snapshot); + wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted); SetSnapshotInternal(snapshot); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 13acd0a27..3c3d1a7da 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -55,8 +55,7 @@ Status WritePreparedTxnDB::Initialize( virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override { assert(!is_mem_disabled); - const bool PREPARE_SKIPPED = true; - db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED); + db_->AddCommitted(commit_seq, commit_seq); return Status::OK(); } @@ -167,7 +166,7 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, no_log_ref, !DISABLE_MEMTABLE, &seq_used, batch_cnt, pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); - uint64_t& prepare_seq = seq_used; + uint64_t prepare_seq = seq_used; if (txn != nullptr) { txn->SetId(prepare_seq); } @@ -185,9 +184,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, // Commit the batch by writing an empty batch to the 2nd queue that will // release the commit sequence number to readers. const size_t ZERO_COMMITS = 0; - const bool PREP_HEAP_SKIPPED = true; WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED); + this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); const size_t ONE_BATCH = 1; @@ -197,6 +195,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Note RemovePrepared should be called after WriteImpl that publishsed the + // seq. Otherwise SmallestUnCommittedSeq optimization breaks. + RemovePrepared(prepare_seq, batch_cnt); return s; } @@ -392,24 +393,13 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, throw std::runtime_error( "Rollback reqeust while there are live snapshots."); } - WriteLock wl(&prepared_mutex_); - prepared_txns_.erase(prep_seq); - bool was_empty = delayed_prepared_.empty(); - if (!was_empty) { - delayed_prepared_.erase(prep_seq); - bool is_empty = delayed_prepared_.empty(); - if (was_empty != is_empty) { - delayed_prepared_empty_.store(is_empty, std::memory_order_release); - } - } + RemovePrepared(prep_seq); } void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, - bool prepare_skipped, uint8_t loop_cnt) { - ROCKS_LOG_DETAILS(info_log_, - "Txn %" PRIu64 " Committing with %" PRIu64 - "(prepare_skipped=%d)", - prepare_seq, commit_seq, prepare_skipped); + uint8_t loop_cnt) { + ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, + prepare_seq, commit_seq); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; @@ -443,23 +433,27 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, if (loop_cnt > 100) { throw std::runtime_error("Infinite loop in AddCommitted!"); } - AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt); + AddCommitted(prepare_seq, commit_seq, ++loop_cnt); return; } - if (!prepare_skipped) { - WriteLock wl(&prepared_mutex_); - prepared_txns_.erase(prepare_seq); + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); +} + +void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, + const size_t batch_cnt) { + WriteLock wl(&prepared_mutex_); + for (size_t i = 0; i < batch_cnt; i++) { + prepared_txns_.erase(prepare_seq + i); bool was_empty = delayed_prepared_.empty(); if (!was_empty) { - delayed_prepared_.erase(prepare_seq); + delayed_prepared_.erase(prepare_seq + i); bool is_empty = delayed_prepared_.empty(); if (was_empty != is_empty) { delayed_prepared_empty_.store(is_empty, std::memory_order_release); } } } - TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); - TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); } bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, @@ -542,10 +536,13 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, } const Snapshot* WritePreparedTxnDB::GetSnapshot() { + // Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer + // to WritePreparedTxn::SetSnapshot for more explanation. + auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); const bool FOR_WW_CONFLICT_CHECK = true; SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK); assert(snap_impl); - EnhanceSnapshot(snap_impl); + EnhanceSnapshot(snap_impl, min_uncommitted); return snap_impl; } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 70a62b03f..350dc9455 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -161,6 +161,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // should check delayed_prepared_ first before applying this optimization. // TODO(myabandeh): include delayed_prepared_ in min_uncommitted if (prep_seq < min_uncommitted) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 + " because of min_uncommitted %" PRIu64, + prep_seq, snapshot_seq, 1, min_uncommitted); return true; } auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; @@ -242,15 +247,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Add the transaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); + // Remove the transaction with prepare sequence seq from the prepared list + void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq // with which the additional data is written to cancel the txn effect. It can // be used to identify the snapshots that overlap with the rolled back txn. void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); // Add the transaction with prepare sequence prepare_seq and commit sequence - // commit_seq to the commit map. prepare_skipped is set if the prepare phase - // is skipped for this commit. loop_cnt is to detect infinite loops. + // commit_seq to the commit map. loop_cnt is to detect infinite loops. void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, - bool prepare_skipped = false, uint8_t loop_cnt = 0); + uint8_t loop_cnt = 0); struct CommitEntry { uint64_t prep_seq; @@ -492,9 +498,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { } } // Enhance the snapshot object by recording in it the smallest uncommitted seq - inline void EnhanceSnapshot(SnapshotImpl* snapshot) { + inline void EnhanceSnapshot(SnapshotImpl* snapshot, + SequenceNumber min_uncommitted) { assert(snapshot); - snapshot->min_uncommitted_ = WritePreparedTxnDB::SmallestUnCommittedSeq(); + snapshot->min_uncommitted_ = min_uncommitted; } virtual const std::vector GetSnapshotListFromDB( @@ -648,14 +655,12 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq, size_t prep_batch_cnt, size_t data_batch_cnt = 0, - bool prep_heap_skipped = false, bool publish_seq = true) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), prep_batch_cnt_(prep_batch_cnt), data_batch_cnt_(data_batch_cnt), - prep_heap_skipped_(prep_heap_skipped), includes_data_(data_batch_cnt_ > 0), publish_seq_(publish_seq) { assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor @@ -670,18 +675,17 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { : commit_seq + data_batch_cnt_ - 1; if (prep_seq_ != kMaxSequenceNumber) { for (size_t i = 0; i < prep_batch_cnt_; i++) { - db_->AddCommitted(prep_seq_ + i, last_commit_seq, prep_heap_skipped_); + db_->AddCommitted(prep_seq_ + i, last_commit_seq); } } // else there was no prepare phase if (includes_data_) { assert(data_batch_cnt_); // Commit the data that is accompanied with the commit request - const bool PREPARE_SKIPPED = true; for (size_t i = 0; i < data_batch_cnt_; i++) { // For commit seq of each batch use the commit seq of the last batch. // This would make debugging easier by having all the batches having // the same sequence number. - db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED); + db_->AddCommitted(commit_seq + i, last_commit_seq); } } if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { @@ -704,9 +708,6 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq_; size_t prep_batch_cnt_; size_t data_batch_cnt_; - // An optimization that indicates that there is no need to update the prepare - // heap since the prepare sequence number was not added to it. - bool prep_heap_skipped_; // Either because it is commit without prepare or it has a // CommitTimeWriteBatch bool includes_data_;