Set WriteCommitted txn id to commit sequence number (#4565)

Summary:
SetId and GetId are the experimental API that so far being used in WritePrepared and WriteUnPrepared transactions, where the id is assigned at the prepare time. The patch extends the API to WriteCommitted transactions, by setting the id at commit time.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4565

Differential Revision: D10557862

Pulled By: maysamyabandeh

fbshipit-source-id: 2b27a140682b6185a4988fa88f8152628e0d67af
main
Simon Grätzer 6 years ago committed by Facebook Github Bot
parent abb8ecb4cd
commit ad21b1af52
  1. 32
      utilities/transactions/pessimistic_transaction.cc
  2. 4
      utilities/transactions/pessimistic_transaction_db.cc

@ -232,7 +232,7 @@ Status WriteCommittedTxn::PrepareInternal() {
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
Status s = Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, /*callback*/ nullptr, &log_number_, /*log_ref*/ 0,
/* disable_memtable*/ true); /* disable_memtable*/ true);
return s; return s;
} }
@ -322,12 +322,27 @@ Status PessimisticTransaction::Commit() {
} }
Status WriteCommittedTxn::CommitWithoutPrepareInternal() { Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
Status s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); uint64_t seq_used = kMaxSequenceNumber;
auto s =
db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, /*log_used*/ nullptr,
/*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
}
return s; return s;
} }
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
Status s = db_->Write(write_options_, batch); uint64_t seq_used = kMaxSequenceNumber;
auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ 0,
/*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
}
return s; return s;
} }
@ -345,8 +360,15 @@ Status WriteCommittedTxn::CommitInternal() {
// in non recovery mode and simply insert the values // in non recovery mode and simply insert the values
WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, uint64_t seq_used = kMaxSequenceNumber;
log_number_); auto s =
db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_,
/*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
}
return s; return s;
} }

@ -146,7 +146,9 @@ Status PessimisticTransactionDB::Initialize(
assert(real_trx); assert(real_trx);
real_trx->SetLogNumber(batch_info.log_number_); real_trx->SetLogNumber(batch_info.log_number_);
assert(seq != kMaxSequenceNumber); assert(seq != kMaxSequenceNumber);
real_trx->SetId(seq); if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
real_trx->SetId(seq);
}
s = real_trx->SetName(recovered_trx->name_); s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) { if (!s.ok()) {

Loading…
Cancel
Save