WriteUnPrepared: Pass in correct subbatch count during rollback (#6463)

Summary:
Today `WriteUnpreparedTxn::RollbackInternal` will write the rollback batch assuming that there is only a single subbatch. However, because untracked_keys_ are currently not deduplicated, it's possible for duplicate keys to exist, and thus split the batch. Also, tracked_keys_ also does not support compators outside of the bytewise comparators, so it's possible for duplicates to occur there as well.

To solve this, just pass in the correct subbatch count.

Also, removed `WriteUnpreparedRollbackPreReleaseCallback` to unify the Commit/Rollback codepaths some more.

Also, fixed a bug in `CommitInternal` where if 1. two_write_queue is true and 2. include_data is true, then `WriteUnpreparedCommitEntryPreReleaseCallback` ends up calling `AddCommitted` on the commit time write batch a second time on the second write. To fix, `WriteUnpreparedCommitEntryPreReleaseCallback` is re-initialized.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6463

Differential Revision: D20150153

Pulled By: lth

fbshipit-source-id: df0b42d39406c75af73df995aa1138f0db539cd1
main
Manuel Ung 5 years ago committed by Facebook Github Bot
parent 72ee067b90
commit 41535d0218
  1. 41
      utilities/transactions/write_unprepared_txn.cc
  2. 2
      utilities/transactions/write_unprepared_txn.h
  3. 40
      utilities/transactions/write_unprepared_txn_db.h

@ -607,6 +607,8 @@ Status WriteUnpreparedTxn::CommitInternal() {
// commit write batch as just another "unprepared" batch. This will also
// update the unprep_seqs_ in the update_commit_map callback.
unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
// Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot
@ -623,7 +625,7 @@ Status WriteUnpreparedTxn::CommitInternal() {
const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map);
&update_commit_map_with_commit_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
@ -704,6 +706,9 @@ Status WriteUnpreparedTxn::RollbackInternal() {
// need to read our own writes when reading prior versions of the key for
// rollback.
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
// TODO(lth): We write rollback batch all in a single batch here, but this
// should be subdivded into multiple batches as well. In phase 2, when key
// sets are read from WAL, this will happen naturally.
WriteRollbackKeys(GetTrackedKeys(), &rollback_batch, &callback, roptions);
// The Rollback marker will be used as a batch separator
@ -712,26 +717,29 @@ Status WriteUnpreparedTxn::RollbackInternal() {
const bool DISABLE_MEMTABLE = true;
const uint64_t NO_REF_LOG = 0;
uint64_t seq_used = kMaxSequenceNumber;
// TODO(lth): We write rollback batch all in a single batch here, but this
// should be subdivded into multiple batches as well. In phase 2, when key
// sets are read from WAL, this will happen naturally.
const size_t ONE_BATCH = 1;
// We commit the rolled back prepared batches. ALthough this is
// Rollback batch may contain duplicate keys, because tracked_keys_ is not
// comparator aware.
auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
// We commit the rolled back prepared batches. Although this is
// counter-intuitive, i) it is safe to do so, since the prepared batches are
// already canceled out by the rollback batch, ii) adding the commit entry to
// CommitCache will allow us to benefit from the existing mechanism in
// CommitCache that keeps an entry evicted due to max advance and yet overlaps
// with a live snapshot around so that the live snapshot properly skips the
// entry even if its prepare seq is lower than max_evicted_seq_.
//
// TODO(lth): RollbackInternal is conceptually very similar to
// CommitInternal, with the rollback batch simply taking on the role of
// CommitTimeWriteBatch. We should be able to merge the two code paths.
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH);
wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
// Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while the roolback
// data that is written to DB but not yet committed, while the rollback
// batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
&seq_used, rollback_batch.SubBatchCnt(),
&seq_used, rollback_batch_cnt,
do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
@ -746,21 +754,26 @@ Status WriteUnpreparedTxn::RollbackInternal() {
unflushed_save_points_.reset(nullptr);
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used;
// Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
// rollback write batch as just another "unprepared" batch. This will also
// update the unprep_seqs_ in the update_commit_map callback.
unprep_seqs_[prepare_seq] = rollback_batch_cnt;
WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, unprep_seqs_, prepare_seq);
WriteBatch empty_batch;
const size_t ONE_BATCH = 1;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
&update_commit_map_with_rollback_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back
if (s.ok()) {

@ -262,7 +262,6 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// value when calling RollbackToSavepoint.
SequenceNumber largest_validated_seq_;
using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
struct SavePoint {
// Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
// used during RollbackToSavepoint to determine visibility when restoring
@ -333,6 +332,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// last savepoint. Also, it may make sense to merge this into tracked_keys_
// and differentiate between tracked but not locked keys to avoid having two
// very similar data structures.
using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
KeySet untracked_keys_;
};

@ -104,45 +104,5 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool publish_seq_;
};
class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
// TODO(lth): Reduce code duplication with
// WritePreparedCommitEntryPreReleaseCallback
public:
WriteUnpreparedRollbackPreReleaseCallback(
WritePreparedTxnDB* db, DBImpl* db_impl,
const std::map<SequenceNumber, size_t>& unprep_seqs,
SequenceNumber rollback_seq)
: db_(db),
db_impl_(db_impl),
unprep_seqs_(unprep_seqs),
rollback_seq_(rollback_seq) {
assert(unprep_seqs.size() > 0);
assert(db_impl_->immutable_db_options().two_write_queues);
}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t, size_t /*index*/,
size_t /*total*/) override {
assert(is_mem_disabled); // implies the 2nd queue
const uint64_t last_commit_seq = commit_seq;
db_->AddCommitted(rollback_seq_, last_commit_seq);
// Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
for (const auto& s : unprep_seqs_) {
for (size_t i = 0; i < s.second; i++) {
db_->AddCommitted(s.first + i, last_commit_seq);
}
}
db_impl_->SetLastPublishedSequence(last_commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
DBImpl* db_impl_;
const std::map<SequenceNumber, size_t>& unprep_seqs_;
SequenceNumber rollback_seq_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

Loading…
Cancel
Save