WriteUnprepared: commit only from the 2nd queue (#5439)

Summary:
This is a port of this PR into WriteUnprepared:
https://github.com/facebook/rocksdb/pull/5014

This also reverts this test change to restore some flaky write unprepared
tests: https://github.com/facebook/rocksdb/pull/5315

Tested with:
$ gtest-parallel ./transaction_test --gtest_filter=MySQLStyleTransactionTest/MySQLStyleTransactionTest.TransactionStressTest/9 --repeat=128
[128/128] MySQLStyleTransactionTest/MySQLStyleTransactionTest.TransactionStressTest/9 (18250 ms)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5439

Differential Revision: D15761405

Pulled By: lth

fbshipit-source-id: ae2581fd942d8a5b3f9278fd6bc3c1ac0b2c964c
main
Manuel Ung 6 years ago committed by Facebook Github Bot
parent ba64a4cf52
commit ca1aee2a19
  1. 4
      utilities/transactions/transaction_test.cc
  2. 56
      utilities/transactions/write_unprepared_txn.cc

@ -74,6 +74,10 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true), std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false), std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true), std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false), std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true))); std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
#endif // ROCKSDB_VALGRIND_RUN #endif // ROCKSDB_VALGRIND_RUN

@ -281,23 +281,30 @@ Status WriteUnpreparedTxn::CommitInternal() {
const bool disable_memtable = !includes_data; const bool disable_memtable = !includes_data;
const bool do_one_write = const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable; !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
const bool publish_seq = do_one_write;
// Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// CommitTimeWriteBatch commits with PreReleaseCallback.
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq); wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
const bool kFirstPrepareBatch = true;
AddPreparedCallback add_prepared_callback(
wpt_db_, db_impl_, commit_batch_cnt,
db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
PreReleaseCallback* pre_release_callback;
if (do_one_write) {
pre_release_callback = &update_commit_map;
} else {
pre_release_callback = &add_prepared_callback;
}
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already // Since the prepared batch is directly written to memtable, there is
// a connection between the memtable and its WAL, so there is no need to // already a connection between the memtable and its WAL, so there is no
// redundantly reference the log that contains the prepared data. // need to redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull; const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used, zero_log_number, disable_memtable, &seq_used,
batch_cnt, &update_commit_map); batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) { if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) { if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed // Note RemovePrepared should be called after WriteImpl that publishsed
@ -306,30 +313,25 @@ Status WriteUnpreparedTxn::CommitInternal() {
wpt_db_->RemovePrepared(seq.first, seq.second); wpt_db_->RemovePrepared(seq.first, seq.second);
} }
} }
if (UNLIKELY(!do_one_write)) {
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear(); write_set_keys_.clear();
return s; return s;
} // else do the 2nd write to publish seq } // else do the 2nd write to publish seq
// Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
// commit write batch as just another "unprepared" batch. This will also
// update the unprep_seqs_ in the update_commit_map callback.
unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
// Note: the 2nd write comes with a performance penality. So if we have too // Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization, // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
// two_write_queues should be disabled to avoid many additional writes here. // two_write_queues should be disabled to avoid many additional writes here.
class PublishSeqPreReleaseCallback : public PreReleaseCallback {
public: // Update commit map only from the 2nd queue
explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
: db_impl_(db_impl) {}
Status Callback(SequenceNumber seq,
bool is_mem_disabled __attribute__((__unused__)), uint64_t,
size_t /*index*/, size_t /*total*/) override {
assert(is_mem_disabled);
assert(db_impl_->immutable_db_options().two_write_queues);
db_impl_->SetLastPublishedSequence(seq);
return Status::OK();
}
private:
DBImpl* db_impl_;
} publish_seq_callback(db_impl_);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
@ -339,7 +341,7 @@ Status WriteUnpreparedTxn::CommitInternal() {
const uint64_t NO_REF_LOG = 0; const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&publish_seq_callback); &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the // Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks. // seq. Otherwise SmallestUnCommittedSeq optimization breaks.

Loading…
Cancel
Save