diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 6cede20fc..f84514ec1 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2929,6 +2929,81 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) { rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } +// Test when we add a prepared seq when the max_evicted_seq_ already goes beyond +// that. The test focuses on a race condition between AddPrepared and +// AdvanceMaxEvictedSeq functions. +TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) { + if (!options.two_write_queues) { + // This test is only for two write queues + return; + } + const size_t snapshot_cache_bits = 7; // same as default + // 1 entry to advance max after the 2nd commit + const size_t commit_cache_bits = 0; + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); + WritePreparedTxnDB* wp_db = dynamic_cast(db); + std::string some_value("value_some"); + std::string uncommitted_value("value_uncommitted"); + // Prepare two uncommitted transactions + Transaction* txn1 = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Put(Slice("key1"), some_value)); + ASSERT_OK(txn1->Prepare()); + Transaction* txn2 = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn2->SetName("xid2")); + ASSERT_OK(txn2->Put(Slice("key2"), some_value)); + ASSERT_OK(txn2->Prepare()); + // Start the txn here so the other thread could get its id + Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value)); + + // t1) Insert prepared entry, t2) commit other entires to advance max + // evicted sec and finish checking the existing prepared entires, t1) + // AddPrepared, t2) update max_evicted_seq_ + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"}, + {"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"}, + {"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::port::Thread write_thread([&]() { ASSERT_OK(txn->Prepare()); }); + + rocksdb::port::Thread read_thread([&]() { + TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start"); + // Publish seq number with a commit + ASSERT_OK(txn1->Commit()); + // Since the commit cache size is one the 2nd commit evict the 1st one and + // invokes AdcanceMaxEvictedSeq + ASSERT_OK(txn2->Commit()); + + ReadOptions roptions; + PinnableSlice value; + // The snapshot should not see the uncommitted value from write_thread + auto snap = db->GetSnapshot(); + ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); + // This is the scenario that we test for + ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId()); + roptions.snapshot = snap; + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value); + ASSERT_TRUE(s.IsNotFound()); + db->ReleaseSnapshot(snap); + }); + + read_thread.join(); + write_thread.join(); + delete txn1; + delete txn2; + ASSERT_OK(txn->Commit()); + delete txn; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // When an old prepared entry gets committed, there is a gap between the time // that it is published and when it is cleaned up from old_prepared_. This test // stresses such cases. diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 800fd5f6e..2430615c0 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -392,22 +392,49 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { new std::atomic[COMMIT_CACHE_SIZE] {}); } +void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) { + prepared_mutex_.AssertHeld(); + // When max_evicted_seq_ advances, move older entries from prepared_txns_ + // to delayed_prepared_. This guarantees that if a seq is lower than max, + // then it is not in prepared_txns_ and save an expensive, synchronized + // lookup from a shared set. delayed_prepared_ is expected to be empty in + // normal cases. + ROCKS_LOG_DETAILS( + info_log_, + "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64, + prepared_txns_.empty(), + prepared_txns_.empty() ? 0 : prepared_txns_.top()); + while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { + auto to_be_popped = prepared_txns_.top(); + delayed_prepared_.insert(to_be_popped); + ROCKS_LOG_WARN(info_log_, + "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 + " new_max=%" PRIu64, + static_cast(delayed_prepared_.size()), + to_be_popped, new_max); + prepared_txns_.pop(); + delayed_prepared_empty_.store(false, std::memory_order_release); + } +} + void WritePreparedTxnDB::AddPrepared(uint64_t seq) { - ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing with max %" PRIu64, + ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64, seq, max_evicted_seq_.load()); + TEST_SYNC_POINT("AddPrepared::begin:pause"); + TEST_SYNC_POINT("AddPrepared::begin:resume"); WriteLock wl(&prepared_mutex_); - if (UNLIKELY(seq <= max_evicted_seq_)) { + prepared_txns_.push(seq); + auto new_max = future_max_evicted_seq_.load(); + if (UNLIKELY(seq <= new_max)) { // This should not happen in normal case ROCKS_LOG_ERROR( info_log_, "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64 " <= %" PRIu64, - seq, max_evicted_seq_.load()); - delayed_prepared_.insert(seq); - delayed_prepared_empty_.store(false, std::memory_order_release); - } else { - prepared_txns_.push(seq); + seq, new_max); + CheckPreparedAgainstMax(new_max); } + TEST_SYNC_POINT("AddPrepared::end"); } void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, @@ -557,29 +584,10 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, updated_future_max, new_max, std::memory_order_acq_rel, std::memory_order_relaxed)) { }; - // When max_evicted_seq_ advances, move older entries from prepared_txns_ - // to delayed_prepared_. This guarantees that if a seq is lower than max, - // then it is not in prepared_txns_ ans save an expensive, synchronized - // lookup from a shared set. delayed_prepared_ is expected to be empty in - // normal cases. + { WriteLock wl(&prepared_mutex_); - ROCKS_LOG_DETAILS( - info_log_, - "AdvanceMaxEvictedSeq prepared_txns_.empty() %d top: %" PRIu64, - prepared_txns_.empty(), - prepared_txns_.empty() ? 0 : prepared_txns_.top()); - while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { - auto to_be_popped = prepared_txns_.top(); - delayed_prepared_.insert(to_be_popped); - ROCKS_LOG_WARN(info_log_, - "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 - " new_max=%" PRIu64 " oldmax=%" PRIu64, - static_cast(delayed_prepared_.size()), - to_be_popped, new_max, prev_max); - prepared_txns_.pop(); - delayed_prepared_empty_.store(false, std::memory_order_release); - } + CheckPreparedAgainstMax(new_max); } // With each change to max_evicted_seq_ fetch the live snapshots behind it. @@ -609,6 +617,8 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, } } auto updated_prev_max = prev_max; + TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause"); + TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume"); while (updated_prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max, std::memory_order_acq_rel, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 7dcc8f6a8..bc284029a 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -324,6 +324,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Add the transaction with prepare sequence seq to the prepared list. // Note: must be called serially with increasing seq on each call. void AddPrepared(uint64_t seq); + // Check if any of the prepared txns are less than new max_evicted_seq_. Must + // be called with prepared_mutex_ write locked. + void CheckPreparedAgainstMax(SequenceNumber new_max); // Remove the transaction with prepare sequence seq from the prepared list void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); // Add the transaction with prepare sequence prepare_seq and commit sequence @@ -443,28 +446,29 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const ColumnFamilyOptions& cf_options) override; private: - friend class WritePreparedCommitEntryPreReleaseCallback; - friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; - friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; - friend class WritePreparedTransactionTest_CommitMapTest_Test; - friend class - WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; - friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; - friend class WritePreparedTransactionTestBase; friend class PreparedHeap_BasicsTest_Test; - friend class PreparedHeap_EmptyAtTheEnd_Test; friend class PreparedHeap_Concurrent_Test; + friend class PreparedHeap_EmptyAtTheEnd_Test; + friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; + friend class WritePreparedCommitEntryPreReleaseCallback; + friend class WritePreparedTransactionTestBase; friend class WritePreparedTxn; friend class WritePreparedTxnDBMock; + friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; + friend class + WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; + friend class WritePreparedTransactionTest_CommitMapTest_Test; friend class WritePreparedTransactionTest_DoubleSnapshot_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; + friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; friend class