diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 2cb91f0d3..1e7384dc7 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1575,6 +1575,68 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { delete txn0; } +// Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and +// delayed_prepared_, when is run concurrently with advancing max_evicted_seq, +// which moves prepared txns from prepared_txns_ to delayed_prepared_. +TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 1; // disable commit cache + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); + WritePreparedTxnDB* wp_db = dynamic_cast(db); + ReadOptions ropt; + PinnableSlice pinnable_val; + WriteOptions write_options; + TransactionOptions txn_options; + std::vector txns, committed_txns; + + const int cnt = 100; + for (int i = 0; i < cnt; i++) { + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid" + ToString(i))); + auto key = "key1" + ToString(i); + auto value = "value1" + ToString(i); + ASSERT_OK(txn->Put(Slice(key), Slice(value))); + ASSERT_OK(txn->Prepare()); + txns.push_back(txn); + } + + port::Mutex mutex; + Random rnd(1103); + rocksdb::port::Thread commit_thread([&]() { + for (int i = 0; i < cnt; i++) { + uint32_t index = rnd.Uniform(cnt - i); + Transaction* txn; + { + MutexLock l(&mutex); + txn = txns[index]; + txns.erase(txns.begin() + index); + } + // Since commit cahce is practically disabled, commit results in immediate + // advance in max_evicted_seq_ and subsequently moving some prepared txns + // to delayed_prepared_. + txn->Commit(); + committed_txns.push_back(txn); + } + }); + rocksdb::port::Thread read_thread([&]() { + while (1) { + MutexLock l(&mutex); + if (txns.empty()) { + break; + } + auto min_uncommitted = wp_db->SmallestUnCommittedSeq(); + ASSERT_LE(min_uncommitted, (*txns.begin())->GetId()); + } + }); + + commit_thread.join(); + read_thread.join(); + for (auto txn : committed_txns) { + delete txn; + } +} + TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) { // Given the sequential run of txns, with this timeout we should never see a // deadlock nor a timeout unless we have a key conflict, which should be diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 7441cb3c0..7ff89a4f8 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -432,8 +432,12 @@ void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, " new_max=%" PRIu64, static_cast(delayed_prepared_.size()), to_be_popped, new_max); - prepared_txns_.pop(); delayed_prepared_empty_.store(false, std::memory_order_release); + // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise + // there will be a point in time that the entry is neither in + // prepared_txns_ nor in delayed_prepared_, which will not be checked if + // delayed_prepared_empty_ is false. + prepared_txns_.pop(); } if (locked) { prepared_txns_.push_pop_mutex()->Lock(); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 4ee7d8e6c..cdcee4941 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -500,6 +500,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; + friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test; friend class WriteUnpreparedTxn; friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; @@ -626,6 +627,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const SequenceNumber& new_max); inline SequenceNumber SmallestUnCommittedSeq() { + // Note: We have two lists to look into, but for performance reasons they + // are not read atomically. Since CheckPreparedAgainstMax copies the entry + // to delayed_prepared_ before removing it from prepared_txns_, to ensure + // that a prepared entry will not go unmissed, we look into them in opposite + // order: first read prepared_txns_ and then delayed_prepared_. + + // This must be called before calling ::top. This is because the concurrent + // thread would call ::RemovePrepared before updating + // GetLatestSequenceNumber(). Reading then in opposite order here guarantees + // that the ::top that we read would be lower the ::top if we had otherwise + // update/read them atomically. + auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1; + auto min_prepare = prepared_txns_.top(); // Since we update the prepare_heap always from the main write queue via // PreReleaseCallback, the prepared_txns_.top() indicates the smallest // prepared data in 2pc transactions. For non-2pc transactions that are @@ -638,13 +652,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { return *delayed_prepared_.begin(); } } - // This must be called before calling ::top. This is because the concurrent - // thread would call ::RemovePrepared before updating - // GetLatestSequenceNumber(). Reading then in opposite order here guarantees - // that the ::top that we read would be lower the ::top if we had otherwise - // update/read them atomically. - auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1; - auto min_prepare = prepared_txns_.top(); bool empty = min_prepare == kMaxSequenceNumber; if (empty) { // Since GetLatestSequenceNumber is updated