WritePrepared: commit of delayed prepared entries (#4894)

Summary:
Here is the order of ops in a commit: 1) update commit cache 2) publish seq, 3) RemovePrepared. In case of a delayed prepared, there will be a gap between when the commit is visible to snapshots until delayed_prepared_ is cleaned up. To tell apart this case from a delayed uncommitted txn from, the commit entry of a delayed prepared is also stored in delayed_prepared_commits_, which is updated before publishing the commit.
Also logic in GetSnapshotInternal that ensures that each new snapshot is always larger than max_evicted_seq_ is updated to check against the upcoming value of max_evicted_seq_ rather than its current one. This is because AdvanceMaxEvictedSeq gets the list of snapshots lower than the new max, before updating max_evicted_seq_.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4894

Differential Revision: D13726988

Pulled By: maysamyabandeh

fbshipit-source-id: 1e70d78061b50c944c9816bf4b6dac405ab4ccd3
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 73ff15c07b
commit 7fd9813b9f
  1. 75
      utilities/transactions/write_prepared_transaction_test.cc
  2. 94
      utilities/transactions/write_prepared_txn_db.cc
  3. 57
      utilities/transactions/write_prepared_txn_db.h

@ -2557,6 +2557,81 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
delete iter;
}
// When an old prepared entry gets committed, there is a gap between the time
// that it is published and when it is cleaned up from old_prepared_. This test
// stresses such cacese.
TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
for (const size_t commit_cache_bits : {0, 2, 3}) {
for (const size_t sub_batch_cnt : {1, 2, 3}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
// Value is synchronized via snap
PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start
auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param);
if (prep_seq == exp_prepare.load()) { // only for write_thread
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
}
};
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
SyncPoint::GetInstance()->EnableProcessing();
// Thread to cause frequent evictions
rocksdb::port::Thread eviction_thread([&]() {
// Too many txns might cause commit_seq - prepare_seq in another thread
// to go beyond DELTA_UPPERBOUND
for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
}
});
rocksdb::port::Thread write_thread([&]() {
for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
std::string val_str = "value" + ToString(i);
for (size_t b = 0; b < sub_batch_cnt; b++) {
ASSERT_OK(txn->Put(Slice("key"), val_str));
}
ASSERT_OK(txn->Prepare());
// Let an eviction to kick in
std::this_thread::yield();
exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit());
delete txn;
// Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions;
roptions.snapshot = snap.load();
ASSERT_NE(nullptr, roptions.snapshot);
PinnableSlice value2;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value2);
ASSERT_OK(s);
// It should see its own write
ASSERT_TRUE(val_str == value2);
// The value read by snapshot should not change
ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
db->ReleaseSnapshot(roptions.snapshot);
snap.store(nullptr);
}
});
write_thread.join();
eviction_thread.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
}
// Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) {

