Abstract out LockManager interface (#7532)

Summary:
In order to be able to introduce more locking protocols, we need to abstract out the locking subsystem in TransactionDB into a set of interfaces.

PR https://github.com/facebook/rocksdb/pull/7013 introduces interface `LockTracker`. This PR is a follow up to take the first step to abstract out a `LockManager` interface.

Further modifications to the interface may be needed when introducing the first implementation of range lock. But the idea here is to put the range lock implementation based on range tree under the `utilities/transactions/lock/range/range_tree`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7532

Test Plan: point_lock_manager_test

Reviewed By: ajkr

Differential Revision: D24238731

Pulled By: cheng-chang

fbshipit-source-id: 2a9458cd8b3fb008d9529dbc4d3b28c24631f463
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent ed90e2a450
commit 0ea7db768e
  1. 8
      CMakeLists.txt
  2. 4
      Makefile
  3. 26
      TARGETS
  4. 2
      include/rocksdb/types.h
  5. 5
      include/rocksdb/utilities/transaction.h
  6. 8
      include/rocksdb/utilities/transaction_db.h
  7. 8
      src.mk
  8. 23
      utilities/transactions/lock/lock_manager.cc
  9. 82
      utilities/transactions/lock/lock_manager.h
  10. 17
      utilities/transactions/lock/lock_tracker.cc
  11. 14
      utilities/transactions/lock/lock_tracker.h
  12. 134
      utilities/transactions/lock/point/point_lock_manager.cc
  13. 70
      utilities/transactions/lock/point/point_lock_manager.h
  14. 102
      utilities/transactions/lock/point/point_lock_manager_test.cc
  15. 2
      utilities/transactions/lock/point/point_lock_tracker.cc
  16. 13
      utilities/transactions/lock/point/point_lock_tracker.h
  17. 1
      utilities/transactions/lock/range/range_tree/TODO
  18. 7
      utilities/transactions/optimistic_transaction.cc
  19. 7
      utilities/transactions/pessimistic_transaction.cc
  20. 2
      utilities/transactions/pessimistic_transaction.h
  21. 39
      utilities/transactions/pessimistic_transaction_db.cc
  22. 12
      utilities/transactions/pessimistic_transaction_db.h
  23. 13
      utilities/transactions/transaction_base.cc
  24. 13
      utilities/transactions/transaction_base.h
  25. 8
      utilities/transactions/transaction_test.cc
  26. 1
      utilities/transactions/write_prepared_txn_db.h

@ -810,8 +810,9 @@ set(SOURCES
utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc
utilities/transactions/lock/lock_tracker.cc
utilities/transactions/lock/point_lock_tracker.cc
utilities/transactions/lock/lock_manager.cc
utilities/transactions/lock/point/point_lock_tracker.cc
utilities/transactions/lock/point/point_lock_manager.cc
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction.cc
utilities/transactions/pessimistic_transaction.cc
@ -819,7 +820,6 @@ set(SOURCES
utilities/transactions/snapshot_checker.cc
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_txn.cc
utilities/transactions/write_prepared_txn_db.cc
@ -1204,7 +1204,7 @@ if(WITH_TESTS)
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
utilities/transactions/optimistic_transaction_test.cc
utilities/transactions/transaction_test.cc
utilities/transactions/transaction_lock_mgr_test.cc
utilities/transactions/lock/point/point_lock_manager_test.cc
utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc
utilities/ttl/ttl_test.cc

@ -553,7 +553,7 @@ PARALLEL_TEST = \
persistent_cache_test \
table_test \
transaction_test \
transaction_lock_mgr_test \
point_lock_manager_test \
write_prepared_transaction_test \
write_unprepared_transaction_test \
@ -1842,7 +1842,7 @@ write_callback_test: $(OBJ_DIR)/db/write_callback_test.o $(TEST_LIBRARY) $(LIBRA
heap_test: $(OBJ_DIR)/util/heap_test.o $(GTEST)
$(AM_LINK)
transaction_lock_mgr_test: utilities/transactions/transaction_lock_mgr_test.o $(TEST_LIBRARY) $(LIBRARY)
point_lock_manager_test: utilities/transactions/lock/point/point_lock_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY)

@ -384,8 +384,9 @@ cpp_library(
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
"utilities/transactions/lock/lock_tracker.cc",
"utilities/transactions/lock/point_lock_tracker.cc",
"utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
@ -393,7 +394,6 @@ cpp_library(
"utilities/transactions/snapshot_checker.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc",
"utilities/transactions/write_prepared_txn_db.cc",
@ -673,8 +673,9 @@ cpp_library(
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
"utilities/transactions/lock/lock_tracker.cc",
"utilities/transactions/lock/point_lock_tracker.cc",
"utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
@ -682,7 +683,6 @@ cpp_library(
"utilities/transactions/snapshot_checker.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc",
"utilities/transactions/write_prepared_txn_db.cc",
@ -1727,6 +1727,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"point_lock_manager_test",
"utilities/transactions/lock/point/point_lock_manager_test.cc",
"parallel",
[],
[],
],
[
"prefetch_test",
"file/prefetch_test.cc",
@ -1916,13 +1923,6 @@ ROCKS_TESTS = [
[],
[],
],
[
"transaction_lock_mgr_test",
"utilities/transactions/transaction_lock_mgr_test.cc",
"parallel",
[],
[],
],
[
"transaction_test",
"utilities/transactions/transaction_test.cc",

@ -12,6 +12,8 @@ namespace ROCKSDB_NAMESPACE {
// Define all public custom types here.
using ColumnFamilyId = uint32_t;
// Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber;

@ -24,6 +24,11 @@ using TransactionName = std::string;
using TransactionID = uint64_t;
// An endpoint for a range of keys.
class Endpoint {
// TODO
};
// Provides notification to the caller of SetSnapshotOnNextOperation when
// the actual snapshot gets created
class TransactionNotifier {

@ -202,6 +202,13 @@ struct KeyLockInfo {
bool exclusive;
};
struct RangeLockInfo {
Endpoint start;
Endpoint end;
std::vector<TransactionID> ids;
bool exclusive;
};
struct DeadlockInfo {
TransactionID m_txn_id;
uint32_t m_cf_id;
@ -296,6 +303,7 @@ class TransactionDB : public StackableDB {
// The mapping is column family id -> KeyLockInfo
virtual std::unordered_multimap<uint32_t, KeyLockInfo>
GetLockStatusData() = 0;
virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0;
virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0;

@ -251,8 +251,9 @@ LIB_SOURCES = \
utilities/simulator_cache/sim_cache.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/trace/file_trace_reader_writer.cc \
utilities/transactions/lock/lock_tracker.cc \
utilities/transactions/lock/point_lock_tracker.cc \
utilities/transactions/lock/lock_manager.cc \
utilities/transactions/lock/point/point_lock_tracker.cc \
utilities/transactions/lock/point/point_lock_manager.cc \
utilities/transactions/optimistic_transaction.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/pessimistic_transaction.cc \
@ -260,7 +261,6 @@ LIB_SOURCES = \
utilities/transactions/snapshot_checker.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_util.cc \
utilities/transactions/write_prepared_txn.cc \
utilities/transactions/write_prepared_txn_db.cc \
@ -522,7 +522,7 @@ TEST_MAIN_SOURCES = \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
utilities/transactions/transaction_test.cc \
utilities/transactions/transaction_lock_mgr_test.cc \
utilities/transactions/lock/point/point_lock_manager_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \
utilities/transactions/write_unprepared_transaction_test.cc \
utilities/ttl/ttl_test.cc \

@ -0,0 +1,23 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/lock/lock_manager.h"
#include "utilities/transactions/lock/point/point_lock_manager.h"
namespace ROCKSDB_NAMESPACE {
LockManager* NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt) {
assert(db);
// TODO: determine the lock manager implementation based on configuration.
return new PointLockManager(db, opt);
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -0,0 +1,82 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/lock/lock_tracker.h"
#include "utilities/transactions/pessimistic_transaction.h"
namespace ROCKSDB_NAMESPACE {
class PessimisticTransactionDB;
class LockManager {
public:
virtual ~LockManager() {}
// Whether supports locking a specific key.
virtual bool IsPointLockSupported() const = 0;
// Whether supports locking a range of keys.
virtual bool IsRangeLockSupported() const = 0;
// Locks acquired through this LockManager should be tracked by
// the LockTrackers created through the returned factory.
virtual const LockTrackerFactory& GetLockTrackerFactory() const = 0;
// Enable locking for the specified column family.
// Caller should guarantee that this column family is not already enabled.
virtual void AddColumnFamily(const ColumnFamilyHandle* cf) = 0;
// Disable locking for the specified column family.
// Caller should guarantee that this column family is no longer used.
virtual void RemoveColumnFamily(const ColumnFamilyHandle* cf) = 0;
// Attempt to lock a key or a key range. If OK status is returned, the caller
// is responsible for calling UnLock() on this key.
virtual Status TryLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id,
const std::string& key, Env* env, bool exclusive) = 0;
// The range [start, end] are inclusive at both sides.
virtual Status TryLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id, const Endpoint& start,
const Endpoint& end, Env* env, bool exclusive) = 0;
// Unlock a key or a range locked by TryLock(). txn must be the same
// Transaction that locked this key.
virtual void UnLock(PessimisticTransaction* txn, const LockTracker& tracker,
Env* env) = 0;
virtual void UnLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id, const std::string& key,
Env* env) = 0;
virtual void UnLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id, const Endpoint& start,
const Endpoint& end, Env* env) = 0;
using PointLockStatus = std::unordered_multimap<ColumnFamilyId, KeyLockInfo>;
virtual PointLockStatus GetPointLockStatus() = 0;
using RangeLockStatus =
std::unordered_multimap<ColumnFamilyId, RangeLockInfo>;
virtual RangeLockStatus GetRangeLockStatus() = 0;
virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0;
virtual void Resize(uint32_t new_size) = 0;
};
// LockManager should always be constructed through this factory method,
// instead of constructing through concrete implementations' constructor.
// Caller owns the returned pointer.
LockManager* NewLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt);
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -1,17 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/transactions/lock/lock_tracker.h"
#include "utilities/transactions/lock/point_lock_tracker.h"
namespace ROCKSDB_NAMESPACE {
LockTracker* NewLockTracker() {
// TODO: determine the lock tracker implementation based on configuration.
return new PointLockTracker();
}
} // namespace ROCKSDB_NAMESPACE

@ -13,8 +13,6 @@
namespace ROCKSDB_NAMESPACE {
using ColumnFamilyId = uint32_t;
// Request for locking a single key.
struct PointLockRequest {
// The id of the key's column family.
@ -191,9 +189,13 @@ class LockTracker {
ColumnFamilyId /*column_family_id*/) const = 0;
};
// LockTracker should always be constructed through this factory method,
// instead of constructing through concrete implementations' constructor.
// Caller owns the returned pointer.
LockTracker* NewLockTracker();
// LockTracker should always be constructed through this factory.
// Each LockManager owns a LockTrackerFactory.
class LockTrackerFactory {
public:
// Caller owns the returned pointer.
virtual LockTracker* Create() const = 0;
virtual ~LockTrackerFactory() {}
};
} // namespace ROCKSDB_NAMESPACE

@ -5,10 +5,10 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/lock/point/point_lock_manager.h"
#include <cinttypes>
#include <algorithm>
#include <cinttypes>
#include <mutex>
#include "monitoring/perf_context_imp.h"
@ -19,6 +19,7 @@
#include "util/hash.h"
#include "util/thread_local.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace ROCKSDB_NAMESPACE {
@ -37,6 +38,11 @@ struct LockInfo {
: exclusive(lock_info.exclusive),
txn_ids(lock_info.txn_ids),
expiration_time(lock_info.expiration_time) {}
void operator=(const LockInfo& lock_info) {
exclusive = lock_info.exclusive;
txn_ids = lock_info.txn_ids;
expiration_time = lock_info.expiration_time;
}
};
struct LockMapStripe {
@ -80,7 +86,7 @@ struct LockMap {
const size_t num_stripes_;
// Count of keys that are currently locked in this column family.
// (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
// (Only maintained if PointLockManager::max_num_locks_ is positive.)
std::atomic<int64_t> lock_cnt{0};
std::vector<LockMapStripe*> lock_map_stripes_;
@ -155,47 +161,44 @@ void UnrefLockMapsCache(void* ptr) {
}
} // anonymous namespace
TransactionLockMgr::TransactionLockMgr(
TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
uint32_t max_num_deadlocks,
std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
: txn_db_impl_(nullptr),
default_num_stripes_(default_num_stripes),
max_num_locks_(max_num_locks),
PointLockManager::PointLockManager(PessimisticTransactionDB* txn_db,
const TransactionDBOptions& opt)
: txn_db_impl_(txn_db),
default_num_stripes_(opt.num_stripes),
max_num_locks_(opt.max_num_locks),
lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
dlock_buffer_(max_num_deadlocks),
mutex_factory_(mutex_factory) {
assert(txn_db);
txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
}
dlock_buffer_(opt.max_num_deadlocks),
mutex_factory_(opt.custom_mutex_factory
? opt.custom_mutex_factory
: std::make_shared<TransactionDBMutexFactoryImpl>()) {}
TransactionLockMgr::~TransactionLockMgr() {}
PointLockManager::~PointLockManager() {}
size_t LockMap::GetStripe(const std::string& key) const {
assert(num_stripes_ > 0);
return FastRange64(GetSliceNPHash64(key), num_stripes_);
}
void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
void PointLockManager::AddColumnFamily(const ColumnFamilyHandle* cf) {
InstrumentedMutexLock l(&lock_map_mutex_);
if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
lock_maps_.emplace(column_family_id,
std::make_shared<LockMap>(default_num_stripes_, mutex_factory_));
if (lock_maps_.find(cf->GetID()) == lock_maps_.end()) {
lock_maps_.emplace(cf->GetID(), std::make_shared<LockMap>(
default_num_stripes_, mutex_factory_));
} else {
// column_family already exists in lock map
assert(false);
}
}
void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
void PointLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cf) {
// Remove lock_map for this column family. Since the lock map is stored
// as a shared ptr, concurrent transactions can still keep using it
// until they release their references to it.
{
InstrumentedMutexLock l(&lock_map_mutex_);
auto lock_maps_iter = lock_maps_.find(column_family_id);
auto lock_maps_iter = lock_maps_.find(cf->GetID());
if (lock_maps_iter == lock_maps_.end()) {
return;
}
@ -214,8 +217,8 @@ void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
// Look up the LockMap std::shared_ptr for a given column_family_id.
// Note: The LockMap is only valid as long as the caller is still holding on
// to the returned std::shared_ptr.
std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
uint32_t column_family_id) {
std::shared_ptr<LockMap> PointLockManager::GetLockMap(
ColumnFamilyId column_family_id) {
// First check thread-local cache
if (lock_maps_cache_->Get() == nullptr) {
lock_maps_cache_->Reset(new LockMaps());
@ -248,9 +251,9 @@ std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
// transaction.
// If false, sets *expire_time to the expiration time of the lock according
// to Env->GetMicros() or 0 if no expiration.
bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
const LockInfo& lock_info, Env* env,
uint64_t* expire_time) {
bool PointLockManager::IsLockExpired(TransactionID txn_id,
const LockInfo& lock_info, Env* env,
uint64_t* expire_time) {
if (lock_info.expiration_time == 0) {
*expire_time = 0;
return false;
@ -279,10 +282,10 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
return expired;
}
Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
uint32_t column_family_id,
const std::string& key, Env* env,
bool exclusive) {
Status PointLockManager::TryLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id,
const std::string& key, Env* env,
bool exclusive) {
// Lookup lock map for this column family id
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get();
@ -307,9 +310,9 @@ Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
}
// Helper function for TryLock().
Status TransactionLockMgr::AcquireWithTimeout(
Status PointLockManager::AcquireWithTimeout(
PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
uint32_t column_family_id, const std::string& key, Env* env,
ColumnFamilyId column_family_id, const std::string& key, Env* env,
int64_t timeout, LockInfo&& lock_info) {
Status result;
uint64_t end_time = 0;
@ -370,7 +373,7 @@ Status TransactionLockMgr::AcquireWithTimeout(
txn->SetWaitingTxn(wait_ids, column_family_id, &key);
}
TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn");
if (cv_end_time < 0) {
// Wait indefinitely
result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
@ -408,14 +411,14 @@ Status TransactionLockMgr::AcquireWithTimeout(
return result;
}
void TransactionLockMgr::DecrementWaiters(
void PointLockManager::DecrementWaiters(
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_ids);
}
void TransactionLockMgr::DecrementWaitersImpl(
void PointLockManager::DecrementWaitersImpl(
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID();
@ -430,7 +433,7 @@ void TransactionLockMgr::DecrementWaitersImpl(
}
}
bool TransactionLockMgr::IncrementWaiters(
bool PointLockManager::IncrementWaiters(
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids, const std::string& key,
const uint32_t& cf_id, const bool& exclusive, Env* const env) {
@ -513,12 +516,11 @@ bool TransactionLockMgr::IncrementWaiters(
// Sets *expire_time to the expiration time in microseconds
// or 0 if no expiration.
// REQUIRED: Stripe mutex must be held.
Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
LockMapStripe* stripe,
const std::string& key, Env* env,
LockInfo&& txn_lock_info,
uint64_t* expire_time,
autovector<TransactionID>* txn_ids) {
Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
const std::string& key, Env* env,
LockInfo&& txn_lock_info,
uint64_t* expire_time,
autovector<TransactionID>* txn_ids) {
assert(txn_lock_info.txn_ids.size() == 1);
Status result;
@ -580,10 +582,9 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
return result;
}
void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
const std::string& key,
LockMapStripe* stripe, LockMap* lock_map,
Env* env) {
void PointLockManager::UnLockKey(PessimisticTransaction* txn,
const std::string& key, LockMapStripe* stripe,
LockMap* lock_map, Env* env) {
#ifdef NDEBUG
(void)env;
#endif
@ -619,9 +620,9 @@ void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
}
}
void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
uint32_t column_family_id,
const std::string& key, Env* env) {
void PointLockManager::UnLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id,
const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get();
if (lock_map == nullptr) {
@ -642,8 +643,8 @@ void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
stripe->stripe_cv->NotifyAll();
}
void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
const LockTracker& tracker, Env* env) {
void PointLockManager::UnLock(PessimisticTransaction* txn,
const LockTracker& tracker, Env* env) {
std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
tracker.GetColumnFamilyIterator());
assert(cf_it != nullptr);
@ -690,8 +691,8 @@ void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
}
}
TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
LockStatusData data;
PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() {
PointLockStatus data;
// Lock order here is important. The correct order is lock_map_mutex_, then
// for every column family ID in ascending order lock every stripe in
// ascending order.
@ -730,13 +731,34 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
return data;
}
std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
std::vector<DeadlockPath> PointLockManager::GetDeadlockInfoBuffer() {
return dlock_buffer_.PrepareBuffer();
}
void TransactionLockMgr::Resize(uint32_t target_size) {
void PointLockManager::Resize(uint32_t target_size) {
dlock_buffer_.Resize(target_size);
}
PointLockManager::RangeLockStatus PointLockManager::GetRangeLockStatus() {
return {};
}
Status PointLockManager::TryLock(PessimisticTransaction* /* txn */,
ColumnFamilyId /* cf_id */,
const Endpoint& /* start */,
const Endpoint& /* end */, Env* /* env */,
bool /* exclusive */) {
return Status::NotSupported(
"PointLockManager does not support range locking");
}
void PointLockManager::UnLock(PessimisticTransaction* /* txn */,
ColumnFamilyId /* cf_id */,
const Endpoint& /* start */,
const Endpoint& /* end */, Env* /* env */) {
// no-op
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -6,9 +6,9 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <memory>
#include <string>
#include <unordered_map>
#include <memory>
#include <utility>
#include <vector>
@ -17,7 +17,8 @@
#include "util/autovector.h"
#include "util/hash_map.h"
#include "util/thread_local.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/lock/lock_manager.h"
#include "utilities/transactions/lock/point/point_lock_tracker.h"
namespace ROCKSDB_NAMESPACE {
@ -48,44 +49,47 @@ struct TrackedTrxInfo {
std::string m_waiting_key;
};
class Slice;
class PessimisticTransactionDB;
class TransactionLockMgr {
class PointLockManager : public LockManager {
public:
TransactionLockMgr(TransactionDB* txn_db, size_t default_num_stripes,
int64_t max_num_locks, uint32_t max_num_deadlocks,
std::shared_ptr<TransactionDBMutexFactory> factory);
PointLockManager(PessimisticTransactionDB* db,
const TransactionDBOptions& opt);
// No copying allowed
TransactionLockMgr(const TransactionLockMgr&) = delete;
void operator=(const TransactionLockMgr&) = delete;
PointLockManager(const PointLockManager&) = delete;
PointLockManager& operator=(const PointLockManager&) = delete;
~PointLockManager() override;
bool IsPointLockSupported() const override { return true; }
bool IsRangeLockSupported() const override { return false; }
const LockTrackerFactory& GetLockTrackerFactory() const override {
return PointLockTrackerFactory::Get();
}
void AddColumnFamily(const ColumnFamilyHandle* cf) override;
void RemoveColumnFamily(const ColumnFamilyHandle* cf) override;
~TransactionLockMgr();
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const std::string& key, Env* env, bool exclusive) override;
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const Endpoint& start, const Endpoint& end, Env* env,
bool exclusive) override;
// Creates a new LockMap for this column family. Caller should guarantee
// that this column family does not already exist.
void AddColumnFamily(uint32_t column_family_id);
void UnLock(PessimisticTransaction* txn, const LockTracker& tracker,
Env* env) override;
void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const std::string& key, Env* env) override;
void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const Endpoint& start, const Endpoint& end, Env* env) override;
// Deletes the LockMap for this column family. Caller should guarantee that
// this column family is no longer in use.
void RemoveColumnFamily(uint32_t column_family_id);
PointLockStatus GetPointLockStatus() override;
// Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key.
Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id,
const std::string& key, Env* env, bool exclusive);
RangeLockStatus GetRangeLockStatus() override;
// Unlock a key locked by TryLock(). txn must be the same Transaction that
// locked this key.
void UnLock(const PessimisticTransaction* txn, const LockTracker& tracker,
Env* env);
void UnLock(PessimisticTransaction* txn, uint32_t column_family_id,
const std::string& key, Env* env);
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>;
LockStatusData GetLockStatusData();
std::vector<DeadlockPath> GetDeadlockInfoBuffer();
void Resize(uint32_t);
void Resize(uint32_t new_size) override;
private:
PessimisticTransactionDB* txn_db_impl_;
@ -140,7 +144,7 @@ class TransactionLockMgr {
LockInfo&& lock_info, uint64_t* wait_time,
autovector<TransactionID>* txn_ids);
void UnLockKey(const PessimisticTransaction* txn, const std::string& key,
void UnLockKey(PessimisticTransaction* txn, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, Env* env);
bool IncrementWaiters(const PessimisticTransaction* txn,

@ -5,7 +5,7 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/lock/point/point_lock_manager.h"
#include "file/file_util.h"
#include "port/port.h"
@ -13,15 +13,37 @@
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace ROCKSDB_NAMESPACE {
class TransactionLockMgrTest : public testing::Test {
class MockColumnFamilyHandle : public ColumnFamilyHandle {
public:
explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {}
~MockColumnFamilyHandle() override {}
const std::string& GetName() const override { return name_; }
ColumnFamilyId GetID() const override { return cf_id_; }
Status GetDescriptor(ColumnFamilyDescriptor*) override {
return Status::OK();
}
const Comparator* GetComparator() const override { return nullptr; }
private:
ColumnFamilyId cf_id_;
std::string name_ = "MockCF";
};
class PointLockManagerTest : public testing::Test {
public:
void SetUp() override {
env_ = Env::Default();
db_dir_ = test::PerThreadDBPath("transaction_lock_mgr_test");
db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
ASSERT_OK(env_->CreateDir(db_dir_));
mutex_factory_ = std::make_shared<TransactionDBMutexFactoryImpl>();
@ -29,11 +51,11 @@ class TransactionLockMgrTest : public testing::Test {
opt.create_if_missing = true;
TransactionDBOptions txn_opt;
txn_opt.transaction_lock_timeout = 0;
txn_opt.custom_mutex_factory = mutex_factory_;
ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_));
locker_.reset(
new TransactionLockMgr(db_, txn_opt.num_stripes, txn_opt.max_num_locks,
txn_opt.max_num_deadlocks, mutex_factory_));
locker_.reset(new PointLockManager(
static_cast<PessimisticTransactionDB*>(db_), txn_opt));
}
void TearDown() override {
@ -49,7 +71,7 @@ class TransactionLockMgrTest : public testing::Test {
protected:
Env* env_;
std::unique_ptr<TransactionLockMgr> locker_;
std::unique_ptr<PointLockManager> locker_;
private:
std::string db_dir_;
@ -57,8 +79,9 @@ class TransactionLockMgrTest : public testing::Test {
TransactionDB* db_;
};
TEST_F(TransactionLockMgrTest, LockNonExistingColumnFamily) {
locker_->RemoveColumnFamily(1024);
TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) {
MockColumnFamilyHandle cf(1024);
locker_->RemoveColumnFamily(&cf);
auto txn = NewTxn();
auto s = locker_->TryLock(txn, 1024, "k", env_, true);
ASSERT_TRUE(s.IsInvalidArgument());
@ -66,9 +89,10 @@ TEST_F(TransactionLockMgrTest, LockNonExistingColumnFamily) {
delete txn;
}
TEST_F(TransactionLockMgrTest, LockStatus) {
locker_->AddColumnFamily(1024);
locker_->AddColumnFamily(2048);
TEST_F(PointLockManagerTest, LockStatus) {
MockColumnFamilyHandle cf1(1024), cf2(2048);
locker_->AddColumnFamily(&cf1);
locker_->AddColumnFamily(&cf2);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true));
@ -78,7 +102,7 @@ TEST_F(TransactionLockMgrTest, LockStatus) {
ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false));
ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false));
auto s = locker_->GetLockStatusData();
auto s = locker_->GetPointLockStatus();
ASSERT_EQ(s.size(), 4u);
for (uint32_t cf_id : {1024, 2048}) {
ASSERT_EQ(s.count(cf_id), 2u);
@ -101,8 +125,9 @@ TEST_F(TransactionLockMgrTest, LockStatus) {
delete txn2;
}
TEST_F(TransactionLockMgrTest, UnlockExclusive) {
locker_->AddColumnFamily(1);
TEST_F(PointLockManagerTest, UnlockExclusive) {
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true));
@ -115,8 +140,9 @@ TEST_F(TransactionLockMgrTest, UnlockExclusive) {
delete txn2;
}
TEST_F(TransactionLockMgrTest, UnlockShared) {
locker_->AddColumnFamily(1);
TEST_F(PointLockManagerTest, UnlockShared) {
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
@ -129,46 +155,51 @@ TEST_F(TransactionLockMgrTest, UnlockShared) {
delete txn2;
}
TEST_F(TransactionLockMgrTest, ReentrantExclusiveLock) {
TEST_F(PointLockManagerTest, ReentrantExclusiveLock) {
// Tests that a txn can acquire exclusive lock on the same key repeatedly.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
delete txn;
}
TEST_F(TransactionLockMgrTest, ReentrantSharedLock) {
TEST_F(PointLockManagerTest, ReentrantSharedLock) {
// Tests that a txn can acquire shared lock on the same key repeatedly.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
delete txn;
}
TEST_F(TransactionLockMgrTest, LockUpgrade) {
TEST_F(PointLockManagerTest, LockUpgrade) {
// Tests that a txn can upgrade from a shared lock to an exclusive lock.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
delete txn;
}
TEST_F(TransactionLockMgrTest, LockDowngrade) {
TEST_F(PointLockManagerTest, LockDowngrade) {
// Tests that a txn can acquire a shared lock after acquiring an exclusive
// lock on the same key.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn = NewTxn();
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true));
ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false));
delete txn;
}
TEST_F(TransactionLockMgrTest, LockConflict) {
TEST_F(PointLockManagerTest, LockConflict) {
// Tests that lock conflicts lead to lock timeout.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
@ -200,7 +231,7 @@ TEST_F(TransactionLockMgrTest, LockConflict) {
port::Thread BlockUntilWaitingTxn(std::function<void()> f) {
std::atomic<bool> reached(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
"PointLockManager::AcquireWithTimeout:WaitingTxn",
[&](void* /*arg*/) { reached.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -215,9 +246,10 @@ port::Thread BlockUntilWaitingTxn(std::function<void()> f) {
return t;
}
TEST_F(TransactionLockMgrTest, SharedLocks) {
TEST_F(PointLockManagerTest, SharedLocks) {
// Tests that shared locks can be concurrently held by multiple transactions.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
@ -226,12 +258,13 @@ TEST_F(TransactionLockMgrTest, SharedLocks) {
delete txn2;
}
TEST_F(TransactionLockMgrTest, Deadlock) {
TEST_F(PointLockManagerTest, Deadlock) {
// Tests that deadlock can be detected.
// Deadlock scenario:
// txn1 exclusively locks k1, and wants to lock k2;
// txn2 exclusively locks k2, and wants to lock k1.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.lock_timeout = 1000000;
@ -275,10 +308,11 @@ TEST_F(TransactionLockMgrTest, Deadlock) {
delete txn1;
}
TEST_F(TransactionLockMgrTest, DeadlockDepthExceeded) {
TEST_F(PointLockManagerTest, DeadlockDepthExceeded) {
// Tests that when detecting deadlock, if the detection depth is exceeded,
// it's also viewed as deadlock.
locker_->AddColumnFamily(1);
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
TransactionOptions txn_opt;
txn_opt.deadlock_detect = true;
txn_opt.deadlock_detect_depth = 1;

@ -3,7 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/transactions/lock/point_lock_tracker.h"
#include "utilities/transactions/lock/point/point_lock_tracker.h"
namespace ROCKSDB_NAMESPACE {

@ -81,4 +81,17 @@ class PointLockTracker : public LockTracker {
TrackedKeys tracked_keys_;
};
class PointLockTrackerFactory : public LockTrackerFactory {
public:
static const PointLockTrackerFactory& Get() {
static const PointLockTrackerFactory instance;
return instance;
}
LockTracker* Create() const override { return new PointLockTracker(); }
private:
PointLockTrackerFactory() {}
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1 @@
Implement LockTracker and LockManager for range lock with range tree.

@ -17,9 +17,10 @@
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/cast_util.h"
#include "util/string_util.h"
#include "utilities/transactions/transaction_util.h"
#include "utilities/transactions/lock/point/point_lock_tracker.h"
#include "utilities/transactions/optimistic_transaction.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "utilities/transactions/transaction_util.h"
namespace ROCKSDB_NAMESPACE {
@ -28,7 +29,9 @@ struct WriteOptions;
OptimisticTransaction::OptimisticTransaction(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) {
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options,
PointLockTrackerFactory::Get()),
txn_db_(txn_db) {
Initialize(txn_options);
}

@ -38,7 +38,10 @@ TransactionID PessimisticTransaction::GenTxnID() {
PessimisticTransaction::PessimisticTransaction(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options, const bool init)
: TransactionBaseImpl(txn_db->GetRootDB(), write_options),
: TransactionBaseImpl(
txn_db->GetRootDB(), write_options,
static_cast_with_check<PessimisticTransactionDB>(txn_db)
->GetLockTrackerFactory()),
txn_db_impl_(nullptr),
expiration_time_(0),
txn_id_(0),
@ -132,7 +135,7 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
: PessimisticTransaction(txn_db, write_options, txn_options){};
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
std::unique_ptr<LockTracker> keys_to_unlock(NewLockTracker());
std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
Status s = LockBatch(batch, keys_to_unlock.get());
if (!s.ok()) {

@ -172,7 +172,7 @@ class PessimisticTransaction : public TransactionBaseImpl {
//
// If waiting_key_ is not null, then the pointer should always point to
// a valid string object. The reason is that it is only non-null when the
// transaction is blocked in the TransactionLockMgr::AcquireWithTimeout
// transaction is blocked in the PointLockManager::AcquireWithTimeout
// function. At that point, the key string object is one of the function
// parameters.
uint32_t waiting_cf_id_;

@ -31,12 +31,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl>(db)),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
lock_manager_(NewLockManager(this, txn_db_options)) {
assert(db_impl_ != nullptr);
info_log_ = db_impl_->GetDBOptions().info_log;
}
@ -62,12 +57,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
lock_manager_(NewLockManager(this, txn_db_options)) {
assert(db_impl_ != nullptr);
}
@ -355,11 +345,11 @@ Status TransactionDB::WrapStackableDB(
return s;
}
// Let TransactionLockMgr know that this column family exists so it can
// Let LockManager know that this column family exists so it can
// allocate a LockMap for it.
void PessimisticTransactionDB::AddColumnFamily(
const ColumnFamilyHandle* handle) {
lock_mgr_.AddColumnFamily(handle->GetID());
lock_manager_->AddColumnFamily(handle);
}
Status PessimisticTransactionDB::CreateColumnFamily(
@ -373,14 +363,14 @@ Status PessimisticTransactionDB::CreateColumnFamily(
s = db_->CreateColumnFamily(options, column_family_name, handle);
if (s.ok()) {
lock_mgr_.AddColumnFamily((*handle)->GetID());
lock_manager_->AddColumnFamily(*handle);
UpdateCFComparatorMap(*handle);
}
return s;
}
// Let TransactionLockMgr know that it can deallocate the LockMap for this
// Let LockManager know that it can deallocate the LockMap for this
// column family.
Status PessimisticTransactionDB::DropColumnFamily(
ColumnFamilyHandle* column_family) {
@ -388,7 +378,7 @@ Status PessimisticTransactionDB::DropColumnFamily(
Status s = db_->DropColumnFamily(column_family);
if (s.ok()) {
lock_mgr_.RemoveColumnFamily(column_family->GetID());
lock_manager_->RemoveColumnFamily(column_family);
}
return s;
@ -398,17 +388,17 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
uint32_t cfh_id,
const std::string& key,
bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
const LockTracker& keys) {
lock_mgr_.UnLock(txn, keys, GetEnv());
lock_manager_->UnLock(txn, keys, GetEnv());
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
uint32_t cfh_id, const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
}
// Used when wrapping DB write operations in a transaction
@ -597,17 +587,16 @@ void PessimisticTransactionDB::GetAllPreparedTransactions(
}
}
TransactionLockMgr::LockStatusData
PessimisticTransactionDB::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
return lock_manager_->GetPointLockStatus();
}
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
return lock_mgr_.GetDeadlockInfoBuffer();
return lock_manager_->GetDeadlockInfoBuffer();
}
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
lock_mgr_.Resize(target_size);
lock_manager_->Resize(target_size);
}
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {

@ -20,8 +20,8 @@
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/lock/lock_manager.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h"
namespace ROCKSDB_NAMESPACE {
@ -130,7 +130,7 @@ class PessimisticTransactionDB : public TransactionDB {
// not thread safe. current use case is during recovery (single thread)
void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
TransactionLockMgr::LockStatusData GetLockStatusData() override;
LockManager::PointLockStatus GetLockStatusData() override;
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
@ -142,6 +142,11 @@ class PessimisticTransactionDB : public TransactionDB {
virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
// Use the returned factory to create LockTrackers in transactions.
const LockTrackerFactory& GetLockTrackerFactory() const {
return lock_manager_->GetLockTrackerFactory();
}
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
@ -166,7 +171,8 @@ class PessimisticTransactionDB : public TransactionDB {
friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
TransactionLockMgr lock_mgr_;
std::unique_ptr<LockManager> lock_manager_;
// Must be held when adding/dropping column families.
InstrumentedMutex column_family_mutex_;

@ -20,15 +20,17 @@
namespace ROCKSDB_NAMESPACE {
TransactionBaseImpl::TransactionBaseImpl(DB* db,
const WriteOptions& write_options)
TransactionBaseImpl::TransactionBaseImpl(
DB* db, const WriteOptions& write_options,
const LockTrackerFactory& lock_tracker_factory)
: db_(db),
dbimpl_(static_cast_with_check<DBImpl>(db)),
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
lock_tracker_factory_(lock_tracker_factory),
start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true, 0),
tracked_locks_(NewLockTracker()),
tracked_locks_(lock_tracker_factory_.Create()),
indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
@ -125,7 +127,8 @@ void TransactionBaseImpl::SetSavePoint() {
save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
}
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
num_puts_, num_deletes_, num_merges_);
num_puts_, num_deletes_, num_merges_,
lock_tracker_factory_);
write_batch_.SetSavePoint();
}
@ -172,7 +175,7 @@ Status TransactionBaseImpl::PopSavePoint() {
if (save_points_->size() == 1) {
save_points_->pop();
} else {
TransactionBaseImpl::SavePoint top;
TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
std::swap(top, save_points_->top());
save_points_->pop();

@ -28,7 +28,8 @@ namespace ROCKSDB_NAMESPACE {
class TransactionBaseImpl : public Transaction {
public:
TransactionBaseImpl(DB* db, const WriteOptions& write_options);
TransactionBaseImpl(DB* db, const WriteOptions& write_options,
const LockTrackerFactory& lock_tracker_factory);
virtual ~TransactionBaseImpl();
@ -280,6 +281,8 @@ class TransactionBaseImpl : public Transaction {
const Comparator* cmp_;
const LockTrackerFactory& lock_tracker_factory_;
// Stores that time the txn was constructed, in microseconds.
uint64_t start_time_;
@ -305,16 +308,18 @@ class TransactionBaseImpl : public Transaction {
SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
std::shared_ptr<TransactionNotifier> snapshot_notifier,
uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges,
const LockTrackerFactory& lock_tracker_factory)
: snapshot_(snapshot),
snapshot_needed_(snapshot_needed),
snapshot_notifier_(snapshot_notifier),
num_puts_(num_puts),
num_deletes_(num_deletes),
num_merges_(num_merges),
new_locks_(NewLockTracker()) {}
new_locks_(lock_tracker_factory.Create()) {}
SavePoint() : new_locks_(NewLockTracker()) {}
explicit SavePoint(const LockTrackerFactory& lock_tracker_factory)
: new_locks_(lock_tracker_factory.Create()) {}
};
// Records writes pending in this transaction

@ -282,7 +282,7 @@ TEST_P(TransactionTest, WaitingTxn) {
ASSERT_TRUE(txn2);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
"PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
std::string key;
uint32_t cf_id;
std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
@ -508,7 +508,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
std::atomic<uint32_t> checkpoints(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
"PointLockManager::AcquireWithTimeout:WaitingTxn",
[&](void* /*arg*/) { checkpoints.fetch_add(1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -641,7 +641,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
std::atomic<uint32_t> checkpoints_shared(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
"PointLockManager::AcquireWithTimeout:WaitingTxn",
[&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -714,7 +714,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
std::atomic<uint32_t> checkpoints(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
"PointLockManager::AcquireWithTimeout:WaitingTxn",
[&](void* /*arg*/) { checkpoints.fetch_add(1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

@ -26,7 +26,6 @@
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h"
namespace ROCKSDB_NAMESPACE {

Loading…
Cancel
Save