WritePrepared: Fix SmallestUnCommittedSeq bug (#5683)

Summary:
SmallestUnCommittedSeq reads two data structures, prepared_txns_ and delayed_prepared_. These two are updated in CheckPreparedAgainstMax when max_evicted_seq_ advances some prepared entires. To avoid the cost of acquiring a mutex, the read from them in SmallestUnCommittedSeq is not atomic. This creates a potential race condition.
The fix is to read the two data structures in the reverse order of their update. CheckPreparedAgainstMax copies the prepared entry to delayed_prepared_ before removing it from prepared_txns_ and SmallestUnCommittedSeq looks into prepared_txns_ before reading delayed_prepared_.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5683

Differential Revision: D16744699

Pulled By: maysamyabandeh

fbshipit-source-id: b1bdb134018beb0b9de58827f512662bea35cad0
main
Maysam Yabandeh 5 years ago committed by Facebook Github Bot
parent 5d9a67e718
commit 12eaacb71d
  1. 62
      utilities/transactions/write_prepared_transaction_test.cc
  2. 6
      utilities/transactions/write_prepared_txn_db.cc
  3. 21
      utilities/transactions/write_prepared_txn_db.h

@ -1575,6 +1575,68 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
delete txn0;
}
// Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
// delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
// which moves prepared txns from prepared_txns_ to delayed_prepared_.
TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ReadOptions ropt;
PinnableSlice pinnable_val;
WriteOptions write_options;
TransactionOptions txn_options;
std::vector<Transaction*> txns, committed_txns;
const int cnt = 100;
for (int i = 0; i < cnt; i++) {
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid" + ToString(i)));
auto key = "key1" + ToString(i);
auto value = "value1" + ToString(i);
ASSERT_OK(txn->Put(Slice(key), Slice(value)));
ASSERT_OK(txn->Prepare());
txns.push_back(txn);
}
port::Mutex mutex;
Random rnd(1103);
rocksdb::port::Thread commit_thread([&]() {
for (int i = 0; i < cnt; i++) {
uint32_t index = rnd.Uniform(cnt - i);
Transaction* txn;
{
MutexLock l(&mutex);
txn = txns[index];
txns.erase(txns.begin() + index);
}
// Since commit cahce is practically disabled, commit results in immediate
// advance in max_evicted_seq_ and subsequently moving some prepared txns
// to delayed_prepared_.
txn->Commit();
committed_txns.push_back(txn);
}
});
rocksdb::port::Thread read_thread([&]() {
while (1) {
MutexLock l(&mutex);
if (txns.empty()) {
break;
}
auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
}
});
commit_thread.join();
read_thread.join();
for (auto txn : committed_txns) {
delete txn;
}
}
TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
// Given the sequential run of txns, with this timeout we should never see a
// deadlock nor a timeout unless we have a key conflict, which should be

@ -432,8 +432,12 @@ void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
" new_max=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
// Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
// there will be a point in time that the entry is neither in
// prepared_txns_ nor in delayed_prepared_, which will not be checked if
// delayed_prepared_empty_ is false.
prepared_txns_.pop();
}
if (locked) {
prepared_txns_.push_pop_mutex()->Lock();

@ -500,6 +500,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
friend class WriteUnpreparedTxn;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
@ -626,6 +627,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const SequenceNumber& new_max);
inline SequenceNumber SmallestUnCommittedSeq() {
// Note: We have two lists to look into, but for performance reasons they
// are not read atomically. Since CheckPreparedAgainstMax copies the entry
// to delayed_prepared_ before removing it from prepared_txns_, to ensure
// that a prepared entry will not go unmissed, we look into them in opposite
// order: first read prepared_txns_ and then delayed_prepared_.
// This must be called before calling ::top. This is because the concurrent
// thread would call ::RemovePrepared before updating
// GetLatestSequenceNumber(). Reading then in opposite order here guarantees
// that the ::top that we read would be lower the ::top if we had otherwise
// update/read them atomically.
auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
auto min_prepare = prepared_txns_.top();
// Since we update the prepare_heap always from the main write queue via
// PreReleaseCallback, the prepared_txns_.top() indicates the smallest
// prepared data in 2pc transactions. For non-2pc transactions that are
@ -638,13 +652,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
return *delayed_prepared_.begin();
}
}
// This must be called before calling ::top. This is because the concurrent
// thread would call ::RemovePrepared before updating
// GetLatestSequenceNumber(). Reading then in opposite order here guarantees
// that the ::top that we read would be lower the ::top if we had otherwise
// update/read them atomically.
auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
auto min_prepare = prepared_txns_.top();
bool empty = min_prepare == kMaxSequenceNumber;
if (empty) {
// Since GetLatestSequenceNumber is updated

Loading…
Cancel
Save