WritePrepared: non-atomic commit of delayed prepared (#4947)

Summary:
Commit of delayed prepared has two non-atomic steps: add to commit cache, remove from delayed_prepared_. Similarly in ::IsInSnapshot we read from commit cache first and then look into delayed_prepared_. Due to non-atomicity thus the reader might not find the
prep_seq that is just committed neither in commit cache nor in delayed_prepared_. To fix that i)
we check if there was any delayed prepared BEFORE looking into commit
cache, ii) if there was, we complete the search steps to be these: i)
commit cache, ii) delayed prepared, commit cache again. In this way if
the first query to commit cache missed the commit, the 2nd will catch it. The cost of the redundant read from commit cache is paid only if delayed_prepared_ is nonempty which should be a very rare scenario.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4947

Differential Revision: D13952754

Pulled By: maysamyabandeh

fbshipit-source-id: 8f47826b13f8ce154398d842028342423f4ca2b2
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent d9c9f3c809
commit 199fabc197
  1. 74
      utilities/transactions/write_prepared_transaction_test.cc
  2. 7
      utilities/transactions/write_prepared_txn_db.cc
  3. 28
      utilities/transactions/write_prepared_txn_db.h

@ -2696,6 +2696,80 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
delete iter;
}
// Committing an delayed prepared has two non-atomic steps: update commit cache,
// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
// non-atomic steps of checking these two data structures. This test breaks each
// in the middle to ensure correctness in spite of non-atomic execution.
// Note: This test is limitted to the case where snapshot is larger than the
// max_evicted_seq_.
TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
for (auto split_read : {true, false}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
// Fill up the commit cache
std::string init_value("value1");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
}
// Prepare a transaction but do not commit it
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
ASSERT_OK(txn->Prepare());
// Commit a bunch of entires to advance max evicted seq and make the
// prepared a delayed prepared
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
}
// The snapshot should not see the delayed prepared entry
auto snap = db->GetSnapshot();
if (split_read) {
// split right after reading from the commit cache
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
"AtomicCommitOfOldPrepared:Commit:before"},
{"AtomicCommitOfOldPrepared:Commit:after",
"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
} else { // split commit
// split right before removing from delayed_preparped_
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::RemovePrepared:pause",
"AtomicCommitOfOldPrepared:Read:before"},
{"AtomicCommitOfOldPrepared:Read:after",
"WritePreparedTxnDB::RemovePrepared:resume"}});
}
SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread commit_thread([&]() {
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:before");
ASSERT_OK(txn->Commit());
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:after");
delete txn;
});
rocksdb::port::Thread read_thread([&]() {
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:before");
ReadOptions roptions;
roptions.snapshot = snap;
PinnableSlice value;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
ASSERT_OK(s);
// It should not see the commit of delayed prpared
ASSERT_TRUE(value == init_value);
TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:after");
db->ReleaseSnapshot(snap);
});
read_thread.join();
commit_thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
}
// When an old prepared entry gets committed, there is a gap between the time
// that it is published and when it is cleaned up from old_prepared_. This test
// stresses such cacese.

@ -489,6 +489,8 @@ void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
TEST_SYNC_POINT_CALLBACK(
"RemovePrepared:Start",
const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
ROCKS_LOG_DETAILS(info_log_,
"RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
prepare_seq, batch_cnt);
@ -544,9 +546,8 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
const SequenceNumber& new_max) {
ROCKS_LOG_DETAILS(info_log_,
"AdvanceMaxEvictedSeq overhead %" PRIu64
" => %" PRIu64 prev_max,
new_max);
"AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
prev_max, new_max);
// Declare the intention before getting snapshot from the DB. This helps a
// concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
// it has not already. Otherwise the new snapshot is when we ask DB for

@ -153,10 +153,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
// Commit of delayed prepared has two non-atomic steps: add to commit cache,
// remove from delayed prepared. Our reads from these two is also
// non-atomic. By looking into commit cache first thus we might not find the
// prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
// we check if there was any delayed prepared BEFORE looking into commit
// cache, ii) if there was, we complete the search steps to be these: i)
// commit cache, ii) delayed prepared, commit cache again. In this way if
// the first query to commit cache missed the commit, the 2nd will catch it.
bool was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
ROCKS_LOG_DETAILS(
@ -178,12 +189,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
prep_seq, snapshot_seq, 0);
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
if (!was_empty) {
// We should not normally reach here
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
ReadLock rl(&prepared_mutex_);
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// This is the order: 1) delayed_prepared_commits_ update, 2) publish 3)
// delayed_prepared_ clean up. So check if it is the case of a late
@ -204,6 +216,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
snapshot_seq <= it->second);
return it->second <= snapshot_seq;
}
} else {
// 2nd query to commit cache. Refer to was_empty comment above.
exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
ROCKS_LOG_DETAILS(
info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
return cached.commit_seq <= snapshot_seq;
}
}
}
// When advancing max_evicted_seq_, we move older entires from prepared to

Loading…
Cancel
Save