TransactionDB Custom Locking API

Summary:
Prototype of API to allow MyRocks to override default Mutex/CondVar used by transactions with their own implementations.  They would simply need to pass their own implementations of Mutex/CondVar to the templated TransactionDB::Open().

Default implementation of TransactionDBMutex/TransactionDBCondVar provided (but the code is not currently changed to use this).

Let me know if this API makes sense or if it should be changed

Test Plan: n/a

Reviewers: yhchiang, rven, igor, sdong, spetrunia

Reviewed By: spetrunia

Subscribers: maykov, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43761
main
agiardullo 9 years ago
parent 0ccf2db385
commit 5e94f68f35
  1. 1
      CMakeLists.txt
  2. 7
      include/rocksdb/utilities/transaction_db.h
  3. 92
      include/rocksdb/utilities/transaction_db_mutex.h
  4. 1
      src.mk
  5. 11
      utilities/transactions/transaction_db_impl.cc
  6. 121
      utilities/transactions/transaction_db_mutex_impl.cc
  7. 26
      utilities/transactions/transaction_db_mutex_impl.h
  8. 9
      utilities/transactions/transaction_impl.cc
  9. 10
      utilities/transactions/transaction_impl.h
  10. 250
      utilities/transactions/transaction_lock_mgr.cc
  11. 6
      utilities/transactions/transaction_lock_mgr.h

