diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 5621a7fa3..bf59a1c40 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -192,7 +192,48 @@ Status TransactionBaseImpl::PopSavePoint() { } assert(!save_points_->empty()); - save_points_->pop(); + // If there is another savepoint A below the current savepoint B, then A needs + // to inherit tracked_keys in B so that if we rollback to savepoint A, we + // remember to unlock keys in B. If there is no other savepoint below, then we + // can safely discard savepoint info. + if (save_points_->size() == 1) { + save_points_->pop(); + } else { + TransactionBaseImpl::SavePoint top; + std::swap(top, save_points_->top()); + save_points_->pop(); + + const TransactionKeyMap& curr_cf_key_map = top.new_keys_; + TransactionKeyMap& prev_cf_key_map = save_points_->top().new_keys_; + + for (const auto& curr_cf_key_iter : curr_cf_key_map) { + uint32_t column_family_id = curr_cf_key_iter.first; + const std::unordered_map& curr_keys = + curr_cf_key_iter.second; + + // If cfid was not previously tracked, just copy everything over. + auto prev_keys_iter = prev_cf_key_map.find(column_family_id); + if (prev_keys_iter == prev_cf_key_map.end()) { + prev_cf_key_map.emplace(curr_cf_key_iter); + } else { + std::unordered_map& prev_keys = + prev_keys_iter->second; + for (const auto& key_iter : curr_keys) { + const std::string& key = key_iter.first; + const TransactionKeyMapInfo& info = key_iter.second; + // If key was not previously tracked, just copy the whole struct over. + // Otherwise, some merging needs to occur. + auto prev_info = prev_keys.find(key); + if (prev_info == prev_keys.end()) { + prev_keys.emplace(key_iter); + } else { + prev_info->second.Merge(info); + } + } + } + } + } + return write_batch_.PopSavePoint(); } diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 26efd51b3..657e9c596 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -294,11 +294,11 @@ class TransactionBaseImpl : public Transaction { struct SavePoint { std::shared_ptr snapshot_; - bool snapshot_needed_; + bool snapshot_needed_ = false; std::shared_ptr snapshot_notifier_; - uint64_t num_puts_; - uint64_t num_deletes_; - uint64_t num_merges_; + uint64_t num_puts_ = 0; + uint64_t num_deletes_ = 0; + uint64_t num_merges_ = 0; // Record all keys tracked since the last savepoint TransactionKeyMap new_keys_; @@ -312,6 +312,8 @@ class TransactionBaseImpl : public Transaction { num_puts_(num_puts), num_deletes_(num_deletes), num_merges_(num_merges) {} + + SavePoint() = default; }; // Records writes pending in this transaction diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 7868d0060..534103a54 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4030,6 +4030,58 @@ TEST_P(TransactionTest, SavepointTest3) { ASSERT_TRUE(s.IsNotFound()); } +TEST_P(TransactionTest, SavepointTest4) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; // 1 ms + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + + txn1->SetSavePoint(); // 1 + s = txn1->Put("A", "a"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 2 + s = txn1->Put("B", "b"); + ASSERT_OK(s); + + s = txn1->PopSavePoint(); // Remove 2 + ASSERT_OK(s); + + // Verify that A/B still exists. + std::string value; + ASSERT_OK(txn1->Get(read_options, "A", &value)); + ASSERT_EQ("a", value); + + ASSERT_OK(txn1->Get(read_options, "B", &value)); + ASSERT_EQ("b", value); + + ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1 + + // Verify that everything was rolled back. + s = txn1->Get(read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Nothing should be locked + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn2->Put("A", ""); + ASSERT_OK(s); + + s = txn2->Put("B", ""); + ASSERT_OK(s); + + delete txn2; + delete txn1; +} + TEST_P(TransactionTest, UndoGetForUpdateTest) { WriteOptions write_options; ReadOptions read_options; diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 1d910134b..b1f9f24cb 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -31,6 +31,14 @@ struct TransactionKeyMapInfo { explicit TransactionKeyMapInfo(SequenceNumber seq_no) : seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {} + + // Used in PopSavePoint to collapse two savepoints together. + void Merge(const TransactionKeyMapInfo& info) { + assert(seq <= info.seq); + num_reads += info.num_reads; + num_writes += info.num_writes; + exclusive |= info.exclusive; + } }; using TransactionKeyMap =