diff --git a/db/write_batch.cc b/db/write_batch.cc index d70014f0a..4bc4bae92 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -988,6 +988,16 @@ class MemTableInserter : public WriteBatch::Handler { virtual bool WriterAfterCommit() const { return write_after_commit_; } + // The batch seq is regularly restarted; In normal mode it is set when + // MemTableInserter is constructed in the write thread and in recovery mode it + // is set when a batch, which is tagged with seq, is read from the WAL. + // Within a sequenced batch, which could be a merge of multiple batches, we + // have two policies to advance the seq: i) seq_per_key (default) and ii) + // seq_per_batch. To implement the latter we need to mark the boundry between + // the individual batches. The approach is this: 1) Use the terminating + // markers to indicate the boundry (kTypeEndPrepareXID, kTypeCommitXID, + // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absense of a + // natural boundy marker. void MaybeAdvanceSeq(bool batch_boundry = false) { if (batch_boundry == seq_per_batch_) { sequence_++; @@ -1430,6 +1440,9 @@ class MemTableInserter : public WriteBatch::Handler { // in non recovery we simply ignore this tag } + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + return Status::OK(); } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 71ad14362..1e54c9a27 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -924,6 +924,7 @@ class DB { // Retrieve the sorted list of all wal files with earliest file first virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0; + // Note: this API is not yet consistent with WritePrepared transactions. // Sets iter to an iterator that is positioned at a write-batch containing // seq_number. If the sequence number is non existent, it returns an iterator // at the first available seq_no after the requested seq_no diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 88c8767c4..85662362b 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4837,6 +4837,8 @@ TEST_P(TransactionTest, MemoryLimitTest) { // necessarily the one acceptable way. If the algorithm is legitimately changed, // this unit test should be updated as well. TEST_P(TransactionTest, SeqAdvanceTest) { + // TODO(myabandeh): must be test with false before new releases + const bool short_test = true; WriteOptions wopts; FlushOptions fopt; @@ -4846,7 +4848,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // of the branches. This is the same as counting a binary number where i-th // bit represents whether we take branch i in the represented by the number. - const size_t NUM_BRANCHES = 8; + const size_t NUM_BRANCHES = short_test ? 6 : 10; // Helper function that shows if the branch is to be taken in the run // represented by the number n. auto branch_do = [&](size_t n, size_t* branch) { @@ -4869,7 +4871,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } - if (branch_do(n, &branch)) { + if (!short_test && branch_do(n, &branch)) { db_impl->FlushWAL(true); ReOpenNoDelete(); db_impl = reinterpret_cast(db->GetRootDB()); @@ -4891,7 +4893,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } - if (branch_do(n, &branch)) { + if (!short_test && branch_do(n, &branch)) { db_impl->FlushWAL(true); ReOpenNoDelete(); db_impl = reinterpret_cast(db->GetRootDB()); @@ -4908,7 +4910,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } - if (branch_do(n, &branch)) { + if (!short_test && branch_do(n, &branch)) { db_impl->FlushWAL(true); ReOpenNoDelete(); db_impl = reinterpret_cast(db->GetRootDB()); @@ -4916,10 +4918,24 @@ TEST_P(TransactionTest, SeqAdvanceTest) { ASSERT_EQ(exp_seq, seq); } - txn_t0(0); + txn_t4(0); seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->TEST_GetLastVisibleSequence(); + ASSERT_EQ(exp_seq, seq); + } + if (!short_test && branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + txn_t2(0); seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); @@ -4929,7 +4945,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) { seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } - if (branch_do(n, &branch)) { + if (!short_test && branch_do(n, &branch)) { db_impl->FlushWAL(true); ReOpenNoDelete(); db_impl = reinterpret_cast(db->GetRootDB()); diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 06caaec80..400ca0106 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -236,6 +236,41 @@ class TransactionTest : public ::testing::TestWithParam< } delete txn; }; + std::function txn_t4 = [&](size_t index) { + // A full 2pc txn that also involves a commit marker. + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + auto istr = std::to_string(index); + auto s = txn->SetName("xid" + istr); + ASSERT_OK(s); + s = txn->Put(Slice("foo" + istr), Slice("bar")); + s = txn->Put(Slice("foo2" + istr), Slice("bar2")); + s = txn->Put(Slice("foo3" + istr), Slice("bar3")); + s = txn->Put(Slice("foo4" + istr), Slice("bar4")); + s = txn->Put(Slice("foo5" + istr), Slice("bar5")); + ASSERT_OK(s); + expected_commits++; + s = txn->Prepare(); + ASSERT_OK(s); + commit_writes++; + s = txn->Rollback(); + ASSERT_OK(s); + if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { + // No seq is consumed for deleting the txn buffer + exp_seq += 0; + } else { + // Consume one seq per batch + exp_seq++; + // Consume one seq per rollback batch + exp_seq++; + if (options.two_write_queues) { + // Consume one seq for rollback commit + exp_seq++; + } + } + delete txn; + }; // Test that we can change write policy after a clean shutdown (which would // empty the WAL) diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index dfa1579ad..3a731ae9c 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -605,12 +605,13 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { FlushOptions fopt; // Number of different txn types we use in this test - const size_t type_cnt = 4; + const size_t type_cnt = 5; // The size of the first write group // TODO(myabandeh): This should be increase for pre-release tests const size_t first_group_size = 2; // Total number of txns we run in each test - const size_t txn_cnt = first_group_size * 2; + // TODO(myabandeh): This should be increase for pre-release tests + const size_t txn_cnt = first_group_size + 1; size_t base[txn_cnt + 1] = { 1, @@ -675,6 +676,9 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { case 3: threads.emplace_back(txn_t3, bi); break; + case 4: + threads.emplace_back(txn_t3, bi); + break; default: assert(false); } @@ -710,16 +714,30 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); // The latest seq might be due to a commit without prepare and hence not - // persisted in the WAL. To make the verification of seq after recovery - // easier we write in a transaction with prepare which makes the latest seq - // to be persisted via the commitmarker. - txn_t3(0); + // persisted in the WAL. We need to discount such seqs if they are not + // continued by any seq consued by a value write. + if (options.two_write_queues) { + WritePreparedTxnDB* wp_db = dynamic_cast(db); + MutexLock l(&wp_db->seq_for_metadata_mutex_); + auto& vec = wp_db->seq_for_metadata; + std::sort(vec.begin(), vec.end()); + // going backward discount any last seq consumed for metadata until we see + // a seq that is consumed for actualy key/values. + auto rit = vec.rbegin(); + for (; rit != vec.rend(); ++rit) { + if (*rit == exp_seq) { + exp_seq--; + } else { + break; + } + } + } // Check if recovery preserves the last sequence number db_impl->FlushWAL(true); ReOpenNoDelete(); db_impl = reinterpret_cast(db->GetRootDB()); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); // Check if flush preserves the last sequence number @@ -1134,25 +1152,18 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) { ASSERT_SAME(db, s4, v4, "key4"); if (crash) { - // TODO(myabandeh): replace it with true crash (commented lines below) - // after compaction PR is landed. + delete txn; auto db_impl = reinterpret_cast(db->GetRootDB()); - auto seq = db_impl->GetLatestSequenceNumber(); + db_impl->FlushWAL(true); + ReOpenNoDelete(); wp_db = dynamic_cast(db); - SequenceNumber prev_max = wp_db->max_evicted_seq_; - wp_db->AdvanceMaxEvictedSeq(prev_max, seq); - // delete txn; - // auto db_impl = reinterpret_cast(db->GetRootDB()); - // db_impl->FlushWAL(true); - // ReOpenNoDelete(); - // wp_db = dynamic_cast(db); - // txn = db->GetTransactionByName("xid0"); - // ASSERT_FALSE(wp_db->delayed_prepared_empty_); - // ReadLock rl(&wp_db->prepared_mutex_); - // ASSERT_TRUE(wp_db->prepared_txns_.empty()); - // ASSERT_FALSE(wp_db->delayed_prepared_.empty()); - // ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != - // wp_db->delayed_prepared_.end()); + txn = db->GetTransactionByName("xid0"); + ASSERT_FALSE(wp_db->delayed_prepared_empty_); + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_FALSE(wp_db->delayed_prepared_.empty()); + ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != + wp_db->delayed_prepared_.end()); } ASSERT_SAME(db, s1, v1, "key1"); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 2d48a32ad..6e601235f 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -93,7 +93,12 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) { if (db_impl_->immutable_db_options().two_write_queues) { - return db_impl_->IncAndFetchSequenceNumber(); + auto s = db_impl_->IncAndFetchSequenceNumber(); +#ifndef NDEBUG + MutexLock l(&wpt_db_->seq_for_metadata_mutex_); + wpt_db_->seq_for_metadata.push_back(s); +#endif + return s; } else { return prep_seq; } @@ -161,8 +166,6 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatch rollback_batch; assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); - // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(&rollback_batch); // In WritePrepared, the txn is is the same as prepare seq auto last_visible_txn = GetId() - 1; struct RollbackWriteBatchBuilder : public WriteBatch::Handler { @@ -227,6 +230,7 @@ Status WritePreparedTxn::RollbackInternal() { if (!s.ok()) { return s; } + // The Rollback marker will be used as a batch separator WriteBatchInternal::MarkRollback(&rollback_batch, name_); const bool disable_memtable = true; const uint64_t no_log_ref = 0; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index c50ac02d4..43b578ebf 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -215,6 +215,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, } { // We should not normally reach here + // TODO(myabandeh): check only if snapshot_seq is in the list of snaphots ReadLock rl(&old_commit_map_mutex_); auto old_commit_entry = old_commit_map_.find(prep_seq); if (old_commit_entry == old_commit_map_.end() || diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 02427433c..004f1ef67 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -179,6 +179,12 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Struct to hold ownership of snapshot and read callback for cleanup. struct IteratorState; +#ifndef NDEBUG + // For unit tests we can track of the seq numbers that are used for metadata as opposed to actual key/values + std::vector seq_for_metadata; + mutable port::Mutex seq_for_metadata_mutex_; +#endif + private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;