@ -394,16 +394,22 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
assert(seq > max_evicted_seq_);
if (seq <= max_evicted_seq_) {
throw std::runtime_error(
"Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) +
" <= " + ToString(max_evicted_seq_.load()));
}
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing with max %" PRIu64,
seq, max_evicted_seq_.load());
WriteLock wl(&prepared_mutex_);
if (UNLIKELY(seq <= max_evicted_seq_)) {
// This should not happen in normal case
ROCKS_LOG_ERROR(
info_log_,
"Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
" <= %" PRIu64,
seq, max_evicted_seq_.load());
delayed_prepared_.insert(seq);
delayed_prepared_empty_.store(false, std::memory_order_release);
} else {
prepared_txns_.push(seq);
}
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
uint8_t loop_cnt) {
@ -443,6 +449,21 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
CheckAgainstSnapshots(evicted);
if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
WriteLock wl(&prepared_mutex_);
for (auto dp : delayed_prepared_) {
if (dp == evicted.prep_seq) {
// This is a rare case that txn is committed but prepared_txns_ is not
// cleaned up yet. Refer to delayed_prepared_commits_ definition for
// why it should be kept updated.
delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
ROCKS_LOG_DEBUG(info_log_,
"delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
evicted.prep_seq, evicted.commit_seq);
break;
}
}
}
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
@ -465,12 +486,24 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
const size_t batch_cnt) {
TEST_SYNC_POINT_CALLBACK(
"RemovePrepared:Start",
const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
ROCKS_LOG_DETAILS(info_log_,
"RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
prepare_seq, batch_cnt);
WriteLock wl(&prepared_mutex_);
for (size_t i = 0; i < batch_cnt; i++) {
prepared_txns_.erase(prepare_seq + i);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq + i);
auto it = delayed_prepared_commits_.find(prepare_seq + i);
if (it != delayed_prepared_commits_.end()) {
ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
prepare_seq + i);
delayed_prepared_commits_.erase(it);
}
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
@ -511,8 +544,19 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
const SequenceNumber& new_max) {
ROCKS_LOG_DETAILS(info_log_,
"AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
prev_max, new_max);
"AdvanceMaxEvictedSeq overhead %" PRIu64
" => %" PRIu64 prev_max,
new_max);
// Declare the intention before getting snapshot from the DB. This helps a
// concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
// it has not already. Otherwise the new snapshot is when we ask DB for
// snapshots smaller than future max.
auto updated_future_max = prev_max;
while (updated_future_max < new_max &&
!future_max_evicted_seq_.compare_exchange_weak(
updated_future_max, new_max, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
};
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
@ -520,6 +564,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
// normal cases.
{
WriteLock wl(&prepared_mutex_);
ROCKS_LOG_DETAILS(
info_log_,
"AdvanceMaxEvictedSeq prepared_txns_.empty() %d top: %" PRIu64,
prepared_txns_.empty(),
prepared_txns_.empty() ? 0 : prepared_txns_.top());
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
@ -587,15 +636,21 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
assert(snap_impl);
SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
if (UNLIKELY(snap_seq != 0 && snap_seq <= max_evicted_seq_)) {
// Note: Check against future_max_evicted_seq_ (in contrast with
// max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
// There is a very rare case in which the commit entry evicts another commit
// entry that is not published yet thus advancing max evicted seq beyond the
// last published seq. This case is not likely in real-world setup so we
// handle it with a few retries.
size_t retry = 0;
while (snap_impl->GetSequenceNumber() <= max_evicted_seq_ && retry < 100) {
ROCKS_LOG_WARN(info_log_, "GetSnapshot retry %" PRIu64,
snap_impl->GetSequenceNumber());
SequenceNumber max;
while ((max = future_max_evicted_seq_.load()) != 0 &&
snap_impl->GetSequenceNumber() <= max && retry < 100) {
ROCKS_LOG_WARN(info_log_,
"GetSnapshot snap: %" PRIu64 " max: %" PRIu64
" retry %" ROCKSDB_PRIszt,
snap_impl->GetSequenceNumber(), max, retry);
ReleaseSnapshot(snap_impl);
// Wait for last visible seq to catch up with max, and also go beyond it
// by one.
@ -604,20 +659,19 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
assert(snap_impl);
retry++;
}
assert(snap_impl->GetSequenceNumber() > max_evicted_seq_);
if (snap_impl->GetSequenceNumber() <= max_evicted_seq_) {
throw std::runtime_error("Snapshot seq " +
ToString(snap_impl->GetSequenceNumber()) +
assert(snap_impl->GetSequenceNumber() > max);
if (snap_impl->GetSequenceNumber() <= max) {
throw std::runtime_error(
"Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
" after " + ToString(retry) +
" retries is still less than max_evicted_seq_" +
ToString(max_evicted_seq_.load()));
" retries is still less than futre_max_evicted_seq_" + ToString(max));
}
}
EnhanceSnapshot(snap_impl, min_uncommitted);
ROCKS_LOG_DETAILS(
db_impl_->immutable_db_options().info_log,
"GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
for_ww_conflict_check, snap_impl->GetSequenceNumber(), min_uncommitted);
snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
return snap_impl;
}

@ -153,21 +153,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
ReadLock rl(&prepared_mutex_);
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
@ -193,6 +178,34 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
prep_seq, snapshot_seq, 0);
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
ReadLock rl(&prepared_mutex_);
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// This is the order: 1) delayed_prepared_commits_ update, 2) publish 3)
// delayed_prepared_ clean up. So check if it is the case of a late
// clenaup.
auto it = delayed_prepared_commits_.find(prep_seq);
if (it == delayed_prepared_commits_.end()) {
// Then it is not committed yet
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
} else {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" commit: %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, it->second,
snapshot_seq <= it->second);
return it->second <= snapshot_seq;
}
}
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
@ -267,7 +280,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
void AddPrepared(uint64_t seq);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// Add the transaction with prepare sequence prepare_seq and comtit sequence
// commit_seq to the commit map. loop_cnt is to detect infinite loops.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
uint8_t loop_cnt = 0);
@ -383,6 +396,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const ColumnFamilyOptions& cf_options) override;
private:
friend class WritePreparedCommitEntryPreReleaseCallback;
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
@ -614,6 +628,12 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// commit_cache_. So commit_cache_ must first be checked before consulting
// with max_evicted_seq_.
std::atomic<uint64_t> max_evicted_seq_ = {};
// Order: 1) update future_max_evicted_seq_ = new_max, 2)
// GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
// GetSnapshotInternal guarantess that the snapshot seq is larger than
// future_max_evicted_seq_, this guarantes that if a snapshot is not larger
// than max has already being looked at via a GetSnapshotListFromDB(new_max).
std::atomic<uint64_t> future_max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
// it to be too large either as it would cause stalls by doing too much
@ -631,6 +651,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// time max_evicted_seq_ advances their sequence number. This is expected to
// be empty normally. Thread-safety is provided with prepared_mutex_.
std::set<uint64_t> delayed_prepared_;
// Commit of a delayed prepared: 1) update commit cache, 2) update
// delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
// delayed_prepared_commits_ will help us tell apart the unprepared txns from
// the ones that are committed but not cleaned up yet.
std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
// Update when delayed_prepared_.empty() changes. Expected to be true
// normally.
std::atomic<bool> delayed_prepared_empty_ = {true};

Loading…
Cancel
Save