From 0acaa1a8464f35d0f4cf83a1bafbad662bfe0c99 Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Tue, 16 Jul 2019 15:19:45 -0700 Subject: [PATCH] WriteUnPrepared: use tracked_keys_ to track keys needed for rollback (#5562) Summary: Currently, we are tracking keys we need to rollback via a separate structure specific to WriteUnprepared in write_set_keys_. We already have a data structure called tracked_keys_ used to track which keys to unlock on transaction termination. This is exactly what we want, since we should only rollback keys that we have locked anyway. Save some memory by reusing that data structure instead of making our own. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5562 Differential Revision: D16206484 Pulled By: lth fbshipit-source-id: 5894d2b824a4b19062d84adbd6e6e86f00047488 --- utilities/transactions/transaction_base.h | 12 +- .../transactions/write_unprepared_txn.cc | 119 ++++++++++++++---- utilities/transactions/write_unprepared_txn.h | 22 ++-- .../transactions/write_unprepared_txn_db.cc | 36 +----- .../transactions/write_unprepared_txn_db.h | 27 ---- 5 files changed, 111 insertions(+), 105 deletions(-) diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 04274866a..26efd51b3 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -317,6 +317,12 @@ class TransactionBaseImpl : public Transaction { // Records writes pending in this transaction WriteBatchWithIndex write_batch_; + // Map from column_family_id to map of keys that are involved in this + // transaction. + // For Pessimistic Transactions this is the list of locked keys. + // Optimistic Transactions will wait till commit time to do conflict checking. + TransactionKeyMap tracked_keys_; + private: friend class WritePreparedTxn; // Extra data to be persisted with the commit. Note this is only used when @@ -327,12 +333,6 @@ class TransactionBaseImpl : public Transaction { // nullptr if there was no snapshot at the time SetSavePoint() was called. std::unique_ptr>> save_points_; - // Map from column_family_id to map of keys that are involved in this - // transaction. - // For Pessimistic Transactions this is the list of locked keys. - // Optimistic Transactions will wait till commit time to do conflict checking. - TransactionKeyMap tracked_keys_; - // If true, future Put/Merge/Deletes will be indexed in the // WriteBatchWithIndex. // If false, future Put/Merge/Deletes will be inserted directly into the diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 54d478c94..d127220e4 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -42,7 +42,9 @@ SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { + : WritePreparedTxn(txn_db, write_options, txn_options), + wupt_db_(txn_db), + recovered_txn_(false) { max_write_batch_size_ = txn_options.max_write_batch_size; // We set max bytes to zero so that we don't get a memory limit error. // Instead of trying to keep write batch strictly under the size limit, we @@ -69,6 +71,12 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() { log_number_); } } + + // Call tracked_keys_.clear() so that ~PessimisticTransaction does not + // try to unlock keys for recovered transactions. + if (recovered_txn_) { + tracked_keys_.clear(); + } } void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { @@ -76,7 +84,7 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { max_write_batch_size_ = txn_options.max_write_batch_size; write_batch_.SetMaxBytes(0); unprep_seqs_.clear(); - write_set_keys_.clear(); + recovered_txn_ = false; } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, @@ -148,6 +156,72 @@ Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); } +// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For +// WriteUnprepared, the write batches have already been written into the +// database during WAL replay, so all we have to do is just to "retrack" the key +// so that rollbacks are possible. +// +// Calling TryLock instead of TrackKey is also possible, but as an optimization, +// recovered transactions do not hold locks on their keys. This follows the +// implementation in PessimisticTransactionDB::Initialize where we set +// skip_concurrency_control to true. +Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) { + struct TrackKeyHandler : public WriteBatch::Handler { + WriteUnpreparedTxn* txn_; + bool rollback_merge_operands_; + + TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands) + : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + if (rollback_merge_operands_) { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + } + return Status::OK(); + } + + // Recovered batches do not contain 2PC markers. + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } + + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkNoop(bool) override { return Status::InvalidArgument(); } + + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + }; + + TrackKeyHandler handler(this, + wupt_db_->txn_db_options_.rollback_merge_operands); + return wb->Iterate(&handler); +} + Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; @@ -159,25 +233,11 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { return s; } -void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) { - // TODO(lth): write_set_keys_ can just be a std::string instead of a vector. - write_set_keys_[cfid].push_back(key.ToString()); -} - Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { if (name_.empty()) { return Status::InvalidArgument("Cannot write to DB without SetName."); } - // Update write_key_set_ for rollback purposes. - KeySetBuilder keyset_handler( - this, wupt_db_->txn_db_options_.rollback_merge_operands); - auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler); - assert(s.ok()); - if (!s.ok()) { - return s; - } - // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. WriteOptions write_options = write_options_; write_options.disableWAL = false; @@ -204,10 +264,10 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { // WriteImpl should not overwrite that value, so set log_used to nullptr if // log_number_ is already set. uint64_t* log_used = log_number_ ? nullptr : &log_number_; - s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, log_used, /*log ref*/ - 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, - &add_prepared_callback); + auto s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, log_used, /*log ref*/ + 0, !DISABLE_MEMTABLE, &seq_used, + prepare_batch_cnt_, &add_prepared_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; @@ -317,7 +377,6 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } // else do the 2nd write to publish seq @@ -349,7 +408,6 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } @@ -359,19 +417,21 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); + Status s; const auto& cf_map = *wupt_db_->GetCFHandleMap(); auto read_at_seq = kMaxSequenceNumber; - Status s; ReadOptions roptions; // Note that we do not use WriteUnpreparedTxnReadCallback because we do not // need to read our own writes when reading prior versions of the key for // rollback. + const auto& tracked_keys = GetTrackedKeys(); WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); - for (const auto& cfkey : write_set_keys_) { + for (const auto& cfkey : tracked_keys) { const auto cfid = cfkey.first; const auto& keys = cfkey.second; - for (const auto& key : keys) { + for (const auto& pair : keys) { + const auto& key = pair.first; const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; @@ -426,7 +486,6 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; @@ -453,10 +512,16 @@ Status WriteUnpreparedTxn::RollbackInternal() { } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } +void WriteUnpreparedTxn::Clear() { + if (!recovered_txn_) { + txn_db_impl_->UnLock(this, &GetTrackedKeys()); + } + TransactionBaseImpl::Clear(); +} + Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 751d36c23..15a76d134 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -94,20 +94,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { const SliceParts& key, const bool assume_tracked = false) override; - virtual Status RebuildFromWriteBatch(WriteBatch*) override { - // This function was only useful for recovering prepared transactions, but - // is unused for write prepared because a transaction may consist of - // multiple write batches. - // - // If there are use cases outside of recovery that can make use of this, - // then support could be added. - return Status::NotSupported("Not supported for WriteUnprepared"); - } + virtual Status RebuildFromWriteBatch(WriteBatch*) override; const std::map& GetUnpreparedSequenceNumbers(); - void UpdateWriteKeySet(uint32_t cfid, const Slice& key); - protected: void Initialize(const TransactionOptions& txn_options) override; @@ -118,6 +108,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status RollbackInternal() override; + void Clear() override; + // Get and GetIterator needs to be overridden so that a ReadCallback to // handle read-your-own-write is used. using Transaction::Get; @@ -157,10 +149,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // commit callbacks. std::map unprep_seqs_; - // Set of keys that have written to that have already been written to DB - // (ie. not in write_batch_). - // - std::map> write_set_keys_; + // Recovered transactions have tracked_keys_ populated, but are not actually + // locked for efficiency reasons. For recovered transactions, skip unlocking + // keys when transaction ends. + bool recovered_txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 9382edfad..c4be058bb 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -252,12 +252,13 @@ Status WriteUnpreparedTxnDB::Initialize( assert(real_trx); auto wupt = static_cast_with_check(real_trx); + wupt->recovered_txn_ = true; real_trx->SetLogNumber(first_log_number); real_trx->SetId(first_seq); Status s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { - break; + return s; } wupt->prepare_batch_cnt_ = last_prepare_batch_cnt; @@ -270,12 +271,11 @@ Status WriteUnpreparedTxnDB::Initialize( ordered_seq_cnt[seq] = cnt; assert(wupt->unprep_seqs_.count(seq) == 0); wupt->unprep_seqs_[seq] = cnt; - KeySetBuilder keyset_handler(wupt, - txn_db_options_.rollback_merge_operands); - s = batch_info.batch_->Iterate(&keyset_handler); + + s = wupt->RebuildFromWriteBatch(batch_info.batch_); assert(s.ok()); if (!s.ok()) { - break; + return s; } } @@ -284,7 +284,7 @@ Status WriteUnpreparedTxnDB::Initialize( real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { - break; + return s; } } // AddPrepared must be called in order @@ -397,29 +397,5 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, return db_iter; } -Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key, - const Slice& /*val*/) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key, - const Slice& /*val*/) { - if (rollback_merge_operands_) { - txn_->UpdateWriteKeySet(cf, key); - } - return Status::OK(); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 6405ba683..65cb4b919 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -144,32 +144,5 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback { SequenceNumber rollback_seq_; }; -struct KeySetBuilder : public WriteBatch::Handler { - WriteUnpreparedTxn* txn_; - bool rollback_merge_operands_; - - KeySetBuilder(WriteUnpreparedTxn* txn, bool rollback_merge_operands) - : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} - - Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override; - - Status DeleteCF(uint32_t cf, const Slice& key) override; - - Status SingleDeleteCF(uint32_t cf, const Slice& key) override; - - Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override; - - // Recovered batches do not contain 2PC markers. - Status MarkNoop(bool) override { return Status::InvalidArgument(); } - Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } - Status MarkEndPrepare(const Slice&) override { - return Status::InvalidArgument(); - } - Status MarkCommit(const Slice&) override { return Status::InvalidArgument(); } - Status MarkRollback(const Slice&) override { - return Status::InvalidArgument(); - } -}; - } // namespace rocksdb #endif // ROCKSDB_LITE