diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 50b2167cd..92d33739b 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -209,10 +209,11 @@ class Transaction { // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) = 0; + const Slice& key, std::string* value, + bool exclusive = true) = 0; virtual Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) = 0; + std::string* value, bool exclusive = true) = 0; virtual std::vector MultiGetForUpdate( const ReadOptions& options, @@ -401,10 +402,10 @@ class Transaction { virtual bool IsDeadlockDetect() const { return false; } - virtual TransactionID GetWaitingTxn(uint32_t* column_family_id, - const std::string** key) const { + virtual std::vector GetWaitingTxns(uint32_t* column_family_id, + std::string* key) const { assert(false); - return 0; + return std::vector(); } enum TransactionState { diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 77425ec4b..dd495e08f 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -104,7 +104,8 @@ struct TransactionOptions { struct KeyLockInfo { std::string key; - TransactionID id; + std::vector ids; + bool exclusive; }; class TransactionDB : public StackableDB { diff --git a/util/autovector.h b/util/autovector.h index cfe703ed9..12b1f5195 100644 --- a/util/autovector.h +++ b/util/autovector.h @@ -283,11 +283,6 @@ class autovector { autovector& operator=(const autovector& other) { return assign(other); } - // move operation are disallowed since it is very hard to make sure both - // autovectors are allocated from the same function stack. - autovector& operator=(autovector&& other) = delete; - autovector(autovector&& other) = delete; - // -- Iterator Operations iterator begin() { return iterator(this, 0); } diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 1a9b6436b..373df6d78 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -86,9 +86,11 @@ Status OptimisticTransactionImpl::Rollback() { } // Record this key so that we can check it for conflicts at commit time. +// +// 'exclusive' is unused for OptimisticTransaction. Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { if (untracked) { return Status::OK(); } diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index b09fe6706..f8c98e1e2 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -48,7 +48,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) override; + bool read_only, bool exclusive, + bool untracked = false) override; private: OptimisticTransactionDB* const txn_db_; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index a65e6331c..2312ced9e 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -96,7 +96,7 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() { Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { size_t key_size = 0; for (int i = 0; i < key.num_parts; ++i) { key_size += key.parts[i].size(); @@ -109,7 +109,7 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, str.append(key.parts[i].data(), key.parts[i].size()); } - return TryLock(column_family, str, read_only, untracked); + return TryLock(column_family, str, read_only, exclusive, untracked); } void TransactionBaseImpl::SetSavePoint() { @@ -187,8 +187,9 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options, Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - Status s = TryLock(column_family, key, true /* read_only */); + const Slice& key, std::string* value, + bool exclusive) { + Status s = TryLock(column_family, key, true /* read_only */, exclusive); if (s.ok() && value != nullptr) { s = Get(read_options, column_family, key, value); @@ -222,7 +223,8 @@ std::vector TransactionBaseImpl::MultiGetForUpdate( // Lock all keys for (size_t i = 0; i < num_keys; ++i) { - Status s = TryLock(column_family[i], keys[i], true /* read_only */); + Status s = TryLock(column_family[i], keys[i], true /* read_only */, + true /* exclusive */); if (!s.ok()) { // Fail entire multiget if we cannot lock all keys return std::vector(num_keys, s); @@ -256,7 +258,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -269,7 +272,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -281,7 +285,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Merge(column_family, key, value); @@ -293,7 +298,8 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -305,7 +311,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -317,7 +324,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->SingleDelete(column_family, key); @@ -329,7 +337,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = TryLock(column_family, key, false /* read_only */); + Status s = + TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { GetBatchForWrite()->SingleDelete(column_family, key); @@ -341,8 +350,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -355,8 +364,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Put(column_family, key, value); @@ -369,8 +378,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Merge(column_family, key, value); @@ -382,8 +391,8 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const Slice& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); @@ -395,8 +404,8 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const SliceParts& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* untracked */); + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, true /* untracked */); if (s.ok()) { GetBatchForWrite()->Delete(column_family, key); diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index b5f9a6222..29fd26af8 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -39,7 +39,8 @@ class TransactionBaseImpl : public Transaction { // untracked will be true if called from PutUntracked, DeleteUntracked, or // MergeUntracked. virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) = 0; + bool read_only, bool exclusive, + bool untracked = false) = 0; void SetSavePoint() override; @@ -55,11 +56,12 @@ class TransactionBaseImpl : public Transaction { Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; + std::string* value, bool exclusive) override; Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); + std::string* value, bool exclusive) override { + return GetForUpdate(options, db_->DefaultColumnFamily(), key, value, + exclusive); } std::vector MultiGet( @@ -315,7 +317,7 @@ class TransactionBaseImpl : public Transaction { std::shared_ptr snapshot_notifier_ = nullptr; Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, - bool read_only, bool untracked = false); + bool read_only, bool exclusive, bool untracked = false); WriteBatchBase* GetBatchForWrite(); diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index bd10e6fb7..9b530c170 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -266,8 +266,8 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, - const std::string& key) { - return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv()); + const std::string& key, bool exclusive) { + return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } void TransactionDBImpl::UnLock(TransactionImpl* txn, diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index a4792ddb6..c377e2c0f 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -63,7 +63,8 @@ class TransactionDBImpl : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); + Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key, + bool exclusive); void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys); void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index d6d1c2ac9..2ee775306 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -40,7 +40,6 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), txn_id_(0), - waiting_txn_id_(0), waiting_cf_id_(0), waiting_key_(nullptr), expiration_time_(0), @@ -395,7 +394,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, for (const auto& key_iter : cfh_keys) { const std::string& key = key_iter; - s = txn_db_impl_->TryLock(this, cfh_id, key); + s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */); if (!s.ok()) { break; } @@ -422,7 +421,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, // the snapshot time. Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool untracked) { + bool exclusive, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; @@ -448,7 +447,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // lock this key if this transactions hasn't already locked it if (!previously_locked) { - s = txn_db_impl_->TryLock(this, cfh_id, key_str); + s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); } SetSnapshotIfNeeded(); diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 4dfea9be5..0ffcbf9b5 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE +#include #include #include #include @@ -23,6 +24,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "util/autovector.h" #include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" @@ -57,22 +59,31 @@ class TransactionImpl : public TransactionBaseImpl { TransactionID GetID() const override { return txn_id_; } - TransactionID GetWaitingTxn(uint32_t* column_family_id, - const std::string** key) const override { + std::vector GetWaitingTxns(uint32_t* column_family_id, + std::string* key) const override { std::lock_guard lock(wait_mutex_); - if (key) *key = waiting_key_; + std::vector ids(waiting_txn_ids_.size()); + if (key) *key = waiting_key_ ? *waiting_key_ : ""; if (column_family_id) *column_family_id = waiting_cf_id_; - return waiting_txn_id_; + std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin()); + return ids; } - void SetWaitingTxn(TransactionID id, uint32_t column_family_id, + void SetWaitingTxn(autovector ids, uint32_t column_family_id, const std::string* key) { std::lock_guard lock(wait_mutex_); - waiting_txn_id_ = id; + waiting_txn_ids_ = ids; waiting_cf_id_ = column_family_id; waiting_key_ = key; } + void ClearWaitingTxn() { + std::lock_guard lock(wait_mutex_); + waiting_txn_ids_.clear(); + waiting_cf_id_ = 0; + waiting_key_ = nullptr; + } + // Returns the time (in microseconds according to Env->GetMicros()) // that this transaction will be expired. Returns 0 if this transaction does // not expire. @@ -97,7 +108,8 @@ class TransactionImpl : public TransactionBaseImpl { protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool untracked = false) override; + bool read_only, bool exclusive, + bool untracked = false) override; private: TransactionDBImpl* txn_db_impl_; @@ -109,10 +121,10 @@ class TransactionImpl : public TransactionBaseImpl { // Unique ID for this transaction TransactionID txn_id_; - // ID for the transaction that is blocking the current transaction. + // IDs for the transactions that are blocking the current transaction. // - // 0 if current transaction is not waiting. - TransactionID waiting_txn_id_; + // empty if current transaction is not waiting. + autovector waiting_txn_ids_; // The following two represents the (cf, key) that a transaction is waiting // on. @@ -125,7 +137,7 @@ class TransactionImpl : public TransactionBaseImpl { uint32_t waiting_cf_id_; const std::string* waiting_key_; - // Mutex protecting waiting_txn_id_, waiting_cf_id_ and waiting_key_. + // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. mutable std::mutex wait_mutex_; // If non-zero, this transaction should not be committed after this time (in diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 8a49b221c..2f645e1bc 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -22,7 +22,6 @@ #include "rocksdb/slice.h" #include "rocksdb/utilities/transaction_db_mutex.h" -#include "util/autovector.h" #include "util/murmurhash.h" #include "util/sync_point.h" #include "util/thread_local.h" @@ -31,15 +30,20 @@ namespace rocksdb { struct LockInfo { - TransactionID txn_id; + bool exclusive; + autovector txn_ids; // Transaction locks are not valid after this time in us uint64_t expiration_time; - LockInfo(TransactionID id, uint64_t time) - : txn_id(id), expiration_time(time) {} + LockInfo(TransactionID id, uint64_t time, bool ex) + : exclusive(ex), expiration_time(time) { + txn_ids.push_back(id); + } LockInfo(const LockInfo& lock_info) - : txn_id(lock_info.txn_id), expiration_time(lock_info.expiration_time) {} + : exclusive(lock_info.exclusive), + txn_ids(lock_info.txn_ids), + expiration_time(lock_info.expiration_time) {} }; struct LockMapStripe { @@ -192,7 +196,8 @@ std::shared_ptr TransactionLockMgr::GetLockMap( // transaction. // If false, sets *expire_time to the expiration time of the lock according // to Env->GetMicros() or 0 if no expiration. -bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, +bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, + const LockInfo& lock_info, Env* env, uint64_t* expire_time) { auto now = env->NowMicros(); @@ -203,12 +208,18 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, // return how many microseconds until lock will be expired *expire_time = lock_info.expiration_time; } else { - bool success = - txn_db_impl_->TryStealingExpiredTransactionLocks(lock_info.txn_id); - if (!success) { - expired = false; + for (auto id : lock_info.txn_ids) { + if (txn_id == id) { + continue; + } + + bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id); + if (!success) { + expired = false; + break; + } + *expire_time = 0; } - *expire_time = 0; } return expired; @@ -216,7 +227,8 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, Status TransactionLockMgr::TryLock(TransactionImpl* txn, uint32_t column_family_id, - const std::string& key, Env* env) { + const std::string& key, Env* env, + bool exclusive) { // Lookup lock map for this column family id std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -233,7 +245,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - LockInfo lock_info(txn->GetID(), txn->GetExpirationTime()); + LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive); int64_t timeout = txn->GetLockTimeout(); return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, @@ -268,9 +280,9 @@ Status TransactionLockMgr::AcquireWithTimeout( // Acquire lock if we are able to uint64_t expire_time_hint = 0; - TransactionID wait_id = 0; + autovector wait_ids; result = AcquireLocked(lock_map, stripe, key, env, lock_info, - &expire_time_hint, &wait_id); + &expire_time_hint, &wait_ids); if (!result.ok() && timeout != 0) { // If we weren't able to acquire the lock, we will keep retrying as long @@ -289,19 +301,19 @@ Status TransactionLockMgr::AcquireWithTimeout( cv_end_time = end_time; } - assert(result.IsBusy() || wait_id != 0); + assert(result.IsBusy() || wait_ids.size() != 0); // We are dependent on a transaction to finish, so perform deadlock // detection. - if (wait_id != 0) { + if (wait_ids.size() != 0) { if (txn->IsDeadlockDetect()) { - if (IncrementWaiters(txn, wait_id)) { + if (IncrementWaiters(txn, wait_ids)) { result = Status::Busy(Status::SubCode::kDeadlock); stripe->stripe_mutex->UnLock(); return result; } } - txn->SetWaitingTxn(wait_id, column_family_id, &key); + txn->SetWaitingTxn(wait_ids, column_family_id, &key); } TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); @@ -316,10 +328,10 @@ Status TransactionLockMgr::AcquireWithTimeout( } } - if (wait_id != 0) { - txn->SetWaitingTxn(0, 0, nullptr); + if (wait_ids.size() != 0) { + txn->ClearWaitingTxn(); if (txn->IsDeadlockDetect()) { - DecrementWaiters(txn, wait_id); + DecrementWaiters(txn, wait_ids); } } @@ -332,7 +344,7 @@ Status TransactionLockMgr::AcquireWithTimeout( if (result.ok() || result.IsTimedOut()) { result = AcquireLocked(lock_map, stripe, key, env, lock_info, - &expire_time_hint, &wait_id); + &expire_time_hint, &wait_ids); } } while (!result.ok() && !timed_out); } @@ -342,35 +354,40 @@ Status TransactionLockMgr::AcquireWithTimeout( return result; } -void TransactionLockMgr::DecrementWaiters(const TransactionImpl* txn, - TransactionID wait_id) { +void TransactionLockMgr::DecrementWaiters( + const TransactionImpl* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); } -void TransactionLockMgr::DecrementWaitersImpl(const TransactionImpl* txn, - TransactionID wait_id) { +void TransactionLockMgr::DecrementWaitersImpl( + const TransactionImpl* txn, const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); - rev_wait_txn_map_.Get(wait_id)--; - if (rev_wait_txn_map_.Get(wait_id) == 0) { - rev_wait_txn_map_.Delete(wait_id); + for (auto wait_id : wait_ids) { + rev_wait_txn_map_.Get(wait_id)--; + if (rev_wait_txn_map_.Get(wait_id) == 0) { + rev_wait_txn_map_.Delete(wait_id); + } } } -bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn, - TransactionID wait_id) { +bool TransactionLockMgr::IncrementWaiters( + const TransactionImpl* txn, const autovector& wait_ids) { auto id = txn->GetID(); + std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); assert(!wait_txn_map_.Contains(id)); - wait_txn_map_.Insert(id, wait_id); + wait_txn_map_.Insert(id, wait_ids); - if (rev_wait_txn_map_.Contains(wait_id)) { - rev_wait_txn_map_.Get(wait_id)++; - } else { - rev_wait_txn_map_.Insert(wait_id, 1); + for (auto wait_id : wait_ids) { + if (rev_wait_txn_map_.Contains(wait_id)) { + rev_wait_txn_map_.Get(wait_id)++; + } else { + rev_wait_txn_map_.Insert(wait_id, 1); + } } // No deadlock if nobody is waiting on self. @@ -378,20 +395,36 @@ bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn, return false; } - TransactionID next = wait_id; - for (int i = 0; i < txn->GetDeadlockDetectDepth(); i++) { + const auto* next_ids = &wait_ids; + for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { + uint i = 0; + if (next_ids) { + for (; i < next_ids->size() && tail + i < txn->GetDeadlockDetectDepth(); + i++) { + queue[tail + i] = (*next_ids)[i]; + } + tail += i; + } + + // No more items in the list, meaning no deadlock. + if (tail == head) { + return false; + } + + auto next = queue[head]; if (next == id) { - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); return true; } else if (!wait_txn_map_.Contains(next)) { - return false; + next_ids = nullptr; + continue; } else { - next = wait_txn_map_.Get(next); + next_ids = &wait_txn_map_.Get(next); } } // Wait cycle too big, just assume deadlock. - DecrementWaitersImpl(txn, wait_id); + DecrementWaitersImpl(txn, wait_ids); return true; } @@ -404,24 +437,47 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, const std::string& key, Env* env, const LockInfo& txn_lock_info, uint64_t* expire_time, - TransactionID* txn_id) { + autovector* txn_ids) { + assert(txn_lock_info.txn_ids.size() == 1); + Status result; // Check if this key is already locked if (stripe->keys.find(key) != stripe->keys.end()) { // Lock already held - LockInfo& lock_info = stripe->keys.at(key); - if (lock_info.txn_id != txn_lock_info.txn_id) { - // locked by another txn. Check if it's expired - if (IsLockExpired(lock_info, env, expire_time)) { - // lock is expired, can steal it - lock_info.txn_id = txn_lock_info.txn_id; + assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive); + + if (lock_info.exclusive || txn_lock_info.exclusive) { + if (lock_info.txn_ids.size() == 1 && + lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) { + // The list contains one txn and we're it, so just take it. + lock_info.exclusive = txn_lock_info.exclusive; lock_info.expiration_time = txn_lock_info.expiration_time; - // lock_cnt does not change } else { - result = Status::TimedOut(Status::SubCode::kLockTimeout); - *txn_id = lock_info.txn_id; + // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case + // it's there for a shared lock with multiple holders which was not + // caught in the first case. + if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env, + expire_time)) { + // lock is expired, can steal it + lock_info.txn_ids = txn_lock_info.txn_ids; + lock_info.exclusive = txn_lock_info.exclusive; + lock_info.expiration_time = txn_lock_info.expiration_time; + // lock_cnt does not change + } else { + result = Status::TimedOut(Status::SubCode::kLockTimeout); + *txn_ids = lock_info.txn_ids; + } } + } else { + // We are requesting shared access to a shared lock, so just grant it. + lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]); + // Using std::max means that expiration time never goes down even when + // a transaction is removed from the list. The correct solution would be + // to track expiry for every transaction, but this would also work for + // now. + lock_info.expiration_time = + std::max(lock_info.expiration_time, txn_lock_info.expiration_time); } } else { // Lock not held. // Check lock limit @@ -442,6 +498,42 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } +void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, + const std::string& key, + LockMapStripe* stripe, LockMap* lock_map, + Env* env) { + TransactionID txn_id = txn->GetID(); + + auto stripe_iter = stripe->keys.find(key); + if (stripe_iter != stripe->keys.end()) { + auto& txns = stripe_iter->second.txn_ids; + auto txn_it = std::find(txns.begin(), txns.end(), txn_id); + // Found the key we locked. unlock it. + if (txn_it != txns.end()) { + if (txns.size() == 1) { + stripe->keys.erase(stripe_iter); + } else { + auto last_it = txns.end() - 1; + if (txn_it != last_it) { + *txn_it = *last_it; + } + txns.pop_back(); + } + + if (max_num_locks_ > 0) { + // Maintain lock count if there is a limit on the number of locks. + assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); + lock_map->lock_cnt--; + } + } + } else { + // This key is either not locked or locked by someone else. This should + // only happen if the unlocking transaction has expired. + assert(txn->GetExpirationTime() > 0 && + txn->GetExpirationTime() < env->NowMicros()); + } +} + void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); @@ -456,26 +548,8 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - TransactionID txn_id = txn->GetID(); - stripe->stripe_mutex->Lock(); - - const auto& iter = stripe->keys.find(key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This should - // only happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() < env->NowMicros()); - } - + UnLockKey(txn, key, stripe, lock_map, env); stripe->stripe_mutex->UnLock(); // Signal waiting threads to retry locking @@ -484,8 +558,6 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, void TransactionLockMgr::UnLock(const TransactionImpl* txn, const TransactionKeyMap* key_map, Env* env) { - TransactionID txn_id = txn->GetID(); - for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; auto& keys = key_map_iter.second; @@ -520,22 +592,7 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn, stripe->stripe_mutex->Lock(); for (const std::string* key : stripe_keys) { - const auto& iter = stripe->keys.find(*key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This - // should only - // happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() < env->NowMicros()); - } + UnLockKey(txn, *key, stripe, lock_map, env); } stripe->stripe_mutex->UnLock(); @@ -565,7 +622,13 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { for (const auto& j : stripes) { j->stripe_mutex->Lock(); for (const auto& it : j->keys) { - data.insert({i, {it.first, it.second.txn_id}}); + struct KeyLockInfo info; + info.exclusive = it.second.exclusive; + info.key = it.first; + for (const auto& id : it.second.txn_ids) { + info.ids.push_back(id); + } + data.insert({i, info}); } } } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index ea0352d53..1bfee89a9 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -13,6 +13,7 @@ #include #include "rocksdb/utilities/transaction.h" +#include "util/autovector.h" #include "util/hash_map.h" #include "util/instrumented_mutex.h" #include "util/thread_local.h" @@ -47,7 +48,7 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. Status TryLock(TransactionImpl* txn, uint32_t column_family_id, - const std::string& key, Env* env); + const std::string& key, Env* env, bool exclusive); // Unlock a key locked by TryLock(). txn must be the same Transaction that // locked this key. @@ -91,12 +92,13 @@ class TransactionLockMgr { // Maps from waitee -> number of waiters. HashMap rev_wait_txn_map_; // Maps from waiter -> waitee. - HashMap wait_txn_map_; + HashMap> wait_txn_map_; // Used to allocate mutexes/condvars to use when locking keys std::shared_ptr mutex_factory_; - bool IsLockExpired(const LockInfo& lock_info, Env* env, uint64_t* wait_time); + bool IsLockExpired(TransactionID txn_id, const LockInfo& lock_info, Env* env, + uint64_t* wait_time); std::shared_ptr GetLockMap(uint32_t column_family_id); @@ -108,11 +110,17 @@ class TransactionLockMgr { Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, const LockInfo& lock_info, uint64_t* wait_time, - TransactionID* txn_id); + autovector* txn_ids); - bool IncrementWaiters(const TransactionImpl* txn, TransactionID wait_id); - void DecrementWaiters(const TransactionImpl* txn, TransactionID wait_id); - void DecrementWaitersImpl(const TransactionImpl* txn, TransactionID wait_id); + void UnLockKey(const TransactionImpl* txn, const std::string& key, + LockMapStripe* stripe, LockMap* lock_map, Env* env); + + bool IncrementWaiters(const TransactionImpl* txn, + const autovector& wait_ids); + void DecrementWaiters(const TransactionImpl* txn, + const autovector& wait_ids); + void DecrementWaitersImpl(const TransactionImpl* txn, + const autovector& wait_ids); // No copying allowed TransactionLockMgr(const TransactionLockMgr&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 917dbb7d2..349e5a5f0 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -197,11 +197,12 @@ TEST_P(TransactionTest, WaitingTxn) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* arg) { - const std::string* key; + std::string key; uint32_t cf_id; - TransactionID wait = txn2->GetWaitingTxn(&cf_id, &key); - ASSERT_EQ(*key, "foo"); - ASSERT_EQ(wait, id1); + std::vector wait = txn2->GetWaitingTxns(&cf_id, &key); + ASSERT_EQ(key, "foo"); + ASSERT_EQ(wait.size(), 1); + ASSERT_EQ(wait[0], id1); ASSERT_EQ(cf_id, 0); }); @@ -225,7 +226,8 @@ TEST_P(TransactionTest, WaitingTxn) { ASSERT_EQ(cf_iterator->first, 1); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); - ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); cf_iterator++; @@ -233,7 +235,8 @@ TEST_P(TransactionTest, WaitingTxn) { ASSERT_EQ(cf_iterator->first, 0); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); - ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); + ASSERT_EQ(cf_iterator->second.ids.size(), 1); + ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID()); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -249,6 +252,170 @@ TEST_P(TransactionTest, WaitingTxn) { delete txn2; } +TEST_P(TransactionTest, SharedLocks) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + Status s; + + txn_options.lock_timeout = 1; + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn2); + ASSERT_TRUE(txn3); + + // Test shared access between txns + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + auto lock_data = db->GetLockStatusData(); + ASSERT_EQ(lock_data.size(), 1); + + auto cf_iterator = lock_data.begin(); + ASSERT_EQ(cf_iterator->second.key, "foo"); + + // We compare whether the set of txns locking this key is the same. To do + // this, we need to sort both vectors so that the comparison is done + // correctly. + std::vector expected_txns = {txn1->GetID(), txn2->GetID(), + txn3->GetID()}; + std::vector lock_txns = cf_iterator->second.ids; + ASSERT_EQ(expected_txns, lock_txns); + ASSERT_FALSE(cf_iterator->second.exclusive); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it. + s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn2->UndoGetForUpdate("foo"); + s = txn3->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + txn1->Rollback(); + txn2->Rollback(); + txn3->Rollback(); + + // Test txn1 holding an exclusive lock and txn2 trying to obtain shared + // access. + s = txn1->GetForUpdate(read_options, "foo", nullptr); + ASSERT_OK(s); + + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + txn1->UndoGetForUpdate("foo"); + s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); + ASSERT_OK(s); + + delete txn1; + delete txn2; + delete txn3; +} + +TEST_P(TransactionTest, DeadlockCycleShared) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + + // Set up a wait for chain like this: + // + // Tn -> T(n*2) + // Tn -> T(n*2 + 1) + // + // So we have: + // T1 -> T2 -> T4 ... + // | |> T5 ... + // |> T3 -> T6 ... + // |> T7 ... + // up to T31, then T[16 - 31] -> T1. + // Note that Tn holds lock on floor(n / 2). + + std::vector txns(31); + + for (uint32_t i = 0; i < 31; i++) { + txns[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns[i]); + auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr, + false /* exclusive */); + ASSERT_OK(s); + } + + std::atomic checkpoints(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* arg) { checkpoints.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // We want the leaf transactions to block and hold everyone back. + std::vector threads; + for (uint32_t i = 0; i < 15; i++) { + std::function blocking_thread = [&, i] { + auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, + true /* exclusive */); + ASSERT_OK(s); + txns[i]->Rollback(); + delete txns[i]; + }; + threads.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints.load() != 15) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle T[16 - 31] -> T1 + for (uint32_t i = 15; i < 31; i++) { + auto s = + txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */); + ASSERT_TRUE(s.IsDeadlock()); + } + + // Rollback the leaf transaction. + for (uint32_t i = 15; i < 31; i++) { + txns[i]->Rollback(); + delete txns[i]; + } + + for (auto& t : threads) { + t.join(); + } +} + TEST_P(TransactionTest, DeadlockCycle) { WriteOptions write_options; ReadOptions read_options; @@ -345,7 +512,9 @@ TEST_P(TransactionTest, DeadlockStress) { // Lock keys in random order. for (const auto& k : random_keys) { - auto s = txn->GetForUpdate(read_options, k, nullptr); + // Lock mostly for shared access, but exclusive 1/4 of the time. + auto s = + txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0); if (!s.ok()) { ASSERT_TRUE(s.IsDeadlock()); txn->Rollback();