Added mechanism to track deadlock chain

Summary:
Changes:
* extended the wait_txn_map to track additional information
* designed circular buffer to store n latest deadlocks' information
* added test coverage to verify the additional information tracked is accurately stored in the buffer
Closes https://github.com/facebook/rocksdb/pull/2630

Differential Revision: D5478025

Pulled By: armishra

fbshipit-source-id: 2b138de7b5a73f5ca554fc3ff8220a3be49f39e7
main
Archit Mishra 8 years ago committed by Facebook Github Bot
parent c1384a7076
commit bddd5d3630
  1. 27
      include/rocksdb/utilities/transaction_db.h
  2. 10
      utilities/transactions/pessimistic_transaction_db.cc
  3. 4
      utilities/transactions/pessimistic_transaction_db.h
  4. 101
      utilities/transactions/transaction_lock_mgr.cc
  5. 33
      utilities/transactions/transaction_lock_mgr.h
  6. 167
      utilities/transactions/transaction_test.cc

@ -29,6 +29,8 @@ enum TxnDBWritePolicy {
WRITE_UNPREPARED // write data before the prepare phase of 2pc
};
const uint32_t kInitialMaxDeadlocks = 5;
struct TransactionDBOptions {
// Specifies the maximum number of keys that can be locked at the same time
// per column family.
@ -37,6 +39,9 @@ struct TransactionDBOptions {
// If this value is not positive, no limit will be enforced.
int64_t max_num_locks = -1;
// Stores the number of latest deadlocks to track
uint32_t max_num_deadlocks = kInitialMaxDeadlocks;
// Increasing this value will increase the concurrency by dividing the lock
// table (per column family) into more sub-tables, each with their own
// separate
@ -123,6 +128,26 @@ struct KeyLockInfo {
bool exclusive;
};
struct DeadlockInfo {
TransactionID m_txn_id;
uint32_t m_cf_id;
std::string m_waiting_key;
bool m_exclusive;
};
struct DeadlockPath {
std::vector<DeadlockInfo> path;
bool limit_exceeded;
explicit DeadlockPath(std::vector<DeadlockInfo> path_entry)
: path(path_entry), limit_exceeded(false) {}
// empty path, limit exceeded constructor and default constructor
explicit DeadlockPath(bool limit = false) : path(0), limit_exceeded(limit) {}
bool empty() { return path.empty() && !limit_exceeded; }
};
class TransactionDB : public StackableDB {
public:
// Open a TransactionDB similar to DB::Open().
@ -181,6 +206,8 @@ class TransactionDB : public StackableDB {
// The mapping is column family id -> KeyLockInfo
virtual std::unordered_multimap<uint32_t, KeyLockInfo>
GetLockStatusData() = 0;
virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0;
virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0;
protected:
// To Create an TransactionDB, call Open()

@ -28,6 +28,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
@ -57,6 +58,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
@ -486,6 +488,14 @@ PessimisticTransactionDB::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
}
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
return lock_mgr_.GetDeadlockInfoBuffer();
}
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
lock_mgr_.Resize(target_size);
}
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);

