diff --git a/CMakeLists.txt b/CMakeLists.txt index e2685f706..505d400f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -810,8 +810,9 @@ set(SOURCES utilities/simulator_cache/sim_cache.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/trace/file_trace_reader_writer.cc - utilities/transactions/lock/lock_tracker.cc - utilities/transactions/lock/point_lock_tracker.cc + utilities/transactions/lock/lock_manager.cc + utilities/transactions/lock/point/point_lock_tracker.cc + utilities/transactions/lock/point/point_lock_manager.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/pessimistic_transaction.cc @@ -819,7 +820,6 @@ set(SOURCES utilities/transactions/snapshot_checker.cc utilities/transactions/transaction_base.cc utilities/transactions/transaction_db_mutex_impl.cc - utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc utilities/transactions/write_prepared_txn_db.cc @@ -1204,7 +1204,7 @@ if(WITH_TESTS) utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc - utilities/transactions/transaction_lock_mgr_test.cc + utilities/transactions/lock/point/point_lock_manager_test.cc utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc utilities/ttl/ttl_test.cc diff --git a/Makefile b/Makefile index 68c3dc79c..c89358db4 100644 --- a/Makefile +++ b/Makefile @@ -553,7 +553,7 @@ PARALLEL_TEST = \ persistent_cache_test \ table_test \ transaction_test \ - transaction_lock_mgr_test \ + point_lock_manager_test \ write_prepared_transaction_test \ write_unprepared_transaction_test \ @@ -1842,7 +1842,7 @@ write_callback_test: $(OBJ_DIR)/db/write_callback_test.o $(TEST_LIBRARY) $(LIBRA heap_test: $(OBJ_DIR)/util/heap_test.o $(GTEST) $(AM_LINK) -transaction_lock_mgr_test: utilities/transactions/transaction_lock_mgr_test.o $(TEST_LIBRARY) $(LIBRARY) +point_lock_manager_test: utilities/transactions/lock/point/point_lock_manager_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY) diff --git a/TARGETS b/TARGETS index d53d78f20..6433dbe06 100644 --- a/TARGETS +++ b/TARGETS @@ -384,8 +384,9 @@ cpp_library( "utilities/simulator_cache/sim_cache.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/trace/file_trace_reader_writer.cc", - "utilities/transactions/lock/lock_tracker.cc", - "utilities/transactions/lock/point_lock_tracker.cc", + "utilities/transactions/lock/lock_manager.cc", + "utilities/transactions/lock/point/point_lock_manager.cc", + "utilities/transactions/lock/point/point_lock_tracker.cc", "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/pessimistic_transaction.cc", @@ -393,7 +394,6 @@ cpp_library( "utilities/transactions/snapshot_checker.cc", "utilities/transactions/transaction_base.cc", "utilities/transactions/transaction_db_mutex_impl.cc", - "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", "utilities/transactions/write_prepared_txn.cc", "utilities/transactions/write_prepared_txn_db.cc", @@ -673,8 +673,9 @@ cpp_library( "utilities/simulator_cache/sim_cache.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/trace/file_trace_reader_writer.cc", - "utilities/transactions/lock/lock_tracker.cc", - "utilities/transactions/lock/point_lock_tracker.cc", + "utilities/transactions/lock/lock_manager.cc", + "utilities/transactions/lock/point/point_lock_manager.cc", + "utilities/transactions/lock/point/point_lock_tracker.cc", "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/pessimistic_transaction.cc", @@ -682,7 +683,6 @@ cpp_library( "utilities/transactions/snapshot_checker.cc", "utilities/transactions/transaction_base.cc", "utilities/transactions/transaction_db_mutex_impl.cc", - "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", "utilities/transactions/write_prepared_txn.cc", "utilities/transactions/write_prepared_txn_db.cc", @@ -1727,6 +1727,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "point_lock_manager_test", + "utilities/transactions/lock/point/point_lock_manager_test.cc", + "parallel", + [], + [], + ], [ "prefetch_test", "file/prefetch_test.cc", @@ -1916,13 +1923,6 @@ ROCKS_TESTS = [ [], [], ], - [ - "transaction_lock_mgr_test", - "utilities/transactions/transaction_lock_mgr_test.cc", - "parallel", - [], - [], - ], [ "transaction_test", "utilities/transactions/transaction_test.cc", diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index a4ab9c07a..28136d5f5 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -12,6 +12,8 @@ namespace ROCKSDB_NAMESPACE { // Define all public custom types here. +using ColumnFamilyId = uint32_t; + // Represents a sequence number in a WAL file. typedef uint64_t SequenceNumber; diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 6ebdbcc40..b553100f3 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -24,6 +24,11 @@ using TransactionName = std::string; using TransactionID = uint64_t; +// An endpoint for a range of keys. +class Endpoint { + // TODO +}; + // Provides notification to the caller of SetSnapshotOnNextOperation when // the actual snapshot gets created class TransactionNotifier { diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 8967b7eef..2e1a0a171 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -202,6 +202,13 @@ struct KeyLockInfo { bool exclusive; }; +struct RangeLockInfo { + Endpoint start; + Endpoint end; + std::vector ids; + bool exclusive; +}; + struct DeadlockInfo { TransactionID m_txn_id; uint32_t m_cf_id; @@ -296,6 +303,7 @@ class TransactionDB : public StackableDB { // The mapping is column family id -> KeyLockInfo virtual std::unordered_multimap GetLockStatusData() = 0; + virtual std::vector GetDeadlockInfoBuffer() = 0; virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0; diff --git a/src.mk b/src.mk index 8ee3c68f3..251228c89 100644 --- a/src.mk +++ b/src.mk @@ -251,8 +251,9 @@ LIB_SOURCES = \ utilities/simulator_cache/sim_cache.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/trace/file_trace_reader_writer.cc \ - utilities/transactions/lock/lock_tracker.cc \ - utilities/transactions/lock/point_lock_tracker.cc \ + utilities/transactions/lock/lock_manager.cc \ + utilities/transactions/lock/point/point_lock_tracker.cc \ + utilities/transactions/lock/point/point_lock_manager.cc \ utilities/transactions/optimistic_transaction.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/pessimistic_transaction.cc \ @@ -260,7 +261,6 @@ LIB_SOURCES = \ utilities/transactions/snapshot_checker.cc \ utilities/transactions/transaction_base.cc \ utilities/transactions/transaction_db_mutex_impl.cc \ - utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ utilities/transactions/write_prepared_txn.cc \ utilities/transactions/write_prepared_txn_db.cc \ @@ -522,7 +522,7 @@ TEST_MAIN_SOURCES = \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/transaction_test.cc \ - utilities/transactions/transaction_lock_mgr_test.cc \ + utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \ utilities/transactions/write_unprepared_transaction_test.cc \ utilities/ttl/ttl_test.cc \ diff --git a/utilities/transactions/lock/lock_manager.cc b/utilities/transactions/lock/lock_manager.cc new file mode 100644 index 000000000..200b15390 --- /dev/null +++ b/utilities/transactions/lock/lock_manager.cc @@ -0,0 +1,23 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/lock_manager.h" + +#include "utilities/transactions/lock/point/point_lock_manager.h" + +namespace ROCKSDB_NAMESPACE { + +LockManager* NewLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt) { + assert(db); + // TODO: determine the lock manager implementation based on configuration. + return new PointLockManager(db, opt); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/lock_manager.h b/utilities/transactions/lock/lock_manager.h new file mode 100644 index 000000000..32b3f9473 --- /dev/null +++ b/utilities/transactions/lock/lock_manager.h @@ -0,0 +1,82 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/lock/lock_tracker.h" +#include "utilities/transactions/pessimistic_transaction.h" + +namespace ROCKSDB_NAMESPACE { + +class PessimisticTransactionDB; + +class LockManager { + public: + virtual ~LockManager() {} + + // Whether supports locking a specific key. + virtual bool IsPointLockSupported() const = 0; + + // Whether supports locking a range of keys. + virtual bool IsRangeLockSupported() const = 0; + + // Locks acquired through this LockManager should be tracked by + // the LockTrackers created through the returned factory. + virtual const LockTrackerFactory& GetLockTrackerFactory() const = 0; + + // Enable locking for the specified column family. + // Caller should guarantee that this column family is not already enabled. + virtual void AddColumnFamily(const ColumnFamilyHandle* cf) = 0; + + // Disable locking for the specified column family. + // Caller should guarantee that this column family is no longer used. + virtual void RemoveColumnFamily(const ColumnFamilyHandle* cf) = 0; + + // Attempt to lock a key or a key range. If OK status is returned, the caller + // is responsible for calling UnLock() on this key. + virtual Status TryLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env* env, bool exclusive) = 0; + // The range [start, end] are inclusive at both sides. + virtual Status TryLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, const Endpoint& start, + const Endpoint& end, Env* env, bool exclusive) = 0; + + // Unlock a key or a range locked by TryLock(). txn must be the same + // Transaction that locked this key. + virtual void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, + Env* env) = 0; + virtual void UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, const std::string& key, + Env* env) = 0; + virtual void UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, const Endpoint& start, + const Endpoint& end, Env* env) = 0; + + using PointLockStatus = std::unordered_multimap; + virtual PointLockStatus GetPointLockStatus() = 0; + + using RangeLockStatus = + std::unordered_multimap; + virtual RangeLockStatus GetRangeLockStatus() = 0; + + virtual std::vector GetDeadlockInfoBuffer() = 0; + + virtual void Resize(uint32_t new_size) = 0; +}; + +// LockManager should always be constructed through this factory method, +// instead of constructing through concrete implementations' constructor. +// Caller owns the returned pointer. +LockManager* NewLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt); + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/lock_tracker.cc b/utilities/transactions/lock/lock_tracker.cc deleted file mode 100644 index c367c273d..000000000 --- a/utilities/transactions/lock/lock_tracker.cc +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#include "utilities/transactions/lock/lock_tracker.h" - -#include "utilities/transactions/lock/point_lock_tracker.h" - -namespace ROCKSDB_NAMESPACE { - -LockTracker* NewLockTracker() { - // TODO: determine the lock tracker implementation based on configuration. - return new PointLockTracker(); -} - -} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/lock_tracker.h b/utilities/transactions/lock/lock_tracker.h index 2129dd2a6..0d3abded7 100644 --- a/utilities/transactions/lock/lock_tracker.h +++ b/utilities/transactions/lock/lock_tracker.h @@ -13,8 +13,6 @@ namespace ROCKSDB_NAMESPACE { -using ColumnFamilyId = uint32_t; - // Request for locking a single key. struct PointLockRequest { // The id of the key's column family. @@ -191,9 +189,13 @@ class LockTracker { ColumnFamilyId /*column_family_id*/) const = 0; }; -// LockTracker should always be constructed through this factory method, -// instead of constructing through concrete implementations' constructor. -// Caller owns the returned pointer. -LockTracker* NewLockTracker(); +// LockTracker should always be constructed through this factory. +// Each LockManager owns a LockTrackerFactory. +class LockTrackerFactory { + public: + // Caller owns the returned pointer. + virtual LockTracker* Create() const = 0; + virtual ~LockTrackerFactory() {} +}; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/lock/point/point_lock_manager.cc similarity index 84% rename from utilities/transactions/transaction_lock_mgr.cc rename to utilities/transactions/lock/point/point_lock_manager.cc index 84d93b304..0ca6e38f0 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/lock/point/point_lock_manager.cc @@ -5,10 +5,10 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/transaction_lock_mgr.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" -#include #include +#include #include #include "monitoring/perf_context_imp.h" @@ -19,6 +19,7 @@ #include "util/hash.h" #include "util/thread_local.h" #include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" namespace ROCKSDB_NAMESPACE { @@ -37,6 +38,11 @@ struct LockInfo { : exclusive(lock_info.exclusive), txn_ids(lock_info.txn_ids), expiration_time(lock_info.expiration_time) {} + void operator=(const LockInfo& lock_info) { + exclusive = lock_info.exclusive; + txn_ids = lock_info.txn_ids; + expiration_time = lock_info.expiration_time; + } }; struct LockMapStripe { @@ -80,7 +86,7 @@ struct LockMap { const size_t num_stripes_; // Count of keys that are currently locked in this column family. - // (Only maintained if TransactionLockMgr::max_num_locks_ is positive.) + // (Only maintained if PointLockManager::max_num_locks_ is positive.) std::atomic lock_cnt{0}; std::vector lock_map_stripes_; @@ -155,47 +161,44 @@ void UnrefLockMapsCache(void* ptr) { } } // anonymous namespace -TransactionLockMgr::TransactionLockMgr( - TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, - uint32_t max_num_deadlocks, - std::shared_ptr mutex_factory) - : txn_db_impl_(nullptr), - default_num_stripes_(default_num_stripes), - max_num_locks_(max_num_locks), +PointLockManager::PointLockManager(PessimisticTransactionDB* txn_db, + const TransactionDBOptions& opt) + : txn_db_impl_(txn_db), + default_num_stripes_(opt.num_stripes), + max_num_locks_(opt.max_num_locks), lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), - dlock_buffer_(max_num_deadlocks), - mutex_factory_(mutex_factory) { - assert(txn_db); - txn_db_impl_ = static_cast_with_check(txn_db); -} + dlock_buffer_(opt.max_num_deadlocks), + mutex_factory_(opt.custom_mutex_factory + ? opt.custom_mutex_factory + : std::make_shared()) {} -TransactionLockMgr::~TransactionLockMgr() {} +PointLockManager::~PointLockManager() {} size_t LockMap::GetStripe(const std::string& key) const { assert(num_stripes_ > 0); return FastRange64(GetSliceNPHash64(key), num_stripes_); } -void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { +void PointLockManager::AddColumnFamily(const ColumnFamilyHandle* cf) { InstrumentedMutexLock l(&lock_map_mutex_); - if (lock_maps_.find(column_family_id) == lock_maps_.end()) { - lock_maps_.emplace(column_family_id, - std::make_shared(default_num_stripes_, mutex_factory_)); + if (lock_maps_.find(cf->GetID()) == lock_maps_.end()) { + lock_maps_.emplace(cf->GetID(), std::make_shared( + default_num_stripes_, mutex_factory_)); } else { // column_family already exists in lock map assert(false); } } -void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { +void PointLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cf) { // Remove lock_map for this column family. Since the lock map is stored // as a shared ptr, concurrent transactions can still keep using it // until they release their references to it. { InstrumentedMutexLock l(&lock_map_mutex_); - auto lock_maps_iter = lock_maps_.find(column_family_id); + auto lock_maps_iter = lock_maps_.find(cf->GetID()); if (lock_maps_iter == lock_maps_.end()) { return; } @@ -214,8 +217,8 @@ void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { // Look up the LockMap std::shared_ptr for a given column_family_id. // Note: The LockMap is only valid as long as the caller is still holding on // to the returned std::shared_ptr. -std::shared_ptr TransactionLockMgr::GetLockMap( - uint32_t column_family_id) { +std::shared_ptr PointLockManager::GetLockMap( + ColumnFamilyId column_family_id) { // First check thread-local cache if (lock_maps_cache_->Get() == nullptr) { lock_maps_cache_->Reset(new LockMaps()); @@ -248,9 +251,9 @@ std::shared_ptr TransactionLockMgr::GetLockMap( // transaction. // If false, sets *expire_time to the expiration time of the lock according // to Env->GetMicros() or 0 if no expiration. -bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, - const LockInfo& lock_info, Env* env, - uint64_t* expire_time) { +bool PointLockManager::IsLockExpired(TransactionID txn_id, + const LockInfo& lock_info, Env* env, + uint64_t* expire_time) { if (lock_info.expiration_time == 0) { *expire_time = 0; return false; @@ -279,10 +282,10 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, return expired; } -Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, - uint32_t column_family_id, - const std::string& key, Env* env, - bool exclusive) { +Status PointLockManager::TryLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env* env, + bool exclusive) { // Lookup lock map for this column family id std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -307,9 +310,9 @@ Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, } // Helper function for TryLock(). -Status TransactionLockMgr::AcquireWithTimeout( +Status PointLockManager::AcquireWithTimeout( PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, - uint32_t column_family_id, const std::string& key, Env* env, + ColumnFamilyId column_family_id, const std::string& key, Env* env, int64_t timeout, LockInfo&& lock_info) { Status result; uint64_t end_time = 0; @@ -370,7 +373,7 @@ Status TransactionLockMgr::AcquireWithTimeout( txn->SetWaitingTxn(wait_ids, column_family_id, &key); } - TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); + TEST_SYNC_POINT("PointLockManager::AcquireWithTimeout:WaitingTxn"); if (cv_end_time < 0) { // Wait indefinitely result = stripe->stripe_cv->Wait(stripe->stripe_mutex); @@ -408,14 +411,14 @@ Status TransactionLockMgr::AcquireWithTimeout( return result; } -void TransactionLockMgr::DecrementWaiters( +void PointLockManager::DecrementWaiters( const PessimisticTransaction* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); DecrementWaitersImpl(txn, wait_ids); } -void TransactionLockMgr::DecrementWaitersImpl( +void PointLockManager::DecrementWaitersImpl( const PessimisticTransaction* txn, const autovector& wait_ids) { auto id = txn->GetID(); @@ -430,7 +433,7 @@ void TransactionLockMgr::DecrementWaitersImpl( } } -bool TransactionLockMgr::IncrementWaiters( +bool PointLockManager::IncrementWaiters( const PessimisticTransaction* txn, const autovector& wait_ids, const std::string& key, const uint32_t& cf_id, const bool& exclusive, Env* const env) { @@ -513,12 +516,11 @@ bool TransactionLockMgr::IncrementWaiters( // Sets *expire_time to the expiration time in microseconds // or 0 if no expiration. // REQUIRED: Stripe mutex must be held. -Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, - LockMapStripe* stripe, - const std::string& key, Env* env, - LockInfo&& txn_lock_info, - uint64_t* expire_time, - autovector* txn_ids) { +Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, + const std::string& key, Env* env, + LockInfo&& txn_lock_info, + uint64_t* expire_time, + autovector* txn_ids) { assert(txn_lock_info.txn_ids.size() == 1); Status result; @@ -580,10 +582,9 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } -void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, - const std::string& key, - LockMapStripe* stripe, LockMap* lock_map, - Env* env) { +void PointLockManager::UnLockKey(PessimisticTransaction* txn, + const std::string& key, LockMapStripe* stripe, + LockMap* lock_map, Env* env) { #ifdef NDEBUG (void)env; #endif @@ -619,9 +620,9 @@ void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, } } -void TransactionLockMgr::UnLock(PessimisticTransaction* txn, - uint32_t column_family_id, - const std::string& key, Env* env) { +void PointLockManager::UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); if (lock_map == nullptr) { @@ -642,8 +643,8 @@ void TransactionLockMgr::UnLock(PessimisticTransaction* txn, stripe->stripe_cv->NotifyAll(); } -void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, - const LockTracker& tracker, Env* env) { +void PointLockManager::UnLock(PessimisticTransaction* txn, + const LockTracker& tracker, Env* env) { std::unique_ptr cf_it( tracker.GetColumnFamilyIterator()); assert(cf_it != nullptr); @@ -690,8 +691,8 @@ void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, } } -TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { - LockStatusData data; +PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() { + PointLockStatus data; // Lock order here is important. The correct order is lock_map_mutex_, then // for every column family ID in ascending order lock every stripe in // ascending order. @@ -730,13 +731,34 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { return data; } -std::vector TransactionLockMgr::GetDeadlockInfoBuffer() { + +std::vector PointLockManager::GetDeadlockInfoBuffer() { return dlock_buffer_.PrepareBuffer(); } -void TransactionLockMgr::Resize(uint32_t target_size) { +void PointLockManager::Resize(uint32_t target_size) { dlock_buffer_.Resize(target_size); } +PointLockManager::RangeLockStatus PointLockManager::GetRangeLockStatus() { + return {}; +} + +Status PointLockManager::TryLock(PessimisticTransaction* /* txn */, + ColumnFamilyId /* cf_id */, + const Endpoint& /* start */, + const Endpoint& /* end */, Env* /* env */, + bool /* exclusive */) { + return Status::NotSupported( + "PointLockManager does not support range locking"); +} + +void PointLockManager::UnLock(PessimisticTransaction* /* txn */, + ColumnFamilyId /* cf_id */, + const Endpoint& /* start */, + const Endpoint& /* end */, Env* /* env */) { + // no-op +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/lock/point/point_lock_manager.h similarity index 70% rename from utilities/transactions/transaction_lock_mgr.h rename to utilities/transactions/lock/point/point_lock_manager.h index 0a9474488..b22884424 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/lock/point/point_lock_manager.h @@ -6,9 +6,9 @@ #pragma once #ifndef ROCKSDB_LITE +#include #include #include -#include #include #include @@ -17,7 +17,8 @@ #include "util/autovector.h" #include "util/hash_map.h" #include "util/thread_local.h" -#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/lock/lock_manager.h" +#include "utilities/transactions/lock/point/point_lock_tracker.h" namespace ROCKSDB_NAMESPACE { @@ -48,44 +49,47 @@ struct TrackedTrxInfo { std::string m_waiting_key; }; -class Slice; -class PessimisticTransactionDB; - -class TransactionLockMgr { +class PointLockManager : public LockManager { public: - TransactionLockMgr(TransactionDB* txn_db, size_t default_num_stripes, - int64_t max_num_locks, uint32_t max_num_deadlocks, - std::shared_ptr factory); + PointLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt); // No copying allowed - TransactionLockMgr(const TransactionLockMgr&) = delete; - void operator=(const TransactionLockMgr&) = delete; + PointLockManager(const PointLockManager&) = delete; + PointLockManager& operator=(const PointLockManager&) = delete; + + ~PointLockManager() override; + + bool IsPointLockSupported() const override { return true; } + + bool IsRangeLockSupported() const override { return false; } + + const LockTrackerFactory& GetLockTrackerFactory() const override { + return PointLockTrackerFactory::Get(); + } + + void AddColumnFamily(const ColumnFamilyHandle* cf) override; + void RemoveColumnFamily(const ColumnFamilyHandle* cf) override; - ~TransactionLockMgr(); + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env, bool exclusive) override; + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start, const Endpoint& end, Env* env, + bool exclusive) override; - // Creates a new LockMap for this column family. Caller should guarantee - // that this column family does not already exist. - void AddColumnFamily(uint32_t column_family_id); + void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, + Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start, const Endpoint& end, Env* env) override; - // Deletes the LockMap for this column family. Caller should guarantee that - // this column family is no longer in use. - void RemoveColumnFamily(uint32_t column_family_id); + PointLockStatus GetPointLockStatus() override; - // Attempt to lock key. If OK status is returned, the caller is responsible - // for calling UnLock() on this key. - Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id, - const std::string& key, Env* env, bool exclusive); + RangeLockStatus GetRangeLockStatus() override; - // Unlock a key locked by TryLock(). txn must be the same Transaction that - // locked this key. - void UnLock(const PessimisticTransaction* txn, const LockTracker& tracker, - Env* env); - void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, - const std::string& key, Env* env); + std::vector GetDeadlockInfoBuffer() override; - using LockStatusData = std::unordered_multimap; - LockStatusData GetLockStatusData(); - std::vector GetDeadlockInfoBuffer(); - void Resize(uint32_t); + void Resize(uint32_t new_size) override; private: PessimisticTransactionDB* txn_db_impl_; @@ -140,7 +144,7 @@ class TransactionLockMgr { LockInfo&& lock_info, uint64_t* wait_time, autovector* txn_ids); - void UnLockKey(const PessimisticTransaction* txn, const std::string& key, + void UnLockKey(PessimisticTransaction* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env); bool IncrementWaiters(const PessimisticTransaction* txn, diff --git a/utilities/transactions/transaction_lock_mgr_test.cc b/utilities/transactions/lock/point/point_lock_manager_test.cc similarity index 78% rename from utilities/transactions/transaction_lock_mgr_test.cc rename to utilities/transactions/lock/point/point_lock_manager_test.cc index e67b453ca..211f17b99 100644 --- a/utilities/transactions/transaction_lock_mgr_test.cc +++ b/utilities/transactions/lock/point/point_lock_manager_test.cc @@ -5,7 +5,7 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/transaction_lock_mgr.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" #include "file/file_util.h" #include "port/port.h" @@ -13,15 +13,37 @@ #include "rocksdb/utilities/transaction_db.h" #include "test_util/testharness.h" #include "test_util/testutil.h" +#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_db_mutex_impl.h" namespace ROCKSDB_NAMESPACE { -class TransactionLockMgrTest : public testing::Test { +class MockColumnFamilyHandle : public ColumnFamilyHandle { + public: + explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} + + ~MockColumnFamilyHandle() override {} + + const std::string& GetName() const override { return name_; } + + ColumnFamilyId GetID() const override { return cf_id_; } + + Status GetDescriptor(ColumnFamilyDescriptor*) override { + return Status::OK(); + } + + const Comparator* GetComparator() const override { return nullptr; } + + private: + ColumnFamilyId cf_id_; + std::string name_ = "MockCF"; +}; + +class PointLockManagerTest : public testing::Test { public: void SetUp() override { env_ = Env::Default(); - db_dir_ = test::PerThreadDBPath("transaction_lock_mgr_test"); + db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); ASSERT_OK(env_->CreateDir(db_dir_)); mutex_factory_ = std::make_shared(); @@ -29,11 +51,11 @@ class TransactionLockMgrTest : public testing::Test { opt.create_if_missing = true; TransactionDBOptions txn_opt; txn_opt.transaction_lock_timeout = 0; + txn_opt.custom_mutex_factory = mutex_factory_; ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); - locker_.reset( - new TransactionLockMgr(db_, txn_opt.num_stripes, txn_opt.max_num_locks, - txn_opt.max_num_deadlocks, mutex_factory_)); + locker_.reset(new PointLockManager( + static_cast(db_), txn_opt)); } void TearDown() override { @@ -49,7 +71,7 @@ class TransactionLockMgrTest : public testing::Test { protected: Env* env_; - std::unique_ptr locker_; + std::unique_ptr locker_; private: std::string db_dir_; @@ -57,8 +79,9 @@ class TransactionLockMgrTest : public testing::Test { TransactionDB* db_; }; -TEST_F(TransactionLockMgrTest, LockNonExistingColumnFamily) { - locker_->RemoveColumnFamily(1024); +TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) { + MockColumnFamilyHandle cf(1024); + locker_->RemoveColumnFamily(&cf); auto txn = NewTxn(); auto s = locker_->TryLock(txn, 1024, "k", env_, true); ASSERT_TRUE(s.IsInvalidArgument()); @@ -66,9 +89,10 @@ TEST_F(TransactionLockMgrTest, LockNonExistingColumnFamily) { delete txn; } -TEST_F(TransactionLockMgrTest, LockStatus) { - locker_->AddColumnFamily(1024); - locker_->AddColumnFamily(2048); +TEST_F(PointLockManagerTest, LockStatus) { + MockColumnFamilyHandle cf1(1024), cf2(2048); + locker_->AddColumnFamily(&cf1); + locker_->AddColumnFamily(&cf2); auto txn1 = NewTxn(); ASSERT_OK(locker_->TryLock(txn1, 1024, "k1", env_, true)); @@ -78,7 +102,7 @@ TEST_F(TransactionLockMgrTest, LockStatus) { ASSERT_OK(locker_->TryLock(txn2, 1024, "k2", env_, false)); ASSERT_OK(locker_->TryLock(txn2, 2048, "k2", env_, false)); - auto s = locker_->GetLockStatusData(); + auto s = locker_->GetPointLockStatus(); ASSERT_EQ(s.size(), 4u); for (uint32_t cf_id : {1024, 2048}) { ASSERT_EQ(s.count(cf_id), 2u); @@ -101,8 +125,9 @@ TEST_F(TransactionLockMgrTest, LockStatus) { delete txn2; } -TEST_F(TransactionLockMgrTest, UnlockExclusive) { - locker_->AddColumnFamily(1); +TEST_F(PointLockManagerTest, UnlockExclusive) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn1 = NewTxn(); ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, true)); @@ -115,8 +140,9 @@ TEST_F(TransactionLockMgrTest, UnlockExclusive) { delete txn2; } -TEST_F(TransactionLockMgrTest, UnlockShared) { - locker_->AddColumnFamily(1); +TEST_F(PointLockManagerTest, UnlockShared) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn1 = NewTxn(); ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); @@ -129,46 +155,51 @@ TEST_F(TransactionLockMgrTest, UnlockShared) { delete txn2; } -TEST_F(TransactionLockMgrTest, ReentrantExclusiveLock) { +TEST_F(PointLockManagerTest, ReentrantExclusiveLock) { // Tests that a txn can acquire exclusive lock on the same key repeatedly. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn = NewTxn(); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); delete txn; } -TEST_F(TransactionLockMgrTest, ReentrantSharedLock) { +TEST_F(PointLockManagerTest, ReentrantSharedLock) { // Tests that a txn can acquire shared lock on the same key repeatedly. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn = NewTxn(); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); delete txn; } -TEST_F(TransactionLockMgrTest, LockUpgrade) { +TEST_F(PointLockManagerTest, LockUpgrade) { // Tests that a txn can upgrade from a shared lock to an exclusive lock. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn = NewTxn(); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); delete txn; } -TEST_F(TransactionLockMgrTest, LockDowngrade) { +TEST_F(PointLockManagerTest, LockDowngrade) { // Tests that a txn can acquire a shared lock after acquiring an exclusive // lock on the same key. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn = NewTxn(); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); delete txn; } -TEST_F(TransactionLockMgrTest, LockConflict) { +TEST_F(PointLockManagerTest, LockConflict) { // Tests that lock conflicts lead to lock timeout. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn1 = NewTxn(); auto txn2 = NewTxn(); @@ -200,7 +231,7 @@ TEST_F(TransactionLockMgrTest, LockConflict) { port::Thread BlockUntilWaitingTxn(std::function f) { std::atomic reached(false); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { reached.store(true); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -215,9 +246,10 @@ port::Thread BlockUntilWaitingTxn(std::function f) { return t; } -TEST_F(TransactionLockMgrTest, SharedLocks) { +TEST_F(PointLockManagerTest, SharedLocks) { // Tests that shared locks can be concurrently held by multiple transactions. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); auto txn1 = NewTxn(); auto txn2 = NewTxn(); ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); @@ -226,12 +258,13 @@ TEST_F(TransactionLockMgrTest, SharedLocks) { delete txn2; } -TEST_F(TransactionLockMgrTest, Deadlock) { +TEST_F(PointLockManagerTest, Deadlock) { // Tests that deadlock can be detected. // Deadlock scenario: // txn1 exclusively locks k1, and wants to lock k2; // txn2 exclusively locks k2, and wants to lock k1. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); TransactionOptions txn_opt; txn_opt.deadlock_detect = true; txn_opt.lock_timeout = 1000000; @@ -275,10 +308,11 @@ TEST_F(TransactionLockMgrTest, Deadlock) { delete txn1; } -TEST_F(TransactionLockMgrTest, DeadlockDepthExceeded) { +TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { // Tests that when detecting deadlock, if the detection depth is exceeded, // it's also viewed as deadlock. - locker_->AddColumnFamily(1); + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); TransactionOptions txn_opt; txn_opt.deadlock_detect = true; txn_opt.deadlock_detect_depth = 1; diff --git a/utilities/transactions/lock/point_lock_tracker.cc b/utilities/transactions/lock/point/point_lock_tracker.cc similarity index 99% rename from utilities/transactions/lock/point_lock_tracker.cc rename to utilities/transactions/lock/point/point_lock_tracker.cc index d6f609ee4..22eb6a0b8 100644 --- a/utilities/transactions/lock/point_lock_tracker.cc +++ b/utilities/transactions/lock/point/point_lock_tracker.cc @@ -3,7 +3,7 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "utilities/transactions/lock/point_lock_tracker.h" +#include "utilities/transactions/lock/point/point_lock_tracker.h" namespace ROCKSDB_NAMESPACE { diff --git a/utilities/transactions/lock/point_lock_tracker.h b/utilities/transactions/lock/point/point_lock_tracker.h similarity index 88% rename from utilities/transactions/lock/point_lock_tracker.h rename to utilities/transactions/lock/point/point_lock_tracker.h index f307d1892..57e1b8437 100644 --- a/utilities/transactions/lock/point_lock_tracker.h +++ b/utilities/transactions/lock/point/point_lock_tracker.h @@ -81,4 +81,17 @@ class PointLockTracker : public LockTracker { TrackedKeys tracked_keys_; }; +class PointLockTrackerFactory : public LockTrackerFactory { + public: + static const PointLockTrackerFactory& Get() { + static const PointLockTrackerFactory instance; + return instance; + } + + LockTracker* Create() const override { return new PointLockTracker(); } + + private: + PointLockTrackerFactory() {} +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/range/range_tree/TODO b/utilities/transactions/lock/range/range_tree/TODO new file mode 100644 index 000000000..4bab2347e --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/TODO @@ -0,0 +1 @@ +Implement LockTracker and LockManager for range lock with range tree. diff --git a/utilities/transactions/optimistic_transaction.cc b/utilities/transactions/optimistic_transaction.cc index ae95efdd7..fd70927f6 100644 --- a/utilities/transactions/optimistic_transaction.cc +++ b/utilities/transactions/optimistic_transaction.cc @@ -17,9 +17,10 @@ #include "rocksdb/utilities/optimistic_transaction_db.h" #include "util/cast_util.h" #include "util/string_util.h" -#include "utilities/transactions/transaction_util.h" +#include "utilities/transactions/lock/point/point_lock_tracker.h" #include "utilities/transactions/optimistic_transaction.h" #include "utilities/transactions/optimistic_transaction_db_impl.h" +#include "utilities/transactions/transaction_util.h" namespace ROCKSDB_NAMESPACE { @@ -28,7 +29,9 @@ struct WriteOptions; OptimisticTransaction::OptimisticTransaction( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) - : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { + : TransactionBaseImpl(txn_db->GetBaseDB(), write_options, + PointLockTrackerFactory::Get()), + txn_db_(txn_db) { Initialize(txn_options); } diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index d92818528..6531773ec 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -38,7 +38,10 @@ TransactionID PessimisticTransaction::GenTxnID() { PessimisticTransaction::PessimisticTransaction( TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options, const bool init) - : TransactionBaseImpl(txn_db->GetRootDB(), write_options), + : TransactionBaseImpl( + txn_db->GetRootDB(), write_options, + static_cast_with_check(txn_db) + ->GetLockTrackerFactory()), txn_db_impl_(nullptr), expiration_time_(0), txn_id_(0), @@ -132,7 +135,7 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, : PessimisticTransaction(txn_db, write_options, txn_options){}; Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { - std::unique_ptr keys_to_unlock(NewLockTracker()); + std::unique_ptr keys_to_unlock(lock_tracker_factory_.Create()); Status s = LockBatch(batch, keys_to_unlock.get()); if (!s.ok()) { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 308d7460f..98eea7e2d 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -172,7 +172,7 @@ class PessimisticTransaction : public TransactionBaseImpl { // // If waiting_key_ is not null, then the pointer should always point to // a valid string object. The reason is that it is only non-null when the - // transaction is blocked in the TransactionLockMgr::AcquireWithTimeout + // transaction is blocked in the PointLockManager::AcquireWithTimeout // function. At that point, the key string object is one of the function // parameters. uint32_t waiting_cf_id_; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index a15df47e1..73520f9ab 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -31,12 +31,7 @@ PessimisticTransactionDB::PessimisticTransactionDB( : TransactionDB(db), db_impl_(static_cast_with_check(db)), txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr( - new TransactionDBMutexFactoryImpl())) { + lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } @@ -62,12 +57,7 @@ PessimisticTransactionDB::PessimisticTransactionDB( : TransactionDB(db), db_impl_(static_cast_with_check(db->GetRootDB())), txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr( - new TransactionDBMutexFactoryImpl())) { + lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); } @@ -355,11 +345,11 @@ Status TransactionDB::WrapStackableDB( return s; } -// Let TransactionLockMgr know that this column family exists so it can +// Let LockManager know that this column family exists so it can // allocate a LockMap for it. void PessimisticTransactionDB::AddColumnFamily( const ColumnFamilyHandle* handle) { - lock_mgr_.AddColumnFamily(handle->GetID()); + lock_manager_->AddColumnFamily(handle); } Status PessimisticTransactionDB::CreateColumnFamily( @@ -373,14 +363,14 @@ Status PessimisticTransactionDB::CreateColumnFamily( s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { - lock_mgr_.AddColumnFamily((*handle)->GetID()); + lock_manager_->AddColumnFamily(*handle); UpdateCFComparatorMap(*handle); } return s; } -// Let TransactionLockMgr know that it can deallocate the LockMap for this +// Let LockManager know that it can deallocate the LockMap for this // column family. Status PessimisticTransactionDB::DropColumnFamily( ColumnFamilyHandle* column_family) { @@ -388,7 +378,7 @@ Status PessimisticTransactionDB::DropColumnFamily( Status s = db_->DropColumnFamily(column_family); if (s.ok()) { - lock_mgr_.RemoveColumnFamily(column_family->GetID()); + lock_manager_->RemoveColumnFamily(column_family); } return s; @@ -398,17 +388,17 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { - return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); + return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, const LockTracker& keys) { - lock_mgr_.UnLock(txn, keys, GetEnv()); + lock_manager_->UnLock(txn, keys, GetEnv()); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key) { - lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); + lock_manager_->UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction @@ -597,17 +587,16 @@ void PessimisticTransactionDB::GetAllPreparedTransactions( } } -TransactionLockMgr::LockStatusData -PessimisticTransactionDB::GetLockStatusData() { - return lock_mgr_.GetLockStatusData(); +LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() { + return lock_manager_->GetPointLockStatus(); } std::vector PessimisticTransactionDB::GetDeadlockInfoBuffer() { - return lock_mgr_.GetDeadlockInfoBuffer(); + return lock_manager_->GetDeadlockInfoBuffer(); } void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { - lock_mgr_.Resize(target_size); + lock_manager_->Resize(target_size); } void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e2b548121..619a83e97 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -20,8 +20,8 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h" +#include "utilities/transactions/lock/lock_manager.h" #include "utilities/transactions/pessimistic_transaction.h" -#include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/write_prepared_txn.h" namespace ROCKSDB_NAMESPACE { @@ -130,7 +130,7 @@ class PessimisticTransactionDB : public TransactionDB { // not thread safe. current use case is during recovery (single thread) void GetAllPreparedTransactions(std::vector* trans) override; - TransactionLockMgr::LockStatusData GetLockStatusData() override; + LockManager::PointLockStatus GetLockStatusData() override; std::vector GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; @@ -142,6 +142,11 @@ class PessimisticTransactionDB : public TransactionDB { virtual void UpdateCFComparatorMap(const std::vector&) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} + // Use the returned factory to create LockTrackers in transactions. + const LockTrackerFactory& GetLockTrackerFactory() const { + return lock_manager_->GetLockTrackerFactory(); + } + protected: DBImpl* db_impl_; std::shared_ptr info_log_; @@ -166,7 +171,8 @@ class PessimisticTransactionDB : public TransactionDB { friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; - TransactionLockMgr lock_mgr_; + + std::unique_ptr lock_manager_; // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 4c4234027..50bfd038c 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -20,15 +20,17 @@ namespace ROCKSDB_NAMESPACE { -TransactionBaseImpl::TransactionBaseImpl(DB* db, - const WriteOptions& write_options) +TransactionBaseImpl::TransactionBaseImpl( + DB* db, const WriteOptions& write_options, + const LockTrackerFactory& lock_tracker_factory) : db_(db), dbimpl_(static_cast_with_check(db)), write_options_(write_options), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), + lock_tracker_factory_(lock_tracker_factory), start_time_(db_->GetEnv()->NowMicros()), write_batch_(cmp_, 0, true, 0), - tracked_locks_(NewLockTracker()), + tracked_locks_(lock_tracker_factory_.Create()), indexing_enabled_(true) { assert(dynamic_cast(db_) != nullptr); log_number_ = 0; @@ -125,7 +127,8 @@ void TransactionBaseImpl::SetSavePoint() { save_points_.reset(new std::stack>()); } save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, - num_puts_, num_deletes_, num_merges_); + num_puts_, num_deletes_, num_merges_, + lock_tracker_factory_); write_batch_.SetSavePoint(); } @@ -172,7 +175,7 @@ Status TransactionBaseImpl::PopSavePoint() { if (save_points_->size() == 1) { save_points_->pop(); } else { - TransactionBaseImpl::SavePoint top; + TransactionBaseImpl::SavePoint top(lock_tracker_factory_); std::swap(top, save_points_->top()); save_points_->pop(); diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index c7832bdc8..449fea71e 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -28,7 +28,8 @@ namespace ROCKSDB_NAMESPACE { class TransactionBaseImpl : public Transaction { public: - TransactionBaseImpl(DB* db, const WriteOptions& write_options); + TransactionBaseImpl(DB* db, const WriteOptions& write_options, + const LockTrackerFactory& lock_tracker_factory); virtual ~TransactionBaseImpl(); @@ -280,6 +281,8 @@ class TransactionBaseImpl : public Transaction { const Comparator* cmp_; + const LockTrackerFactory& lock_tracker_factory_; + // Stores that time the txn was constructed, in microseconds. uint64_t start_time_; @@ -305,16 +308,18 @@ class TransactionBaseImpl : public Transaction { SavePoint(std::shared_ptr snapshot, bool snapshot_needed, std::shared_ptr snapshot_notifier, - uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) + uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges, + const LockTrackerFactory& lock_tracker_factory) : snapshot_(snapshot), snapshot_needed_(snapshot_needed), snapshot_notifier_(snapshot_notifier), num_puts_(num_puts), num_deletes_(num_deletes), num_merges_(num_merges), - new_locks_(NewLockTracker()) {} + new_locks_(lock_tracker_factory.Create()) {} - SavePoint() : new_locks_(NewLockTracker()) {} + explicit SavePoint(const LockTrackerFactory& lock_tracker_factory) + : new_locks_(lock_tracker_factory.Create()) {} }; // Records writes pending in this transaction diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 5effb19d2..d8d0b3f81 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -282,7 +282,7 @@ TEST_P(TransactionTest, WaitingTxn) { ASSERT_TRUE(txn2); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { + "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { std::string key; uint32_t cf_id; std::vector wait = txn2->GetWaitingTxns(&cf_id, &key); @@ -508,7 +508,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { std::atomic checkpoints(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { checkpoints.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -641,7 +641,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { std::atomic checkpoints_shared(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -714,7 +714,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) { std::atomic checkpoints(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) { checkpoints.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 7245330ea..375ae76d5 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -26,7 +26,6 @@ #include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction_db.h" -#include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/write_prepared_txn.h" namespace ROCKSDB_NAMESPACE {