WritePrepared Txn: non-2pc write in one round

Summary:
Currently non-2pc writes do the 2nd dummy write to actually commit the transaction. This was necessary to ensure that publishing the commit sequence number will be done only from one queue (the queue that does not write to memtable). This is however not necessary when we have only one write queue, which is actually the setup that would be used by non-2pc writes. This patch eliminates the 2nd write when two_write_queues are disabled by updating the commit map in the 1st write.
Closes https://github.com/facebook/rocksdb/pull/3277

Differential Revision: D6575392

Pulled By: maysamyabandeh

fbshipit-source-id: 8ab458f7ca506905962f9166026b2ec81e749c46
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent fccc12f386
commit a6d3c762df
  1. 8
      utilities/transactions/transaction_test.h
  2. 12
      utilities/transactions/write_prepared_transaction_test.cc
  3. 39
      utilities/transactions/write_prepared_txn.cc
  4. 31
      utilities/transactions/write_prepared_txn_db.h

@ -143,8 +143,10 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
}
} }
}; };
std::function<void(size_t)> txn_t0 = [&](size_t index) { std::function<void(size_t)> txn_t0 = [&](size_t index) {
@ -166,8 +168,10 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
}
} }
ASSERT_OK(s); ASSERT_OK(s);
}; };
@ -192,8 +196,10 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
}
} }
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db); auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn); pdb->UnregisterTransaction(txn);
@ -258,8 +264,10 @@ class TransactionTest : public ::testing::TestWithParam<
exp_seq++; exp_seq++;
// Consume one seq per rollback batch // Consume one seq per rollback batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for rollback commit // Consume one seq for rollback commit
exp_seq++; exp_seq++;
}
} }
delete txn; delete txn;
}; };

@ -1361,7 +1361,9 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
VerifyKeys({{"foo", v}}); VerifyKeys({{"foo", v}});
seq++; // one for the key/value seq++; // one for the key/value
KeyVersion kv = {"foo", v, seq, kTypeValue}; KeyVersion kv = {"foo", v, seq, kTypeValue};
if (options.two_write_queues) {
seq++; // one for the commit seq++; // one for the commit
}
versions.emplace_back(kv); versions.emplace_back(kv);
} }
std::reverse(std::begin(versions), std::end(versions)); std::reverse(std::begin(versions), std::end(versions));
@ -1407,7 +1409,9 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
auto add_key = [&](std::function<Status()> func) { auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func()); ASSERT_OK(func());
expected_seq++; expected_seq++;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
snapshots.push_back(db->GetSnapshot()); snapshots.push_back(db->GetSnapshot());
}; };
@ -1516,12 +1520,16 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
expected_seq++; // 1 for write expected_seq++; // 1 for write
SequenceNumber seq1 = expected_seq; SequenceNumber seq1 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
expected_seq++; // 1 for write expected_seq++; // 1 for write
SequenceNumber seq2 = expected_seq; SequenceNumber seq2 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1); db->ReleaseSnapshot(snapshot1);
@ -1646,7 +1654,9 @@ TEST_P(WritePreparedTransactionTest,
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
expected_seq++; // one for data expected_seq++; // one for data
expected_seq++; // one for commit if (options.two_write_queues) {
expected_seq++; // one for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual // Dummy keys to avoid compaction trivially move files and get around actual

@ -93,18 +93,29 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
// TODO(myabandeh): handle the duplicate keys in the batch // TODO(myabandeh): handle the duplicate keys in the batch
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch); WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
const bool INCLUDES_DATA = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA);
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used); no_log_ref, !DISABLE_MEMTABLE, &seq_used,
do_one_write ? &update_commit_map : nullptr);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
}
if (do_one_write) {
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the 2nd queue that will
// the commit sequence number to readers. // release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq); wpt_db_, db_impl_, prepare_seq);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
@ -112,7 +123,7 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
WriteBatchInternal::InsertNoop(&empty_batch); WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map); &update_commit_map_with_prepare);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
return s; return s;
} }
@ -221,19 +232,31 @@ Status WritePreparedTxn::RollbackInternal() {
} }
// The Rollback marker will be used as a batch separator // The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, name_); WriteBatchInternal::MarkRollback(&rollback_batch, name_);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
const bool INCLUDES_DATA = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA);
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used); no_log_ref, !DISABLE_MEMTABLE, &seq_used,
do_one_write ? &update_commit_map : nullptr);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (do_one_write) {
// TODO(myabandeh): what if max has already advanced rollback_seq?
// Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers. // the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq); wpt_db_, db_impl_, prepare_seq);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
@ -241,7 +264,7 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatchInternal::InsertNoop(&empty_batch); WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map); &update_commit_map_with_prepare);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back // Mark the txn as rolled back
uint64_t& rollback_seq = seq_used; uint64_t& rollback_seq = seq_used;

@ -382,35 +382,44 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
public: public:
// includes_data indicates that the commit also writes non-empty // includes_data indicates that the commit also writes non-empty
// CommitTimeWriteBatch to memtable, which needs to be committed separately. // CommitTimeWriteBatch to memtable, which needs to be committed separately.
WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, WritePreparedCommitEntryPreReleaseCallback(
DBImpl* db_impl, WritePreparedTxnDB* db, DBImpl* db_impl,
SequenceNumber prep_seq, SequenceNumber prep_seq = kMaxSequenceNumber, bool includes_data = false)
bool includes_data = false)
: db_(db), : db_(db),
db_impl_(db_impl), db_impl_(db_impl),
prep_seq_(prep_seq), prep_seq_(prep_seq),
includes_data_(includes_data) {} includes_data_(includes_data) {}
virtual Status Callback(SequenceNumber commit_seq) { virtual Status Callback(SequenceNumber commit_seq) {
db_->AddCommitted(prep_seq_, commit_seq); assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
if (prep_seq_ != kMaxSequenceNumber) {
db_->AddCommitted(prep_seq_, commit_seq);
} // else there was no prepare phase
if (includes_data_) { if (includes_data_) {
// Commit the data that is accompnaied with the commit marker // Commit the data that is accompnaied with the commit request
// TODO(myabandeh): skip AddPrepared // TODO(myabandeh): skip AddPrepared
db_->AddPrepared(commit_seq); db_->AddPrepared(commit_seq);
db_->AddCommitted(commit_seq, commit_seq); db_->AddCommitted(commit_seq, commit_seq);
} }
// Publish the sequence number. We can do that here assuming the callback is if (db_impl_->immutable_db_options().two_write_queues) {
// invoked only from one write queue, which would guarantee that the publish // Publish the sequence number. We can do that here assuming the callback
// sequence numbers will be in order, i.e., once a seq is published all the // is invoked only from one write queue, which would guarantee that the
// seq prior to that are also publishable. // publish sequence numbers will be in order, i.e., once a seq is
db_impl_->SetLastPublishedSequence(commit_seq); // published all the seq prior to that are also publishable.
db_impl_->SetLastPublishedSequence(commit_seq);
}
// else SequenceNumber that is updated as part of the write already does the
// publishing
return Status::OK(); return Status::OK();
} }
private: private:
WritePreparedTxnDB* db_; WritePreparedTxnDB* db_;
DBImpl* db_impl_; DBImpl* db_impl_;
// kMaxSequenceNumber if there was no prepare phase
SequenceNumber prep_seq_; SequenceNumber prep_seq_;
// Either because it is commit without prepare or it has a
// CommitTimeWriteBatch
bool includes_data_; bool includes_data_;
}; };

Loading…
Cancel
Save