@ -225,6 +225,7 @@ set(SOURCES
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_impl.cc
utilities/transactions/transaction_db_impl.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc
utilities/ttl/db_ttl_impl.cc

@ -20,6 +20,8 @@
namespace rocksdb {
class TransactionDBMutexFactory;
struct TransactionDBOptions {
// Specifies the maximum number of keys that can be locked at the same time
// per column family.
@ -58,6 +60,11 @@ struct TransactionDBOptions {
// A negative timeout should only be used if all transactions have an small
// expiration set.
int64_t default_lock_timeout = 1000; // 1 second
// If set, the TransactionDB will use this implemenation of a mutex and
// condition variable for all transaction locking instead of the default
// mutex/condvar implementation.
std::shared_ptr<TransactionDBMutexFactory> custom_mutex_factory;
};
struct TransactionOptions {

@ -0,0 +1,92 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <memory>
#include "rocksdb/status.h"
namespace rocksdb {
// TransactionDBMutex and TransactionDBCondVar APIs allows applications to
// implement custom mutexes and condition variables to be used by a
// TransactionDB when locking keys.
//
// To open a TransactionDB with a custom TransactionDBMutexFactory, set
// TransactionDBOptions.custom_mutex_factory.
class TransactionDBMutex {
public:
virtual ~TransactionDBMutex() {}
// Attempt to acquire lock. Return OK on success, or other Status on failure.
// If returned status is OK, TransactionDB will eventually call UnLock().
virtual Status Lock() = 0;
// Attempt to acquire lock. If timeout is non-negative, operation should be
// failed after this many microseconds.
// Returns OK on success,
// TimedOut if timed out,
// or other Status on failure.
// If returned status is OK, TransactionDB will eventually call UnLock().
virtual Status TryLockFor(int64_t timeout_time) = 0;
// Unlock Mutex that was successfully locked by Lock() or TryLockUntil()
virtual void UnLock() = 0;
};
class TransactionDBCondVar {
public:
virtual ~TransactionDBCondVar() {}
// Block current thread until condition variable is notified by a call to
// Notify() or NotifyAll(). Wait() will be called with mutex locked.
// Returns OK if notified.
// Returns non-OK if TransactionDB should stop waiting and fail the operation.
// May return OK spuriously even if not notified.
virtual Status Wait(std::shared_ptr<TransactionDBMutex> mutex) = 0;
// Block current thread until condition variable is notified by a call to
// Notify() or NotifyAll(), or if the timeout is reached.
// Wait() will be called with mutex locked.
//
// If timeout is non-negative, operation should be failed after this many
// microseconds.
// If implementing a custom version of this class, the implementation may
// choose to ignore the timeout.
//
// Returns OK if notified.
// Returns TimedOut if timeout is reached.
// Returns other status if TransactionDB should otherwis stop waiting and
// fail the operation.
// May return OK spuriously even if not notified.
virtual Status WaitFor(std::shared_ptr<TransactionDBMutex> mutex,
int64_t timeout_time) = 0;
// If any threads are waiting on *this, unblock at least one of the
// waiting threads.
virtual void Notify() = 0;
// Unblocks all threads waiting on *this.
virtual void NotifyAll() = 0;
};
// Factory class that can allocate mutexes and condition variables.
class TransactionDBMutexFactory {
public:
// Create a TransactionDBMutex object.
virtual std::shared_ptr<TransactionDBMutex> AllocateMutex() = 0;
// Create a TransactionDBCondVar object.
virtual std::shared_ptr<TransactionDBCondVar> AllocateCondVar() = 0;
virtual ~TransactionDBMutexFactory() {}
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -120,6 +120,7 @@ LIB_SOURCES = \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/transaction_db_impl.cc \
utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_impl.cc \
utilities/transactions/transaction_util.cc \

@ -5,15 +5,16 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_db_impl.h"
#include <string>
#include <vector>
#include "utilities/transactions/transaction_db_impl.h"
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
#include "utilities/transactions/transaction_impl.h"
namespace rocksdb {
@ -22,7 +23,11 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
txn_db_options_(txn_db_options),
lock_mgr_(txn_db_options_.num_stripes, txn_db_options.max_num_locks) {}
lock_mgr_(txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {}
Transaction* TransactionDBImpl::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options) {

@ -0,0 +1,121 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_db_mutex_impl.h"
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include "rocksdb/utilities/transaction_db_mutex.h"
namespace rocksdb {
class TransactionDBMutexImpl : public TransactionDBMutex {
public:
TransactionDBMutexImpl() {}
~TransactionDBMutexImpl() {}
Status Lock() override;
Status TryLockFor(int64_t timeout_time) override;
void UnLock() override { mutex_.unlock(); }
friend class TransactionDBCondVarImpl;
private:
std::timed_mutex mutex_;
};
class TransactionDBCondVarImpl : public TransactionDBCondVar {
public:
TransactionDBCondVarImpl() {}
~TransactionDBCondVarImpl() {}
Status Wait(std::shared_ptr<TransactionDBMutex> mutex) override;
Status WaitFor(std::shared_ptr<TransactionDBMutex> mutex,
int64_t timeout_time) override;
void Notify() override { cv_.notify_one(); }
void NotifyAll() override { cv_.notify_all(); }
private:
std::condition_variable_any cv_;
};
std::shared_ptr<TransactionDBMutex>
TransactionDBMutexFactoryImpl::AllocateMutex() {
return std::shared_ptr<TransactionDBMutex>(new TransactionDBMutexImpl());
}
std::shared_ptr<TransactionDBCondVar>
TransactionDBMutexFactoryImpl::AllocateCondVar() {
return std::shared_ptr<TransactionDBCondVar>(new TransactionDBCondVarImpl());
}
Status TransactionDBMutexImpl::Lock() {
mutex_.lock();
return Status::OK();
}
Status TransactionDBMutexImpl::TryLockFor(int64_t timeout_time) {
bool locked = true;
if (timeout_time < 0) {
// If timeout is negative, we wait indefinitely to acquire the lock
mutex_.lock();
} else if (timeout_time == 0) {
locked = mutex_.try_lock();
} else {
// Attempt to acquire the lock unless we timeout
auto duration = std::chrono::microseconds(timeout_time);
locked = mutex_.try_lock_for(duration);
}
if (!locked) {
// timeout acquiring mutex
return Status::TimedOut(Status::SubCode::kMutexTimeout);
}
return Status::OK();
}
Status TransactionDBCondVarImpl::Wait(
std::shared_ptr<TransactionDBMutex> mutex) {
auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get());
cv_.wait(mutex_impl->mutex_);
return Status::OK();
}
Status TransactionDBCondVarImpl::WaitFor(
std::shared_ptr<TransactionDBMutex> mutex, int64_t timeout_time) {
auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get());
if (timeout_time < 0) {
// If timeout is negative, do not use a timeout
cv_.wait(mutex_impl->mutex_);
} else {
auto duration = std::chrono::microseconds(timeout_time);
auto cv_status = cv_.wait_for(mutex_impl->mutex_, duration);
// Check if the wait stopped due to timing out.
if (cv_status == std::cv_status::timeout) {
return Status::TimedOut(Status::SubCode::kMutexTimeout);
}
}
// CV was signaled, or we spuriously woke up (but didn't time out)
return Status::OK();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,26 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/transaction_db_mutex.h"
namespace rocksdb {
class TransactionDBMutex;
class TransactionDBCondVar;
// Default implementation of TransactionDBMutexFactory. May be overridden
// by TransactionDBOptions.custom_mutex_factory.
class TransactionDBMutexFactoryImpl : public TransactionDBMutexFactory {
public:
std::shared_ptr<TransactionDBMutex> AllocateMutex() override;
std::shared_ptr<TransactionDBCondVar> AllocateCondVar() override;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -40,15 +40,16 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
txn_db_impl_(nullptr),
txn_id_(GenTxnID()),
expiration_time_(txn_options.expiration >= 0
? start_time_ / 1000 + txn_options.expiration
? start_time_ + txn_options.expiration * 1000
: 0),
lock_timeout_(txn_options.lock_timeout) {
lock_timeout_(txn_options.lock_timeout * 1000) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_);
if (lock_timeout_ < 0) {
// Lock timeout not set, use default
lock_timeout_ = txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout;
lock_timeout_ =
txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
}
if (txn_options.set_snapshot) {
@ -69,7 +70,7 @@ void TransactionImpl::Cleanup() {
bool TransactionImpl::IsExpired() const {
if (expiration_time_ > 0) {
if (db_->GetEnv()->NowMicros() >= expiration_time_ * 1000) {
if (db_->GetEnv()->NowMicros() >= expiration_time_) {
// Transaction is expired.
return true;
}

@ -49,7 +49,7 @@ class TransactionImpl : public TransactionBaseImpl {
TransactionID GetTxnID() const { return txn_id_; }
// Returns the time (in milliseconds according to Env->GetMicros()*1000)
// Returns the time (in microseconds according to Env->GetMicros())
// that this transaction will be expired. Returns 0 if this transaction does
// not expire.
uint64_t GetExpirationTime() const { return expiration_time_; }
@ -57,10 +57,12 @@ class TransactionImpl : public TransactionBaseImpl {
// returns true if this transaction has an expiration_time and has expired.
bool IsExpired() const;
// Returns the number of milliseconds a transaction can wait on acquiring a
// Returns the number of microseconds a transaction can wait on acquiring a
// lock or -1 if there is no timeout.
int64_t GetLockTimeout() const { return lock_timeout_; }
void SetLockTimeout(int64_t timeout) override { lock_timeout_ = timeout; }
void SetLockTimeout(int64_t timeout) override {
lock_timeout_ = timeout * 1000;
}
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
@ -76,7 +78,7 @@ class TransactionImpl : public TransactionBaseImpl {
const TransactionID txn_id_;
// If non-zero, this transaction should not be committed after this time (in
// milliseconds)
// microseconds according to Env->NowMicros())
const uint64_t expiration_time_;
// Timeout in microseconds when locking a key or -1 if there is no timeout.

@ -21,6 +21,7 @@
#include <vector>
#include "rocksdb/slice.h"
#include "rocksdb/utilities/transaction_db_mutex.h"
#include "util/autovector.h"
#include "util/murmurhash.h"
#include "util/thread_local.h"
@ -29,8 +30,10 @@ namespace rocksdb {
struct LockInfo {
TransactionID txn_id;
uint64_t
expiration_time; // Transaction locks are not valid after this time in ms
// Transaction locks are not valid after this time in us
uint64_t expiration_time;
LockInfo(TransactionID id, uint64_t time)
: txn_id(id), expiration_time(time) {}
LockInfo(const LockInfo& lock_info)
@ -38,11 +41,18 @@ struct LockInfo {
};
struct LockMapStripe {
explicit LockMapStripe(std::shared_ptr<TransactionDBMutexFactory> factory) {
stripe_mutex = factory->AllocateMutex();
stripe_cv = factory->AllocateCondVar();
assert(stripe_mutex);
assert(stripe_cv);
}
// Mutex must be held before modifying keys map
std::timed_mutex stripe_mutex;
std::shared_ptr<TransactionDBMutex> stripe_mutex;
// Condition Variable per stripe for waiting on a lock
std::condition_variable_any stripe_cv;
std::shared_ptr<TransactionDBCondVar> stripe_cv;
// Locked keys mapped to the info about the transactions that locked them.
// TODO(agiardullo): Explore performance of other data structures.
@ -51,11 +61,21 @@ struct LockMapStripe {
// Map of #num_stripes LockMapStripes
struct LockMap {
explicit LockMap(size_t num_stripes)
: num_stripes_(num_stripes), lock_map_stripes_(num_stripes) {}
explicit LockMap(size_t num_stripes,
std::shared_ptr<TransactionDBMutexFactory> factory)
: num_stripes_(num_stripes) {
lock_map_stripes_.reserve(num_stripes);
for (size_t i = 0; i < num_stripes; i++) {
LockMapStripe* stripe = new LockMapStripe(factory);
lock_map_stripes_.push_back(stripe);
}
}
LockMap(const LockMap& lock_map)
: num_stripes_(lock_map.num_stripes_), lock_map_stripes_(num_stripes_) {}
~LockMap() {
for (auto stripe : lock_map_stripes_) {
delete stripe;
}
}
// Number of sepearate LockMapStripes to create, each with their own Mutex
const size_t num_stripes_;
@ -64,7 +84,7 @@ struct LockMap {
// (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
std::atomic<int64_t> lock_cnt{0};
std::vector<LockMapStripe> lock_map_stripes_;
std::vector<LockMapStripe*> lock_map_stripes_;
size_t GetStripe(const std::string& key) const;
};
@ -78,10 +98,12 @@ void UnrefLockMapsCache(void* ptr) {
}
} // anonymous namespace
TransactionLockMgr::TransactionLockMgr(size_t default_num_stripes,
int64_t max_num_locks)
TransactionLockMgr::TransactionLockMgr(
size_t default_num_stripes, int64_t max_num_locks,
std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
: default_num_stripes_(default_num_stripes),
max_num_locks_(max_num_locks),
mutex_factory_(mutex_factory),
lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) {}
TransactionLockMgr::~TransactionLockMgr() {}
@ -97,9 +119,9 @@ void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
InstrumentedMutexLock l(&lock_map_mutex_);
if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
lock_maps_.emplace(
column_family_id,
std::shared_ptr<LockMap>(new LockMap(default_num_stripes_)));
lock_maps_.emplace(column_family_id,
std::shared_ptr<LockMap>(
new LockMap(default_num_stripes_, mutex_factory_)));
} else {
// column_family already exists in lock map
assert(false);
@ -162,18 +184,20 @@ std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
// Returns true if this lock has expired and can be acquired by another
// transaction.
// If false, returns the number of microseconds until expiration in
// *wait_time_us, or 0 if no expiration.
// If false, sets *expire_time to the expiration time of the lock according
// to Env->GetMicros() or 0 if no expiration.
bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
uint64_t* wait_time_us) {
uint64_t* expire_time) {
auto now = env->NowMicros();
bool expired = (lock_info.expiration_time > 0 &&
lock_info.expiration_time * 1000 <= now);
bool expired =
(lock_info.expiration_time > 0 && lock_info.expiration_time <= now);
if (!expired && lock_info.expiration_time > 0 && wait_time_us != nullptr) {
if (!expired && lock_info.expiration_time > 0) {
// return how many microseconds until lock will be expired
*wait_time_us = (lock_info.expiration_time * 1000 - now);
*expire_time = lock_info.expiration_time;
} else {
*expire_time = 0;
}
return expired;
@ -196,7 +220,7 @@ Status TransactionLockMgr::TryLock(const TransactionImpl* txn,
// Need to lock the mutex for the stripe that this key hashes to
size_t stripe_num = lock_map->GetStripe(key);
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime());
int64_t timeout = txn->GetLockTimeout();
@ -210,95 +234,88 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
const std::string& key, Env* env,
int64_t timeout,
const LockInfo& lock_info) {
std::chrono::system_clock::time_point end_time;
Status result;
uint64_t start_time = 0;
uint64_t end_time = 0;
if (timeout > 0) {
end_time =
std::chrono::system_clock::now() + std::chrono::milliseconds(timeout);
start_time = env->NowMicros();
end_time = start_time + timeout;
}
bool locked = true;
if (timeout == 0) {
// If timeout is 0, we do not wait to acquire the lock if it is not
// available
locked = stripe->stripe_mutex.try_lock();
} else if (timeout < 0) {
if (timeout < 0) {
// If timeout is negative, we wait indefinitely to acquire the lock
stripe->stripe_mutex.lock();
result = stripe->stripe_mutex->Lock();
} else {
// If timeout is positive, we attempt to acquire the lock unless we timeout
locked = stripe->stripe_mutex.try_lock_until(end_time);
result = stripe->stripe_mutex->TryLockFor(timeout);
}
if (!locked) {
// timeout acquiring mutex
return Status::TimedOut(Status::SubCode::kMutexTimeout);
if (!result.ok()) {
// failed to acquire mutex
return result;
}
// Acquire lock if we are able to
uint64_t wait_time_us = 0;
Status result =
AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us);
uint64_t expire_time_hint = 0;
result =
AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint);
if (!result.ok() && timeout != 0) {
// If we weren't able to acquire the lock, we will keep retrying as long
// as the
// timeout allows.
// as the timeout allows.
bool timed_out = false;
do {
// Check to see if the lock expires sooner than our timeout.
std::chrono::system_clock::time_point wait_time_end;
if (wait_time_us > 0 &&
(timeout < 0 ||
wait_time_us < static_cast<uint64_t>(timeout * 1000))) {
wait_time_end = std::chrono::system_clock::now() +
std::chrono::microseconds(wait_time_us);
if (timeout > 0 && wait_time_end >= end_time) {
// lock expiration time is after our timeout.
wait_time_us = 0;
}
} else {
wait_time_us = 0;
// Decide how long to wait
int64_t cv_end_time = -1;
// Check if held lock's expiration time is sooner than our timeout
if (expire_time_hint > 0 &&
(timeout < 0 || (timeout > 0 && expire_time_hint < end_time))) {
// expiration time is sooner than our timeout
cv_end_time = expire_time_hint;
} else if (timeout >= 0) {
cv_end_time = end_time;
}
if (wait_time_us > 0) {
// Wait up to the locks current expiration time
stripe->stripe_cv.wait_until(stripe->stripe_mutex, wait_time_end);
} else if (timeout > 0) {
// Wait until we timeout
auto cv_status =
stripe->stripe_cv.wait_until(stripe->stripe_mutex, end_time);
if (cv_end_time < 0) {
// Wait indefinitely
result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
} else {
uint64_t now = env->NowMicros();
if (static_cast<uint64_t>(cv_end_time) > now) {
result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
cv_end_time - now);
}
}
if (cv_status == std::cv_status::timeout) {
if (result.IsTimedOut()) {
timed_out = true;
// Even though we timed out, we will still make one more attempt to
// acquire lock below (it is possible the lock expired and we
// were never signaled).
}
} else {
// No wait timeout.
stripe->stripe_cv.wait(stripe->stripe_mutex);
}
result =
AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us);
if (result.ok() || result.IsTimedOut()) {
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint);
}
} while (!result.ok() && !timed_out);
}
stripe->stripe_mutex.unlock();
stripe->stripe_mutex->UnLock();
return result;
}
// Try to lock this key after we have acquired the mutex.
// Returns the number of microseconds until expiration in *wait_time_us,
// Sets *expire_time to the expiration time in microseconds
// or 0 if no expiration.
// REQUIRED: Stripe mutex must be held.
Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
LockMapStripe* stripe,
const std::string& key, Env* env,
const LockInfo& txn_lock_info,
uint64_t* wait_time_us) {
uint64_t* expire_time) {
Status result;
// Check if this key is already locked
if (stripe->keys.find(key) != stripe->keys.end()) {
@ -307,7 +324,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
LockInfo& lock_info = stripe->keys.at(key);
if (lock_info.txn_id != txn_lock_info.txn_id) {
// locked by another txn. Check if it's expired
if (IsLockExpired(lock_info, env, wait_time_us)) {
if (IsLockExpired(lock_info, env, expire_time)) {
// lock is expired, can steal it
lock_info.txn_id = txn_lock_info.txn_id;
lock_info.expiration_time = txn_lock_info.expiration_time;
@ -347,31 +364,32 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
// Lock the mutex for the stripe that this key hashes to
size_t stripe_num = lock_map->GetStripe(key);
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
TransactionID txn_id = txn->GetTxnID();
{
std::lock_guard<std::timed_mutex> lock(stripe->stripe_mutex);
const auto& iter = stripe->keys.find(key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} else {
// This key is either not locked or locked by someone else. This should
// only happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() * 1000 < env->NowMicros());
stripe->stripe_mutex->Lock();
const auto& iter = stripe->keys.find(key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} // stripe_mutex unlocked
} else {
// This key is either not locked or locked by someone else. This should
// only happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
stripe->stripe_mutex->UnLock();
// Signal waiting threads to retry locking
stripe->stripe_cv.notify_all();
stripe->stripe_cv->NotifyAll();
}
void TransactionLockMgr::UnLock(const TransactionImpl* txn,
@ -407,33 +425,33 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn,
auto& stripe_keys = stripe_iter.second;
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = &lock_map->lock_map_stripes_.at(stripe_num);
{
std::lock_guard<std::timed_mutex> lock(stripe->stripe_mutex);
for (const std::string* key : stripe_keys) {
const auto& iter = stripe->keys.find(*key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} else {
// This key is either not locked or locked by someone else. This
// should only
// happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() * 1000 < env->NowMicros());
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
stripe->stripe_mutex->Lock();
for (const std::string* key : stripe_keys) {
const auto& iter = stripe->keys.find(*key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} else {
// This key is either not locked or locked by someone else. This
// should only
// happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
} // stripe_mutex unlocked
}
stripe->stripe_mutex->UnLock();
// Signal waiting threads to retry locking
stripe->stripe_cv.notify_all();
stripe->stripe_cv->NotifyAll();
}
}
}

@ -27,7 +27,8 @@ class Slice;
class TransactionLockMgr {
public:
TransactionLockMgr(size_t default_num_stripes, int64_t max_num_locks);
TransactionLockMgr(size_t default_num_stripes, int64_t max_num_locks,
std::shared_ptr<TransactionDBMutexFactory> factory);
~TransactionLockMgr();
@ -58,6 +59,9 @@ class TransactionLockMgr {
// Limit on number of keys locked per column family
const int64_t max_num_locks_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
// Must be held when accessing/modifying lock_maps_
InstrumentedMutex lock_map_mutex_;

Loading…
Cancel
Save