diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 51110292a..007b9ccd4 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1484,6 +1484,80 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) { } } +// Shows the contract of IsInSnapshot when called on invalid/released snapshots +TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) { + WritePreparedTxnDB* wp_db = dynamic_cast(db); + WriteOptions woptions; + ASSERT_OK(db->Put(woptions, "key", "value")); + // snap seq = 1 + const Snapshot* snap1 = db->GetSnapshot(); + ASSERT_OK(db->Put(woptions, "key", "value")); + ASSERT_OK(db->Put(woptions, "key", "value")); + // snap seq = 3 + const Snapshot* snap2 = db->GetSnapshot(); + const SequenceNumber seq = 1; + // Evict seq out of commit cache + size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq; + wp_db->AddCommitted(overwrite_seq, overwrite_seq); + SequenceNumber snap_seq; + uint64_t min_uncommitted = 0; + bool released; + + released = false; + snap_seq = snap1->GetSequenceNumber(); + ASSERT_LE(seq, snap_seq); + // Valid snapshot lower than max + ASSERT_LE(snap_seq, wp_db->max_evicted_seq_); + ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); + ASSERT_FALSE(released); + + released = false; + snap_seq = snap1->GetSequenceNumber(); + // Invaid snapshot lower than max + ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_); + ASSERT_TRUE( + wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); + ASSERT_TRUE(released); + + db->ReleaseSnapshot(snap1); + + released = false; + // Released snapshot lower than max + ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); + // The release does not take affect until the next max advance + ASSERT_FALSE(released); + + released = false; + // Invaid snapshot lower than max + ASSERT_TRUE( + wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); + ASSERT_TRUE(released); + + // This make the snapshot release to reflect in txn db structures + wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_, + wp_db->max_evicted_seq_ + 1); + + released = false; + // Released snapshot lower than max + ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); + ASSERT_TRUE(released); + + released = false; + // Invaid snapshot lower than max + ASSERT_TRUE( + wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); + ASSERT_TRUE(released); + + snap_seq = snap2->GetSequenceNumber(); + + released = false; + // Unreleased snapshot lower than max + ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); + ASSERT_FALSE(released); + + db->ReleaseSnapshot(snap2); +} + // Test WritePreparedTxnDB's IsInSnapshot against different ordering of // snapshot, max_committed_seq_, prepared, and commit entries. TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index fba2af79f..0502b6d47 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -535,6 +535,15 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, } if (update_snapshots) { UpdateSnapshots(snapshots, new_snapshots_version); + if (!snapshots.empty()) { + WriteLock wl(&old_commit_map_mutex_); + for (auto snap : snapshots) { + // This allows IsInSnapshot to tell apart the reads from in valid + // snapshots from the reads from committed values in valid snapshots. + old_commit_map_[snap]; + } + old_commit_map_empty_.store(false, std::memory_order_release); + } } auto updated_prev_max = prev_max; while (updated_prev_max < new_max && diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e0263d4f7..e892c6320 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -115,8 +115,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Check whether the transaction that wrote the value with sequence number seq // is visible to the snapshot with sequence number snapshot_seq. // Returns true if commit_seq <= snapshot_seq + // If the snapshot_seq is already released and snapshot_seq <= max, sets + // *snap_released to true and returns true as well. inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, - uint64_t min_uncommitted = 0) const { + uint64_t min_uncommitted = 0, + bool* snap_released = nullptr) const { ROCKS_LOG_DETAILS(info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " min_uncommitted %" PRIu64, @@ -210,9 +213,15 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // snapshot. If there was no overlapping commit entry, then it is committed // with a commit_seq lower than any live snapshot, including snapshot_seq. if (old_commit_map_empty_.load(std::memory_order_acquire)) { - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 1); + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 " released=1", + prep_seq, snapshot_seq, 0); + assert(snap_released); + // This snapshot is not valid anymore. We cannot tell if prep_seq is + // committed before or after the snapshot. Return true but also set + // snap_released to true. + *snap_released = true; return true; } { @@ -226,7 +235,20 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { if (found) { auto& vec = prep_set_entry->second; found = std::binary_search(vec.begin(), vec.end(), prep_seq); + } else { + // coming from compaction + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 " released=1", + prep_seq, snapshot_seq, 0); + // This snapshot is not valid anymore. We cannot tell if prep_seq is + // committed before or after the snapshot. Return true but also set + // snap_released to true. + assert(snap_released); + *snap_released = true; + return true; } + if (!found) { ROCKS_LOG_DETAILS(info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 @@ -379,6 +401,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_DoubleSnapshot_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; + friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; friend class WriteUnpreparedTxnDB; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 93a4bbe81..40d8fe091 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -372,15 +372,14 @@ Status WriteUnpreparedTxn::RollbackInternal() { assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); const auto& cf_map = *wupt_db_->GetCFHandleMap(); - // In WritePrepared, the txn is is the same as prepare seq - auto last_visible_txn = GetId() - 1; + auto read_at_seq = kMaxSequenceNumber; Status s; ReadOptions roptions; // Note that we do not use WriteUnpreparedTxnReadCallback because we do not // need to read our own writes when reading prior versions of the key for // rollback. - WritePreparedTxnReadCallback callback(wpt_db_, last_visible_txn, 0); + WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq, 0); for (const auto& cfkey : write_set_keys_) { const auto cfid = cfkey.first; const auto& keys = cfkey.second; diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 51bb30818..674d19c3b 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -29,6 +29,29 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( // rollback batch. w_options.disableWAL = true; + class InvalidSnapshotReadCallback : public ReadCallback { + public: + InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, + SequenceNumber min_uncommitted) + : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} + + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + inline virtual bool IsVisible(SequenceNumber seq) override { + // Becomes true if it cannot tell by comparing seq with snapshot seq since + // the snapshot_ is not a real snapshot. + bool released = false; + auto ret = db_->IsInSnapshot(seq, snapshot_, min_uncommitted_, &released); + assert(!released || ret); + return ret; + } + + private: + WritePreparedTxnDB* db_; + SequenceNumber snapshot_; + SequenceNumber min_uncommitted_; + }; + // Iterate starting with largest sequence number. for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) { auto last_visible_txn = it->first - 1; @@ -38,7 +61,7 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( struct RollbackWriteBatchBuilder : public WriteBatch::Handler { DBImpl* db_; ReadOptions roptions; - WritePreparedTxnReadCallback callback; + InvalidSnapshotReadCallback callback; WriteBatch* rollback_batch_; std::map& comparators_; std::map& handles_;