From bddd5d3630694a6abecec02050d7d5bf55189c64 Mon Sep 17 00:00:00 2001 From: Archit Mishra Date: Thu, 17 Aug 2017 18:49:30 -0700 Subject: [PATCH] Added mechanism to track deadlock chain Summary: Changes: * extended the wait_txn_map to track additional information * designed circular buffer to store n latest deadlocks' information * added test coverage to verify the additional information tracked is accurately stored in the buffer Closes https://github.com/facebook/rocksdb/pull/2630 Differential Revision: D5478025 Pulled By: armishra fbshipit-source-id: 2b138de7b5a73f5ca554fc3ff8220a3be49f39e7 --- include/rocksdb/utilities/transaction_db.h | 27 +++ .../pessimistic_transaction_db.cc | 10 ++ .../transactions/pessimistic_transaction_db.h | 4 + .../transactions/transaction_lock_mgr.cc | 101 ++++++++++- utilities/transactions/transaction_lock_mgr.h | 33 +++- utilities/transactions/transaction_test.cc | 167 +++++++++++++++++- 6 files changed, 329 insertions(+), 13 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index a61234adc..7a592c4f6 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -29,6 +29,8 @@ enum TxnDBWritePolicy { WRITE_UNPREPARED // write data before the prepare phase of 2pc }; +const uint32_t kInitialMaxDeadlocks = 5; + struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -37,6 +39,9 @@ struct TransactionDBOptions { // If this value is not positive, no limit will be enforced. int64_t max_num_locks = -1; + // Stores the number of latest deadlocks to track + uint32_t max_num_deadlocks = kInitialMaxDeadlocks; + // Increasing this value will increase the concurrency by dividing the lock // table (per column family) into more sub-tables, each with their own // separate @@ -123,6 +128,26 @@ struct KeyLockInfo { bool exclusive; }; +struct DeadlockInfo { + TransactionID m_txn_id; + uint32_t m_cf_id; + std::string m_waiting_key; + bool m_exclusive; +}; + +struct DeadlockPath { + std::vector path; + bool limit_exceeded; + + explicit DeadlockPath(std::vector path_entry) + : path(path_entry), limit_exceeded(false) {} + + // empty path, limit exceeded constructor and default constructor + explicit DeadlockPath(bool limit = false) : path(0), limit_exceeded(limit) {} + + bool empty() { return path.empty() && !limit_exceeded; } +}; + class TransactionDB : public StackableDB { public: // Open a TransactionDB similar to DB::Open(). @@ -181,6 +206,8 @@ class TransactionDB : public StackableDB { // The mapping is column family id -> KeyLockInfo virtual std::unordered_multimap GetLockStatusData() = 0; + virtual std::vector GetDeadlockInfoBuffer() = 0; + virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0; protected: // To Create an TransactionDB, call Open() diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 08b0dfa7b..530434054 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -28,6 +28,7 @@ PessimisticTransactionDB::PessimisticTransactionDB( db_impl_(static_cast_with_check(db)), txn_db_options_(txn_db_options), lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, + txn_db_options_.max_num_deadlocks, txn_db_options_.custom_mutex_factory ? txn_db_options_.custom_mutex_factory : std::shared_ptr( @@ -57,6 +58,7 @@ PessimisticTransactionDB::PessimisticTransactionDB( db_impl_(static_cast_with_check(db->GetRootDB())), txn_db_options_(txn_db_options), lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, + txn_db_options_.max_num_deadlocks, txn_db_options_.custom_mutex_factory ? txn_db_options_.custom_mutex_factory : std::shared_ptr( @@ -486,6 +488,14 @@ PessimisticTransactionDB::GetLockStatusData() { return lock_mgr_.GetLockStatusData(); } +std::vector PessimisticTransactionDB::GetDeadlockInfoBuffer() { + return lock_mgr_.GetDeadlockInfoBuffer(); +} + +void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { + lock_mgr_.Resize(target_size); +} + void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { assert(txn); assert(txn->GetName().length() > 0); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 35c2a0143..4d1a5f4b5 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -100,6 +100,10 @@ class PessimisticTransactionDB : public TransactionDB { void GetAllPreparedTransactions(std::vector* trans) override; TransactionLockMgr::LockStatusData GetLockStatusData() override; + + std::vector GetDeadlockInfoBuffer() override; + void SetDeadlockInfoBufferSize(uint32_t target_size) override; + struct CommitEntry { uint64_t prep_seq; uint64_t commit_seq; diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 9b7a4e640..a72c2a12f 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -96,6 +96,64 @@ struct LockMap { size_t GetStripe(const std::string& key) const; }; +void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { + std::lock_guard lock(paths_buffer_mutex_); + + if (paths_buffer_.empty()) { + return; + } + + paths_buffer_[buffer_idx_] = path; + buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); +} + +void DeadlockInfoBuffer::Resize(uint32_t target_size) { + std::lock_guard lock(paths_buffer_mutex_); + + paths_buffer_ = Normalize(); + + // Drop the deadlocks that will no longer be needed ater the normalize + if (target_size < paths_buffer_.size()) { + paths_buffer_.erase( + paths_buffer_.begin(), + paths_buffer_.begin() + (paths_buffer_.size() - target_size)); + buffer_idx_ = 0; + } + // Resize the buffer to the target size and restore the buffer's idx + else { + auto prev_size = paths_buffer_.size(); + paths_buffer_.resize(target_size); + buffer_idx_ = (uint32_t)prev_size; + } +} + +std::vector DeadlockInfoBuffer::Normalize() { + auto working = paths_buffer_; + + if (working.empty()) { + return working; + } + + // Next write occurs at a nonexistent path's slot + if (paths_buffer_[buffer_idx_].empty()) { + working.resize(buffer_idx_); + } else { + std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); + } + + return working; +} + +std::vector DeadlockInfoBuffer::PrepareBuffer() { + std::lock_guard lock(paths_buffer_mutex_); + + // Reversing the normalized vector returns the latest deadlocks first + auto working = Normalize(); + std::reverse(working.begin(), working.end()); + + return working; +} + namespace { void UnrefLockMapsCache(void* ptr) { // Called when a thread exits or a ThreadLocalPtr gets destroyed. @@ -107,11 +165,13 @@ void UnrefLockMapsCache(void* ptr) { TransactionLockMgr::TransactionLockMgr( TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, + uint32_t max_num_deadlocks, std::shared_ptr mutex_factory) : txn_db_impl_(nullptr), default_num_stripes_(default_num_stripes), max_num_locks_(max_num_locks), lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), + dlock_buffer_(max_num_deadlocks), mutex_factory_(mutex_factory) { assert(txn_db); txn_db_impl_ = @@ -309,7 +369,8 @@ Status TransactionLockMgr::AcquireWithTimeout( // detection. if (wait_ids.size() != 0) { if (txn->IsDeadlockDetect()) { - if (IncrementWaiters(txn, wait_ids)) { + if (IncrementWaiters(txn, wait_ids, key, column_family_id, + lock_info.exclusive)) { result = Status::Busy(Status::SubCode::kDeadlock); stripe->stripe_mutex->UnLock(); return result; @@ -380,12 +441,15 @@ void TransactionLockMgr::DecrementWaitersImpl( bool TransactionLockMgr::IncrementWaiters( const PessimisticTransaction* txn, - const autovector& wait_ids) { + const autovector& wait_ids, const std::string& key, + const uint32_t& cf_id, const bool& exclusive) { auto id = txn->GetID(); - std::vector queue(txn->GetDeadlockDetectDepth()); + std::vector queue_parents(txn->GetDeadlockDetectDepth()); + std::vector queue_values(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); assert(!wait_txn_map_.Contains(id)); - wait_txn_map_.Insert(id, wait_ids); + + wait_txn_map_.Insert(id, {wait_ids, cf_id, key, exclusive}); for (auto wait_id : wait_ids) { if (rev_wait_txn_map_.Contains(wait_id)) { @@ -401,13 +465,15 @@ bool TransactionLockMgr::IncrementWaiters( } const auto* next_ids = &wait_ids; + int parent = -1; for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) { int i = 0; if (next_ids) { for (; i < static_cast(next_ids->size()) && tail + i < txn->GetDeadlockDetectDepth(); i++) { - queue[tail + i] = (*next_ids)[i]; + queue_values[tail + i] = (*next_ids)[i]; + queue_parents[tail + i] = parent; } tail += i; } @@ -417,19 +483,33 @@ bool TransactionLockMgr::IncrementWaiters( return false; } - auto next = queue[head]; + auto next = queue_values[head]; if (next == id) { + std::vector path; + while (head != -1) { + assert(wait_txn_map_.Contains(queue_values[head])); + + auto extracted_info = wait_txn_map_.Get(queue_values[head]); + path.push_back({queue_values[head], extracted_info.m_cf_id, + extracted_info.m_waiting_key, + extracted_info.m_exclusive}); + head = queue_parents[head]; + } + std::reverse(path.begin(), path.end()); + dlock_buffer_.AddNewPath(DeadlockPath(path)); DecrementWaitersImpl(txn, wait_ids); return true; } else if (!wait_txn_map_.Contains(next)) { next_ids = nullptr; continue; } else { - next_ids = &wait_txn_map_.Get(next); + parent = head; + next_ids = &(wait_txn_map_.Get(next).m_neighbors); } } // Wait cycle too big, just assume deadlock. + dlock_buffer_.AddNewPath(DeadlockPath(true)); DecrementWaitersImpl(txn, wait_ids); return true; } @@ -650,6 +730,13 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { return data; } +std::vector TransactionLockMgr::GetDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +void TransactionLockMgr::Resize(uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 6e542071c..abf7c5d3d 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -26,13 +26,35 @@ struct LockInfo; struct LockMap; struct LockMapStripe; +struct DeadlockInfoBuffer { + private: + std::vector paths_buffer_; + uint32_t buffer_idx_; + std::mutex paths_buffer_mutex_; + std::vector Normalize(); + + public: + explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks) + : paths_buffer_(n_latest_dlocks), buffer_idx_(0) {} + void AddNewPath(DeadlockPath path); + void Resize(uint32_t target_size); + std::vector PrepareBuffer(); +}; + +struct TrackedTrxInfo { + autovector m_neighbors; + uint32_t m_cf_id; + std::string m_waiting_key; + bool m_exclusive; +}; + class Slice; class PessimisticTransactionDB; class TransactionLockMgr { public: TransactionLockMgr(TransactionDB* txn_db, size_t default_num_stripes, - int64_t max_num_locks, + int64_t max_num_locks, uint32_t max_num_deadlocks, std::shared_ptr factory); ~TransactionLockMgr(); @@ -59,6 +81,8 @@ class TransactionLockMgr { using LockStatusData = std::unordered_multimap; LockStatusData GetLockStatusData(); + std::vector GetDeadlockInfoBuffer(); + void Resize(uint32_t); private: PessimisticTransactionDB* txn_db_impl_; @@ -92,7 +116,8 @@ class TransactionLockMgr { // Maps from waitee -> number of waiters. HashMap rev_wait_txn_map_; // Maps from waiter -> waitee. - HashMap> wait_txn_map_; + HashMap wait_txn_map_; + DeadlockInfoBuffer dlock_buffer_; // Used to allocate mutexes/condvars to use when locking keys std::shared_ptr mutex_factory_; @@ -116,7 +141,9 @@ class TransactionLockMgr { LockMapStripe* stripe, LockMap* lock_map, Env* env); bool IncrementWaiters(const PessimisticTransaction* txn, - const autovector& wait_ids); + const autovector& wait_ids, + const std::string& key, const uint32_t& cf_id, + const bool& exclusive); void DecrementWaiters(const PessimisticTransaction* txn, const autovector& wait_ids); void DecrementWaitersImpl(const PessimisticTransaction* txn, diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 148f1c41c..0eaaf20ac 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -462,6 +462,37 @@ TEST_P(TransactionTest, DeadlockCycleShared) { auto s = txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */); ASSERT_TRUE(s.IsDeadlock()); + + // Calculate next buffer len, plateau at 5 when 5 records are inserted. + const uint32_t curr_dlock_buffer_len_ = + (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_); + auto dlock_entry = dlock_buffer[0].path; + ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks); + + int64_t curr_waiting_key = 0; + + // Offset of each txn id from the root of the shared dlock tree's txn id. + int64_t offset_root = dlock_entry[0].m_txn_id - 1; + // Offset of the final entry in the dlock path from the root's txn id. + TransactionID leaf_id = + dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root; + + for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) { + auto dl_node = *it; + ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id); + ASSERT_EQ(dl_node.m_cf_id, 0); + ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key)); + ASSERT_EQ(dl_node.m_exclusive, true); + + if (curr_waiting_key == 0) { + curr_waiting_key = leaf_id; + } + curr_waiting_key /= 2; + leaf_id /= 2; + } } // Rollback the leaf transaction. @@ -473,6 +504,102 @@ TEST_P(TransactionTest, DeadlockCycleShared) { for (auto& t : threads) { t.join(); } + + // Downsize the buffer and verify the 3 latest deadlocks are preserved. + auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(3); + auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 3); + + for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) { + for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) { + ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id, + dlock_buffer_before_resize[i].path[j].m_txn_id); + } + } + + // Upsize the buffer and verify the 3 latest dealocks are preserved. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(5); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 3); + + for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) { + for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) { + ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id, + dlock_buffer_before_resize[i].path[j].m_txn_id); + } + } + + // Downsize to 0 and verify the size is consistent. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(0); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 0); + + // Upsize from 0 to verify the size is persistent. + dlock_buffer_before_resize = db->GetDeadlockInfoBuffer(); + db->SetDeadlockInfoBufferSize(3); + dlock_buffer_after_resize = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer_after_resize.size(), 0); + + // Contrived case of shared lock of cycle size 2 to verify that a shared + // lock causing a deadlock is correctly reported as "shared" in the buffer. + std::vector txns_shared(2); + + // Create a cycle of size 2. + for (uint32_t i = 0; i < 2; i++) { + txns_shared[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns_shared[i]); + auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr); + ASSERT_OK(s); + } + + std::atomic checkpoints_shared(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* arg) { checkpoints_shared.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector threads_shared; + for (uint32_t i = 0; i < 1; i++) { + std::function blocking_thread = [&, i] { + auto s = + txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); + ASSERT_OK(s); + txns_shared[i]->Rollback(); + delete txns_shared[i]; + }; + threads_shared.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints_shared.load() != 1) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle T2 -> T1 with a shared lock. + auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false); + ASSERT_TRUE(s.IsDeadlock()); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + + // Verify the size of the buffer and the single path. + ASSERT_EQ(dlock_buffer.size(), 1); + ASSERT_EQ(dlock_buffer[0].path.size(), 2); + + // Verify the exclusivity field of the transactions in the deadlock path. + ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive); + ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive); + txns_shared[1]->Rollback(); + delete txns_shared[1]; + + for (auto& t : threads_shared) { + t.join(); + } } TEST_P(TransactionTest, DeadlockCycle) { @@ -480,7 +607,8 @@ TEST_P(TransactionTest, DeadlockCycle) { ReadOptions read_options; TransactionOptions txn_options; - const uint32_t kMaxCycleLength = 50; + // offset by 2 from the max depth to test edge case + const uint32_t kMaxCycleLength = 52; txn_options.lock_timeout = 1000000; txn_options.deadlock_detect = true; @@ -489,6 +617,7 @@ TEST_P(TransactionTest, DeadlockCycle) { // Set up a long wait for chain like this: // // T1 -> T2 -> T3 -> ... -> Tlen + std::vector txns(len); for (uint32_t i = 0; i < len; i++) { @@ -509,8 +638,7 @@ TEST_P(TransactionTest, DeadlockCycle) { std::vector threads; for (uint32_t i = 0; i < len - 1; i++) { std::function blocking_thread = [&, i] { - auto s = - txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); + auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); ASSERT_OK(s); txns[i]->Rollback(); delete txns[i]; @@ -530,6 +658,39 @@ TEST_P(TransactionTest, DeadlockCycle) { auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr); ASSERT_TRUE(s.IsDeadlock()); + const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1); + uint32_t curr_waiting_key = 0; + TransactionID curr_txn_id = txns[0]->GetID(); + + auto dlock_buffer = db->GetDeadlockInfoBuffer(); + ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_); + uint32_t check_len = len; + bool check_limit_flag = false; + + // Special case for a deadlock path that exceeds the maximum depth. + if (len > 50) { + check_len = 0; + check_limit_flag = true; + } + auto dlock_entry = dlock_buffer[0].path; + ASSERT_EQ(dlock_entry.size(), check_len); + ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag); + + // Iterates backwards over path verifying decreasing txn_ids. + for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) { + auto dl_node = *it; + ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1); + ASSERT_EQ(dl_node.m_cf_id, 0); + ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key)); + ASSERT_EQ(dl_node.m_exclusive, true); + + curr_txn_id--; + if (curr_waiting_key == 0) { + curr_waiting_key = len; + } + curr_waiting_key--; + } + // Rollback the last transaction. txns[len - 1]->Rollback(); delete txns[len - 1];