From 7429b20e3901e0bcc7e028b37536a262cf11a448 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 22 Mar 2018 14:27:44 -0700 Subject: [PATCH] WritePrepared Txn: fix race condition on publishing seq Summary: This commit fixes a race condition on calling SetLastPublishedSequence. The function must be called only from the 2nd write queue when two_write_queues is enabled. However there was a bug that would also call it from the main write queue if CommitTimeWriteBatch is provided to the commit request and yet use_only_the_last_commit_time_batch_for_recovery optimization is not enabled. To fix that we penalize the commit request in such cases by doing an additional write solely to publish the seq number from the 2nd queue. Closes https://github.com/facebook/rocksdb/pull/3641 Differential Revision: D7361508 Pulled By: maysamyabandeh fbshipit-source-id: bf8f7a27e5cccf5425dccbce25eb0032e8e5a4d7 --- db/db_impl.h | 2 + db/db_impl_write.cc | 10 ++-- db/pre_release_callback.h | 4 +- db/write_callback_test.cc | 3 +- utilities/transactions/write_prepared_txn.cc | 47 +++++++++++++++++-- .../transactions/write_prepared_txn_db.h | 14 ++++-- 6 files changed, 66 insertions(+), 14 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index c316b6ad4..c97d407bf 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -223,6 +223,8 @@ class DBImpl : public DB { virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; + // REQUIRES: joined the main write queue if two_write_queues is disabled, and + // the second write queue otherwise. virtual void SetLastPublishedSequence(SequenceNumber seq); // Returns LastSequence in last_seq_same_as_publish_seq_ // mode and LastAllocatedSequence otherwise. This is useful when visiblility diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 5c3eaf404..8b26ac511 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -133,7 +133,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, for (auto* writer : *(w.write_group)) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); if (!ws.ok()) { status = ws; break; @@ -368,7 +369,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); if (!ws.ok()) { status = ws; break; @@ -629,7 +631,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + const bool DISABLE_MEMTABLE = true; + Status ws = writer->pre_release_callback->Callback(writer->sequence, + DISABLE_MEMTABLE); if (!ws.ok()) { status = ws; break; diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h index fdc4d50c5..b3e658577 100644 --- a/db/pre_release_callback.h +++ b/db/pre_release_callback.h @@ -24,7 +24,9 @@ class PreReleaseCallback { // propagated to all the writers in the write group. // seq is the sequence number that is used for this write and will be // released. - virtual Status Callback(SequenceNumber seq) = 0; + // is_mem_disabled is currently used for debugging purposes to assert that + // the callback is done from the right write queue. + virtual Status Callback(SequenceNumber seq, const bool is_mem_disabled) = 0; }; } // namespace rocksdb diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index c4f0e35dc..c91a4305c 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -294,7 +294,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { public: PublishSeqCallback(DBImpl* db_impl_in) : db_impl_(db_impl_in) {} - virtual Status Callback(SequenceNumber last_seq) { + virtual Status Callback(SequenceNumber last_seq, + const bool /*not used*/) override { db_impl_->SetLastPublishedSequence(last_seq); return Status::OK(); } diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 591a5d3ab..f6aef7fe2 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -128,9 +128,14 @@ Status WritePreparedTxn::CommitInternal() { assert(s.ok()); commit_batch_cnt = counter.BatchCount(); } - WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); + const bool PREP_HEAP_SKIPPED = true; const bool disable_memtable = !includes_data; + const bool do_one_write = + !db_impl_->immutable_db_options().two_write_queues || disable_memtable; + const bool publish_seq = do_one_write; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, + !PREP_HEAP_SKIPPED, publish_seq); uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to @@ -141,6 +146,38 @@ Status WritePreparedTxn::CommitInternal() { zero_log_number, disable_memtable, &seq_used, batch_cnt, &update_commit_map); assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (LIKELY(do_one_write || !s.ok())) { + return s; + } // else do the 2nd write to publish seq + // Note: the 2nd write comes with a performance penality. So if we have too + // many of commits accompanied with ComitTimeWriteBatch and yet we cannot + // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, + // two_write_queues should be disabled to avoid many additional writes here. + class PublishSeqPreReleaseCallback : public PreReleaseCallback { + public: + explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) + : db_impl_(db_impl) {} + virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override { + assert(is_mem_disabled); + assert(db_impl_->immutable_db_options().two_write_queues); + db_impl_->SetLastPublishedSequence(seq); + return Status::OK(); + } + + private: + DBImpl* db_impl_; + } publish_seq_callback(db_impl_); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + const bool DISABLE_MEMTABLE = true; + const size_t ONE_BATCH = 1; + const uint64_t NO_REF_LOG = 0; + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &publish_seq_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; } @@ -240,14 +277,14 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatchInternal::MarkRollback(&rollback_batch, name_); bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; - const uint64_t no_log_ref = 0; + const uint64_t NO_REF_LOG = 0; uint64_t seq_used = kMaxSequenceNumber; const size_t ZERO_PREPARES = 0; const size_t ONE_BATCH = 1; WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, do_one_write ? &update_commit_map : nullptr); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (!s.ok()) { @@ -275,7 +312,7 @@ Status WritePreparedTxn::RollbackInternal() { // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, - no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index bb495b6c6..a3d2ab8fc 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -459,19 +459,22 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq, size_t prep_batch_cnt, size_t data_batch_cnt = 0, - bool prep_heap_skipped = false) + bool prep_heap_skipped = false, + bool publish_seq = true) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), prep_batch_cnt_(prep_batch_cnt), data_batch_cnt_(data_batch_cnt), prep_heap_skipped_(prep_heap_skipped), - includes_data_(data_batch_cnt_ > 0) { + includes_data_(data_batch_cnt_ > 0), + publish_seq_(publish_seq) { assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); } - virtual Status Callback(SequenceNumber commit_seq) override { + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) ? commit_seq @@ -492,7 +495,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED); } } - if (db_impl_->immutable_db_options().two_write_queues) { + if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { + assert(is_mem_disabled); // implies the 2nd queue // Publish the sequence number. We can do that here assuming the callback // is invoked only from one write queue, which would guarantee that the // publish sequence numbers will be in order, i.e., once a seq is @@ -517,6 +521,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { // Either because it is commit without prepare or it has a // CommitTimeWriteBatch bool includes_data_; + // Should the callback also publishes the commit seq number + bool publish_seq_; }; // Count the number of sub-batches inside a batch. A sub-batch does not have