diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index bec5d9d98..400ca0106 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -143,8 +143,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } }; std::function txn_t0 = [&](size_t index) { @@ -166,8 +168,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } ASSERT_OK(s); }; @@ -192,8 +196,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } auto pdb = reinterpret_cast(db); pdb->UnregisterTransaction(txn); @@ -258,8 +264,10 @@ class TransactionTest : public ::testing::TestWithParam< exp_seq++; // Consume one seq per rollback batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for rollback commit exp_seq++; + } } delete txn; }; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8baa09367..4c77162d5 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1361,7 +1361,9 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { VerifyKeys({{"foo", v}}); seq++; // one for the key/value KeyVersion kv = {"foo", v, seq, kTypeValue}; + if (options.two_write_queues) { seq++; // one for the commit + } versions.emplace_back(kv); } std::reverse(std::begin(versions), std::end(versions)); @@ -1407,7 +1409,9 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { auto add_key = [&](std::function func) { ASSERT_OK(func()); expected_seq++; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); snapshots.push_back(db->GetSnapshot()); }; @@ -1516,12 +1520,16 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); expected_seq++; // 1 for write SequenceNumber seq1 = expected_seq; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); expected_seq++; // 1 for write SequenceNumber seq2 = expected_seq; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); db->ReleaseSnapshot(snapshot1); @@ -1646,7 +1654,9 @@ TEST_P(WritePreparedTransactionTest, ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); expected_seq++; // one for data - expected_seq++; // one for commit + if (options.two_write_queues) { + expected_seq++; // one for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); // Dummy keys to avoid compaction trivially move files and get around actual diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 0d2710d54..b4caaa4c7 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -93,18 +93,29 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { // TODO(myabandeh): handle the duplicate keys in the batch + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; + const bool INCLUDES_DATA = true; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used, + do_one_write ? &update_commit_map : nullptr); assert(seq_used != kMaxSequenceNumber); + if (!s.ok()) { + return s; + } + if (do_one_write) { + return s; + } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; - // Commit the batch by writing an empty batch to the queue that will release - // the commit sequence number to readers. - WritePreparedCommitEntryPreReleaseCallback update_commit_map( + // Commit the batch by writing an empty batch to the 2nd queue that will + // release the commit sequence number to readers. + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, prepare_seq); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); @@ -112,7 +123,7 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, - &update_commit_map); + &update_commit_map_with_prepare); assert(seq_used != kMaxSequenceNumber); return s; } @@ -221,19 +232,31 @@ Status WritePreparedTxn::RollbackInternal() { } // The Rollback marker will be used as a batch separator WriteBatchInternal::MarkRollback(&rollback_batch, name_); + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; + const bool INCLUDES_DATA = true; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used, + do_one_write ? &update_commit_map : nullptr); assert(seq_used != kMaxSequenceNumber); if (!s.ok()) { return s; } + if (do_one_write) { + // TODO(myabandeh): what if max has already advanced rollback_seq? + // Mark the txn as rolled back + uint64_t& rollback_seq = seq_used; + wpt_db_->RollbackPrepared(GetId(), rollback_seq); + return s; + } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. - WritePreparedCommitEntryPreReleaseCallback update_commit_map( + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, prepare_seq); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); @@ -241,7 +264,7 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, - &update_commit_map); + &update_commit_map_with_prepare); assert(seq_used != kMaxSequenceNumber); // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index cb8c134dd..b2a1af8f6 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -382,35 +382,44 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { public: // includes_data indicates that the commit also writes non-empty // CommitTimeWriteBatch to memtable, which needs to be committed separately. - WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, - DBImpl* db_impl, - SequenceNumber prep_seq, - bool includes_data = false) + WritePreparedCommitEntryPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, + SequenceNumber prep_seq = kMaxSequenceNumber, bool includes_data = false) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), includes_data_(includes_data) {} virtual Status Callback(SequenceNumber commit_seq) { - db_->AddCommitted(prep_seq_, commit_seq); + assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); + if (prep_seq_ != kMaxSequenceNumber) { + db_->AddCommitted(prep_seq_, commit_seq); + } // else there was no prepare phase if (includes_data_) { - // Commit the data that is accompnaied with the commit marker + // Commit the data that is accompnaied with the commit request // TODO(myabandeh): skip AddPrepared db_->AddPrepared(commit_seq); db_->AddCommitted(commit_seq, commit_seq); } - // Publish the sequence number. We can do that here assuming the callback is - // invoked only from one write queue, which would guarantee that the publish - // sequence numbers will be in order, i.e., once a seq is published all the - // seq prior to that are also publishable. - db_impl_->SetLastPublishedSequence(commit_seq); + if (db_impl_->immutable_db_options().two_write_queues) { + // Publish the sequence number. We can do that here assuming the callback + // is invoked only from one write queue, which would guarantee that the + // publish sequence numbers will be in order, i.e., once a seq is + // published all the seq prior to that are also publishable. + db_impl_->SetLastPublishedSequence(commit_seq); + } + // else SequenceNumber that is updated as part of the write already does the + // publishing return Status::OK(); } private: WritePreparedTxnDB* db_; DBImpl* db_impl_; + // kMaxSequenceNumber if there was no prepare phase SequenceNumber prep_seq_; + // Either because it is commit without prepare or it has a + // CommitTimeWriteBatch bool includes_data_; };