Range Locking: Allow different LockManagers, add Range Lock definitions (#7443)

Summary:
This PR has two commits:
1.  Modify the code to allow different Lock Managers (of any kind) to be used.  It is implied that a LockManager uses its own custom LockTracker.
2.  Add definitions for Range Locking (class Endpoint and GetRangeLock() function.

cheng-chang, is this what you've had in mind (should the PR have both item 1 and item 2?)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7443

Reviewed By: zhichao-cao

Differential Revision: D24123172

Pulled By: cheng-chang

fbshipit-source-id: c6548ad6d4cc3c25f68d13b29147bc6fdf357185
main
Sergei Petrunia 4 years ago committed by Facebook GitHub Bot
parent db03172d08
commit d8bd9fc7b3
  1. 82
      include/rocksdb/utilities/transaction.h
  2. 100
      include/rocksdb/utilities/transaction_db.h
  3. 14
      utilities/transactions/lock/lock_manager.cc
  4. 4
      utilities/transactions/lock/lock_manager.h
  5. 10
      utilities/transactions/lock/lock_tracker.h
  6. 58
      utilities/transactions/lock/point/point_lock_manager.cc
  7. 76
      utilities/transactions/lock/point/point_lock_manager.h
  8. 245
      utilities/transactions/lock/point/point_lock_manager_test.cc
  9. 276
      utilities/transactions/lock/point/point_lock_manager_test.h
  10. 4
      utilities/transactions/lock/point/point_lock_tracker.cc
  11. 2
      utilities/transactions/lock/point/point_lock_tracker.h
  12. 30
      utilities/transactions/lock/range/range_lock_manager.h
  13. 48
      utilities/transactions/pessimistic_transaction.cc
  14. 4
      utilities/transactions/pessimistic_transaction.h
  15. 8
      utilities/transactions/pessimistic_transaction_db.cc
  16. 5
      utilities/transactions/pessimistic_transaction_db.h

@ -24,9 +24,81 @@ using TransactionName = std::string;
using TransactionID = uint64_t;
// An endpoint for a range of keys.
/*
class Endpoint allows to define prefix ranges.
Prefix ranges are introduced below.
== Basic Ranges ==
Let's start from basic ranges. Key Comparator defines ordering of rowkeys.
Then, one can specify finite closed ranges by just providing rowkeys of their
endpoints:
lower_endpoint <= X <= upper_endpoint
However our goal is to provide a richer set of endpoints. Read on.
== Lexicographic ordering ==
A lexicographic (or dictionary) ordering satisfies these criteria: If there
are two keys in form
key_a = {prefix_a, suffix_a}
key_b = {prefix_b, suffix_b}
and
prefix_a < prefix_b
then
key_a < key_b.
== Prefix ranges ==
With lexicographic ordering, one may want to define ranges in form
"prefix is $PREFIX"
which translates to a range in form
{$PREFIX, -infinity} < X < {$PREFIX, +infinity}
where -infinity will compare less than any possible suffix, and +infinity
will compare as greater than any possible suffix.
class Endpoint allows to define these kind of rangtes.
== Notes ==
BytewiseComparator and ReverseBytewiseComparator produce lexicographic
ordering.
The row comparison function is able to compare key prefixes. If the data
domain includes keys A and B, then the comparison function is able to compare
equal-length prefixes:
min_len= min(byte_length(A), byte_length(B));
cmp(Slice(A, min_len), Slice(B, min_len)); // this call is valid
== Other options ==
As far as MyRocks is concerned, the alternative to prefix ranges would be to
support both open (non-inclusive) and closed (inclusive) range endpoints.
*/
class Endpoint {
// TODO
public:
Slice slice;
/*
true : the key has a "+infinity" suffix. A suffix that would compare as
greater than any other suffix
false : otherwise
*/
bool inf_suffix;
explicit Endpoint(const Slice& slice_arg, bool inf_suffix_arg = false)
: slice(slice_arg), inf_suffix(inf_suffix_arg) {}
explicit Endpoint(const char* s, bool inf_suffix_arg = false)
: slice(s), inf_suffix(inf_suffix_arg) {}
Endpoint(const char* s, size_t size, bool inf_suffix_arg = false)
: slice(s, size), inf_suffix(inf_suffix_arg) {}
Endpoint() : inf_suffix(false) {}
};
// Provides notification to the caller of SetSnapshotOnNextOperation when
@ -282,6 +354,12 @@ class Transaction {
}
}
// Get a range lock on [start_endpoint; end_endpoint].
virtual Status GetRangeLock(ColumnFamilyHandle*, const Endpoint&,
const Endpoint&) {
return Status::NotSupported();
}
virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value, bool exclusive = true,
const bool do_validate = true) = 0;

