From 4edd39fda22f5e281efeb04ca14f5bb4dfdee5a1 Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Wed, 28 Sep 2016 01:23:33 -0700 Subject: [PATCH] Implement deadlock detection Summary: Implement deadlock detection. This is done by maintaining a TxnID -> TxnID map which represents the edges in the wait for graph (this is named `wait_txn_map_`). Test Plan: transaction_test Reviewers: IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D64491 --- include/rocksdb/status.h | 7 + include/rocksdb/utilities/transaction_db.h | 10 +- utilities/transactions/transaction_impl.cc | 7 +- utilities/transactions/transaction_impl.h | 10 ++ .../transactions/transaction_lock_mgr.cc | 101 +++++++++++--- utilities/transactions/transaction_lock_mgr.h | 26 +++- utilities/transactions/transaction_test.cc | 128 +++++++++++++++++- 7 files changed, 261 insertions(+), 28 deletions(-) diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 139942362..2c5090b4f 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -69,6 +69,7 @@ class Status { kLockTimeout = 2, kLockLimit = 3, kNoSpace = 4, + kDeadlock = 5, kMaxSubCode }; @@ -194,10 +195,16 @@ class Status { bool IsAborted() const { return code() == kAborted; } + bool IsLockLimit() const { + return code() == kAborted && subcode() == kLockLimit; + } + // Returns true iff the status indicates that a resource is Busy and // temporarily could not be acquired. bool IsBusy() const { return code() == kBusy; } + bool IsDeadlock() const { return code() == kBusy && subcode() == kDeadlock; } + // Returns true iff the status indicated that the operation has Expired. bool IsExpired() const { return code() == kExpired; } diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 5e352798e..77425ec4b 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -73,6 +73,10 @@ struct TransactionOptions { // Transaction::SetSnapshot(). bool set_snapshot = false; + // Setting to true means that before acquiring locks, this transaction will + // check if doing so will cause a deadlock. If so, it will return with + // Status::Busy. The user should retry their transaction. + bool deadlock_detect = false; // TODO(agiardullo): TransactionDB does not yet support comparators that allow // two non-equal keys to be equivalent. Ie, cmp->Compare(a,b) should only @@ -91,9 +95,11 @@ struct TransactionOptions { // last longer than this many milliseconds will fail to commit. If not set, // a forgotten transaction that is never committed, rolled back, or deleted // will never relinquish any locks it holds. This could prevent keys from - // being - // written by other writers. + // being written by other writers. int64_t expiration = -1; + + // The number of traversals to make during deadlock detection. + int64_t deadlock_detect_depth = 50; }; struct KeyLockInfo { diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 1425b6361..d6d1c2ac9 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -44,7 +44,9 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, waiting_cf_id_(0), waiting_key_(nullptr), expiration_time_(0), - lock_timeout_(0) { + lock_timeout_(0), + deadlock_detect_(false), + deadlock_detect_depth_(0) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); db_impl_ = dynamic_cast(txn_db->GetRootDB()); @@ -57,6 +59,9 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) { txn_state_ = STARTED; + deadlock_detect_ = txn_options.deadlock_detect; + deadlock_detect_depth_ = txn_options.deadlock_detect_depth; + lock_timeout_ = txn_options.lock_timeout * 1000; if (lock_timeout_ < 0) { // Lock timeout not set, use default diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index edc23cd3f..252a42ae5 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -91,6 +91,10 @@ class TransactionImpl : public TransactionBaseImpl { // Returns true if locks were stolen successfully, false otherwise. bool TryStealingLocks(); + bool IsDeadlockDetect() const { return deadlock_detect_; } + + int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } + protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool untracked = false) override; @@ -131,6 +135,12 @@ class TransactionImpl : public TransactionBaseImpl { // Timeout in microseconds when locking a key or -1 if there is no timeout. int64_t lock_timeout_; + // Whether to perform deadlock detection or not. + bool deadlock_detect_; + + // Whether to perform deadlock detection or not. + int64_t deadlock_detect_depth_; + void Clear() override; void Initialize(const TransactionOptions& txn_options); diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 79ae0bd34..792e014b1 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -106,8 +106,8 @@ TransactionLockMgr::TransactionLockMgr( : txn_db_impl_(nullptr), default_num_stripes_(default_num_stripes), max_num_locks_(max_num_locks), - mutex_factory_(mutex_factory), - lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) { + lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), + mutex_factory_(mutex_factory) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); } @@ -290,7 +290,20 @@ Status TransactionLockMgr::AcquireWithTimeout( } assert(result.IsBusy() || wait_id != 0); - txn->SetWaitingTxn(wait_id, column_family_id, &key); + + // We are dependent on a transaction to finish, so perform deadlock + // detection. + if (wait_id != 0) { + if (txn->IsDeadlockDetect()) { + if (IncrementWaiters(txn, wait_id)) { + result = Status::Busy(Status::SubCode::kDeadlock); + stripe->stripe_mutex->UnLock(); + return result; + } + } + txn->SetWaitingTxn(wait_id, column_family_id, &key); + } + TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); if (cv_end_time < 0) { // Wait indefinitely @@ -302,7 +315,13 @@ Status TransactionLockMgr::AcquireWithTimeout( cv_end_time - now); } } - txn->SetWaitingTxn(0, 0, nullptr); + + if (wait_id != 0) { + txn->SetWaitingTxn(0, 0, nullptr); + if (txn->IsDeadlockDetect()) { + DecrementWaiters(txn, wait_id); + } + } if (result.IsTimedOut()) { timed_out = true; @@ -323,6 +342,54 @@ Status TransactionLockMgr::AcquireWithTimeout( return result; } +void TransactionLockMgr::DecrementWaiters(const TransactionImpl* txn, + TransactionID wait_id) { + std::lock_guard lock(wait_txn_map_mutex_); + DecrementWaitersImpl(txn, wait_id); +} + +void TransactionLockMgr::DecrementWaitersImpl(const TransactionImpl* txn, + TransactionID wait_id) { + auto id = txn->GetID(); + assert(wait_txn_map_.count(id) > 0); + wait_txn_map_.erase(id); + + rev_wait_txn_map_[wait_id]--; + if (rev_wait_txn_map_[wait_id] == 0) { + rev_wait_txn_map_.erase(wait_id); + } +} + +bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn, + TransactionID wait_id) { + auto id = txn->GetID(); + std::lock_guard lock(wait_txn_map_mutex_); + assert(wait_txn_map_.count(id) == 0); + wait_txn_map_[id] = wait_id; + rev_wait_txn_map_[wait_id]++; + + // No deadlock if nobody is waiting on self. + if (rev_wait_txn_map_.count(id) == 0) { + return false; + } + + TransactionID next = wait_id; + for (int i = 0; i < txn->GetDeadlockDetectDepth(); i++) { + if (next == id) { + DecrementWaitersImpl(txn, wait_id); + return true; + } else if (wait_txn_map_.count(next) == 0) { + return false; + } else { + next = wait_txn_map_[next]; + } + } + + // Wait cycle too big, just assume deadlock. + DecrementWaitersImpl(txn, wait_id); + return true; +} + // Try to lock this key after we have acquired the mutex. // Sets *expire_time to the expiration time in microseconds // or 0 if no expiration. @@ -476,32 +543,32 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn, TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { LockStatusData data; - std::vector cf_ids; - // Lock order here is important. The correct order is lock_map_mutex_, then // for every column family ID in ascending order lock every stripe in // ascending order. InstrumentedMutexLock l(&lock_map_mutex_); - for (const auto& lock_map_iter : lock_maps_) { - uint32_t cf_id = lock_map_iter.first; - cf_ids.push_back(cf_id); - const auto& lock_map = lock_map_iter.second; + std::vector cf_ids; + for (const auto& map : lock_maps_) { + cf_ids.push_back(map.first); + } + std::sort(cf_ids.begin(), cf_ids.end()); + for (auto i : cf_ids) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; // Iterate and lock all stripes in ascending order. - for (const auto& stripe : lock_map->lock_map_stripes_) { - stripe->stripe_mutex->Lock(); - // Iterate through all keys in stripe, and copy to data. - for (const auto& it : stripe->keys) { - data.insert({cf_id, {it.first, it.second.txn_id}}); + for (const auto& j : stripes) { + j->stripe_mutex->Lock(); + for (const auto& it : j->keys) { + data.insert({i, {it.first, it.second.txn_id}}); } } } // Unlock everything. Unlocking order is not important. for (auto i : cf_ids) { - const auto stripes = lock_maps_[i]->lock_map_stripes_; - for (const auto j : stripes) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; + for (const auto& j : stripes) { j->stripe_mutex->UnLock(); } } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 8c650c03f..da9913e16 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -67,10 +67,13 @@ class TransactionLockMgr { // Limit on number of keys locked per column family const int64_t max_num_locks_; - // Used to allocate mutexes/condvars to use when locking keys - std::shared_ptr mutex_factory_; - - // Must be held when accessing/modifying lock_maps_ + // The following lock order must be satisfied in order to avoid deadlocking + // ourselves. + // - lock_map_mutex_ + // - stripe mutexes in ascending cf id, ascending stripe order + // - wait_txn_map_mutex_ + // + // Must be held when accessing/modifying lock_maps_. InstrumentedMutex lock_map_mutex_; // Map of ColumnFamilyId to locked key info @@ -81,6 +84,17 @@ class TransactionLockMgr { // to avoid acquiring a mutex in order to look up a LockMap std::unique_ptr lock_maps_cache_; + // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_. + std::mutex wait_txn_map_mutex_; + + // Maps from waitee -> number of waiters. + std::unordered_map rev_wait_txn_map_; + // Maps from waiter -> waitee. + std::unordered_map wait_txn_map_; + + // Used to allocate mutexes/condvars to use when locking keys + std::shared_ptr mutex_factory_; + bool IsLockExpired(const LockInfo& lock_info, Env* env, uint64_t* wait_time); std::shared_ptr GetLockMap(uint32_t column_family_id); @@ -95,6 +109,10 @@ class TransactionLockMgr { const LockInfo& lock_info, uint64_t* wait_time, TransactionID* txn_id); + bool IncrementWaiters(const TransactionImpl* txn, TransactionID wait_id); + void DecrementWaiters(const TransactionImpl* txn, TransactionID wait_id); + void DecrementWaitersImpl(const TransactionImpl* txn, TransactionID wait_id); + // No copying allowed TransactionLockMgr(const TransactionLockMgr&); void operator=(const TransactionLockMgr&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index ce538a690..3edefb7b9 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5,6 +5,7 @@ #ifndef ROCKSDB_LITE +#include #include #include @@ -17,6 +18,7 @@ #include "util/fault_injection_test_env.h" #include "util/logging.h" #include "util/random.h" +#include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" @@ -219,16 +221,16 @@ TEST_P(TransactionTest, WaitingTxn) { auto cf_iterator = lock_data.begin(); - // Column family is 0 (default). - ASSERT_EQ(cf_iterator->first, 0); + // Column family is 1 (cfa). + ASSERT_EQ(cf_iterator->first, 1); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); cf_iterator++; - // Column family is 1 (cfa). - ASSERT_EQ(cf_iterator->first, 1); + // Column family is 0 (default). + ASSERT_EQ(cf_iterator->first, 0); // The locked key is "foo" and is locked by txn1 ASSERT_EQ(cf_iterator->second.key, "foo"); ASSERT_EQ(cf_iterator->second.id, txn1->GetID()); @@ -247,6 +249,124 @@ TEST_P(TransactionTest, WaitingTxn) { delete txn2; } +TEST_P(TransactionTest, DeadlockCycle) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + const uint32_t kMaxCycleLength = 50; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + + for (uint32_t len = 2; len < kMaxCycleLength; len++) { + // Set up a long wait for chain like this: + // + // T1 -> T2 -> T3 -> ... -> Tlen + std::vector txns(len); + + for (uint32_t i = 0; i < len; i++) { + txns[i] = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txns[i]); + auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr); + ASSERT_OK(s); + } + + std::atomic checkpoints(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", + [&](void* arg) { checkpoints.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // We want the last transaction in the chain to block and hold everyone + // back. + std::vector threads; + for (uint32_t i = 0; i < len - 1; i++) { + std::function blocking_thread = [&, i] { + auto s = + txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); + ASSERT_OK(s); + txns[i]->Rollback(); + delete txns[i]; + }; + threads.emplace_back(blocking_thread); + } + + // Wait until all threads are waiting on each other. + while (checkpoints.load() != len - 1) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Complete the cycle Tlen -> T1 + auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr); + ASSERT_TRUE(s.IsDeadlock()); + + // Rollback the last transaction. + txns[len - 1]->Rollback(); + delete txns[len - 1]; + + for (auto& t : threads) { + t.join(); + } + } +} + +TEST_P(TransactionTest, DeadlockStress) { + const uint32_t NUM_TXN_THREADS = 10; + const uint32_t NUM_KEYS = 100; + const uint32_t NUM_ITERS = 100000; + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + + txn_options.lock_timeout = 1000000; + txn_options.deadlock_detect = true; + std::vector keys; + + for (uint32_t i = 0; i < NUM_KEYS; i++) { + db->Put(write_options, Slice(ToString(i)), Slice("")); + keys.push_back(ToString(i)); + } + + size_t tid = std::hash()(std::this_thread::get_id()); + Random rnd(static_cast(tid)); + std::function stress_thread = [&](uint32_t seed) { + std::default_random_engine g(seed); + + Transaction* txn; + for (uint32_t i = 0; i < NUM_ITERS; i++) { + txn = db->BeginTransaction(write_options, txn_options); + auto random_keys = keys; + std::shuffle(random_keys.begin(), random_keys.end(), g); + + // Lock keys in random order. + for (const auto& k : random_keys) { + auto s = txn->GetForUpdate(read_options, k, nullptr); + if (!s.ok()) { + ASSERT_TRUE(s.IsDeadlock()); + txn->Rollback(); + break; + } + } + + delete txn; + } + }; + + std::vector threads; + for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { + threads.emplace_back(stress_thread, rnd.Next()); + } + + for (auto& t : threads) { + t.join(); + } +} + TEST_P(TransactionTest, CommitTimeBatchFailTest) { WriteOptions write_options; TransactionOptions txn_options;