From 2005c88a756b0f88b92086a323b87ba4a7254dad Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Mon, 5 Dec 2016 17:18:14 -0800 Subject: [PATCH] Implement non-exclusive locks Summary: This is an implementation of non-exclusive locks for pessimistic transactions. It is relatively simple and does not prevent starvation (ie. it's possible that request for exclusive access will never be granted if there are always threads holding shared access). It is done by changing `KeyLockInfo` to hold an set a transaction ids, instead of just one, and adding a flag specifying whether this lock is currently held with exclusive access or not. Some implementation notes: - Some lock diagnostic functions had to be updated to return a set of transaction ids for a given lock, eg. `GetWaitingTxn` and `GetLockStatusData`. - Deadlock detection is a bit more complicated since a transaction can now wait on multiple other transactions. A BFS is done in this case, and deadlock detection depth is now just a limit on the number of transactions we visit. - Expirable transactions do not work efficiently with shared locks at the moment, but that's okay for now. Closes https://github.com/facebook/rocksdb/pull/1573 Differential Revision: D4239097 Pulled By: lth fbshipit-source-id: da7c074 --- include/rocksdb/utilities/transaction.h | 11 +- include/rocksdb/utilities/transaction_db.h | 3 +- util/autovector.h | 5 - .../optimistic_transaction_impl.cc | 4 +- .../optimistic_transaction_impl.h | 3 +- utilities/transactions/transaction_base.cc | 53 ++-- utilities/transactions/transaction_base.h | 12 +- utilities/transactions/transaction_db_impl.cc | 4 +- utilities/transactions/transaction_db_impl.h | 3 +- utilities/transactions/transaction_impl.cc | 7 +- utilities/transactions/transaction_impl.h | 34 ++- .../transactions/transaction_lock_mgr.cc | 247 +++++++++++------- utilities/transactions/transaction_lock_mgr.h | 22 +- utilities/transactions/transaction_test.cc | 183 ++++++++++++- 14 files changed, 427 insertions(+), 164 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 50b2167cd..92d33739b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -209,10 +209,11 @@ class Transaction { // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) = 0; + const Slice& key, std::string* value, + bool exclusive = true) = 0; virtual Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) = 0; + std::string* value, bool exclusive = true) = 0; virtual std::vector MultiGetForUpdate( const ReadOptions& options, @@ -401,10 +402,10 @@ class Transaction { virtual bool IsDeadlockDetect() const { return false; } - virtual TransactionID GetWaitingTxn(uint32_t* column_family_id, - const std::string** key) const { + virtual std::vector GetWaitingTxns(uint32_t* column_family_id, + std::string* key) const { assert(false); - return 0; + return std::vector(); } enum TransactionState { diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 77425ec4b..dd495e08f 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -104,7 +104,8 @@ struct TransactionOptions { struct KeyLockInfo { std::string key; - TransactionID id; + std::vector ids; + bool exclusive; }; class TransactionDB : public StackableDB { diff --git a/util/autovector.h b/util/autovector.h index cfe703ed9..12b1f5195 100644 --- a/util/autovector.h +++ b/util/autovector.h @@ -283,11 +283,6 @@ class autovector { autovector& operator=(const autovector& other) { return assign(other); } - // move operation are disallowed since it is very hard to make sure both - // autovectors are allocated from the same function stack. - autovector& operator=(autovector&& other) = delete; - autovector(autovector&& other) = delete; - // -- Iterator Operations iterator begin() { return iterator(this, 0); } diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 1a9b6436b..373df6d78 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -86,9 +86,11 @@ Status OptimisticTransactionImpl::Rollback() { } // Record this key so that we can check it for conflicts at commit time. +// +// 'exclusive' is unused for OptimisticTransaction. Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { if (untracked) { return Status::OK(); } diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index b09fe6706..f8c98e1e2 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -48,7 +48,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) override; + bool read_only, bool exclusive, + bool untracked = false) override; private: OptimisticTransactionDB* const txn_db_; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index a65e6331c..2312ced9e 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -96,7 +96,7 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() { Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { size_t key_size = 0; for (int i = 0; i < key.num_parts; ++i) { key_size += key.parts[i].size(); @@ -109,7 +109,7 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, str.append(key.parts[i].data(), key.parts[i].size()); } - return TryLock(column_family, str, read_only, untracked); + return TryLock(column_family, str, read_only, exclusive, untracked); } void TransactionBaseImpl::SetSavePoint() { @@ -187,8 +187,9 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options, Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - Status s = TryLock(column_family, key, true /* read_only */); + const Slice& key, std::string* value, + bool exclusive) { + Status s = TryLock(column_family, key, true /* read_only */, exclusive); if (s.ok() && value != nullptr) { s = Get(read_options, column_family, key, value); @@ -222,7 +223,8 @@ std::vector TransactionBaseImpl::MultiGetForUpdate( // Lock all keys for (size_t i = 0; i < num_keys; ++i) { - Status s = TryLock(column_family[i], keys[i], true /* read_only */); + Status s = TryLock(column_family[i], keys[i], true /* read_only */, + true /* exclusive */); if (!s.ok()) { // Fail entire multiget if we cannot lock all keys return std::vector(num_keys, s); @@ -256,7 +258,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -269,7 +272,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -281,7 +285,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Merge(column_family, key, value); @@ -293,7 +298,8 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -305,7 +311,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -317,7 +324,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->SingleDelete(column_family, key); @@ -329,7 +337,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->SingleDelete(column_family, key); @@ -341,8 +350,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -355,8 +364,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -369,8 +378,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Merge(column_family, key, value); @@ -382,8 +391,8 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -395,8 +404,8 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index b5f9a6222..29fd26af8 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -39,7 +39,8 @@ class TransactionBaseImpl : public Transaction { // untracked will be true if called from PutUntracked, DeleteUntracked, or // MergeUntracked. virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) = 0; + bool read_only, bool exclusive, + bool untracked = false) = 0; void SetSavePoint() override; @@ -55,11 +56,12 @@ class TransactionBaseImpl : public Transaction { Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + std::string* value, bool exclusive) override; Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); + std::string* value, bool exclusive) override { + return GetForUpdate(options, db_->DefaultColumnFamily(), key, value, + exclusive); } std::vector MultiGet( @@ -315,7 +317,7 @@ class TransactionBaseImpl : public Transaction { std::shared_ptr snapshot_notifier_ = nullptr; Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, - bool read_only, bool untracked = false); + bool read_only, bool exclusive, bool untracked = false); WriteBatchBase* GetBatchForWrite(); diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index bd10e6fb7..9b530c170 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -266,8 +266,8 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, - const std::string& key) { - return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv()); + const std::string& key, bool exclusive) { + return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } void TransactionDBImpl::UnLock(TransactionImpl* txn, diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index a4792ddb6..c377e2c0f 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -63,7 +63,8 @@ class TransactionDBImpl : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); + Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key, + bool exclusive); void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys); void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index d6d1c2ac9..2ee775306 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -40,7 +40,6 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), txn_id_(0), - waiting_txn_id_(0), waiting_cf_id_(0), waiting_key_(nullptr), expiration_time_(0), @@ -395,7 +394,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, for (const auto& key_iter : cfh_keys) { const std::string& key = key_iter; - s = txn_db_impl_->TryLock(this, cfh_id, key); + s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */); if (!s.ok()) { break; } @@ -422,7 +421,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, // the snapshot time. Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; @@ -448,7 +447,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // lock this key if this transactions hasn't already locked it if (!previously_locked) { - s = txn_db_impl_->TryLock(this, cfh_id, key_str); + s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); } SetSnapshotIfNeeded(); diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 4dfea9be5..0ffcbf9b5 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE +#include #include #include #include @@ -23,6 +24,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "util/autovector.h" #include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" @@ -57,22 +59,31 @@ class TransactionImpl : public TransactionBaseImpl { TransactionID GetID() const override { return txn_id_; } - TransactionID GetWaitingTxn(uint32_t* column_family_id, - const std::string** key) const override { + std::vector GetWaitingTxns(uint32_t* column_family_id, + std::string* key) const override { std::lock_guard lock(wait_mutex_); - if (key) *key = waiting_key_; + std::vector ids(waiting_txn_ids_.size()); + if (key) *key = waiting_key_ ? *waiting_key_ : ""; if (column_family_id) *column_family_id = waiting_cf_id_; - return waiting_txn_id_; + std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin()); + return ids; } - void SetWaitingTxn(TransactionID id, uint32_t column_family_id, + void SetWaitingTxn(autovector ids, uint32_t column_family_id, const std::string* key) { std::lock_guard lock(wait_mutex_); - waiting_txn_id_ = id; + waiting_txn_ids_ = ids; waiting_cf_id_ = column_family_id; waiting_key_ = key; } + void ClearWaitingTxn() { + std::lock_guard lock(wait_mutex_); + waiting_txn_ids_.clear(); + waiting_cf_id_ = 0; + waiting_key_ = nullptr; + } + // Returns the time (in microseconds according to Env->GetMicros()) // that this transaction will be expired. Returns 0 if this transaction does // not expire. @@ -97,7 +108,8 @@ class TransactionImpl : public TransactionBaseImpl { protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) override; + bool read_only, bool exclusive, + bool untracked = false) override; private: TransactionDBImpl* txn_db_impl_; @@ -109,10 +121,10 @@ class TransactionImpl : public TransactionBaseImpl { // Unique ID for this transaction TransactionID txn_id_; - // ID for the transaction that is blocking the current transaction. + // IDs for the transactions that are blocking the current transaction. // - // 0 if current transaction is not waiting. - TransactionID waiting_txn_id_; + // empty if current transaction is not waiting. + autovector waiting_txn_ids_; // The following two represents the (cf, key) that a transaction is waiting // on. @@ -125,7 +137,7 @@ class TransactionImpl : public TransactionBaseImpl { uint32_t waiting_cf_id_; const std::string* waiting_key_; - // Mutex protecting waiting_txn_id_, waiting_cf_id_ and waiting_key_. + // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. mutable std::mutex wait_mutex_; // If non-zero, this transaction should not be committed after this time (in diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 8a49b221c..2f645e1bc 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -22,7 +22,6 @@ #include "rocksdb/slice.h" #include "rocksdb/utilities/transaction_db_mutex.h" -#include "util/autovector.h" #include "util/murmurhash.h" #include "util/sync_point.h" #include "util/thread_local.h" @@ -31,15 +30,20 @@ namespace rocksdb { struct LockInfo { - TransactionID txn_id; + bool exclusive; + autovector txn_ids; // Transaction locks are not valid after this time in us uint64_t expiration_time; - LockInfo(TransactionID id, uint64_t time) - : txn_id(id), expiration_time(time) {} + LockInfo(TransactionID id, uint64_t time, bool ex) + : exclusive(ex), expiration_time(time) { + txn_ids.push_back(id); + } LockInfo(const LockInfo& lock_info) - : txn_id(lock_info.txn_id), expiration_time(lock_info.expiration_time) {} + : exclusive(lock_info.exclusive), + txn_ids(lock_info.txn_ids), + expiration_time(lock_info.expiration_time) {} }; struct LockMapStripe { @@ -192,7 +196,8 @@ std::shared_ptr TransactionLockMgr::GetLockMap( // transaction. // If false, sets *expire_time to the expiration time of the lock according // to Env->GetMicros() or 0 if no expiration. -bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, +bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, + const LockInfo& lock_info, Env* env, uint64_t* expire_time) { auto now = env->NowMicros(); @@ -203,12 +208,18 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, // return how many microseconds until lock will be expired *expire_time = lock_info.expiration_time; } else { - bool success = - txn_db_impl_->TryStealingExpiredTransactionLocks(lock_info.txn_id); - if (!success) { - expired = false; + for (auto id : lock_info.txn_ids) { + if (txn_id == id) { + continue; + } + + bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); + if (!success) { + expired = false; + break; + } + *expire_time = 0; } - *expire_time = 0; } return expired; @@ -216,7 +227,8 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, Status TransactionLockMgr::TryLock(TransactionImpl* txn, uint32_t column_family_id, - const std::string& key, Env* env) { + const std::string& key, Env* env, + bool exclusive) { // Lookup lock map for this column family id std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -233,7 +245,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - LockInfo lock_info(txn->GetID(), txn->GetExpirationTime()); + LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); int64_t timeout = txn->GetLockTimeout(); return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, @@ -268,9 +280,9 @@ Status TransactionLockMgr::AcquireWithTimeout( // Acquire lock if we are able to uint64_t expire_time_hint = 0; - TransactionID wait_id = 0; + autovector wait_ids; result = AcquireLocked(lock_map, stripe, key, env, lock_info, - &expire_time_hint, &wait_id); + &expire_time_hint, &wait_ids); if (!result.ok() && timeout != 0) { // If we weren't able to acquire the lock, we will keep retrying as long @@ -289,19 +301,19 @@ Status TransactionLockMgr::AcquireWithTimeout( cv_end_time = end_time; } - assert(result.IsBusy() || wait_id != 0); + assert(result.IsBusy() || wait_ids.size() != 0); // We are dependent on a transaction to finish, so perform deadlock // detection. - if (wait_id != 0) { + if (wait_ids.size() != 0) { if (txn->IsDeadlockDetect()) { - if (IncrementWaiters(txn, wait_id)) { + if (IncrementWaiters(txn, wait_ids)) { result = Status::Busy(Status::SubCode::kDeadlock); stripe->stripe_mutex->UnLock(); return result; } } - txn->SetWaitingTxn(wait_id, column_family_id, &key); + txn->SetWaitingTxn(wait_ids, column_family_id, &key); } TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); @@ -316,10 +328,10 @@ Status TransactionLockMgr::AcquireWithTimeout( } } - if (wait_id != 0) { - txn->SetWaitingTxn(0, 0, nullptr); + if (wait_ids.size() != 0) { + txn->ClearWaitingTxn(); if (txn->IsDeadlockDetect()) { - DecrementWaiters(txn, wait_id); + DecrementWaiters(txn, wait_ids); } } @@ -332,7 +344,7 @@ Status TransactionLockMgr::AcquireWithTimeout( if (result.ok() || result.IsTimedOut()) { result = AcquireLocked(lock_map, stripe, key, env, lock_info, - &expire_time_hint, &wait_id); + &expire_time_hint, &wait_ids); } } while (!result.ok() && !timed_out); } @@ -342,35 +354,40 @@ Status TransactionLockMgr::AcquireWithTimeout( return result; } -void TransactionLockMgr::DecrementWaiters(const TransactionImpl* txn, - TransactionID wait_id) { +void TransactionLockMgr::DecrementWaiters( + const TransactionImpl* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); } -void TransactionLockMgr::DecrementWaitersImpl(const TransactionImpl* txn, - TransactionID wait_id) { +void TransactionLockMgr::DecrementWaitersImpl( + const TransactionImpl* txn, const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); - rev_wait_txn_map_.Get(wait_id)--; - if (rev_wait_txn_map_.Get(wait_id) == 0) { - rev_wait_txn_map_.Delete(wait_id); + for (auto wait_id : wait_ids) { + rev_wait_txn_map_.Get(wait_id)--; + if (rev_wait_txn_map_.Get(wait_id) == 0) { + rev_wait_txn_map_.Delete(wait_id); + } } } -bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn, - TransactionID wait_id) { +bool TransactionLockMgr::IncrementWaiters( + const TransactionImpl* txn, const autovector& wait_ids) { auto id = txn->GetID(); + std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); assert(!wait_txn_map_.Contains(id)); - wait_txn_map_.Insert(id, wait_id); + wait_txn_map_.Insert(id, wait_ids); - if (rev_wait_txn_map_.Contains(wait_id)) { - rev_wait_txn_map_.Get(wait_id)++; - } else { - rev_wait_txn_map_.Insert(wait_id, 1); + for (auto wait_id : wait_ids) { + if (rev_wait_txn_map_.Contains(wait_id)) { + rev_wait_txn_map_.Get(wait_id)++; + } else { + rev_wait_txn_map_.Insert(wait_id, 1); + } } // No deadlock if nobody is waiting on self. @@ -378,20 +395,36 @@ bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn, return false; } - TransactionID next = wait_id; - for (int i = 0; i < txn->GetDeadlockDetectDepth(); i++) { + const auto* next_ids = &wait_ids; + for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { + uint i = 0; + if (next_ids) { + for (; i < next_ids->size() && tail + i < txn->GetDeadlockDetectDepth(); + i++) { + queue[tail + i] = (*next_ids)[i]; + } + tail += i; + } + + // No more items in the list, meaning no deadlock. + if (tail == head) { + return false; + } + + auto next = queue[head]; if (next == id) { - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); return true; } else if (!wait_txn_map_.Contains(next)) { - return false; + next_ids = nullptr; + continue; } else { - next = wait_txn_map_.Get(next); + next_ids = &wait_txn_map_.Get(next); } } // Wait cycle too big, just assume deadlock. - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); return true; } @@ -404,24 +437,47 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, const std::string& key, Env* env, const LockInfo& txn_lock_info, uint64_t* expire_time, - TransactionID* txn_id) { + autovector* txn_ids) { + assert(txn_lock_info.txn_ids.size() == 1); + Status result; // Check if this key is already locked if (stripe->keys.find(key) != stripe->keys.end()) { // Lock already held - LockInfo& lock_info = stripe->keys.at(key); - if (lock_info.txn_id != txn_lock_info.txn_id) { - // locked by another txn. Check if it's expired - if (IsLockExpired(lock_info, env, expire_time)) { - // lock is expired, can steal it - lock_info.txn_id = txn_lock_info.txn_id; + assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); + + if (lock_info.exclusive || txn_lock_info.exclusive) { + if (lock_info.txn_ids.size() == 1 && + lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { + // The list contains one txn and we're it, so just take it. + lock_info.exclusive = txn_lock_info.exclusive; lock_info.expiration_time = txn_lock_info.expiration_time; - // lock_cnt does not change } else { - result = Status::TimedOut(Status::SubCode::kLockTimeout); - *txn_id = lock_info.txn_id; + // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case + // it's there for a shared lock with multiple holders which was not + // caught in the first case. + if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, + expire_time)) { + // lock is expired, can steal it + lock_info.txn_ids = txn_lock_info.txn_ids; + lock_info.exclusive = txn_lock_info.exclusive; + lock_info.expiration_time = txn_lock_info.expiration_time; + // lock_cnt does not change + } else { + result = Status::TimedOut(Status::SubCode::kLockTimeout); + *txn_ids = lock_info.txn_ids; + } } + } else { + // We are requesting shared access to a shared lock, so just grant it. + lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); + // Using std::max means that expiration time never goes down even when + // a transaction is removed from the list. The correct solution would be + // to track expiry for every transaction, but this would also work for + // now. + lock_info.expiration_time = + std::max(lock_info.expiration_time, txn_lock_info.expiration_time); } } else { // Lock not held. // Check lock limit @@ -442,6 +498,42 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } +void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, + const std::string& key, + LockMapStripe* stripe, LockMap* lock_map, + Env* env) { + TransactionID txn_id = txn->GetID(); + + auto stripe_iter = stripe->keys.find(key); + if (stripe_iter != stripe->keys.end()) { + auto& txns = stripe_iter->second.txn_ids; + auto txn_it = std::find(txns.begin(), txns.end(), txn_id); + // Found the key we locked. unlock it. + if (txn_it != txns.end()) { + if (txns.size() == 1) { + stripe->keys.erase(stripe_iter); + } else { + auto last_it = txns.end() - 1; + if (txn_it != last_it) { + *txn_it = *last_it; + } + txns.pop_back(); + } + + if (max_num_locks_ > 0) { + // Maintain lock count if there is a limit on the number of locks. + assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); + lock_map->lock_cnt--; + } + } + } else { + // This key is either not locked or locked by someone else. This should + // only happen if the unlocking transaction has expired. + assert(txn->GetExpirationTime() > 0 && + txn->GetExpirationTime() < env->NowMicros()); + } +} + void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); @@ -456,26 +548,8 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - TransactionID txn_id = txn->GetID(); - stripe->stripe_mutex->Lock(); - - const auto& iter = stripe->keys.find(key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This should - // only happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() < env->NowMicros()); - } - + UnLockKey(txn, key, stripe, lock_map, env); stripe->stripe_mutex->UnLock(); // Signal waiting threads to retry locking @@ -484,8 +558,6 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, void TransactionLockMgr::UnLock(const TransactionImpl* txn, const TransactionKeyMap* key_map, Env* env) { - TransactionID txn_id = txn->GetID(); - for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; auto& keys = key_map_iter.second; @@ -520,22 +592,7 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn, stripe->stripe_mutex->Lock(); for (const std::string* key : stripe_keys) { - const auto& iter = stripe->keys.find(*key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This - // should only - // happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() < env->NowMicros()); - } + UnLockKey(txn, *key, stripe, lock_map, env); } stripe->stripe_mutex->UnLock(); @@ -565,7 +622,13 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { for (const auto& j : stripes) { j->stripe_mutex->Lock(); for (const auto& it : j->keys) { - data.insert({i, {it.first, it.second.txn_id}}); + struct KeyLockInfo info; + info.exclusive = it.second.exclusive; + info.key = it.first; + for (const auto& id : it.second.txn_ids) { + info.ids.push_back(id); + } + data.insert({i, info}); } } } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index ea0352d53..1bfee89a9 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -13,6 +13,7 @@ #include #include "rocksdb/utilities/transaction.h" +#include "util/autovector.h" #include "util/hash_map.h" #include "util/instrumented_mutex.h" #include "util/thread_local.h" @@ -47,7 +48,7 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. Status TryLock(TransactionImpl* txn, uint32_t column_family_id, - const std::string& key, Env* env); + const std::string& key, Env* env, bool exclusive); // Unlock a key locked by TryLock(). txn must be the same Transaction that // locked this key. @@ -91,12 +92,13 @@ class TransactionLockMgr { // Maps from waitee -> number of waiters. HashMap rev_wait_txn_map_; // Maps from waiter -> waitee. - HashMap wait_txn_map_; + HashMap> wait_txn_map_; // Used to allocate mutexes/condvars to use when locking keys std::shared_ptr mutex_factory_; - bool IsLockExpired(const LockInfo& lock_info, Env* env, uint64_t* wait_time); + bool IsLockExpired(TransactionID txn_id, const LockInfo& lock_info, Env* env, + uint64_t* wait_time); std::shared_ptr GetLockMap(uint32_t column_family_id); @@ -108,11 +110,17 @@ class TransactionLockMgr { Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, const LockInfo& lock_info, uint64_t* wait_time, - TransactionID* txn_id); + autovector* txn_ids); - bool IncrementWaiters(const TransactionImpl* txn, TransactionID wait_id); - void DecrementWaiters(const TransactionImpl* txn, TransactionID wait_id); - void DecrementWaitersImpl(const TransactionImpl* txn, TransactionID wait_id); + void UnLockKey(const TransactionImpl* txn, const std::string& key, + LockMapStripe* stripe, LockMap* lock_map, Env* env); + + bool IncrementWaiters(const TransactionImpl* txn, + const autovector& wait_ids); + void DecrementWaiters(const TransactionImpl* txn, + const autovector& wait_ids); + void DecrementWaitersImpl(const TransactionImpl* txn, + const autovector& wait_ids); // No copying allowed TransactionLockMgr(const TransactionLockMgr&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 917dbb7d2..349e5a5f0 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -197,11 +197,12 @@ TEST_P(TransactionTest, WaitingTxn) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* arg) { - const std::string* key; + std::string key; uint32_t cf_id; - TransactionID wait = txn2->GetWaitingTxn(&cf_id, &key); - ASSERT_EQ(*key, "foo"); - ASSERT_EQ(wait, id1); + std::vector wait = txn2->GetWaitingTxns(&cf_id, &key); + ASSERT_EQ(key, "foo"); + ASSERT_EQ(wait.size(), 1); + ASSERT_EQ(wait[0], id1); ASSERT_EQ(cf_id, 0); }); @@ -225,7 +226,8 @@ TEST_P(TransactionTest, WaitingTxn) { ASSERT_EQ(cf_iterator->first, 1); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); - ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); cf_iterator++; @@ -233,7 +235,8 @@ TEST_P(TransactionTest, WaitingTxn) { ASSERT_EQ(cf_iterator->first, 0); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); - ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -249,6 +252,170 @@ TEST_P(TransactionTest, WaitingTxn) { delete txn2; } +TEST_P(TransactionTest, SharedLocks) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn2); + ASSERT_TRUE(txn3); + + // Test shared access between txns + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + auto lock_data = db->GetLockStatusData(); + ASSERT_EQ(lock_data.size(), 1); + + auto cf_iterator = lock_data.begin(); + ASSERT_EQ(cf_iterator->second.key, "foo"); + + // We compare whether the set of txns locking this key is the same. To do + // this, we need to sort both vectors so that the comparison is done + // correctly. + std::vector expected_txns = {txn1->GetID(), txn2->GetID(), + txn3->GetID()}; + std::vector lock_txns = cf_iterator->second.ids; + ASSERT_EQ(expected_txns, lock_txns); + ASSERT_FALSE(cf_iterator->second.exclusive); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it. + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn2->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 holding an exclusive lock and txn2 trying to obtain shared + // access. + s = txn1->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + delete txn1; + delete txn2; + delete txn3; +} + +TEST_P(TransactionTest, DeadlockCycleShared) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + + // Set up a wait for chain like this: + // + // Tn -> T(n*2) + // Tn -> T(n*2 + 1) + // + // So we have: + // T1 -> T2 -> T4 ... + // | |> T5 ... + // |> T3 -> T6 ... + // |> T7 ... + // up to T31, then T[16 - 31] -> T1. + // Note that Tn holds lock on floor(n / 2). + + std::vector txns(31); + + for (uint32_t i = 0; i < 31; i++) { + txns[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns[i]); + auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr, + false /* exclusive */); + ASSERT_OK(s); + } + + std::atomic checkpoints(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* arg) { checkpoints.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // We want the leaf transactions to block and hold everyone back. + std::vector threads; + for (uint32_t i = 0; i < 15; i++) { + std::function blocking_thread = [&, i] { + auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, + true /* exclusive */); + ASSERT_OK(s); + txns[i]->Rollback(); + delete txns[i]; + }; + threads.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints.load() != 15) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle T[16 - 31] -> T1 + for (uint32_t i = 15; i < 31; i++) { + auto s = + txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */); + ASSERT_TRUE(s.IsDeadlock()); + } + + // Rollback the leaf transaction. + for (uint32_t i = 15; i < 31; i++) { + txns[i]->Rollback(); + delete txns[i]; + } + + for (auto& t : threads) { + t.join(); + } +} + TEST_P(TransactionTest, DeadlockCycle) { WriteOptions write_options; ReadOptions read_options; @@ -345,7 +512,9 @@ TEST_P(TransactionTest, DeadlockStress) { // Lock keys in random order. for (const auto& k : random_keys) { - auto s = txn->GetForUpdate(read_options, k, nullptr); + // Lock mostly for shared access, but exclusive 1/4 of the time. + auto s = + txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0); if (!s.ok()) { ASSERT_TRUE(s.IsDeadlock()); txn->Rollback();