@ -31,6 +31,98 @@ enum TxnDBWritePolicy {
const uint32_t kInitialMaxDeadlocks = 5;
class LockManager;
struct RangeLockInfo;
// A lock manager handle
// The workflow is as follows:
// * Use a factory method (like NewRangeLockManager()) to create a lock
// manager and get its handle.
// * A Handle for a particular kind of lock manager will have extra
// methods and parameters to control the lock manager
// * Pass the handle to RocksDB in TransactionDBOptions::lock_mgr_handle. It
// will be used to perform locking.
class LockManagerHandle {
public:
// PessimisticTransactionDB will call this to get the Lock Manager it's going
// to use.
virtual LockManager* getLockManager() = 0;
virtual ~LockManagerHandle() {}
};
// Same as class Endpoint, but use std::string to manage the buffer allocation
struct EndpointWithString {
std::string slice;
bool inf_suffix;
};
struct RangeDeadlockInfo {
TransactionID m_txn_id;
uint32_t m_cf_id;
bool m_exclusive;
EndpointWithString m_start;
EndpointWithString m_end;
};
struct RangeDeadlockPath {
std::vector<RangeDeadlockInfo> path;
bool limit_exceeded;
int64_t deadlock_time;
explicit RangeDeadlockPath(std::vector<RangeDeadlockInfo> path_entry,
const int64_t& dl_time)
: path(path_entry), limit_exceeded(false), deadlock_time(dl_time) {}
// empty path, limit exceeded constructor and default constructor
explicit RangeDeadlockPath(const int64_t& dl_time = 0, bool limit = false)
: path(0), limit_exceeded(limit), deadlock_time(dl_time) {}
bool empty() { return path.empty() && !limit_exceeded; }
};
// A handle to control RangeLockManager (Range-based lock manager) from outside
// RocksDB
class RangeLockManagerHandle : public LockManagerHandle {
public:
// Total amount of lock memory to use (per column family)
virtual int SetMaxLockMemory(size_t max_lock_memory) = 0;
virtual size_t GetMaxLockMemory() = 0;
using RangeLockStatus =
std::unordered_multimap<ColumnFamilyId, RangeLockInfo>;
virtual RangeLockStatus GetRangeLockStatusData() = 0;
class Counters {
public:
// Number of times lock escalation was triggered (for all column families)
uint64_t escalation_count;
// How much memory is currently used for locks (total for all column
// families)
uint64_t current_lock_memory;
};
// Get the current counter values
virtual Counters GetStatus() = 0;
// Functions for range-based Deadlock reporting.
virtual std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() = 0;
virtual void SetRangeDeadlockInfoBufferSize(uint32_t target_size) = 0;
virtual ~RangeLockManagerHandle() {}
};
// A factory function to create a Range Lock Manager. The created object should
// be:
// 1. Passed in TransactionDBOptions::lock_mgr_handle to open the database in
// range-locking mode
// 2. Used to control the lock manager when the DB is already open.
RangeLockManagerHandle* NewRangeLockManager(
std::shared_ptr<TransactionDBMutexFactory> mutex_factory);
struct TransactionDBOptions {
// Specifies the maximum number of keys that can be locked at the same time
// per column family.
@ -92,6 +184,10 @@ struct TransactionDBOptions {
// for the special way that myrocks uses this operands.
bool rollback_merge_operands = false;
// nullptr means use default lock manager.
// Other value means the user provides a custom lock manager.
std::shared_ptr<LockManagerHandle> lock_mgr_handle;
// If true, the TransactionDB implementation might skip concurrency control
// unless it is overridden by TransactionOptions or
// TransactionDBWriteOptimizations. This can be used in conjuction with
@ -203,8 +299,8 @@ struct KeyLockInfo {
};
struct RangeLockInfo {
Endpoint start;
Endpoint end;
EndpointWithString start;
EndpointWithString end;
std::vector<TransactionID> ids;
bool exclusive;
};

@ -11,11 +11,17 @@
namespace ROCKSDB_NAMESPACE {
LockManager* NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt) {
std::shared_ptr<LockManager> NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt) {
assert(db);
// TODO: determine the lock manager implementation based on configuration.
return new PointLockManager(db, opt);
if (opt.lock_mgr_handle) {
// A custom lock manager was provided in options
auto mgr = opt.lock_mgr_handle->getLockManager();
return std::shared_ptr<LockManager>(opt.lock_mgr_handle, mgr);
} else {
// Use a point lock manager by default
return std::shared_ptr<LockManager>(new PointLockManager(db, opt));
}
}
} // namespace ROCKSDB_NAMESPACE

@ -74,8 +74,8 @@ class LockManager {
// LockManager should always be constructed through this factory method,
// instead of constructing through concrete implementations' constructor.
// Caller owns the returned pointer.
LockManager* NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt);
std::shared_ptr<LockManager> NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt);
} // namespace ROCKSDB_NAMESPACE

