Implement deadlock detection

Summary: Implement deadlock detection. This is done by maintaining a TxnID -> TxnID map which represents the edges in the wait for graph (this is named `wait_txn_map_`).

Test Plan: transaction_test

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D64491
main
Manuel Ung 8 years ago
parent 48fd619a47
commit 4edd39fda2
  1. 7
      include/rocksdb/status.h
  2. 10
      include/rocksdb/utilities/transaction_db.h
  3. 7
      utilities/transactions/transaction_impl.cc
  4. 10
      utilities/transactions/transaction_impl.h
  5. 97
      utilities/transactions/transaction_lock_mgr.cc
  6. 26
      utilities/transactions/transaction_lock_mgr.h
  7. 128
      utilities/transactions/transaction_test.cc

@ -69,6 +69,7 @@ class Status {
kLockTimeout = 2,
kLockLimit = 3,
kNoSpace = 4,
kDeadlock = 5,
kMaxSubCode
};
@ -194,10 +195,16 @@ class Status {
bool IsAborted() const { return code() == kAborted; }
bool IsLockLimit() const {
return code() == kAborted && subcode() == kLockLimit;
}
// Returns true iff the status indicates that a resource is Busy and
// temporarily could not be acquired.
bool IsBusy() const { return code() == kBusy; }
bool IsDeadlock() const { return code() == kBusy && subcode() == kDeadlock; }
// Returns true iff the status indicated that the operation has Expired.
bool IsExpired() const { return code() == kExpired; }

@ -73,6 +73,10 @@ struct TransactionOptions {
// Transaction::SetSnapshot().
bool set_snapshot = false;
// Setting to true means that before acquiring locks, this transaction will
// check if doing so will cause a deadlock. If so, it will return with
// Status::Busy. The user should retry their transaction.
bool deadlock_detect = false;
// TODO(agiardullo): TransactionDB does not yet support comparators that allow
// two non-equal keys to be equivalent. Ie, cmp->Compare(a,b) should only
@ -91,9 +95,11 @@ struct TransactionOptions {
// last longer than this many milliseconds will fail to commit. If not set,
// a forgotten transaction that is never committed, rolled back, or deleted
// will never relinquish any locks it holds. This could prevent keys from
// being
// written by other writers.
// being written by other writers.
int64_t expiration = -1;
// The number of traversals to make during deadlock detection.
int64_t deadlock_detect_depth = 50;
};
struct KeyLockInfo {

@ -44,7 +44,9 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
waiting_cf_id_(0),
waiting_key_(nullptr),
expiration_time_(0),
lock_timeout_(0) {
lock_timeout_(0),
deadlock_detect_(false),
deadlock_detect_depth_(0) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_);
db_impl_ = dynamic_cast<DBImpl*>(txn_db->GetRootDB());
@ -57,6 +59,9 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
txn_state_ = STARTED;
deadlock_detect_ = txn_options.deadlock_detect;
deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
lock_timeout_ = txn_options.lock_timeout * 1000;
if (lock_timeout_ < 0) {
// Lock timeout not set, use default

@ -91,6 +91,10 @@ class TransactionImpl : public TransactionBaseImpl {
// Returns true if locks were stolen successfully, false otherwise.
bool TryStealingLocks();
bool IsDeadlockDetect() const { return deadlock_detect_; }
int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool untracked = false) override;
@ -131,6 +135,12 @@ class TransactionImpl : public TransactionBaseImpl {
// Timeout in microseconds when locking a key or -1 if there is no timeout.
int64_t lock_timeout_;
// Whether to perform deadlock detection or not.
bool deadlock_detect_;
// Whether to perform deadlock detection or not.
int64_t deadlock_detect_depth_;
void Clear() override;
void Initialize(const TransactionOptions& txn_options);

@ -106,8 +106,8 @@ TransactionLockMgr::TransactionLockMgr(
: txn_db_impl_(nullptr),
default_num_stripes_(default_num_stripes),
max_num_locks_(max_num_locks),
mutex_factory_(mutex_factory),
lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)) {
lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
mutex_factory_(mutex_factory) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_);
}
@ -290,7 +290,20 @@ Status TransactionLockMgr::AcquireWithTimeout(
}
assert(result.IsBusy() || wait_id != 0);
// We are dependent on a transaction to finish, so perform deadlock
// detection.
if (wait_id != 0) {
if (txn->IsDeadlockDetect()) {
if (IncrementWaiters(txn, wait_id)) {
result = Status::Busy(Status::SubCode::kDeadlock);
stripe->stripe_mutex->UnLock();
return result;
}
}
txn->SetWaitingTxn(wait_id, column_family_id, &key);
}
TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
if (cv_end_time < 0) {
// Wait indefinitely
@ -302,7 +315,13 @@ Status TransactionLockMgr::AcquireWithTimeout(
cv_end_time - now);
}
}
if (wait_id != 0) {
txn->SetWaitingTxn(0, 0, nullptr);
if (txn->IsDeadlockDetect()) {
DecrementWaiters(txn, wait_id);
}
}
if (result.IsTimedOut()) {
timed_out = true;
@ -323,6 +342,54 @@ Status TransactionLockMgr::AcquireWithTimeout(
return result;
}
void TransactionLockMgr::DecrementWaiters(const TransactionImpl* txn,
TransactionID wait_id) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_id);
}
void TransactionLockMgr::DecrementWaitersImpl(const TransactionImpl* txn,
TransactionID wait_id) {
auto id = txn->GetID();
assert(wait_txn_map_.count(id) > 0);
wait_txn_map_.erase(id);
rev_wait_txn_map_[wait_id]--;
if (rev_wait_txn_map_[wait_id] == 0) {
rev_wait_txn_map_.erase(wait_id);
}
}
bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn,
TransactionID wait_id) {
auto id = txn->GetID();
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
assert(wait_txn_map_.count(id) == 0);
wait_txn_map_[id] = wait_id;
rev_wait_txn_map_[wait_id]++;
// No deadlock if nobody is waiting on self.
if (rev_wait_txn_map_.count(id) == 0) {
return false;
}
TransactionID next = wait_id;
for (int i = 0; i < txn->GetDeadlockDetectDepth(); i++) {
if (next == id) {
DecrementWaitersImpl(txn, wait_id);
return true;
} else if (wait_txn_map_.count(next) == 0) {
return false;
} else {
next = wait_txn_map_[next];
}
}
// Wait cycle too big, just assume deadlock.
DecrementWaitersImpl(txn, wait_id);
return true;
}
// Try to lock this key after we have acquired the mutex.
// Sets *expire_time to the expiration time in microseconds
// or 0 if no expiration.
@ -476,32 +543,32 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn,
TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
LockStatusData data;
std::vector<uint32_t> cf_ids;
// Lock order here is important. The correct order is lock_map_mutex_, then
// for every column family ID in ascending order lock every stripe in
// ascending order.
InstrumentedMutexLock l(&lock_map_mutex_);
for (const auto& lock_map_iter : lock_maps_) {
uint32_t cf_id = lock_map_iter.first;
cf_ids.push_back(cf_id);
const auto& lock_map = lock_map_iter.second;
std::vector<uint32_t> cf_ids;
for (const auto& map : lock_maps_) {
cf_ids.push_back(map.first);
}
std::sort(cf_ids.begin(), cf_ids.end());
for (auto i : cf_ids) {
const auto& stripes = lock_maps_[i]->lock_map_stripes_;
// Iterate and lock all stripes in ascending order.
for (const auto& stripe : lock_map->lock_map_stripes_) {
stripe->stripe_mutex->Lock();
// Iterate through all keys in stripe, and copy to data.
for (const auto& it : stripe->keys) {
data.insert({cf_id, {it.first, it.second.txn_id}});
for (const auto& j : stripes) {
j->stripe_mutex->Lock();
for (const auto& it : j->keys) {
data.insert({i, {it.first, it.second.txn_id}});
}
}
}
// Unlock everything. Unlocking order is not important.
for (auto i : cf_ids) {
const auto stripes = lock_maps_[i]->lock_map_stripes_;
for (const auto j : stripes) {
const auto& stripes = lock_maps_[i]->lock_map_stripes_;
for (const auto& j : stripes) {
j->stripe_mutex->UnLock();
}
}

@ -67,10 +67,13 @@ class TransactionLockMgr {
// Limit on number of keys locked per column family
const int64_t max_num_locks_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
// Must be held when accessing/modifying lock_maps_
// The following lock order must be satisfied in order to avoid deadlocking
// ourselves.
// - lock_map_mutex_
// - stripe mutexes in ascending cf id, ascending stripe order
// - wait_txn_map_mutex_
//
// Must be held when accessing/modifying lock_maps_.
InstrumentedMutex lock_map_mutex_;
// Map of ColumnFamilyId to locked key info
@ -81,6 +84,17 @@ class TransactionLockMgr {
// to avoid acquiring a mutex in order to look up a LockMap
std::unique_ptr<ThreadLocalPtr> lock_maps_cache_;
// Must be held when modifying wait_txn_map_ and rev_wait_txn_map_.
std::mutex wait_txn_map_mutex_;
// Maps from waitee -> number of waiters.
std::unordered_map<TransactionID, int> rev_wait_txn_map_;
// Maps from waiter -> waitee.
std::unordered_map<TransactionID, TransactionID> wait_txn_map_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
bool IsLockExpired(const LockInfo& lock_info, Env* env, uint64_t* wait_time);
std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id);
@ -95,6 +109,10 @@ class TransactionLockMgr {
const LockInfo& lock_info, uint64_t* wait_time,
TransactionID* txn_id);
bool IncrementWaiters(const TransactionImpl* txn, TransactionID wait_id);
void DecrementWaiters(const TransactionImpl* txn, TransactionID wait_id);
void DecrementWaitersImpl(const TransactionImpl* txn, TransactionID wait_id);
// No copying allowed
TransactionLockMgr(const TransactionLockMgr&);
void operator=(const TransactionLockMgr&);

@ -5,6 +5,7 @@
#ifndef ROCKSDB_LITE
#include <algorithm>
#include <string>
#include <thread>
@ -17,6 +18,7 @@
#include "util/fault_injection_test_env.h"
#include "util/logging.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -219,16 +221,16 @@ TEST_P(TransactionTest, WaitingTxn) {
auto cf_iterator = lock_data.begin();
// Column family is 0 (default).
ASSERT_EQ(cf_iterator->first, 0);
// Column family is 1 (cfa).
ASSERT_EQ(cf_iterator->first, 1);
// The locked key is "foo" and is locked by txn1
ASSERT_EQ(cf_iterator->second.key, "foo");
ASSERT_EQ(cf_iterator->second.id, txn1->GetID());
cf_iterator++;
// Column family is 1 (cfa).
ASSERT_EQ(cf_iterator->first, 1);
// Column family is 0 (default).
ASSERT_EQ(cf_iterator->first, 0);
// The locked key is "foo" and is locked by txn1
ASSERT_EQ(cf_iterator->second.key, "foo");
ASSERT_EQ(cf_iterator->second.id, txn1->GetID());
@ -247,6 +249,124 @@ TEST_P(TransactionTest, WaitingTxn) {
delete txn2;
}
TEST_P(TransactionTest, DeadlockCycle) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
const uint32_t kMaxCycleLength = 50;
txn_options.lock_timeout = 1000000;
txn_options.deadlock_detect = true;
for (uint32_t len = 2; len < kMaxCycleLength; len++) {
// Set up a long wait for chain like this:
//
// T1 -> T2 -> T3 -> ... -> Tlen
std::vector<Transaction*> txns(len);
for (uint32_t i = 0; i < len; i++) {
txns[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns[i]);
auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
ASSERT_OK(s);
}
std::atomic<uint32_t> checkpoints(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
[&](void* arg) { checkpoints.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// We want the last transaction in the chain to block and hold everyone
// back.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < len - 1; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s =
txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
ASSERT_OK(s);
txns[i]->Rollback();
delete txns[i];
};
threads.emplace_back(blocking_thread);
}
// Wait until all threads are waiting on each other.
while (checkpoints.load() != len - 1) {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Complete the cycle Tlen -> T1
auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
ASSERT_TRUE(s.IsDeadlock());
// Rollback the last transaction.
txns[len - 1]->Rollback();
delete txns[len - 1];
for (auto& t : threads) {
t.join();
}
}
}
TEST_P(TransactionTest, DeadlockStress) {
const uint32_t NUM_TXN_THREADS = 10;
const uint32_t NUM_KEYS = 100;
const uint32_t NUM_ITERS = 100000;
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.lock_timeout = 1000000;
txn_options.deadlock_detect = true;
std::vector<std::string> keys;
for (uint32_t i = 0; i < NUM_KEYS; i++) {
db->Put(write_options, Slice(ToString(i)), Slice(""));
keys.push_back(ToString(i));
}
size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
Random rnd(static_cast<uint32_t>(tid));
std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
std::default_random_engine g(seed);
Transaction* txn;
for (uint32_t i = 0; i < NUM_ITERS; i++) {
txn = db->BeginTransaction(write_options, txn_options);
auto random_keys = keys;
std::shuffle(random_keys.begin(), random_keys.end(), g);
// Lock keys in random order.
for (const auto& k : random_keys) {
auto s = txn->GetForUpdate(read_options, k, nullptr);
if (!s.ok()) {
ASSERT_TRUE(s.IsDeadlock());
txn->Rollback();
break;
}
}
delete txn;
}
};
std::vector<std::thread> threads;
for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
threads.emplace_back(stress_thread, rnd.Next());
}
for (auto& t : threads) {
t.join();
}
}
TEST_P(TransactionTest, CommitTimeBatchFailTest) {
WriteOptions write_options;
TransactionOptions txn_options;

Loading…
Cancel
Save