WriteUnPrepared: use tracked_keys_ to track keys needed for rollback (#5562)

Summary:
Currently, we are tracking keys we need to rollback via a separate structure specific to WriteUnprepared in write_set_keys_.

We already have a data structure called tracked_keys_ used to track which keys to unlock on transaction termination. This is exactly what we want, since we should only rollback keys that we have locked anyway.

Save some memory by reusing that data structure instead of making our own.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5562

Differential Revision: D16206484

Pulled By: lth

fbshipit-source-id: 5894d2b824a4b19062d84adbd6e6e86f00047488
main
Manuel Ung 5 years ago committed by Facebook Github Bot
parent 3bde41b5a3
commit 0acaa1a846
  1. 12
      utilities/transactions/transaction_base.h
  2. 117
      utilities/transactions/write_unprepared_txn.cc
  3. 22
      utilities/transactions/write_unprepared_txn.h
  4. 36
      utilities/transactions/write_unprepared_txn_db.cc
  5. 27
      utilities/transactions/write_unprepared_txn_db.h

@ -317,6 +317,12 @@ class TransactionBaseImpl : public Transaction {
// Records writes pending in this transaction // Records writes pending in this transaction
WriteBatchWithIndex write_batch_; WriteBatchWithIndex write_batch_;
// Map from column_family_id to map of keys that are involved in this
// transaction.
// For Pessimistic Transactions this is the list of locked keys.
// Optimistic Transactions will wait till commit time to do conflict checking.
TransactionKeyMap tracked_keys_;
private: private:
friend class WritePreparedTxn; friend class WritePreparedTxn;
// Extra data to be persisted with the commit. Note this is only used when // Extra data to be persisted with the commit. Note this is only used when
@ -327,12 +333,6 @@ class TransactionBaseImpl : public Transaction {
// nullptr if there was no snapshot at the time SetSavePoint() was called. // nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>> save_points_; std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>> save_points_;
// Map from column_family_id to map of keys that are involved in this
// transaction.
// For Pessimistic Transactions this is the list of locked keys.
// Optimistic Transactions will wait till commit time to do conflict checking.
TransactionKeyMap tracked_keys_;
// If true, future Put/Merge/Deletes will be indexed in the // If true, future Put/Merge/Deletes will be indexed in the
// WriteBatchWithIndex. // WriteBatchWithIndex.
// If false, future Put/Merge/Deletes will be inserted directly into the // If false, future Put/Merge/Deletes will be inserted directly into the

@ -42,7 +42,9 @@ SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber(
WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { : WritePreparedTxn(txn_db, write_options, txn_options),
wupt_db_(txn_db),
recovered_txn_(false) {
max_write_batch_size_ = txn_options.max_write_batch_size; max_write_batch_size_ = txn_options.max_write_batch_size;
// We set max bytes to zero so that we don't get a memory limit error. // We set max bytes to zero so that we don't get a memory limit error.
// Instead of trying to keep write batch strictly under the size limit, we // Instead of trying to keep write batch strictly under the size limit, we
@ -69,6 +71,12 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() {
log_number_); log_number_);
} }
} }
// Call tracked_keys_.clear() so that ~PessimisticTransaction does not
// try to unlock keys for recovered transactions.
if (recovered_txn_) {
tracked_keys_.clear();
}
} }
void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
@ -76,7 +84,7 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
max_write_batch_size_ = txn_options.max_write_batch_size; max_write_batch_size_ = txn_options.max_write_batch_size;
write_batch_.SetMaxBytes(0); write_batch_.SetMaxBytes(0);
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear(); recovered_txn_ = false;
} }
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
@ -148,6 +156,72 @@ Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
} }
// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
// WriteUnprepared, the write batches have already been written into the
// database during WAL replay, so all we have to do is just to "retrack" the key
// so that rollbacks are possible.
//
// Calling TryLock instead of TrackKey is also possible, but as an optimization,
// recovered transactions do not hold locks on their keys. This follows the
// implementation in PessimisticTransactionDB::Initialize where we set
// skip_concurrency_control to true.
Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
struct TrackKeyHandler : public WriteBatch::Handler {
WriteUnpreparedTxn* txn_;
bool rollback_merge_operands_;
TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
if (rollback_merge_operands_) {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
}
return Status::OK();
}
// Recovered batches do not contain 2PC markers.
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
TrackKeyHandler handler(this,
wupt_db_->txn_db_options_.rollback_merge_operands);
return wb->Iterate(&handler);
}
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
const bool kPrepared = true; const bool kPrepared = true;
Status s; Status s;
@ -159,25 +233,11 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
return s; return s;
} }
void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) {
// TODO(lth): write_set_keys_ can just be a std::string instead of a vector.
write_set_keys_[cfid].push_back(key.ToString());
}
Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
if (name_.empty()) { if (name_.empty()) {
return Status::InvalidArgument("Cannot write to DB without SetName."); return Status::InvalidArgument("Cannot write to DB without SetName.");
} }
// Update write_key_set_ for rollback purposes.
KeySetBuilder keyset_handler(
this, wupt_db_->txn_db_options_.rollback_merge_operands);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler);
assert(s.ok());
if (!s.ok()) {
return s;
}
// TODO(lth): Reduce duplicate code with WritePrepared prepare logic. // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
WriteOptions write_options = write_options_; WriteOptions write_options = write_options_;
write_options.disableWAL = false; write_options.disableWAL = false;
@ -204,10 +264,10 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
// WriteImpl should not overwrite that value, so set log_used to nullptr if // WriteImpl should not overwrite that value, so set log_used to nullptr if
// log_number_ is already set. // log_number_ is already set.
uint64_t* log_used = log_number_ ? nullptr : &log_number_; uint64_t* log_used = log_number_ ? nullptr : &log_number_;
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), auto s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, log_used, /*log ref*/ /*callback*/ nullptr, log_used, /*log ref*/
0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, 0, !DISABLE_MEMTABLE, &seq_used,
&add_prepared_callback); prepare_batch_cnt_, &add_prepared_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used; auto prepare_seq = seq_used;
@ -317,7 +377,6 @@ Status WriteUnpreparedTxn::CommitInternal() {
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
} }
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear();
return s; return s;
} // else do the 2nd write to publish seq } // else do the 2nd write to publish seq
@ -349,7 +408,6 @@ Status WriteUnpreparedTxn::CommitInternal() {
wpt_db_->RemovePrepared(seq.first, seq.second); wpt_db_->RemovePrepared(seq.first, seq.second);
} }
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear();
return s; return s;
} }
@ -359,19 +417,21 @@ Status WriteUnpreparedTxn::RollbackInternal() {
wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
Status s;
const auto& cf_map = *wupt_db_->GetCFHandleMap(); const auto& cf_map = *wupt_db_->GetCFHandleMap();
auto read_at_seq = kMaxSequenceNumber; auto read_at_seq = kMaxSequenceNumber;
Status s;
ReadOptions roptions; ReadOptions roptions;
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
// need to read our own writes when reading prior versions of the key for // need to read our own writes when reading prior versions of the key for
// rollback. // rollback.
const auto& tracked_keys = GetTrackedKeys();
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
for (const auto& cfkey : write_set_keys_) { for (const auto& cfkey : tracked_keys) {
const auto cfid = cfkey.first; const auto cfid = cfkey.first;
const auto& keys = cfkey.second; const auto& keys = cfkey.second;
for (const auto& key : keys) { for (const auto& pair : keys) {
const auto& key = pair.first;
const auto& cf_handle = cf_map.at(cfid); const auto& cf_handle = cf_map.at(cfid);
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
bool not_used; bool not_used;
@ -426,7 +486,6 @@ Status WriteUnpreparedTxn::RollbackInternal() {
wpt_db_->RemovePrepared(seq.first, seq.second); wpt_db_->RemovePrepared(seq.first, seq.second);
} }
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear();
return s; return s;
} // else do the 2nd write for commit } // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
@ -453,10 +512,16 @@ Status WriteUnpreparedTxn::RollbackInternal() {
} }
unprep_seqs_.clear(); unprep_seqs_.clear();
write_set_keys_.clear();
return s; return s;
} }
void WriteUnpreparedTxn::Clear() {
if (!recovered_txn_) {
txn_db_impl_->UnLock(this, &GetTrackedKeys());
}
TransactionBaseImpl::Clear();
}
Status WriteUnpreparedTxn::Get(const ReadOptions& options, Status WriteUnpreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) { const Slice& key, PinnableSlice* value) {

@ -94,20 +94,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
const SliceParts& key, const SliceParts& key,
const bool assume_tracked = false) override; const bool assume_tracked = false) override;
virtual Status RebuildFromWriteBatch(WriteBatch*) override { virtual Status RebuildFromWriteBatch(WriteBatch*) override;
// This function was only useful for recovering prepared transactions, but
// is unused for write prepared because a transaction may consist of
// multiple write batches.
//
// If there are use cases outside of recovery that can make use of this,
// then support could be added.
return Status::NotSupported("Not supported for WriteUnprepared");
}
const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers(); const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
void UpdateWriteKeySet(uint32_t cfid, const Slice& key);
protected: protected:
void Initialize(const TransactionOptions& txn_options) override; void Initialize(const TransactionOptions& txn_options) override;
@ -118,6 +108,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
Status RollbackInternal() override; Status RollbackInternal() override;
void Clear() override;
// Get and GetIterator needs to be overridden so that a ReadCallback to // Get and GetIterator needs to be overridden so that a ReadCallback to
// handle read-your-own-write is used. // handle read-your-own-write is used.
using Transaction::Get; using Transaction::Get;
@ -157,10 +149,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// commit callbacks. // commit callbacks.
std::map<SequenceNumber, size_t> unprep_seqs_; std::map<SequenceNumber, size_t> unprep_seqs_;
// Set of keys that have written to that have already been written to DB // Recovered transactions have tracked_keys_ populated, but are not actually
// (ie. not in write_batch_). // locked for efficiency reasons. For recovered transactions, skip unlocking
// // keys when transaction ends.
std::map<uint32_t, std::vector<std::string>> write_set_keys_; bool recovered_txn_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -252,12 +252,13 @@ Status WriteUnpreparedTxnDB::Initialize(
assert(real_trx); assert(real_trx);
auto wupt = auto wupt =
static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx); static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
wupt->recovered_txn_ = true;
real_trx->SetLogNumber(first_log_number); real_trx->SetLogNumber(first_log_number);
real_trx->SetId(first_seq); real_trx->SetId(first_seq);
Status s = real_trx->SetName(recovered_trx->name_); Status s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) { if (!s.ok()) {
break; return s;
} }
wupt->prepare_batch_cnt_ = last_prepare_batch_cnt; wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
@ -270,12 +271,11 @@ Status WriteUnpreparedTxnDB::Initialize(
ordered_seq_cnt[seq] = cnt; ordered_seq_cnt[seq] = cnt;
assert(wupt->unprep_seqs_.count(seq) == 0); assert(wupt->unprep_seqs_.count(seq) == 0);
wupt->unprep_seqs_[seq] = cnt; wupt->unprep_seqs_[seq] = cnt;
KeySetBuilder keyset_handler(wupt,
txn_db_options_.rollback_merge_operands); s = wupt->RebuildFromWriteBatch(batch_info.batch_);
s = batch_info.batch_->Iterate(&keyset_handler);
assert(s.ok()); assert(s.ok());
if (!s.ok()) { if (!s.ok()) {
break; return s;
} }
} }
@ -284,7 +284,7 @@ Status WriteUnpreparedTxnDB::Initialize(
real_trx->SetState(Transaction::PREPARED); real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) { if (!s.ok()) {
break; return s;
} }
} }
// AddPrepared must be called in order // AddPrepared must be called in order
@ -397,29 +397,5 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
return db_iter; return db_iter;
} }
Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) {
txn_->UpdateWriteKeySet(cf, key);
return Status::OK();
}
Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) {
txn_->UpdateWriteKeySet(cf, key);
return Status::OK();
}
Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) {
txn_->UpdateWriteKeySet(cf, key);
return Status::OK();
}
Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) {
if (rollback_merge_operands_) {
txn_->UpdateWriteKeySet(cf, key);
}
return Status::OK();
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -144,32 +144,5 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
SequenceNumber rollback_seq_; SequenceNumber rollback_seq_;
}; };
struct KeySetBuilder : public WriteBatch::Handler {
WriteUnpreparedTxn* txn_;
bool rollback_merge_operands_;
KeySetBuilder(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override;
Status DeleteCF(uint32_t cf, const Slice& key) override;
Status SingleDeleteCF(uint32_t cf, const Slice& key) override;
Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override;
// Recovered batches do not contain 2PC markers.
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override { return Status::InvalidArgument(); }
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

Loading…
Cancel
Save