@ -4,12 +4,14 @@
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <memory>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction_db.h"
namespace ROCKSDB_NAMESPACE {
@ -29,7 +31,12 @@ struct PointLockRequest {
// Request for locking a range of keys.
struct RangeLockRequest {
// TODO
// The id of the key's column family.
ColumnFamilyId column_family_id;
// The range to be locked
Endpoint start_endp;
Endpoint end_endp;
};
struct PointLockStatus {
@ -199,3 +206,4 @@ class LockTrackerFactory {
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -94,64 +94,6 @@ struct LockMap {
size_t GetStripe(const std::string& key) const;
};
void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
if (paths_buffer_.empty()) {
return;
}
paths_buffer_[buffer_idx_] = std::move(path);
buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
}
void DeadlockInfoBuffer::Resize(uint32_t target_size) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
paths_buffer_ = Normalize();
// Drop the deadlocks that will no longer be needed ater the normalize
if (target_size < paths_buffer_.size()) {
paths_buffer_.erase(
paths_buffer_.begin(),
paths_buffer_.begin() + (paths_buffer_.size() - target_size));
buffer_idx_ = 0;
}
// Resize the buffer to the target size and restore the buffer's idx
else {
auto prev_size = paths_buffer_.size();
paths_buffer_.resize(target_size);
buffer_idx_ = (uint32_t)prev_size;
}
}
std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
auto working = paths_buffer_;
if (working.empty()) {
return working;
}
// Next write occurs at a nonexistent path's slot
if (paths_buffer_[buffer_idx_].empty()) {
working.resize(buffer_idx_);
} else {
std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
}
return working;
}
std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
// Reversing the normalized vector returns the latest deadlocks first
auto working = Normalize();
std::reverse(working.begin(), working.end());
return working;
}
namespace {
void UnrefLockMapsCache(void* ptr) {
// Called when a thread exits or a ThreadLocalPtr gets destroyed.

@ -27,21 +27,79 @@ struct LockInfo;
struct LockMap;
struct LockMapStripe;
struct DeadlockInfoBuffer {
template <class Path>
class DeadlockInfoBufferTempl {
private:
std::vector<DeadlockPath> paths_buffer_;
std::vector<Path> paths_buffer_;
uint32_t buffer_idx_;
std::mutex paths_buffer_mutex_;
std::vector<DeadlockPath> Normalize();
std::vector<Path> Normalize() {
auto working = paths_buffer_;
if (working.empty()) {
return working;
}
// Next write occurs at a nonexistent path's slot
if (paths_buffer_[buffer_idx_].empty()) {
working.resize(buffer_idx_);
} else {
std::rotate(working.begin(), working.begin() + buffer_idx_,
working.end());
}
return working;
}
public:
explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks)
explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks)
: paths_buffer_(n_latest_dlocks), buffer_idx_(0) {}
void AddNewPath(DeadlockPath path);
void Resize(uint32_t target_size);
std::vector<DeadlockPath> PrepareBuffer();
void AddNewPath(Path path) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
if (paths_buffer_.empty()) {
return;
}
paths_buffer_[buffer_idx_] = std::move(path);
buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
}
void Resize(uint32_t target_size) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
paths_buffer_ = Normalize();
// Drop the deadlocks that will no longer be needed ater the normalize
if (target_size < paths_buffer_.size()) {
paths_buffer_.erase(
paths_buffer_.begin(),
paths_buffer_.begin() + (paths_buffer_.size() - target_size));
buffer_idx_ = 0;
}
// Resize the buffer to the target size and restore the buffer's idx
else {
auto prev_size = paths_buffer_.size();
paths_buffer_.resize(target_size);
buffer_idx_ = (uint32_t)prev_size;
}
}
std::vector<Path> PrepareBuffer() {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
// Reversing the normalized vector returns the latest deadlocks first
auto working = Normalize();
std::reverse(working.begin(), working.end());
return working;
}
};
typedef DeadlockInfoBufferTempl<DeadlockPath> DeadlockInfoBuffer;
struct TrackedTrxInfo {
autovector<TransactionID> m_neighbors;
uint32_t m_cf_id;
@ -67,7 +125,11 @@ class PointLockManager : public LockManager {
return PointLockTrackerFactory::Get();
}
// Creates a new LockMap for this column family. Caller should guarantee
// that this column family does not already exist.
void AddColumnFamily(const ColumnFamilyHandle* cf) override;
// Deletes the LockMap for this column family. Caller should guarantee that
// this column family is no longer in use.
void RemoveColumnFamily(const ColumnFamilyHandle* cf) override;
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,

@ -5,80 +5,12 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/lock/point/point_lock_manager.h"
#include "file/file_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
#include "utilities/transactions/lock/point/point_lock_manager_test.h"
namespace ROCKSDB_NAMESPACE {
class MockColumnFamilyHandle : public ColumnFamilyHandle {
public:
explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {}
~MockColumnFamilyHandle() override {}
const std::string& GetName() const override { return name_; }
ColumnFamilyId GetID() const override { return cf_id_; }
Status GetDescriptor(ColumnFamilyDescriptor*) override {
return Status::OK();
}
const Comparator* GetComparator() const override { return nullptr; }
private:
ColumnFamilyId cf_id_;
std::string name_ = "MockCF";
};
class PointLockManagerTest : public testing::Test {
public:
void SetUp() override {
env_ = Env::Default();
db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
ASSERT_OK(env_->CreateDir(db_dir_));
mutex_factory_ = std::make_shared<TransactionDBMutexFactoryImpl>();
Options opt;
opt.create_if_missing = true;
TransactionDBOptions txn_opt;
txn_opt.transaction_lock_timeout = 0;
txn_opt.custom_mutex_factory = mutex_factory_;
ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_));
locker_.reset(new PointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txn_opt));
}
void TearDown() override {
delete db_;
EXPECT_OK(DestroyDir(env_, db_dir_));
}
PessimisticTransaction* NewTxn(
TransactionOptions txn_opt = TransactionOptions()) {
Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt);
return reinterpret_cast<PessimisticTransaction*>(txn);
}
protected:
Env* env_;
std::unique_ptr<PointLockManager> locker_;
private:
std::string db_dir_;
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
TransactionDB* db_;
};
// This test is not applicable for Range Lock manager as Range Lock Manager
// operates on Column Families, not their ids.
TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) {
MockColumnFamilyHandle cf(1024);
locker_->RemoveColumnFamily(&cf);
@ -121,6 +53,12 @@ TEST_F(PointLockManagerTest, LockStatus) {
}
}
// Cleanup
locker_->UnLock(txn1, 1024, "k1", env_);
locker_->UnLock(txn1, 2048, "k1", env_);
locker_->UnLock(txn2, 1024, "k2", env_);
locker_->UnLock(txn2, 2048, "k2", env_);
delete txn1;
delete txn2;
}
@ -136,6 +74,9 @@ TEST_F(PointLockManagerTest, UnlockExclusive) {
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
// Cleanup
locker_->UnLock(txn2, 1, "k", env_);
delete txn1;
delete txn2;
}
@ -151,162 +92,15 @@ TEST_F(PointLockManagerTest, UnlockShared) {
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true));
delete txn1;
delete txn2;
}
TEST_F(PointLockManagerTest, ReentrantExclusiveLock) {
// Tests that a txn can acquire exclusive lock on the same key repeatedly.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
delete txn;
}
TEST_F(PointLockManagerTest, ReentrantSharedLock) {
// Tests that a txn can acquire shared lock on the same key repeatedly.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
delete txn;
}
TEST_F(PointLockManagerTest, LockUpgrade) {
// Tests that a txn can upgrade from a shared lock to an exclusive lock.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
delete txn;
}
TEST_F(PointLockManagerTest, LockDowngrade) {
// Tests that a txn can acquire a shared lock after acquiring an exclusive
// lock on the same key.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
delete txn;
}
TEST_F(PointLockManagerTest, LockConflict) {
// Tests that lock conflicts lead to lock timeout.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
{
// exclusive-exclusive conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsTimedOut());
}
{
// exclusive-shared conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true));
auto s = locker_->TryLock(txn2, 1, "k2", env_, false);
ASSERT_TRUE(s.IsTimedOut());
}
{
// shared-exclusive conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false));
auto s = locker_->TryLock(txn2, 1, "k2", env_, true);
ASSERT_TRUE(s.IsTimedOut());
}
delete txn1;
delete txn2;
}
port::Thread BlockUntilWaitingTxn(std::function<void()> f) {
std::atomic<bool> reached(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PointLockManager::AcquireWithTimeout:WaitingTxn",
[&](void* /*arg*/) { reached.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Cleanup
locker_->UnLock(txn2, 1, "k", env_);
port::Thread t(f);
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
return t;
}
TEST_F(PointLockManagerTest, SharedLocks) {
// Tests that shared locks can be concurrently held by multiple transactions.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false));
delete txn1;
delete txn2;
}
TEST_F(PointLockManagerTest, Deadlock) {
// Tests that deadlock can be detected.
// Deadlock scenario:
// txn1 exclusively locks k1, and wants to lock k2;
// txn2 exclusively locks k2, and wants to lock k1.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = 1000000;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
// txn1 tries to lock k2, will block forever.
port::Thread t = BlockUntilWaitingTxn([&]() {
// block because txn2 is holding a lock on k2.
locker_->TryLock(txn1, 1, "k2", env_, true);
});
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsBusy());
ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
ASSERT_EQ(deadlock_paths.size(), 1u);
ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
ASSERT_EQ(deadlocks.size(), 2u);
ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[0].m_exclusive);
ASSERT_EQ(deadlocks[0].m_waiting_key, "k2");
ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[1].m_exclusive);
ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
locker_->UnLock(txn2, 1, "k2", env_);
t.join();
delete txn2;
delete txn1;
}
// This test doesn't work with Range Lock Manager, because Range Lock Manager
// doesn't support deadlock_detect_depth.
TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
// Tests that when detecting deadlock, if the detection depth is exceeded,
@ -332,7 +126,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
// it must have another txn waiting on it, which is txn4 in this case.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
port::Thread t1 = BlockUntilWaitingTxn([&]() {
port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() {
ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
// block because txn1 is holding a lock on k1.
locker_->TryLock(txn2, 1, "k1", env_, true);
@ -340,7 +134,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true));
port::Thread t2 = BlockUntilWaitingTxn([&]() {
port::Thread t2 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() {
// block because txn3 is holding a lock on k1.
locker_->TryLock(txn4, 1, "k3", env_, true);
});
@ -364,6 +158,9 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
delete txn1;
}
INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest,
::testing::Values(nullptr));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -0,0 +1,276 @@
#include "file/file_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "utilities/transactions/lock/point/point_lock_manager.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace ROCKSDB_NAMESPACE {
class MockColumnFamilyHandle : public ColumnFamilyHandle {
public:
explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {}
~MockColumnFamilyHandle() override {}
const std::string& GetName() const override { return name_; }
ColumnFamilyId GetID() const override { return cf_id_; }
Status GetDescriptor(ColumnFamilyDescriptor*) override {
return Status::OK();
}
const Comparator* GetComparator() const override {
return BytewiseComparator();
}
private:
ColumnFamilyId cf_id_;
std::string name_ = "MockCF";
};
class PointLockManagerTest : public testing::Test {
public:
void SetUp() override {
env_ = Env::Default();
db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
ASSERT_OK(env_->CreateDir(db_dir_));
Options opt;
opt.create_if_missing = true;
TransactionDBOptions txn_opt;
txn_opt.transaction_lock_timeout = 0;
ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_));
// CAUTION: This test creates a separate lock manager object (right, NOT
// the one that the TransactionDB is using!), and runs tests on it.
locker_.reset(new PointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txn_opt));
wait_sync_point_name_ = "PointLockManager::AcquireWithTimeout:WaitingTxn";
}
void TearDown() override {
delete db_;
EXPECT_OK(DestroyDir(env_, db_dir_));
}
PessimisticTransaction* NewTxn(
TransactionOptions txn_opt = TransactionOptions()) {
Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt);
return reinterpret_cast<PessimisticTransaction*>(txn);
}
protected:
Env* env_;
std::shared_ptr<LockManager> locker_;
const char* wait_sync_point_name_;
friend void PointLockManagerTestExternalSetup(PointLockManagerTest*);
private:
std::string db_dir_;
TransactionDB* db_;
};
typedef void (*init_func_t)(PointLockManagerTest*);
class AnyLockManagerTest : public PointLockManagerTest,
public testing::WithParamInterface<init_func_t> {
public:
void SetUp() override {
// If a custom setup function was provided, use it. Otherwise, use what we
// have inherited.
auto init_func = GetParam();
if (init_func)
(*init_func)(this);
else
PointLockManagerTest::SetUp();
}
};
TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) {
// Tests that a txn can acquire exclusive lock on the same key repeatedly.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
// Cleanup
locker_->UnLock(txn, 1, "k", env_);
delete txn;
}
TEST_P(AnyLockManagerTest, ReentrantSharedLock) {
// Tests that a txn can acquire shared lock on the same key repeatedly.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
// Cleanup
locker_->UnLock(txn, 1, "k", env_);
delete txn;
}
TEST_P(AnyLockManagerTest, LockUpgrade) {
// Tests that a txn can upgrade from a shared lock to an exclusive lock.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
// Cleanup
locker_->UnLock(txn, 1, "k", env_);
delete txn;
}
TEST_P(AnyLockManagerTest, LockDowngrade) {
// Tests that a txn can acquire a shared lock after acquiring an exclusive
// lock on the same key.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
// Cleanup
locker_->UnLock(txn, 1, "k", env_);
delete txn;
}
TEST_P(AnyLockManagerTest, LockConflict) {
// Tests that lock conflicts lead to lock timeout.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
{
// exclusive-exclusive conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsTimedOut());
}
{
// exclusive-shared conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true));
auto s = locker_->TryLock(txn2, 1, "k2", env_, false);
ASSERT_TRUE(s.IsTimedOut());
}
{
// shared-exclusive conflict.
ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false));
auto s = locker_->TryLock(txn2, 1, "k2", env_, true);
ASSERT_TRUE(s.IsTimedOut());
}
// Cleanup
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn1, 1, "k2", env_);
delete txn1;
delete txn2;
}
port::Thread BlockUntilWaitingTxn(const char* sync_point_name,
std::function<void()> f) {
std::atomic<bool> reached(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
sync_point_name, [&](void* /*arg*/) { reached.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread t(f);
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
return t;
}
TEST_P(AnyLockManagerTest, SharedLocks) {
// Tests that shared locks can be concurrently held by multiple transactions.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false));
// Cleanup
locker_->UnLock(txn1, 1, "k", env_);
locker_->UnLock(txn2, 1, "k", env_);
delete txn1;
delete txn2;
}
TEST_P(AnyLockManagerTest, Deadlock) {
// Tests that deadlock can be detected.
// Deadlock scenario:
// txn1 exclusively locks k1, and wants to lock k2;
// txn2 exclusively locks k2, and wants to lock k1.
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = 1000000;
auto txn1 = NewTxn(txn_opt);
auto txn2 = NewTxn(txn_opt);
ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true));
ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true));
// txn1 tries to lock k2, will block forever.
port::Thread t = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() {
// block because txn2 is holding a lock on k2.
locker_->TryLock(txn1, 1, "k2", env_, true);
});
auto s = locker_->TryLock(txn2, 1, "k1", env_, true);
ASSERT_TRUE(s.IsBusy());
ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock);
std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer();
ASSERT_EQ(deadlock_paths.size(), 1u);
ASSERT_FALSE(deadlock_paths[0].limit_exceeded);
std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path;
ASSERT_EQ(deadlocks.size(), 2u);
ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID());
ASSERT_EQ(deadlocks[0].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[0].m_exclusive);
ASSERT_EQ(deadlocks[0].m_waiting_key, "k2");
ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID());
ASSERT_EQ(deadlocks[1].m_cf_id, 1u);
ASSERT_TRUE(deadlocks[1].m_exclusive);
ASSERT_EQ(deadlocks[1].m_waiting_key, "k1");
locker_->UnLock(txn2, 1, "k2", env_);
t.join();
// Cleanup
locker_->UnLock(txn1, 1, "k1", env_);
locker_->UnLock(txn1, 1, "k2", env_);
delete txn2;
delete txn1;
}
} // namespace ROCKSDB_NAMESPACE

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/lock/point/point_lock_tracker.h"
namespace ROCKSDB_NAMESPACE {
@ -264,3 +266,5 @@ LockTracker::KeyIterator* PointLockTracker::GetKeyIterator(
void PointLockTracker::Clear() { tracked_keys_.clear(); }
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <memory>
#include <string>
@ -95,3 +96,4 @@ class PointLockTrackerFactory : public LockTrackerFactory {
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -0,0 +1,30 @@
//
// Generic definitions for a Range-based Lock Manager
//
#pragma once
#ifndef ROCKSDB_LITE
#include "utilities/transactions/lock/lock_manager.h"
namespace ROCKSDB_NAMESPACE {
/*
A base class for all Range-based lock managers
See also class RangeLockManagerHandle in
include/rocksdb/utilities/transaction_db.h
*/
class RangeLockManagerBase : public LockManager {
public:
// Geting a point lock is reduced to getting a range lock on a single-point
// range
using LockManager::TryLock;
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const std::string& key, Env* env, bool exclusive) override {
Endpoint endp(key.data(), key.size(), false);
return TryLock(txn, column_family_id, endp, endp, env, exclusive);
}
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -564,9 +564,20 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
}
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
bool previously_locked = status.locked;
bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
PointLockStatus status;
bool lock_upgrade;
bool previously_locked;
if (tracked_locks_->IsPointLockSupported()) {
status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
previously_locked = status.locked;
lock_upgrade = previously_locked && exclusive && !status.exclusive;
} else {
// If the record is tracked, we can assume it was locked, too.
previously_locked = assume_tracked;
status.locked = false;
lock_upgrade = false;
}
// Lock this key if this transactions hasn't already locked it or we require
// an upgrade.
@ -585,7 +596,8 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
SequenceNumber tracked_at_seq =
status.locked ? status.seq : kMaxSequenceNumber;
if (!do_validate || snapshot_ == nullptr) {
if (assume_tracked && !previously_locked) {
if (assume_tracked && !previously_locked &&
tracked_locks_->IsPointLockSupported()) {
s = Status::InvalidArgument(
"assume_tracked is set but it is not tracked yet");
}
@ -642,11 +654,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
} else {
#ifndef NDEBUG
PointLockStatus lock_status =
tracked_locks_->GetPointLockStatus(cfh_id, key_str);
assert(lock_status.locked);
assert(lock_status.seq <= tracked_at_seq);
assert(lock_status.exclusive == exclusive);
if (tracked_locks_->IsPointLockSupported()) {
PointLockStatus lock_status =
tracked_locks_->GetPointLockStatus(cfh_id, key_str);
assert(lock_status.locked);
assert(lock_status.seq <= tracked_at_seq);
assert(lock_status.exclusive == exclusive);
}
#endif
}
}
@ -654,6 +668,22 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
return s;
}
Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
const Endpoint& start_endp,
const Endpoint& end_endp) {
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
uint32_t cfh_id = GetColumnFamilyID(cfh);
Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
if (s.ok()) {
RangeLockRequest req{cfh_id, start_endp, end_endp};
tracked_locks_->Track(req);
}
return s;
}
// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
// tracked_at_seq is the global seq at which we either locked the key or already

@ -116,6 +116,10 @@ class PessimisticTransaction : public TransactionBaseImpl {
int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
virtual Status GetRangeLock(ColumnFamilyHandle* column_family,
const Endpoint& start_key,
const Endpoint& end_key) override;
protected:
// Refer to
// TransactionOptions::use_only_the_last_commit_time_batch_for_recovery

@ -391,6 +391,14 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
uint32_t cfh_id,
const Endpoint& start_endp,
const Endpoint& end_endp) {
return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
/*exclusive=*/true);
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
const LockTracker& keys) {
lock_manager_->UnLock(txn, keys, GetEnv());

@ -21,6 +21,7 @@
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/lock/lock_manager.h"
#include "utilities/transactions/lock/range/range_lock_manager.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn.h"
@ -98,6 +99,8 @@ class PessimisticTransactionDB : public TransactionDB {
Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key, bool exclusive);
Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
const Endpoint& start_endp, const Endpoint& end_endp);
void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
@ -172,7 +175,7 @@ class PessimisticTransactionDB : public TransactionDB {
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
std::unique_ptr<LockManager> lock_manager_;
std::shared_ptr<LockManager> lock_manager_;
// Must be held when adding/dropping column families.
InstrumentedMutex column_family_mutex_;

Loading…
Cancel
Save