diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f5207366..4d130260f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -225,6 +225,7 @@ set(SOURCES utilities/transactions/transaction_base.cc utilities/transactions/transaction_impl.cc utilities/transactions/transaction_db_impl.cc + utilities/transactions/transaction_db_mutex_impl.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/ttl/db_ttl_impl.cc diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 0f9a1773e..f9023fc21 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -20,6 +20,8 @@ namespace rocksdb { +class TransactionDBMutexFactory; + struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -58,6 +60,11 @@ struct TransactionDBOptions { // A negative timeout should only be used if all transactions have an small // expiration set. int64_t default_lock_timeout = 1000; // 1 second + + // If set, the TransactionDB will use this implemenation of a mutex and + // condition variable for all transaction locking instead of the default + // mutex/condvar implementation. + std::shared_ptr custom_mutex_factory; }; struct TransactionOptions { diff --git a/include/rocksdb/utilities/transaction_db_mutex.h b/include/rocksdb/utilities/transaction_db_mutex.h new file mode 100644 index 000000000..773ebc106 --- /dev/null +++ b/include/rocksdb/utilities/transaction_db_mutex.h @@ -0,0 +1,92 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/status.h" + +namespace rocksdb { + +// TransactionDBMutex and TransactionDBCondVar APIs allows applications to +// implement custom mutexes and condition variables to be used by a +// TransactionDB when locking keys. +// +// To open a TransactionDB with a custom TransactionDBMutexFactory, set +// TransactionDBOptions.custom_mutex_factory. + +class TransactionDBMutex { + public: + virtual ~TransactionDBMutex() {} + + // Attempt to acquire lock. Return OK on success, or other Status on failure. + // If returned status is OK, TransactionDB will eventually call UnLock(). + virtual Status Lock() = 0; + + // Attempt to acquire lock. If timeout is non-negative, operation should be + // failed after this many microseconds. + // Returns OK on success, + // TimedOut if timed out, + // or other Status on failure. + // If returned status is OK, TransactionDB will eventually call UnLock(). + virtual Status TryLockFor(int64_t timeout_time) = 0; + + // Unlock Mutex that was successfully locked by Lock() or TryLockUntil() + virtual void UnLock() = 0; +}; + +class TransactionDBCondVar { + public: + virtual ~TransactionDBCondVar() {} + + // Block current thread until condition variable is notified by a call to + // Notify() or NotifyAll(). Wait() will be called with mutex locked. + // Returns OK if notified. + // Returns non-OK if TransactionDB should stop waiting and fail the operation. + // May return OK spuriously even if not notified. + virtual Status Wait(std::shared_ptr mutex) = 0; + + // Block current thread until condition variable is notified by a call to + // Notify() or NotifyAll(), or if the timeout is reached. + // Wait() will be called with mutex locked. + // + // If timeout is non-negative, operation should be failed after this many + // microseconds. + // If implementing a custom version of this class, the implementation may + // choose to ignore the timeout. + // + // Returns OK if notified. + // Returns TimedOut if timeout is reached. + // Returns other status if TransactionDB should otherwis stop waiting and + // fail the operation. + // May return OK spuriously even if not notified. + virtual Status WaitFor(std::shared_ptr mutex, + int64_t timeout_time) = 0; + + // If any threads are waiting on *this, unblock at least one of the + // waiting threads. + virtual void Notify() = 0; + + // Unblocks all threads waiting on *this. + virtual void NotifyAll() = 0; +}; + +// Factory class that can allocate mutexes and condition variables. +class TransactionDBMutexFactory { + public: + // Create a TransactionDBMutex object. + virtual std::shared_ptr AllocateMutex() = 0; + + // Create a TransactionDBCondVar object. + virtual std::shared_ptr AllocateCondVar() = 0; + + virtual ~TransactionDBMutexFactory() {} +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/src.mk b/src.mk index 96b3cadb1..cd0849d57 100644 --- a/src.mk +++ b/src.mk @@ -120,6 +120,7 @@ LIB_SOURCES = \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/transaction_base.cc \ utilities/transactions/transaction_db_impl.cc \ + utilities/transactions/transaction_db_mutex_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_util.cc \ diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 84baf4b40..98ca6f5b7 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -5,15 +5,16 @@ #ifndef ROCKSDB_LITE +#include "utilities/transactions/transaction_db_impl.h" + #include #include -#include "utilities/transactions/transaction_db_impl.h" - #include "db/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/transaction_impl.h" namespace rocksdb { @@ -22,7 +23,11 @@ TransactionDBImpl::TransactionDBImpl(DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), txn_db_options_(txn_db_options), - lock_mgr_(txn_db_options_.num_stripes, txn_db_options.max_num_locks) {} + lock_mgr_(txn_db_options_.num_stripes, txn_db_options.max_num_locks, + txn_db_options_.custom_mutex_factory + ? txn_db_options_.custom_mutex_factory + : std::shared_ptr( + new TransactionDBMutexFactoryImpl())) {} Transaction* TransactionDBImpl::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options) { diff --git a/utilities/transactions/transaction_db_mutex_impl.cc b/utilities/transactions/transaction_db_mutex_impl.cc new file mode 100644 index 000000000..185f8c725 --- /dev/null +++ b/utilities/transactions/transaction_db_mutex_impl.cc @@ -0,0 +1,121 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/transaction_db_mutex_impl.h" + +#include +#include +#include +#include + +#include "rocksdb/utilities/transaction_db_mutex.h" + +namespace rocksdb { + +class TransactionDBMutexImpl : public TransactionDBMutex { + public: + TransactionDBMutexImpl() {} + ~TransactionDBMutexImpl() {} + + Status Lock() override; + + Status TryLockFor(int64_t timeout_time) override; + + void UnLock() override { mutex_.unlock(); } + + friend class TransactionDBCondVarImpl; + + private: + std::timed_mutex mutex_; +}; + +class TransactionDBCondVarImpl : public TransactionDBCondVar { + public: + TransactionDBCondVarImpl() {} + ~TransactionDBCondVarImpl() {} + + Status Wait(std::shared_ptr mutex) override; + + Status WaitFor(std::shared_ptr mutex, + int64_t timeout_time) override; + + void Notify() override { cv_.notify_one(); } + + void NotifyAll() override { cv_.notify_all(); } + + private: + std::condition_variable_any cv_; +}; + +std::shared_ptr +TransactionDBMutexFactoryImpl::AllocateMutex() { + return std::shared_ptr(new TransactionDBMutexImpl()); +} + +std::shared_ptr +TransactionDBMutexFactoryImpl::AllocateCondVar() { + return std::shared_ptr(new TransactionDBCondVarImpl()); +} + +Status TransactionDBMutexImpl::Lock() { + mutex_.lock(); + return Status::OK(); +} + +Status TransactionDBMutexImpl::TryLockFor(int64_t timeout_time) { + bool locked = true; + + if (timeout_time < 0) { + // If timeout is negative, we wait indefinitely to acquire the lock + mutex_.lock(); + } else if (timeout_time == 0) { + locked = mutex_.try_lock(); + } else { + // Attempt to acquire the lock unless we timeout + auto duration = std::chrono::microseconds(timeout_time); + locked = mutex_.try_lock_for(duration); + } + + if (!locked) { + // timeout acquiring mutex + return Status::TimedOut(Status::SubCode::kMutexTimeout); + } + + return Status::OK(); +} + +Status TransactionDBCondVarImpl::Wait( + std::shared_ptr mutex) { + auto mutex_impl = reinterpret_cast(mutex.get()); + cv_.wait(mutex_impl->mutex_); + return Status::OK(); +} + +Status TransactionDBCondVarImpl::WaitFor( + std::shared_ptr mutex, int64_t timeout_time) { + auto mutex_impl = reinterpret_cast(mutex.get()); + + if (timeout_time < 0) { + // If timeout is negative, do not use a timeout + cv_.wait(mutex_impl->mutex_); + } else { + auto duration = std::chrono::microseconds(timeout_time); + auto cv_status = cv_.wait_for(mutex_impl->mutex_, duration); + + // Check if the wait stopped due to timing out. + if (cv_status == std::cv_status::timeout) { + return Status::TimedOut(Status::SubCode::kMutexTimeout); + } + } + + // CV was signaled, or we spuriously woke up (but didn't time out) + return Status::OK(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_db_mutex_impl.h b/utilities/transactions/transaction_db_mutex_impl.h new file mode 100644 index 000000000..7c915ca56 --- /dev/null +++ b/utilities/transactions/transaction_db_mutex_impl.h @@ -0,0 +1,26 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/transaction_db_mutex.h" + +namespace rocksdb { + +class TransactionDBMutex; +class TransactionDBCondVar; + +// Default implementation of TransactionDBMutexFactory. May be overridden +// by TransactionDBOptions.custom_mutex_factory. +class TransactionDBMutexFactoryImpl : public TransactionDBMutexFactory { + public: + std::shared_ptr AllocateMutex() override; + std::shared_ptr AllocateCondVar() override; +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index cfaf0b2ac..7012caff9 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -40,15 +40,16 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, txn_db_impl_(nullptr), txn_id_(GenTxnID()), expiration_time_(txn_options.expiration >= 0 - ? start_time_ / 1000 + txn_options.expiration + ? start_time_ + txn_options.expiration * 1000 : 0), - lock_timeout_(txn_options.lock_timeout) { + lock_timeout_(txn_options.lock_timeout * 1000) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); if (lock_timeout_ < 0) { // Lock timeout not set, use default - lock_timeout_ = txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout; + lock_timeout_ = + txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; } if (txn_options.set_snapshot) { @@ -69,7 +70,7 @@ void TransactionImpl::Cleanup() { bool TransactionImpl::IsExpired() const { if (expiration_time_ > 0) { - if (db_->GetEnv()->NowMicros() >= expiration_time_ * 1000) { + if (db_->GetEnv()->NowMicros() >= expiration_time_) { // Transaction is expired. return true; } diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 6b0901dcd..a02c2091a 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -49,7 +49,7 @@ class TransactionImpl : public TransactionBaseImpl { TransactionID GetTxnID() const { return txn_id_; } - // Returns the time (in milliseconds according to Env->GetMicros()*1000) + // Returns the time (in microseconds according to Env->GetMicros()) // that this transaction will be expired. Returns 0 if this transaction does // not expire. uint64_t GetExpirationTime() const { return expiration_time_; } @@ -57,10 +57,12 @@ class TransactionImpl : public TransactionBaseImpl { // returns true if this transaction has an expiration_time and has expired. bool IsExpired() const; - // Returns the number of milliseconds a transaction can wait on acquiring a + // Returns the number of microseconds a transaction can wait on acquiring a // lock or -1 if there is no timeout. int64_t GetLockTimeout() const { return lock_timeout_; } - void SetLockTimeout(int64_t timeout) override { lock_timeout_ = timeout; } + void SetLockTimeout(int64_t timeout) override { + lock_timeout_ = timeout * 1000; + } protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, @@ -76,7 +78,7 @@ class TransactionImpl : public TransactionBaseImpl { const TransactionID txn_id_; // If non-zero, this transaction should not be committed after this time (in - // milliseconds) + // microseconds according to Env->NowMicros()) const uint64_t expiration_time_; // Timeout in microseconds when locking a key or -1 if there is no timeout. diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 882c441d9..80e4fb8d9 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -21,6 +21,7 @@ #include #include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction_db_mutex.h" #include "util/autovector.h" #include "util/murmurhash.h" #include "util/thread_local.h" @@ -29,8 +30,10 @@ namespace rocksdb { struct LockInfo { TransactionID txn_id; - uint64_t - expiration_time; // Transaction locks are not valid after this time in ms + + // Transaction locks are not valid after this time in us + uint64_t expiration_time; + LockInfo(TransactionID id, uint64_t time) : txn_id(id), expiration_time(time) {} LockInfo(const LockInfo& lock_info) @@ -38,11 +41,18 @@ struct LockInfo { }; struct LockMapStripe { + explicit LockMapStripe(std::shared_ptr factory) { + stripe_mutex = factory->AllocateMutex(); + stripe_cv = factory->AllocateCondVar(); + assert(stripe_mutex); + assert(stripe_cv); + } + // Mutex must be held before modifying keys map - std::timed_mutex stripe_mutex; + std::shared_ptr stripe_mutex; // Condition Variable per stripe for waiting on a lock - std::condition_variable_any stripe_cv; + std::shared_ptr stripe_cv; // Locked keys mapped to the info about the transactions that locked them. // TODO(agiardullo): Explore performance of other data structures. @@ -51,11 +61,21 @@ struct LockMapStripe { // Map of #num_stripes LockMapStripes struct LockMap { - explicit LockMap(size_t num_stripes) - : num_stripes_(num_stripes), lock_map_stripes_(num_stripes) {} + explicit LockMap(size_t num_stripes, + std::shared_ptr factory) + : num_stripes_(num_stripes) { + lock_map_stripes_.reserve(num_stripes); + for (size_t i = 0; i < num_stripes; i++) { + LockMapStripe* stripe = new LockMapStripe(factory); + lock_map_stripes_.push_back(stripe); + } + } - LockMap(const LockMap& lock_map) - : num_stripes_(lock_map.num_stripes_), lock_map_stripes_(num_stripes_) {} + ~LockMap() { + for (auto stripe : lock_map_stripes_) { + delete stripe; + } + } // Number of sepearate LockMapStripes to create, each with their own Mutex const size_t num_stripes_; @@ -64,7 +84,7 @@ struct LockMap { // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) std::atomic lock_cnt{0}; - std::vector lock_map_stripes_; + std::vector lock_map_stripes_; size_t GetStripe(const std::string& key) const; }; @@ -78,10 +98,12 @@ void UnrefLockMapsCache(void* ptr) { } } // anonymous namespace -TransactionLockMgr::TransactionLockMgr(size_t default_num_stripes, - int64_t max_num_locks) +TransactionLockMgr::TransactionLockMgr( + size_t default_num_stripes, int64_t max_num_locks, + std::shared_ptr mutex_factory) : default_num_stripes_(default_num_stripes), max_num_locks_(max_num_locks), + mutex_factory_(mutex_factory), lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) {} TransactionLockMgr::~TransactionLockMgr() {} @@ -97,9 +119,9 @@ void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { InstrumentedMutexLock l(&lock_map_mutex_); if (lock_maps_.find(column_family_id) == lock_maps_.end()) { - lock_maps_.emplace( - column_family_id, - std::shared_ptr(new LockMap(default_num_stripes_))); + lock_maps_.emplace(column_family_id, + std::shared_ptr( + new LockMap(default_num_stripes_, mutex_factory_))); } else { // column_family already exists in lock map assert(false); @@ -162,18 +184,20 @@ std::shared_ptr TransactionLockMgr::GetLockMap( // Returns true if this lock has expired and can be acquired by another // transaction. -// If false, returns the number of microseconds until expiration in -// *wait_time_us, or 0 if no expiration. +// If false, sets *expire_time to the expiration time of the lock according +// to Env->GetMicros() or 0 if no expiration. bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, - uint64_t* wait_time_us) { + uint64_t* expire_time) { auto now = env->NowMicros(); - bool expired = (lock_info.expiration_time > 0 && - lock_info.expiration_time * 1000 <= now); + bool expired = + (lock_info.expiration_time > 0 && lock_info.expiration_time <= now); - if (!expired && lock_info.expiration_time > 0 && wait_time_us != nullptr) { + if (!expired && lock_info.expiration_time > 0) { // return how many microseconds until lock will be expired - *wait_time_us = (lock_info.expiration_time * 1000 - now); + *expire_time = lock_info.expiration_time; + } else { + *expire_time = 0; } return expired; @@ -196,7 +220,7 @@ Status TransactionLockMgr::TryLock(const TransactionImpl* txn, // Need to lock the mutex for the stripe that this key hashes to size_t stripe_num = lock_map->GetStripe(key); assert(lock_map->lock_map_stripes_.size() > stripe_num); - LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime()); int64_t timeout = txn->GetLockTimeout(); @@ -210,95 +234,88 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info) { - std::chrono::system_clock::time_point end_time; + Status result; + uint64_t start_time = 0; + uint64_t end_time = 0; if (timeout > 0) { - end_time = - std::chrono::system_clock::now() + std::chrono::milliseconds(timeout); + start_time = env->NowMicros(); + end_time = start_time + timeout; } - bool locked = true; - if (timeout == 0) { - // If timeout is 0, we do not wait to acquire the lock if it is not - // available - locked = stripe->stripe_mutex.try_lock(); - } else if (timeout < 0) { + if (timeout < 0) { // If timeout is negative, we wait indefinitely to acquire the lock - stripe->stripe_mutex.lock(); + result = stripe->stripe_mutex->Lock(); } else { - // If timeout is positive, we attempt to acquire the lock unless we timeout - locked = stripe->stripe_mutex.try_lock_until(end_time); + result = stripe->stripe_mutex->TryLockFor(timeout); } - if (!locked) { - // timeout acquiring mutex - return Status::TimedOut(Status::SubCode::kMutexTimeout); + if (!result.ok()) { + // failed to acquire mutex + return result; } // Acquire lock if we are able to - uint64_t wait_time_us = 0; - Status result = - AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us); + uint64_t expire_time_hint = 0; + result = + AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint); if (!result.ok() && timeout != 0) { // If we weren't able to acquire the lock, we will keep retrying as long - // as the - // timeout allows. + // as the timeout allows. bool timed_out = false; do { - // Check to see if the lock expires sooner than our timeout. - std::chrono::system_clock::time_point wait_time_end; - if (wait_time_us > 0 && - (timeout < 0 || - wait_time_us < static_cast(timeout * 1000))) { - wait_time_end = std::chrono::system_clock::now() + - std::chrono::microseconds(wait_time_us); - if (timeout > 0 && wait_time_end >= end_time) { - // lock expiration time is after our timeout. - wait_time_us = 0; - } - } else { - wait_time_us = 0; + // Decide how long to wait + int64_t cv_end_time = -1; + + // Check if held lock's expiration time is sooner than our timeout + if (expire_time_hint > 0 && + (timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) { + // expiration time is sooner than our timeout + cv_end_time = expire_time_hint; + } else if (timeout >= 0) { + cv_end_time = end_time; } - if (wait_time_us > 0) { - // Wait up to the locks current expiration time - stripe->stripe_cv.wait_until(stripe->stripe_mutex, wait_time_end); - } else if (timeout > 0) { - // Wait until we timeout - auto cv_status = - stripe->stripe_cv.wait_until(stripe->stripe_mutex, end_time); + if (cv_end_time < 0) { + // Wait indefinitely + result = stripe->stripe_cv->Wait(stripe->stripe_mutex); + } else { + uint64_t now = env->NowMicros(); + if (static_cast(cv_end_time) > now) { + result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex, + cv_end_time - now); + } + } - if (cv_status == std::cv_status::timeout) { + if (result.IsTimedOut()) { timed_out = true; // Even though we timed out, we will still make one more attempt to // acquire lock below (it is possible the lock expired and we // were never signaled). - } - } else { - // No wait timeout. - stripe->stripe_cv.wait(stripe->stripe_mutex); } - result = - AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us); + if (result.ok() || result.IsTimedOut()) { + result = AcquireLocked(lock_map, stripe, key, env, lock_info, + &expire_time_hint); + } } while (!result.ok() && !timed_out); } - stripe->stripe_mutex.unlock(); + stripe->stripe_mutex->UnLock(); return result; } // Try to lock this key after we have acquired the mutex. -// Returns the number of microseconds until expiration in *wait_time_us, +// Sets *expire_time to the expiration time in microseconds // or 0 if no expiration. // REQUIRED: Stripe mutex must be held. Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, const LockInfo& txn_lock_info, - uint64_t* wait_time_us) { + uint64_t* expire_time) { Status result; // Check if this key is already locked if (stripe->keys.find(key) != stripe->keys.end()) { @@ -307,7 +324,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, LockInfo& lock_info = stripe->keys.at(key); if (lock_info.txn_id != txn_lock_info.txn_id) { // locked by another txn. Check if it's expired - if (IsLockExpired(lock_info, env, wait_time_us)) { + if (IsLockExpired(lock_info, env, expire_time)) { // lock is expired, can steal it lock_info.txn_id = txn_lock_info.txn_id; lock_info.expiration_time = txn_lock_info.expiration_time; @@ -347,31 +364,32 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, // Lock the mutex for the stripe that this key hashes to size_t stripe_num = lock_map->GetStripe(key); assert(lock_map->lock_map_stripes_.size() > stripe_num); - LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); TransactionID txn_id = txn->GetTxnID(); - { - std::lock_guard lock(stripe->stripe_mutex); - - const auto& iter = stripe->keys.find(key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This should - // only happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() * 1000 < env->NowMicros()); + + stripe->stripe_mutex->Lock(); + + const auto& iter = stripe->keys.find(key); + if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { + // Found the key we locked. unlock it. + stripe->keys.erase(iter); + if (max_num_locks_ > 0) { + // Maintain lock count if there is a limit on the number of locks. + assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); + lock_map->lock_cnt--; } - } // stripe_mutex unlocked + } else { + // This key is either not locked or locked by someone else. This should + // only happen if the unlocking transaction has expired. + assert(txn->GetExpirationTime() > 0 && + txn->GetExpirationTime() < env->NowMicros()); + } + + stripe->stripe_mutex->UnLock(); // Signal waiting threads to retry locking - stripe->stripe_cv.notify_all(); + stripe->stripe_cv->NotifyAll(); } void TransactionLockMgr::UnLock(const TransactionImpl* txn, @@ -407,33 +425,33 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn, auto& stripe_keys = stripe_iter.second; assert(lock_map->lock_map_stripes_.size() > stripe_num); - LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num); - - { - std::lock_guard lock(stripe->stripe_mutex); - - for (const std::string* key : stripe_keys) { - const auto& iter = stripe->keys.find(*key); - if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { - // Found the key we locked. unlock it. - stripe->keys.erase(iter); - if (max_num_locks_ > 0) { - // Maintain lock count if there is a limit on the number of locks. - assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); - lock_map->lock_cnt--; - } - } else { - // This key is either not locked or locked by someone else. This - // should only - // happen if the unlocking transaction has expired. - assert(txn->GetExpirationTime() > 0 && - txn->GetExpirationTime() * 1000 < env->NowMicros()); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); + + stripe->stripe_mutex->Lock(); + + for (const std::string* key : stripe_keys) { + const auto& iter = stripe->keys.find(*key); + if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) { + // Found the key we locked. unlock it. + stripe->keys.erase(iter); + if (max_num_locks_ > 0) { + // Maintain lock count if there is a limit on the number of locks. + assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0); + lock_map->lock_cnt--; } + } else { + // This key is either not locked or locked by someone else. This + // should only + // happen if the unlocking transaction has expired. + assert(txn->GetExpirationTime() > 0 && + txn->GetExpirationTime() < env->NowMicros()); } - } // stripe_mutex unlocked + } + + stripe->stripe_mutex->UnLock(); // Signal waiting threads to retry locking - stripe->stripe_cv.notify_all(); + stripe->stripe_cv->NotifyAll(); } } } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 7768496a2..8f640d4ca 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -27,7 +27,8 @@ class Slice; class TransactionLockMgr { public: - TransactionLockMgr(size_t default_num_stripes, int64_t max_num_locks); + TransactionLockMgr(size_t default_num_stripes, int64_t max_num_locks, + std::shared_ptr factory); ~TransactionLockMgr(); @@ -58,6 +59,9 @@ class TransactionLockMgr { // Limit on number of keys locked per column family const int64_t max_num_locks_; + // Used to allocate mutexes/condvars to use when locking keys + std::shared_ptr mutex_factory_; + // Must be held when accessing/modifying lock_maps_ InstrumentedMutex lock_map_mutex_;