@ -100,6 +100,10 @@ class PessimisticTransactionDB : public TransactionDB {
void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
TransactionLockMgr::LockStatusData GetLockStatusData() override;
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;

@ -96,6 +96,64 @@ struct LockMap {
size_t GetStripe(const std::string& key) const;
};
void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
if (paths_buffer_.empty()) {
return;
}
paths_buffer_[buffer_idx_] = path;
buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size();
}
void DeadlockInfoBuffer::Resize(uint32_t target_size) {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
paths_buffer_ = Normalize();
// Drop the deadlocks that will no longer be needed ater the normalize
if (target_size < paths_buffer_.size()) {
paths_buffer_.erase(
paths_buffer_.begin(),
paths_buffer_.begin() + (paths_buffer_.size() - target_size));
buffer_idx_ = 0;
}
// Resize the buffer to the target size and restore the buffer's idx
else {
auto prev_size = paths_buffer_.size();
paths_buffer_.resize(target_size);
buffer_idx_ = (uint32_t)prev_size;
}
}
std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() {
auto working = paths_buffer_;
if (working.empty()) {
return working;
}
// Next write occurs at a nonexistent path's slot
if (paths_buffer_[buffer_idx_].empty()) {
working.resize(buffer_idx_);
} else {
std::rotate(working.begin(), working.begin() + buffer_idx_, working.end());
}
return working;
}
std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() {
std::lock_guard<std::mutex> lock(paths_buffer_mutex_);
// Reversing the normalized vector returns the latest deadlocks first
auto working = Normalize();
std::reverse(working.begin(), working.end());
return working;
}
namespace {
void UnrefLockMapsCache(void* ptr) {
// Called when a thread exits or a ThreadLocalPtr gets destroyed.
@ -107,11 +165,13 @@ void UnrefLockMapsCache(void* ptr) {
TransactionLockMgr::TransactionLockMgr(
TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks,
uint32_t max_num_deadlocks,
std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
: txn_db_impl_(nullptr),
default_num_stripes_(default_num_stripes),
max_num_locks_(max_num_locks),
lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
dlock_buffer_(max_num_deadlocks),
mutex_factory_(mutex_factory) {
assert(txn_db);
txn_db_impl_ =
@ -309,7 +369,8 @@ Status TransactionLockMgr::AcquireWithTimeout(
// detection.
if (wait_ids.size() != 0) {
if (txn->IsDeadlockDetect()) {
if (IncrementWaiters(txn, wait_ids)) {
if (IncrementWaiters(txn, wait_ids, key, column_family_id,
lock_info.exclusive)) {
result = Status::Busy(Status::SubCode::kDeadlock);
stripe->stripe_mutex->UnLock();
return result;
@ -380,12 +441,15 @@ void TransactionLockMgr::DecrementWaitersImpl(
bool TransactionLockMgr::IncrementWaiters(
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
const autovector<TransactionID>& wait_ids, const std::string& key,
const uint32_t& cf_id, const bool& exclusive) {
auto id = txn->GetID();
std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth());
std::vector<int> queue_parents(txn->GetDeadlockDetectDepth());
std::vector<TransactionID> queue_values(txn->GetDeadlockDetectDepth());
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
assert(!wait_txn_map_.Contains(id));
wait_txn_map_.Insert(id, wait_ids);
wait_txn_map_.Insert(id, {wait_ids, cf_id, key, exclusive});
for (auto wait_id : wait_ids) {
if (rev_wait_txn_map_.Contains(wait_id)) {
@ -401,13 +465,15 @@ bool TransactionLockMgr::IncrementWaiters(
}
const auto* next_ids = &wait_ids;
int parent = -1;
for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
int i = 0;
if (next_ids) {
for (; i < static_cast<int>(next_ids->size()) &&
tail + i < txn->GetDeadlockDetectDepth();
i++) {
queue[tail + i] = (*next_ids)[i];
queue_values[tail + i] = (*next_ids)[i];
queue_parents[tail + i] = parent;
}
tail += i;
}
@ -417,19 +483,33 @@ bool TransactionLockMgr::IncrementWaiters(
return false;
}
auto next = queue[head];
auto next = queue_values[head];
if (next == id) {
std::vector<DeadlockInfo> path;
while (head != -1) {
assert(wait_txn_map_.Contains(queue_values[head]));
auto extracted_info = wait_txn_map_.Get(queue_values[head]);
path.push_back({queue_values[head], extracted_info.m_cf_id,
extracted_info.m_waiting_key,
extracted_info.m_exclusive});
head = queue_parents[head];
}
std::reverse(path.begin(), path.end());
dlock_buffer_.AddNewPath(DeadlockPath(path));
DecrementWaitersImpl(txn, wait_ids);
return true;
} else if (!wait_txn_map_.Contains(next)) {
next_ids = nullptr;
continue;
} else {
next_ids = &wait_txn_map_.Get(next);
parent = head;
next_ids = &(wait_txn_map_.Get(next).m_neighbors);
}
}
// Wait cycle too big, just assume deadlock.
dlock_buffer_.AddNewPath(DeadlockPath(true));
DecrementWaitersImpl(txn, wait_ids);
return true;
}
@ -650,6 +730,13 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
return data;
}
std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
return dlock_buffer_.PrepareBuffer();
}
void TransactionLockMgr::Resize(uint32_t target_size) {
dlock_buffer_.Resize(target_size);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -26,13 +26,35 @@ struct LockInfo;
struct LockMap;
struct LockMapStripe;
struct DeadlockInfoBuffer {
private:
std::vector<DeadlockPath> paths_buffer_;
uint32_t buffer_idx_;
std::mutex paths_buffer_mutex_;
std::vector<DeadlockPath> Normalize();
public:
explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks)
: paths_buffer_(n_latest_dlocks), buffer_idx_(0) {}
void AddNewPath(DeadlockPath path);
void Resize(uint32_t target_size);
std::vector<DeadlockPath> PrepareBuffer();
};
struct TrackedTrxInfo {
autovector<TransactionID> m_neighbors;
uint32_t m_cf_id;
std::string m_waiting_key;
bool m_exclusive;
};
class Slice;
class PessimisticTransactionDB;
class TransactionLockMgr {
public:
TransactionLockMgr(TransactionDB* txn_db, size_t default_num_stripes,
int64_t max_num_locks,
int64_t max_num_locks, uint32_t max_num_deadlocks,
std::shared_ptr<TransactionDBMutexFactory> factory);
~TransactionLockMgr();
@ -59,6 +81,8 @@ class TransactionLockMgr {
using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>;
LockStatusData GetLockStatusData();
std::vector<DeadlockPath> GetDeadlockInfoBuffer();
void Resize(uint32_t);
private:
PessimisticTransactionDB* txn_db_impl_;
@ -92,7 +116,8 @@ class TransactionLockMgr {
// Maps from waitee -> number of waiters.
HashMap<TransactionID, int> rev_wait_txn_map_;
// Maps from waiter -> waitee.
HashMap<TransactionID, autovector<TransactionID>> wait_txn_map_;
HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_;
DeadlockInfoBuffer dlock_buffer_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
@ -116,7 +141,9 @@ class TransactionLockMgr {
LockMapStripe* stripe, LockMap* lock_map, Env* env);
bool IncrementWaiters(const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids);
const autovector<TransactionID>& wait_ids,
const std::string& key, const uint32_t& cf_id,
const bool& exclusive);
void DecrementWaiters(const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids);
void DecrementWaitersImpl(const PessimisticTransaction* txn,

@ -462,6 +462,37 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
auto s =
txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
ASSERT_TRUE(s.IsDeadlock());
// Calculate next buffer len, plateau at 5 when 5 records are inserted.
const uint32_t curr_dlock_buffer_len_ =
(i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
auto dlock_buffer = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
auto dlock_entry = dlock_buffer[0].path;
ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
int64_t curr_waiting_key = 0;
// Offset of each txn id from the root of the shared dlock tree's txn id.
int64_t offset_root = dlock_entry[0].m_txn_id - 1;
// Offset of the final entry in the dlock path from the root's txn id.
TransactionID leaf_id =
dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) {
auto dl_node = *it;
ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
ASSERT_EQ(dl_node.m_cf_id, 0);
ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
ASSERT_EQ(dl_node.m_exclusive, true);
if (curr_waiting_key == 0) {
curr_waiting_key = leaf_id;
}
curr_waiting_key /= 2;
leaf_id /= 2;
}
}
// Rollback the leaf transaction.
@ -473,6 +504,102 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
for (auto& t : threads) {
t.join();
}
// Downsize the buffer and verify the 3 latest deadlocks are preserved.
auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
db->SetDeadlockInfoBufferSize(3);
auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
dlock_buffer_before_resize[i].path[j].m_txn_id);
}
}
// Upsize the buffer and verify the 3 latest dealocks are preserved.
dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
db->SetDeadlockInfoBufferSize(5);
dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
dlock_buffer_before_resize[i].path[j].m_txn_id);
}
}
// Downsize to 0 and verify the size is consistent.
dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
db->SetDeadlockInfoBufferSize(0);
dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
// Upsize from 0 to verify the size is persistent.
dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
db->SetDeadlockInfoBufferSize(3);
dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
// Contrived case of shared lock of cycle size 2 to verify that a shared
// lock causing a deadlock is correctly reported as "shared" in the buffer.
std::vector<Transaction*> txns_shared(2);
// Create a cycle of size 2.
for (uint32_t i = 0; i < 2; i++) {
txns_shared[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns_shared[i]);
auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
ASSERT_OK(s);
}
std::atomic<uint32_t> checkpoints_shared(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
[&](void* arg) { checkpoints_shared.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::vector<port::Thread> threads_shared;
for (uint32_t i = 0; i < 1; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s =
txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
ASSERT_OK(s);
txns_shared[i]->Rollback();
delete txns_shared[i];
};
threads_shared.emplace_back(blocking_thread);
}
// Wait until all threads are waiting on each other.
while (checkpoints_shared.load() != 1) {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Complete the cycle T2 -> T1 with a shared lock.
auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
ASSERT_TRUE(s.IsDeadlock());
auto dlock_buffer = db->GetDeadlockInfoBuffer();
// Verify the size of the buffer and the single path.
ASSERT_EQ(dlock_buffer.size(), 1);
ASSERT_EQ(dlock_buffer[0].path.size(), 2);
// Verify the exclusivity field of the transactions in the deadlock path.
ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
txns_shared[1]->Rollback();
delete txns_shared[1];
for (auto& t : threads_shared) {
t.join();
}
}
TEST_P(TransactionTest, DeadlockCycle) {
@ -480,7 +607,8 @@ TEST_P(TransactionTest, DeadlockCycle) {
ReadOptions read_options;
TransactionOptions txn_options;
const uint32_t kMaxCycleLength = 50;
// offset by 2 from the max depth to test edge case
const uint32_t kMaxCycleLength = 52;
txn_options.lock_timeout = 1000000;
txn_options.deadlock_detect = true;
@ -489,6 +617,7 @@ TEST_P(TransactionTest, DeadlockCycle) {
// Set up a long wait for chain like this:
//
// T1 -> T2 -> T3 -> ... -> Tlen
std::vector<Transaction*> txns(len);
for (uint32_t i = 0; i < len; i++) {
@ -509,8 +638,7 @@ TEST_P(TransactionTest, DeadlockCycle) {
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < len - 1; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s =
txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
ASSERT_OK(s);
txns[i]->Rollback();
delete txns[i];
@ -530,6 +658,39 @@ TEST_P(TransactionTest, DeadlockCycle) {
auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
ASSERT_TRUE(s.IsDeadlock());
const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
uint32_t curr_waiting_key = 0;
TransactionID curr_txn_id = txns[0]->GetID();
auto dlock_buffer = db->GetDeadlockInfoBuffer();
ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
uint32_t check_len = len;
bool check_limit_flag = false;
// Special case for a deadlock path that exceeds the maximum depth.
if (len > 50) {
check_len = 0;
check_limit_flag = true;
}
auto dlock_entry = dlock_buffer[0].path;
ASSERT_EQ(dlock_entry.size(), check_len);
ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
// Iterates backwards over path verifying decreasing txn_ids.
for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) {
auto dl_node = *it;
ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
ASSERT_EQ(dl_node.m_cf_id, 0);
ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
ASSERT_EQ(dl_node.m_exclusive, true);
curr_txn_id--;
if (curr_waiting_key == 0) {
curr_waiting_key = len;
}
curr_waiting_key--;
}
// Rollback the last transaction.
txns[len - 1]->Rollback();
delete txns[len - 1];

Loading…
Cancel
Save