WritePrepared Txn: fix race condition on publishing seq

Summary:
This commit fixes a race condition on calling SetLastPublishedSequence. The function must be called only from the 2nd write queue when two_write_queues is enabled. However there was a bug that would also call it from the main write queue if CommitTimeWriteBatch is provided to the commit request and yet use_only_the_last_commit_time_batch_for_recovery optimization is not enabled. To fix that we penalize the commit request in such cases by doing an additional write solely to publish the seq number from the 2nd queue.
Closes https://github.com/facebook/rocksdb/pull/3641

Differential Revision: D7361508

Pulled By: maysamyabandeh

fbshipit-source-id: bf8f7a27e5cccf5425dccbce25eb0032e8e5a4d7
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent fa8c050e9f
commit 7429b20e39
  1. 2
      db/db_impl.h
  2. 10
      db/db_impl_write.cc
  3. 4
      db/pre_release_callback.h
  4. 3
      db/write_callback_test.cc
  5. 47
      utilities/transactions/write_prepared_txn.cc
  6. 14
      utilities/transactions/write_prepared_txn_db.h

@ -223,6 +223,8 @@ class DBImpl : public DB {
virtual Status SyncWAL() override; virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber GetLatestSequenceNumber() const override;
// REQUIRES: joined the main write queue if two_write_queues is disabled, and
// the second write queue otherwise.
virtual void SetLastPublishedSequence(SequenceNumber seq); virtual void SetLastPublishedSequence(SequenceNumber seq);
// Returns LastSequence in last_seq_same_as_publish_seq_ // Returns LastSequence in last_seq_same_as_publish_seq_
// mode and LastAllocatedSequence otherwise. This is useful when visiblility // mode and LastAllocatedSequence otherwise. This is useful when visiblility

@ -133,7 +133,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
for (auto* writer : *(w.write_group)) { for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) { if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber); assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence); Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) { if (!ws.ok()) {
status = ws; status = ws;
break; break;
@ -368,7 +369,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
for (auto* writer : write_group) { for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) { if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber); assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence); Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) { if (!ws.ok()) {
status = ws; status = ws;
break; break;
@ -629,7 +631,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
for (auto* writer : write_group) { for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) { if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber); assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence); const bool DISABLE_MEMTABLE = true;
Status ws = writer->pre_release_callback->Callback(writer->sequence,
DISABLE_MEMTABLE);
if (!ws.ok()) { if (!ws.ok()) {
status = ws; status = ws;
break; break;

@ -24,7 +24,9 @@ class PreReleaseCallback {
// propagated to all the writers in the write group. // propagated to all the writers in the write group.
// seq is the sequence number that is used for this write and will be // seq is the sequence number that is used for this write and will be
// released. // released.
virtual Status Callback(SequenceNumber seq) = 0; // is_mem_disabled is currently used for debugging purposes to assert that
// the callback is done from the right write queue.
virtual Status Callback(SequenceNumber seq, const bool is_mem_disabled) = 0;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -294,7 +294,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
public: public:
PublishSeqCallback(DBImpl* db_impl_in) PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {} : db_impl_(db_impl_in) {}
virtual Status Callback(SequenceNumber last_seq) { virtual Status Callback(SequenceNumber last_seq,
const bool /*not used*/) override {
db_impl_->SetLastPublishedSequence(last_seq); db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK(); return Status::OK();
} }

@ -128,9 +128,14 @@ Status WritePreparedTxn::CommitInternal() {
assert(s.ok()); assert(s.ok());
commit_batch_cnt = counter.BatchCount(); commit_batch_cnt = counter.BatchCount();
} }
WritePreparedCommitEntryPreReleaseCallback update_commit_map( const bool PREP_HEAP_SKIPPED = true;
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
const bool disable_memtable = !includes_data; const bool disable_memtable = !includes_data;
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
const bool publish_seq = do_one_write;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
!PREP_HEAP_SKIPPED, publish_seq);
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already // Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to // a connection between the memtable and its WAL, so there is no need to
@ -141,6 +146,38 @@ Status WritePreparedTxn::CommitInternal() {
zero_log_number, disable_memtable, &seq_used, zero_log_number, disable_memtable, &seq_used,
batch_cnt, &update_commit_map); batch_cnt, &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (LIKELY(do_one_write || !s.ok())) {
return s;
} // else do the 2nd write to publish seq
// Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
// two_write_queues should be disabled to avoid many additional writes here.
class PublishSeqPreReleaseCallback : public PreReleaseCallback {
public:
explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
: db_impl_(db_impl) {}
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override {
assert(is_mem_disabled);
assert(db_impl_->immutable_db_options().two_write_queues);
db_impl_->SetLastPublishedSequence(seq);
return Status::OK();
}
private:
DBImpl* db_impl_;
} publish_seq_callback(db_impl_);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
const bool DISABLE_MEMTABLE = true;
const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&publish_seq_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s; return s;
} }
@ -240,14 +277,14 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatchInternal::MarkRollback(&rollback_batch, name_); WriteBatchInternal::MarkRollback(&rollback_batch, name_);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0; const uint64_t NO_REF_LOG = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0; const size_t ZERO_PREPARES = 0;
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH);
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr); do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) { if (!s.ok()) {
@ -275,7 +312,7 @@ Status WritePreparedTxn::RollbackInternal() {
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch); WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare); &update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back // Mark the txn as rolled back

@ -459,19 +459,22 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq, SequenceNumber prep_seq,
size_t prep_batch_cnt, size_t prep_batch_cnt,
size_t data_batch_cnt = 0, size_t data_batch_cnt = 0,
bool prep_heap_skipped = false) bool prep_heap_skipped = false,
bool publish_seq = true)
: db_(db), : db_(db),
db_impl_(db_impl), db_impl_(db_impl),
prep_seq_(prep_seq), prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt), prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt), data_batch_cnt_(data_batch_cnt),
prep_heap_skipped_(prep_heap_skipped), prep_heap_skipped_(prep_heap_skipped),
includes_data_(data_batch_cnt_ > 0) { includes_data_(data_batch_cnt_ > 0),
publish_seq_(publish_seq) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
} }
virtual Status Callback(SequenceNumber commit_seq) override { virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq ? commit_seq
@ -492,7 +495,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED); db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED);
} }
} }
if (db_impl_->immutable_db_options().two_write_queues) { if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
assert(is_mem_disabled); // implies the 2nd queue
// Publish the sequence number. We can do that here assuming the callback // Publish the sequence number. We can do that here assuming the callback
// is invoked only from one write queue, which would guarantee that the // is invoked only from one write queue, which would guarantee that the
// publish sequence numbers will be in order, i.e., once a seq is // publish sequence numbers will be in order, i.e., once a seq is
@ -517,6 +521,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
// Either because it is commit without prepare or it has a // Either because it is commit without prepare or it has a
// CommitTimeWriteBatch // CommitTimeWriteBatch
bool includes_data_; bool includes_data_;
// Should the callback also publishes the commit seq number
bool publish_seq_;
}; };
// Count the number of sub-batches inside a batch. A sub-batch does not have // Count the number of sub-batches inside a batch. A sub-batch does not have

Loading…
Cancel
Save