From a6d3c762df2dc40c913ee6c460ad1b7d2f7363bd Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 18 Dec 2017 08:03:18 -0800 Subject: [PATCH] WritePrepared Txn: non-2pc write in one round Summary: Currently non-2pc writes do the 2nd dummy write to actually commit the transaction. This was necessary to ensure that publishing the commit sequence number will be done only from one queue (the queue that does not write to memtable). This is however not necessary when we have only one write queue, which is actually the setup that would be used by non-2pc writes. This patch eliminates the 2nd write when two_write_queues are disabled by updating the commit map in the 1st write. Closes https://github.com/facebook/rocksdb/pull/3277 Differential Revision: D6575392 Pulled By: maysamyabandeh fbshipit-source-id: 8ab458f7ca506905962f9166026b2ec81e749c46 --- utilities/transactions/transaction_test.h | 8 ++++ .../write_prepared_transaction_test.cc | 12 +++++- utilities/transactions/write_prepared_txn.cc | 39 +++++++++++++++---- .../transactions/write_prepared_txn_db.h | 31 +++++++++------ 4 files changed, 70 insertions(+), 20 deletions(-) diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index bec5d9d98..400ca0106 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -143,8 +143,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } }; std::function txn_t0 = [&](size_t index) { @@ -166,8 +168,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } ASSERT_OK(s); }; @@ -192,8 +196,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; + } } auto pdb = reinterpret_cast(db); pdb->UnregisterTransaction(txn); @@ -258,8 +264,10 @@ class TransactionTest : public ::testing::TestWithParam< exp_seq++; // Consume one seq per rollback batch exp_seq++; + if (options.two_write_queues) { // Consume one seq for rollback commit exp_seq++; + } } delete txn; }; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8baa09367..4c77162d5 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1361,7 +1361,9 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { VerifyKeys({{"foo", v}}); seq++; // one for the key/value KeyVersion kv = {"foo", v, seq, kTypeValue}; + if (options.two_write_queues) { seq++; // one for the commit + } versions.emplace_back(kv); } std::reverse(std::begin(versions), std::end(versions)); @@ -1407,7 +1409,9 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { auto add_key = [&](std::function func) { ASSERT_OK(func()); expected_seq++; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); snapshots.push_back(db->GetSnapshot()); }; @@ -1516,12 +1520,16 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); expected_seq++; // 1 for write SequenceNumber seq1 = expected_seq; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); expected_seq++; // 1 for write SequenceNumber seq2 = expected_seq; + if (options.two_write_queues) { expected_seq++; // 1 for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); db->ReleaseSnapshot(snapshot1); @@ -1646,7 +1654,9 @@ TEST_P(WritePreparedTransactionTest, ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); expected_seq++; // one for data - expected_seq++; // one for commit + if (options.two_write_queues) { + expected_seq++; // one for commit + } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); // Dummy keys to avoid compaction trivially move files and get around actual diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 0d2710d54..b4caaa4c7 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -93,18 +93,29 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { // TODO(myabandeh): handle the duplicate keys in the batch + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; + const bool INCLUDES_DATA = true; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used, + do_one_write ? &update_commit_map : nullptr); assert(seq_used != kMaxSequenceNumber); + if (!s.ok()) { + return s; + } + if (do_one_write) { + return s; + } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; - // Commit the batch by writing an empty batch to the queue that will release - // the commit sequence number to readers. - WritePreparedCommitEntryPreReleaseCallback update_commit_map( + // Commit the batch by writing an empty batch to the 2nd queue that will + // release the commit sequence number to readers. + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, prepare_seq); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); @@ -112,7 +123,7 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, - &update_commit_map); + &update_commit_map_with_prepare); assert(seq_used != kMaxSequenceNumber); return s; } @@ -221,19 +232,31 @@ Status WritePreparedTxn::RollbackInternal() { } // The Rollback marker will be used as a batch separator WriteBatchInternal::MarkRollback(&rollback_batch, name_); + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; + const bool INCLUDES_DATA = true; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used, + do_one_write ? &update_commit_map : nullptr); assert(seq_used != kMaxSequenceNumber); if (!s.ok()) { return s; } + if (do_one_write) { + // TODO(myabandeh): what if max has already advanced rollback_seq? + // Mark the txn as rolled back + uint64_t& rollback_seq = seq_used; + wpt_db_->RollbackPrepared(GetId(), rollback_seq); + return s; + } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. - WritePreparedCommitEntryPreReleaseCallback update_commit_map( + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, prepare_seq); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); @@ -241,7 +264,7 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, - &update_commit_map); + &update_commit_map_with_prepare); assert(seq_used != kMaxSequenceNumber); // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index cb8c134dd..b2a1af8f6 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -382,35 +382,44 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { public: // includes_data indicates that the commit also writes non-empty // CommitTimeWriteBatch to memtable, which needs to be committed separately. - WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, - DBImpl* db_impl, - SequenceNumber prep_seq, - bool includes_data = false) + WritePreparedCommitEntryPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, + SequenceNumber prep_seq = kMaxSequenceNumber, bool includes_data = false) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), includes_data_(includes_data) {} virtual Status Callback(SequenceNumber commit_seq) { - db_->AddCommitted(prep_seq_, commit_seq); + assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); + if (prep_seq_ != kMaxSequenceNumber) { + db_->AddCommitted(prep_seq_, commit_seq); + } // else there was no prepare phase if (includes_data_) { - // Commit the data that is accompnaied with the commit marker + // Commit the data that is accompnaied with the commit request // TODO(myabandeh): skip AddPrepared db_->AddPrepared(commit_seq); db_->AddCommitted(commit_seq, commit_seq); } - // Publish the sequence number. We can do that here assuming the callback is - // invoked only from one write queue, which would guarantee that the publish - // sequence numbers will be in order, i.e., once a seq is published all the - // seq prior to that are also publishable. - db_impl_->SetLastPublishedSequence(commit_seq); + if (db_impl_->immutable_db_options().two_write_queues) { + // Publish the sequence number. We can do that here assuming the callback + // is invoked only from one write queue, which would guarantee that the + // publish sequence numbers will be in order, i.e., once a seq is + // published all the seq prior to that are also publishable. + db_impl_->SetLastPublishedSequence(commit_seq); + } + // else SequenceNumber that is updated as part of the write already does the + // publishing return Status::OK(); } private: WritePreparedTxnDB* db_; DBImpl* db_impl_; + // kMaxSequenceNumber if there was no prepare phase SequenceNumber prep_seq_; + // Either because it is commit without prepare or it has a + // CommitTimeWriteBatch bool includes_data_; };