WritePrepared: Fix double snapshot release issue (#4727)

Summary:
Currently the garbage collection of items in old_commit_map_ was done upon ::ReleaseSnapshot. The assumption behind this method was that the sequence number of snapshots are unique, which is incorrect. In the very rare cases that two consecutive snapshot have the same sequence number this could lead the release of the first snapshot affect the old_commit_map_ that is necessary to service the reads of the second snapshot. The bug would be triggered only if i) two snapshot have the same seq, ii) both of them are very old (older than the last ~4m transactions), and iii) there is commit entry overlapping with the snapshot seq number.
It is fixed by doing the cleanup of old_commit_map_ in UpdateSnapshot: the new list of snapshots are compared with the old one and the missing sequence numbers are concluded released. If two snapshots have the same seq number, after the release of one of them, the seq number still appears in the snapshot least and thus not cleaned up prematurely.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4727

Differential Revision: D13246495

Pulled By: maysamyabandeh

fbshipit-source-id: 93b87a5042afd8060889df245526d3f5d29de9fe
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 512a5e3ef8
commit 1a5a93ff74
  1. 46
      utilities/transactions/write_prepared_transaction_test.cc
  2. 34
      utilities/transactions/write_prepared_txn_db.cc
  3. 12
      utilities/transactions/write_prepared_txn_db.h

@ -731,6 +731,52 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
} }
// Reproduce the bug with two snapshots with the same seuqence number and test
// that the release of the first snapshot will not affect the reads by the other
// snapshot
TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
TransactionOptions txn_options;
Status s;
// Insert initial value
ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
Transaction* txn =
wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key", "value2"));
ASSERT_OK(txn->Prepare());
// Two snapshots with the same seq number
const Snapshot* snapshot1 = wp_db->GetSnapshot();
const Snapshot* snapshot2 = wp_db->GetSnapshot();
ASSERT_OK(txn->Commit());
SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
SequenceNumber overlap_seq = txn->GetId() + cache_size;
delete txn;
// Cause an eviction to advance max evicted seq number
wp_db->AddCommitted(overlap_seq, overlap_seq);
ReadOptions ropt;
// It should see the value before commit
ropt.snapshot = snapshot2;
PinnableSlice pinnable_val;
s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == "value1");
pinnable_val.Reset();
wp_db->ReleaseSnapshot(snapshot1);
// It should still see the value before commit
s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == "value1");
pinnable_val.Reset();
wp_db->ReleaseSnapshot(snapshot2);
}
// Test that the entries in old_commit_map_ get garbage collected properly // Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0; const size_t snapshot_cache_bits = 0;

@ -554,12 +554,6 @@ const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
return db_impl_->snapshots().GetAll(nullptr, max); return db_impl_->snapshots().GetAll(nullptr, max);
} }
void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) {
auto snap_seq = snapshot->GetSequenceNumber();
ReleaseSnapshotInternal(snap_seq);
db_impl_->ReleaseSnapshot(snapshot);
}
void WritePreparedTxnDB::ReleaseSnapshotInternal( void WritePreparedTxnDB::ReleaseSnapshotInternal(
const SequenceNumber snap_seq) { const SequenceNumber snap_seq) {
// relax is enough since max increases monotonically, i.e., if snap_seq < // relax is enough since max increases monotonically, i.e., if snap_seq <
@ -590,6 +584,28 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
} }
} }
void WritePreparedTxnDB::CleanupReleasedSnapshots(
const std::vector<SequenceNumber>& new_snapshots,
const std::vector<SequenceNumber>& old_snapshots) {
auto newi = new_snapshots.begin();
auto oldi = old_snapshots.begin();
for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
assert(*newi >= *oldi); // cannot have new snapshots with lower seq
if (*newi == *oldi) { // still not released
newi++;
oldi++;
} else {
assert(*newi > *oldi); // *oldi is released
ReleaseSnapshotInternal(*oldi);
oldi++;
}
}
// Everything remained in old_snapshots is released and must be cleaned up
for (; oldi != old_snapshots.end(); oldi++) {
ReleaseSnapshotInternal(*oldi);
}
}
void WritePreparedTxnDB::UpdateSnapshots( void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots, const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version) { const SequenceNumber& version) {
@ -638,6 +654,12 @@ void WritePreparedTxnDB::UpdateSnapshots(
// Update the size at the end. Otherwise a parallel reader might read // Update the size at the end. Otherwise a parallel reader might read
// items that are not set yet. // items that are not set yet.
snapshots_total_.store(snapshots.size(), std::memory_order_release); snapshots_total_.store(snapshots.size(), std::memory_order_release);
// Note: this must be done after the snapshots data structures are updated
// with the new list of snapshots.
CleanupReleasedSnapshots(snapshots, snapshots_all_);
snapshots_all_ = snapshots;
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
} }

@ -112,8 +112,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const std::vector<ColumnFamilyHandle*>& column_families, const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override; std::vector<Iterator*>* iterators) override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Check whether the transaction that wrote the value with sequence number seq // Check whether the transaction that wrote the value with sequence number seq
// is visible to the snapshot with sequence number snapshot_seq. // is visible to the snapshot with sequence number snapshot_seq.
// Returns true if commit_seq <= snapshot_seq // Returns true if commit_seq <= snapshot_seq
@ -379,6 +377,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class friend class
WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test; friend class WritePreparedTransactionTest_RollbackTest_Test;
@ -518,6 +517,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// version value. // version value.
void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version); const SequenceNumber& version);
// Check the new list of new snapshots against the old one to see if any of
// the snapshots are released and to do the cleanup for the released snapshot.
void CleanupReleasedSnapshots(
const std::vector<SequenceNumber>& new_snapshots,
const std::vector<SequenceNumber>& old_snapshots);
// Check an evicted entry against live snapshots to see if it should be kept // Check an evicted entry against live snapshots to see if it should be kept
// around or it can be safely discarded (and hence assume committed for all // around or it can be safely discarded (and hence assume committed for all
@ -552,6 +556,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// 2nd list for storing snapshots. The list sorted in ascending order. // 2nd list for storing snapshots. The list sorted in ascending order.
// Thread-safety is provided with snapshots_mutex_. // Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_; std::vector<SequenceNumber> snapshots_;
// The list of all snapshots: snapshots_ + snapshot_cache_. This list although
// redundant but simplifies CleanupOldSnapshots implementation.
// Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_all_;
// The version of the latest list of snapshots. This can be used to avoid // The version of the latest list of snapshots. This can be used to avoid
// rewriting a list that is concurrently updated with a more recent version. // rewriting a list that is concurrently updated with a more recent version.
SequenceNumber snapshots_version_ = 0; SequenceNumber snapshots_version_ = 0;

Loading…
Cancel
Save