diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 7fede0e51..8baa09367 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -436,6 +436,83 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); } +// Test that the entries in old_commit_map_ get garbage collected properly +TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { + const size_t snapshot_cache_bits = 0; + const size_t commit_cache_bits = 0; + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); + + SequenceNumber seq = 0; + // Take the first snapshot that overlaps with two txn + auto prep_seq = ++seq; + wp_db->AddPrepared(prep_seq); + auto prep_seq2 = ++seq; + wp_db->AddPrepared(prep_seq2); + auto snap_seq1 = seq; + wp_db->TakeSnapshot(snap_seq1); + auto commit_seq = ++seq; + wp_db->AddCommitted(prep_seq, commit_seq); + auto commit_seq2 = ++seq; + wp_db->AddCommitted(prep_seq2, commit_seq2); + // Take the 2nd and 3rd snapshot that overlap with the same txn + prep_seq = ++seq; + wp_db->AddPrepared(prep_seq); + auto snap_seq2 = seq; + wp_db->TakeSnapshot(snap_seq2); + seq++; + auto snap_seq3 = seq; + wp_db->TakeSnapshot(snap_seq3); + seq++; + commit_seq = ++seq; + wp_db->AddCommitted(prep_seq, commit_seq); + // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the + // only item in the commit_cache_ via another commit. + prep_seq = ++seq; + wp_db->AddPrepared(prep_seq); + commit_seq = ++seq; + wp_db->AddCommitted(prep_seq, commit_seq); + + // Verify that the evicted commit entries for all snapshots are in the + // old_commit_map_ + { + ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); + ReadLock rl(&wp_db->old_commit_map_mutex_); + ASSERT_EQ(3, wp_db->old_commit_map_.size()); + ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); + ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size()); + ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + } + + // Verify that the 2nd snapshot is cleaned up after the release + wp_db->ReleaseSnapshotInternal(snap_seq2); + { + ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); + ReadLock rl(&wp_db->old_commit_map_mutex_); + ASSERT_EQ(2, wp_db->old_commit_map_.size()); + ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); + ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + } + + // Verify that the 1st snapshot is cleaned up after the release + wp_db->ReleaseSnapshotInternal(snap_seq1); + { + ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); + ReadLock rl(&wp_db->old_commit_map_mutex_); + ASSERT_EQ(1, wp_db->old_commit_map_.size()); + ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + } + + // Verify that the 3rd snapshot is cleaned up after the release + wp_db->ReleaseSnapshotInternal(snap_seq3); + { + ASSERT_TRUE(wp_db->old_commit_map_empty_.load()); + ReadLock rl(&wp_db->old_commit_map_mutex_); + ASSERT_EQ(0, wp_db->old_commit_map_.size()); + } +} + TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { std::vector snapshots = {100l, 200l, 300l, 400l, 500l, 600l, 700l, 800l, 900l}; @@ -480,20 +557,24 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { // We have a sync point in the method under test after checking each snapshot. // If you increase the max number of snapshots in this test, more sync points // in the methods must also be added. - const std::vector snapshots = {10l, 20l, 30l, 40l, 50l, - 60l, 70l, 80l, 90l, 100l}; + const std::vector snapshots = {10l, 20l, 30l, 40l, 50l, 60l}; + // TODO(myabandeh): increase the snapshots list for pre-release tests + // const std::vector snapshots = {10l, 20l, 30l, 40l, 50l, + // 60l, 70l, 80l, 90l, 100l}; const size_t snapshot_cache_bits = 2; // Safety check to express the intended size in the test. Can be adjusted if // the snapshots lists changed. - assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size()); + assert((1ul << snapshot_cache_bits) + 2 == snapshots.size()); SequenceNumber version = 1000l; // Choose the cache size so that the new snapshot list could replace all the // existing items in the cache and also have some overflow. DBImpl* mock_db = new DBImpl(options, dbname); std::unique_ptr wp_db( new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); - // Add up to 2 items that do not fit into the cache - for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2; + // TODO(myabandeh): increase this number for pre-release tests + const size_t extra = 1; + // Add up to extra items that do not fit into the cache + for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra; old_size++) { const std::vector old_snapshots( snapshots.begin(), snapshots.begin() + old_size); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index d83aae812..54b86c1c8 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -12,6 +12,7 @@ #include "utilities/transactions/write_prepared_txn_db.h" #include +#include #include #include #include @@ -234,8 +235,8 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // delayed_prepared_. Also we move evicted entries from commit cache to // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in - // old_commit_map_, iii) committed with no conflict with any snapshot (i) - // delayed_prepared_ is checked above + // old_commit_map_, iii) committed with no conflict with any snapshot. Case + // (i) delayed_prepared_ is checked above if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case // only (iii) is the case: committed // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < @@ -255,12 +256,17 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, return true; } { - // We should not normally reach here - // TODO(myabandeh): check only if snapshot_seq is in the list of snaphots + // We should not normally reach here unless sapshot_seq is old. This is a + // rare case and it is ok to pay the cost of mutex ReadLock for such old, + // reading transactions. ReadLock rl(&old_commit_map_mutex_); - auto old_commit_entry = old_commit_map_.find(prep_seq); - if (old_commit_entry == old_commit_map_.end() || - old_commit_entry->second <= snapshot_seq) { + auto prep_set_entry = old_commit_map_.find(snapshot_seq); + bool found = prep_set_entry != old_commit_map_.end(); + if (found) { + auto& vec = prep_set_entry->second; + found = std::binary_search(vec.begin(), vec.end(), prep_seq); + } + if (!found) { ROCKSDB_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 1); @@ -322,6 +328,9 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); + ROCKSDB_LOG_DETAILS(info_log_, + "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, + evicted.prep_seq, evicted.commit_seq, prev_max); if (prev_max < evicted.commit_seq) { // Inc max in larger steps to avoid frequent updates auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; @@ -431,6 +440,36 @@ const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( return db_impl_->snapshots().GetAll(nullptr, max); } +void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) { + auto snap_seq = snapshot->GetSequenceNumber(); + ReleaseSnapshotInternal(snap_seq); + db_impl_->ReleaseSnapshot(snapshot); +} + +void WritePreparedTxnDB::ReleaseSnapshotInternal( + const SequenceNumber snap_seq) { + // relax is enough since max increases monotonically, i.e., if snap_seq < + // old_max => snap_seq < new_max as well. + if (snap_seq < max_evicted_seq_.load(std::memory_order_relaxed)) { + // Then this is a rare case that transaction did not finish before max + // advances. It is expected for a few read-only backup snapshots. For such + // snapshots we might have kept around a couple of entries in the + // old_commit_map_. Check and do garbage collection if that is the case. + bool need_gc = false; + { + ReadLock rl(&old_commit_map_mutex_); + auto prep_set_entry = old_commit_map_.find(snap_seq); + need_gc = prep_set_entry != old_commit_map_.end(); + } + if (need_gc) { + WriteLock wl(&old_commit_map_mutex_); + old_commit_map_.erase(snap_seq); + old_commit_map_empty_.store(old_commit_map_.empty(), + std::memory_order_release); + } + } +} + void WritePreparedTxnDB::UpdateSnapshots( const std::vector& snapshots, const SequenceNumber& version) { @@ -541,8 +580,8 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( const uint64_t& prep_seq, const uint64_t& commit_seq, const uint64_t& snapshot_seq, const bool next_is_larger = true) { - // If we do not store an entry in old_commit_map we assume it is committed in - // all snapshots. if commit_seq <= snapshot_seq, it is considered already in + // If we do not store an entry in old_commit_map_ we assume it is committed in + // all snapshots. If commit_seq <= snapshot_seq, it is considered already in // the snapshot so we need not to keep the entry around for this snapshot. if (commit_seq <= snapshot_seq) { // continue the search if the next snapshot could be smaller than commit_seq @@ -552,9 +591,11 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( if (prep_seq <= snapshot_seq) { // overlapping range WriteLock wl(&old_commit_map_mutex_); old_commit_map_empty_.store(false, std::memory_order_release); - old_commit_map_[prep_seq] = commit_seq; - // Storing once is enough. No need to check it for other snapshots. - return false; + auto& vec = old_commit_map_[snapshot_seq]; + vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq); + // We need to store it once for each overlapping snapshot. Returning true to + // continue the search if there is more overlapping snapshot. + return true; } // continue the search if the next snapshot could be larger than prep_seq return next_is_larger; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index a67f5ac25..cb8c134dd 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -90,6 +90,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const std::vector& column_families, std::vector* iterators) override; + virtual void ReleaseSnapshot(const Snapshot* snapshot) override; + // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; @@ -198,6 +200,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; + friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; void Init(const TransactionDBOptions& /* unused */); @@ -269,6 +272,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { virtual const std::vector GetSnapshotListFromDB( SequenceNumber max); + // Will be called by the public ReleaseSnapshot method. Does the maintenance + // internal to WritePreparedTxnDB + void ReleaseSnapshotInternal(const SequenceNumber snap_seq); + // Update the list of snapshots corresponding to the soon-to-be-updated // max_eviceted_seq_. Thread-safety: this function can be called concurrently. // The concurrent invocations of this function is equivalent to a serial @@ -287,9 +294,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < // commit_seq. Return false if checking the next snapshot(s) is not needed. - // This is the case if the entry already added to old_commit_map_ or none of - // the next snapshots could satisfy the condition. next_is_larger: the next - // snapshot will be a larger value + // This is the case if none of the next snapshots could satisfy the condition. + // next_is_larger: the next snapshot will be a larger value bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, const uint64_t& commit_seq, const uint64_t& snapshot_seq, @@ -333,10 +339,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // it to be too large either as it would cause stalls by doing too much // maintenance work under the lock. size_t INC_STEP_FOR_MAX_EVICTED = 1; - // A map of the evicted entries from commit_cache_ that has to be kept around - // to service the old snapshots. This is expected to be empty normally. + // A map from old snapshots (expected to be used by a few read-only txns) to + // prpared sequence number of the evicted entries from commit_cache_ that + // overlaps with such snapshot. These are the prepared sequence numbers that + // the snapshot, to which they are mapped, cannot assume to be committed just + // because it is no longer in the commit_cache_. The vector must be sorted + // after each update. // Thread-safety is provided with old_commit_map_mutex_. - std::map old_commit_map_; + std::map> old_commit_map_; // A set of long-running prepared transactions that are not finished by the // time max_evicted_seq_ advances their sequence number. This is expected to // be empty normally. Thread-safety is provided with prepared_mutex_.