diff --git a/db/db_impl.h b/db/db_impl.h index 29acca5e1..166dc6abe 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -238,6 +238,13 @@ class DBImpl : public DB { virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; + virtual SequenceNumber GetLastPublishedSequence() const { + if (last_seq_same_as_publish_seq_) { + return versions_->LastSequence(); + } else { + return versions_->LastPublishedSequence(); + } + } // REQUIRES: joined the main write queue if two_write_queues is disabled, and // the second write queue otherwise. virtual void SetLastPublishedSequence(SequenceNumber seq); diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 0a0163ecc..8795e78dd 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -37,7 +37,7 @@ TransactionID PessimisticTransaction::GenTxnID() { PessimisticTransaction::PessimisticTransaction( TransactionDB* txn_db, const WriteOptions& write_options, - const TransactionOptions& txn_options) + const TransactionOptions& txn_options, const bool init) : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), expiration_time_(0), @@ -51,7 +51,9 @@ PessimisticTransaction::PessimisticTransaction( txn_db_impl_ = static_cast_with_check(txn_db); db_impl_ = static_cast_with_check(db_); - Initialize(txn_options); + if (init) { + Initialize(txn_options); + } } void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index d09c239ce..1f851818e 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -38,7 +38,8 @@ class PessimisticTransactionDB; class PessimisticTransaction : public TransactionBaseImpl { public: PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + const TransactionOptions& txn_options, + const bool init = true); virtual ~PessimisticTransaction(); diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 400cd2084..d0085de40 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -353,7 +353,7 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; protected: - // TODO(mayabndeh): Avoid duplicating PessimisticTransaction::Open logic here. + // TODO(myabandeh): Avoid duplicating PessimisticTransaction::Open logic here. void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits, size_t commit_cache_bits) { delete db; @@ -376,8 +376,8 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { // The following is equivalent of WrapDB(). txn_db_options.write_policy = WRITE_PREPARED; - auto* wp_db = new WritePreparedTxnDB(base_db, txn_db_options, snapshot_cache_bits, - commit_cache_bits); + auto* wp_db = new WritePreparedTxnDB( + base_db, txn_db_options, snapshot_cache_bits, commit_cache_bits); wp_db->UpdateCFComparatorMap(handles); ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles)); @@ -831,6 +831,13 @@ TEST_P(WritePreparedTransactionTest, DoubleSnapshot) { wp_db->ReleaseSnapshot(snapshot3); } +size_t UniqueCnt(std::vector vec) { + std::set aset; + for (auto i : vec) { + aset.insert(i); + } + return aset.size(); +} // Test that the entries in old_commit_map_ get garbage collected properly TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { const size_t snapshot_cache_bits = 0; @@ -879,9 +886,9 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ReadLock rl(&wp_db->old_commit_map_mutex_); ASSERT_EQ(3, wp_db->old_commit_map_.size()); - ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); - ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size()); - ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); + ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2])); + ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); } // Verify that the 2nd snapshot is cleaned up after the release @@ -890,8 +897,8 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ReadLock rl(&wp_db->old_commit_map_mutex_); ASSERT_EQ(2, wp_db->old_commit_map_.size()); - ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); - ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); + ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); } // Verify that the 1st snapshot is cleaned up after the release @@ -900,7 +907,7 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); ReadLock rl(&wp_db->old_commit_map_mutex_); ASSERT_EQ(1, wp_db->old_commit_map_.size()); - ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); + ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); } // Verify that the 3rd snapshot is cleaned up after the release @@ -1139,6 +1146,139 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { } } +// A new snapshot should always be always larger than max_evicted_seq_ +// Otherwise the snapshot does not go through AdvanceMaxEvictedSeq +TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) { + WriteOptions woptions; + TransactionOptions txn_options; + WritePreparedTxnDB* wp_db = dynamic_cast(db); + Transaction* txn0 = db->BeginTransaction(woptions, txn_options); + ASSERT_OK(txn0->Put(Slice("key"), Slice("value"))); + ASSERT_OK(txn0->Commit()); + const SequenceNumber seq = txn0->GetId(); // is also prepare seq + delete txn0; + std::vector txns; + // Inc seq without committing anything + for (int i = 0; i < 10; i++) { + Transaction* txn = db->BeginTransaction(woptions, txn_options); + ASSERT_OK(txn->SetName("xid" + std::to_string(i))); + ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value"))); + ASSERT_OK(txn->Prepare()); + txns.push_back(txn); + } + + // The new commit is seq + 10 + ASSERT_OK(db->Put(woptions, "key", "value")); + auto snap = wp_db->GetSnapshot(); + const SequenceNumber last_seq = snap->GetSequenceNumber(); + wp_db->ReleaseSnapshot(snap); + ASSERT_LT(seq, last_seq); + // Otherwise our test is not effective + ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED); + + // Evict seq out of commit cache + const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE; + // Check that the next write could make max go beyond last + auto last_max = wp_db->max_evicted_seq_.load(); + wp_db->AddCommitted(overwrite_seq, overwrite_seq); + // Check that eviction has advanced the max + ASSERT_LT(last_max, wp_db->max_evicted_seq_.load()); + // Check that the new max has not advanced the last seq + ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq); + for (auto txn : txns) { + txn->Rollback(); + delete txn; + } +} + +// A new snapshot should always be always larger than max_evicted_seq_ +// In very rare cases max could be below last published seq. Test that +// taking snapshot will wait for max to catch up. +TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + WriteOptions woptions; + TransactionOptions txn_options; + WritePreparedTxnDB* wp_db = dynamic_cast(db); + + const int writes = 50; + const int batch_cnt = 4; + rocksdb::port::Thread t1([&]() { + for (int i = 0; i < writes; i++) { + WriteBatch batch; + // For duplicate keys cause 4 commit entires, each evicting an entry that + // is not published yet, thus causing max ecited seq go higher than last + // published. + for (int b = 0; b < batch_cnt; b++) { + batch.Put("foo", "foo"); + } + db->Write(woptions, &batch); + } + }); + + rocksdb::port::Thread t2([&]() { + while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread + std::this_thread::yield(); + } + for (int i = 0; i < 10; i++) { + auto snap = db->GetSnapshot(); + if (snap->GetSequenceNumber() != 0) { + ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); + } // seq 0 is ok to be less than max since nothing is visible to it + db->ReleaseSnapshot(snap); + } + }); + + t1.join(); + t2.join(); + + // Make sure that the test has worked and seq number has advanced as we + // thought + auto snap = db->GetSnapshot(); + ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1); + db->ReleaseSnapshot(snap); +} + +TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) { + auto snap = db->GetSnapshot(); + auto seq1 = snap->GetSequenceNumber(); + db->ReleaseSnapshot(snap); + + WritePreparedTxnDB* wp_db = dynamic_cast(db); + wp_db->AdvanceSeqByOne(); + + snap = db->GetSnapshot(); + auto seq2 = snap->GetSequenceNumber(); + db->ReleaseSnapshot(snap); + + ASSERT_LT(seq1, seq2); +} + +// Test that the txn Initilize calls the overridden functions +TEST_P(WritePreparedTransactionTest, TxnInitialize) { + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); + ASSERT_OK(txn0->Prepare()); + + // SetSnapshot is overridden to update min_uncommitted_ + txn_options.set_snapshot = true; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + auto snap = txn1->GetSnapshot(); + auto snap_impl = reinterpret_cast(snap); + // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be + // udpated + ASSERT_GT(snap_impl->min_uncommitted_, 0); + + txn0->Rollback(); + txn1->Rollback(); + delete txn0; + delete txn1; +} + // This tests that transactions with duplicate keys perform correctly after max // is advancing their prepared sequence numbers. This will not be the case if // for example the txn does not add the prepared seq for the second sub-batch to @@ -2059,8 +2199,8 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { } TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { - const size_t snapshot_cache_bits = 7; // same as default - const size_t commit_cache_bits = 0; // disable commit cache + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // disable commit cache for (bool has_recent_prepare : {true, false}) { DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); @@ -2072,7 +2212,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { ASSERT_OK(transaction->Prepare()); // snapshot1 should get min_uncommitted from prepared_txns_ heap. auto snapshot1 = db->GetSnapshot(); - ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); + ASSERT_EQ(transaction->GetId(), + ((SnapshotImpl*)snapshot1)->min_uncommitted_); // Add a commit to advance max_evicted_seq and move the prepared transaction // into delayed_prepared_ set. ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); @@ -2086,7 +2227,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { } // snapshot2 should get min_uncommitted from delayed_prepared_ set. auto snapshot2 = db->GetSnapshot(); - ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); + ASSERT_EQ(transaction->GetId(), + ((SnapshotImpl*)snapshot1)->min_uncommitted_); ASSERT_OK(transaction->Commit()); delete transaction; if (has_recent_prepare) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index ed3c72ccb..1aa689ade 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -31,8 +31,13 @@ struct WriteOptions; WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : PessimisticTransaction(txn_db, write_options, txn_options), - wpt_db_(txn_db) {} + : PessimisticTransaction(txn_db, write_options, txn_options, false), + wpt_db_(txn_db) { + // Call Initialize outside PessimisticTransaction constructor otherwise it + // would skip overridden functions in WritePreparedTxn since they are not + // defined yet in the constructor of PessimisticTransaction + Initialize(txn_options); +} void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { PessimisticTransaction::Initialize(txn_options); @@ -413,20 +418,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, } void WritePreparedTxn::SetSnapshot() { - // Note: for this optimization setting the last sequence number and obtaining - // the smallest uncommitted seq should be done atomically. However to avoid - // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the - // snapshot. Since we always updated the list of unprepared seq (via - // AddPrepared) AFTER the last sequence is updated, this guarantees that the - // smallest uncommitted seq that we pair with the snapshot is smaller or equal - // the value that would be obtained otherwise atomically. That is ok since - // this optimization works as long as min_uncommitted is less than or equal - // than the smallest uncommitted seq when the snapshot was taken. - auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq(); - const bool FOR_WW_CONFLICT_CHECK = true; - SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK); - assert(snapshot); - wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted); + const bool kForWWConflictCheck = true; + SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck); SetSnapshotInternal(snapshot); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 8ac08f964..893a2e011 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -422,8 +422,22 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, evicted.prep_seq, evicted.commit_seq, prev_max); if (prev_max < evicted.commit_seq) { - // Inc max in larger steps to avoid frequent updates - auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; + auto last = db_impl_->GetLastPublishedSequence(); // could be 0 + SequenceNumber max_evicted_seq; + if (LIKELY(evicted.commit_seq < last)) { + assert(last > 0); + // Inc max in larger steps to avoid frequent updates + max_evicted_seq = + std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1); + } else { + // legit when a commit entry in a write batch overwrite the previous one + max_evicted_seq = evicted.commit_seq; + } + ROCKS_LOG_DETAILS(info_log_, + "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64 + " => %lu", + prepare_seq, evicted.prep_seq, evicted.commit_seq, + prev_max, max_evicted_seq); AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); } // After each eviction from commit cache, check if the commit entry should @@ -554,16 +568,84 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, } const Snapshot* WritePreparedTxnDB::GetSnapshot() { - // Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer - // to WritePreparedTxn::SetSnapshot for more explanation. + const bool kForWWConflictCheck = true; + return GetSnapshotInternal(!kForWWConflictCheck); +} + +SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal( + bool for_ww_conflict_check) { + // Note: for this optimization setting the last sequence number and obtaining + // the smallest uncommitted seq should be done atomically. However to avoid + // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the + // snapshot. Since we always updated the list of unprepared seq (via + // AddPrepared) AFTER the last sequence is updated, this guarantees that the + // smallest uncommitted seq that we pair with the snapshot is smaller or equal + // the value that would be obtained otherwise atomically. That is ok since + // this optimization works as long as min_uncommitted is less than or equal + // than the smallest uncommitted seq when the snapshot was taken. auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); - const bool FOR_WW_CONFLICT_CHECK = true; - SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK); + SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); assert(snap_impl); + SequenceNumber snap_seq = snap_impl->GetSequenceNumber(); + if (UNLIKELY(snap_seq != 0 && snap_seq <= max_evicted_seq_)) { + // There is a very rare case in which the commit entry evicts another commit + // entry that is not published yet thus advancing max evicted seq beyond the + // last published seq. This case is not likely in real-world setup so we + // handle it with a few retries. + size_t retry = 0; + while (snap_impl->GetSequenceNumber() <= max_evicted_seq_ && retry < 100) { + ROCKS_LOG_WARN(info_log_, "GetSnapshot retry %" PRIu64, + snap_impl->GetSequenceNumber()); + ReleaseSnapshot(snap_impl); + // Wait for last visible seq to catch up with max, and also go beyond it + // by one. + AdvanceSeqByOne(); + snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); + assert(snap_impl); + retry++; + } + assert(snap_impl->GetSequenceNumber() > max_evicted_seq_); + if (snap_impl->GetSequenceNumber() <= max_evicted_seq_) { + throw std::runtime_error("Snapshot seq " + + ToString(snap_impl->GetSequenceNumber()) + + " after " + ToString(retry) + + " retries is still less than max_evicted_seq_" + + ToString(max_evicted_seq_.load())); + } + } EnhanceSnapshot(snap_impl, min_uncommitted); + ROCKS_LOG_DETAILS( + db_impl_->immutable_db_options().info_log, + "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64, + for_ww_conflict_check, snap_impl->GetSequenceNumber(), min_uncommitted); return snap_impl; } +void WritePreparedTxnDB::AdvanceSeqByOne() { + // Inserting an empty value will i) let the max evicted entry to be + // published, i.e., max == last_published, increase the last published to + // be one beyond max, i.e., max < last_published. + WriteOptions woptions; + TransactionOptions txn_options; + Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr); + std::hash hasher; + char name[64]; + snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id())); + assert(strlen(name) < 64 - 1); + Status s = txn0->SetName(name); + assert(s.ok()); + if (s.ok()) { + // Without prepare it would simply skip the commit + s = txn0->Prepare(); + } + assert(s.ok()); + if (s.ok()) { + s = txn0->Commit(); + } + assert(s.ok()); + delete txn0; +} + const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( SequenceNumber max) { ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e77422f70..7a134a9f5 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -374,6 +374,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; virtual const Snapshot* GetSnapshot() override; + SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); protected: virtual Status VerifyCFOptions( @@ -395,10 +396,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; + friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_DoubleSnapshot_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; + friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; + friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; friend class WriteUnpreparedTxnDB; @@ -564,6 +568,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const uint64_t& snapshot_seq, const bool next_is_larger); + // A trick to increase the last visible sequence number by one and also wait + // for the in-flight commits to be visible. + void AdvanceSeqByOne(); + // The list of live snapshots at the last time that max_evicted_seq_ advanced. // The list stored into two data structures: in snapshot_cache_ that is // efficient for concurrent reads, and in snapshots_ if the data does not fit