diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 373df6d78..60385d6cb 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -107,7 +107,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, std::string key_str = key.ToString(); - TrackKey(cfh_id, key_str, seq, read_only); + TrackKey(cfh_id, key_str, seq, read_only, exclusive); // Always return OK. Confilct checking will happen at commit time. return Status::OK(); diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index ba4c3766a..0042649ad 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -470,20 +470,22 @@ uint64_t TransactionBaseImpl::GetNumKeys() const { } void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key, - SequenceNumber seq, bool read_only) { + SequenceNumber seq, bool read_only, + bool exclusive) { // Update map of all tracked keys for this transaction - TrackKey(&tracked_keys_, cfh_id, key, seq, read_only); + TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive); if (save_points_ != nullptr && !save_points_->empty()) { // Update map of tracked keys in this SavePoint - TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only); + TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only, + exclusive); } } // Add a key to the given TransactionKeyMap void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id, const std::string& key, SequenceNumber seq, - bool read_only) { + bool read_only, bool exclusive) { auto& cf_key_map = (*key_map)[cfh_id]; auto iter = cf_key_map.find(key); if (iter == cf_key_map.end()) { @@ -499,6 +501,7 @@ void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id, } else { iter->second.num_writes++; } + iter->second.exclusive |= exclusive; } std::unique_ptr @@ -529,7 +532,7 @@ TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { // All the reads/writes to this key were done in the last savepoint. bool read_only = (num_writes == 0); TrackKey(result, column_family_id, key, key_iter.second.seq, - read_only); + read_only, key_iter.second.exclusive); } } } diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index dd045218e..b1b8c02db 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -227,12 +227,12 @@ class TransactionBaseImpl : public Transaction { // seqno is the earliest seqno this key was involved with this transaction. // readonly should be set to true if no data was written for this key void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno, - bool readonly); + bool readonly, bool exclusive); // Helper function to add a key to the given TransactionKeyMap static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id, const std::string& key, SequenceNumber seqno, - bool readonly); + bool readonly, bool exclusive); // Called when UndoGetForUpdate determines that this key can be unlocked. virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family, diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 8a57c9684..e81d87ed7 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -400,7 +400,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, break; } TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber, - false); + false, true /* exclusive */); } if (!s.ok()) { @@ -426,6 +426,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; + bool lock_upgrade = false; Status s; // lock this key if this transactions hasn't already locked it @@ -441,13 +442,17 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, if (iter == tracked_keys_cf->second.end()) { previously_locked = false; } else { + if (!iter->second.exclusive && exclusive) { + lock_upgrade = true; + } previously_locked = true; current_seqno = iter->second.seq; } } - // lock this key if this transactions hasn't already locked it - if (!previously_locked) { + // Lock this key if this transactions hasn't already locked it or we require + // an upgrade. + if (!previously_locked || lock_upgrade) { s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); } @@ -481,7 +486,13 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // Failed to validate key if (!previously_locked) { // Unlock key we just locked - txn_db_impl_->UnLock(this, cfh_id, key.ToString()); + if (lock_upgrade) { + s = txn_db_impl_->TryLock(this, cfh_id, key_str, + false /* exclusive */); + assert(s.ok()); + } else { + txn_db_impl_->UnLock(this, cfh_id, key.ToString()); + } } } } @@ -489,7 +500,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, if (s.ok()) { // Let base class know we've conflict checked this key. - TrackKey(cfh_id, key_str, new_seqno, read_only); + TrackKey(cfh_id, key_str, new_seqno, read_only, exclusive); } return s; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 626b8acb4..48e7f06a7 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -332,6 +332,43 @@ TEST_P(TransactionTest, SharedLocks) { txn2->Rollback(); txn3->Rollback(); + // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock. + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn2->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + txn1->Rollback(); + txn2->Rollback(); + + // Test txn1 trying to downgrade its lock. + s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + // Should still fail after "downgrading". + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->Rollback(); + txn2->Rollback(); + // Test txn1 holding an exclusive lock and txn2 trying to obtain shared // access. s = txn1->GetForUpdate(read_options, "foo", nullptr); diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index b9579f7f1..8dea428bf 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -24,8 +24,10 @@ struct TransactionKeyMapInfo { uint32_t num_writes; uint32_t num_reads; + bool exclusive; + explicit TransactionKeyMapInfo(SequenceNumber seq_no) - : seq(seq_no), num_writes(0), num_reads(0) {} + : seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {} }; using TransactionKeyMap =