Expose transaction id, lock state information and transaction wait information

Summary:
This diff does 3 things:

Expose TransactionID so that we can identify transactions when we retrieve locking and lock wait information. This is exposed as `Transaction::GetID`.

Expose lock state information by locking all stripes in all column families and copying their contents to a data structure. This is exposed as `TransactionDB::GetLockStatusData`.

Adds support for tracking the transaction and the key being waited on, and exposes this as `Transaction::GetWaitingTxn`.

Test Plan: unit tests

Reviewers: horuff, sdong

Reviewed By: sdong

Subscribers: vasilep, hermanlee4, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D64413
main
Manuel Ung 8 years ago
parent 6009c473c7
commit be1f1092c9
  1. 16
      include/rocksdb/utilities/transaction.h
  2. 12
      include/rocksdb/utilities/transaction_db.h
  3. 4
      utilities/transactions/transaction_db_impl.cc
  4. 2
      utilities/transactions/transaction_db_impl.h
  5. 3
      utilities/transactions/transaction_impl.cc
  6. 40
      utilities/transactions/transaction_impl.h
  7. 71
      utilities/transactions/transaction_lock_mgr.cc
  8. 12
      utilities/transactions/transaction_lock_mgr.h
  9. 57
      utilities/transactions/transaction_test.cc

@ -20,7 +20,9 @@ class Iterator;
class TransactionDB;
class WriteBatchWithIndex;
typedef std::string TransactionName;
using TransactionName = std::string;
using TransactionID = uint64_t;
// Provides notification to the caller of SetSnapshotOnNextOperation when
// the actual snapshot gets created
@ -389,11 +391,19 @@ class Transaction {
virtual void SetLogNumber(uint64_t log) { log_number_ = log; }
virtual uint64_t GetLogNumber() { return log_number_; }
virtual uint64_t GetLogNumber() const { return log_number_; }
virtual Status SetName(const TransactionName& name) = 0;
virtual TransactionName GetName() { return name_; }
virtual TransactionName GetName() const { return name_; }
virtual TransactionID GetID() const { return 0; }
virtual TransactionID GetWaitingTxn(uint32_t* column_family_id,
const std::string** key) const {
assert(false);
return 0;
}
enum ExecutionStatus {
STARTED = 0,

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <string>
#include <utility>
#include <vector>
#include "rocksdb/comparator.h"
@ -95,6 +96,11 @@ struct TransactionOptions {
int64_t expiration = -1;
};
struct KeyLockInfo {
std::string key;
TransactionID id;
};
class TransactionDB : public StackableDB {
public:
// Open a TransactionDB similar to DB::Open().
@ -148,6 +154,12 @@ class TransactionDB : public StackableDB {
virtual Transaction* GetTransactionByName(const TransactionName& name) = 0;
virtual void GetAllPreparedTransactions(std::vector<Transaction*>* trans) = 0;
// Returns set of all locks held.
//
// The mapping is column family id -> KeyLockInfo
virtual std::unordered_multimap<uint32_t, KeyLockInfo>
GetLockStatusData() = 0;
protected:
// To Create an TransactionDB, call Open()
explicit TransactionDB(DB* db) : StackableDB(db) {}

@ -441,6 +441,10 @@ void TransactionDBImpl::GetAllPreparedTransactions(
}
}
TransactionLockMgr::LockStatusData TransactionDBImpl::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
}
void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);

@ -94,6 +94,8 @@ class TransactionDBImpl : public TransactionDB {
// not thread safe. current use case is during recovery (single thread)
void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
TransactionLockMgr::LockStatusData GetLockStatusData() override;
private:
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,

@ -40,6 +40,9 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
: TransactionBaseImpl(txn_db->GetRootDB(), write_options),
txn_db_impl_(nullptr),
txn_id_(0),
waiting_txn_id_(0),
waiting_cf_id_(0),
waiting_key_(nullptr),
expiration_time_(0),
lock_timeout_(0) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);

@ -8,6 +8,7 @@
#ifndef ROCKSDB_LITE
#include <atomic>
#include <mutex>
#include <stack>
#include <string>
#include <unordered_map>
@ -27,8 +28,6 @@
namespace rocksdb {
using TransactionID = uint64_t;
class TransactionDBImpl;
class TransactionImpl : public TransactionBaseImpl {
@ -56,7 +55,23 @@ class TransactionImpl : public TransactionBaseImpl {
// Generate a new unique transaction identifier
static TransactionID GenTxnID();
TransactionID GetTxnID() const { return txn_id_; }
TransactionID GetID() const override { return txn_id_; }
TransactionID GetWaitingTxn(uint32_t* column_family_id,
const std::string** key) const override {
std::lock_guard<std::mutex> lock(wait_mutex_);
if (key) *key = waiting_key_;
if (column_family_id) *column_family_id = waiting_cf_id_;
return waiting_txn_id_;
}
void SetWaitingTxn(TransactionID id, uint32_t column_family_id,
const std::string* key) {
std::lock_guard<std::mutex> lock(wait_mutex_);
waiting_txn_id_ = id;
waiting_cf_id_ = column_family_id;
waiting_key_ = key;
}
// Returns the time (in microseconds according to Env->GetMicros())
// that this transaction will be expired. Returns 0 if this transaction does
@ -90,6 +105,25 @@ class TransactionImpl : public TransactionBaseImpl {
// Unique ID for this transaction
TransactionID txn_id_;
// ID for the transaction that is blocking the current transaction.
//
// 0 if current transaction is not waiting.
TransactionID waiting_txn_id_;
// The following two represents the (cf, key) that a transaction is waiting
// on.
//
// If waiting_key_ is not null, then the pointer should always point to
// a valid string object. The reason is that it is only non-null when the
// transaction is blocked in the TransactionLockMgr::AcquireWithTimeout
// function. At that point, the key string object is one of the function
// parameters.
uint32_t waiting_cf_id_;
const std::string* waiting_key_;
// Mutex protecting waiting_txn_id_, waiting_cf_id_ and waiting_key_.
mutable std::mutex wait_mutex_;
// If non-zero, this transaction should not be committed after this time (in
// microseconds according to Env->NowMicros())
uint64_t expiration_time_;

@ -24,6 +24,7 @@
#include "rocksdb/utilities/transaction_db_mutex.h"
#include "util/autovector.h"
#include "util/murmurhash.h"
#include "util/sync_point.h"
#include "util/thread_local.h"
#include "utilities/transactions/transaction_db_impl.h"
@ -213,7 +214,7 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
return expired;
}
Status TransactionLockMgr::TryLock(const TransactionImpl* txn,
Status TransactionLockMgr::TryLock(TransactionImpl* txn,
uint32_t column_family_id,
const std::string& key, Env* env) {
// Lookup lock map for this column family id
@ -232,18 +233,18 @@ Status TransactionLockMgr::TryLock(const TransactionImpl* txn,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime());
LockInfo lock_info(txn->GetID(), txn->GetExpirationTime());
int64_t timeout = txn->GetLockTimeout();
return AcquireWithTimeout(lock_map, stripe, key, env, timeout, lock_info);
return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
timeout, lock_info);
}
// Helper function for TryLock().
Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
LockMapStripe* stripe,
const std::string& key, Env* env,
int64_t timeout,
const LockInfo& lock_info) {
Status TransactionLockMgr::AcquireWithTimeout(
TransactionImpl* txn, LockMap* lock_map, LockMapStripe* stripe,
uint32_t column_family_id, const std::string& key, Env* env,
int64_t timeout, const LockInfo& lock_info) {
Status result;
uint64_t start_time = 0;
uint64_t end_time = 0;
@ -267,8 +268,9 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
// Acquire lock if we are able to
uint64_t expire_time_hint = 0;
result =
AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint);
TransactionID wait_id = 0;
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint, &wait_id);
if (!result.ok() && timeout != 0) {
// If we weren't able to acquire the lock, we will keep retrying as long
@ -287,6 +289,9 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
cv_end_time = end_time;
}
assert(result.IsBusy() || wait_id != 0);
txn->SetWaitingTxn(wait_id, column_family_id, &key);
TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
if (cv_end_time < 0) {
// Wait indefinitely
result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
@ -297,6 +302,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
cv_end_time - now);
}
}
txn->SetWaitingTxn(0, 0, nullptr);
if (result.IsTimedOut()) {
timed_out = true;
@ -307,7 +313,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map,
if (result.ok() || result.IsTimedOut()) {
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint);
&expire_time_hint, &wait_id);
}
} while (!result.ok() && !timed_out);
}
@ -325,7 +331,8 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
LockMapStripe* stripe,
const std::string& key, Env* env,
const LockInfo& txn_lock_info,
uint64_t* expire_time) {
uint64_t* expire_time,
TransactionID* txn_id) {
Status result;
// Check if this key is already locked
if (stripe->keys.find(key) != stripe->keys.end()) {
@ -341,6 +348,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
// lock_cnt does not change
} else {
result = Status::TimedOut(Status::SubCode::kLockTimeout);
*txn_id = lock_info.txn_id;
}
}
} else { // Lock not held.
@ -376,7 +384,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
TransactionID txn_id = txn->GetTxnID();
TransactionID txn_id = txn->GetID();
stripe->stripe_mutex->Lock();
@ -404,7 +412,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
void TransactionLockMgr::UnLock(const TransactionImpl* txn,
const TransactionKeyMap* key_map, Env* env) {
TransactionID txn_id = txn->GetTxnID();
TransactionID txn_id = txn->GetID();
for (auto& key_map_iter : *key_map) {
uint32_t column_family_id = key_map_iter.first;
@ -466,5 +474,40 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn,
}
}
TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
LockStatusData data;
std::vector<uint32_t> cf_ids;
// Lock order here is important. The correct order is lock_map_mutex_, then
// for every column family ID in ascending order lock every stripe in
// ascending order.
InstrumentedMutexLock l(&lock_map_mutex_);
for (const auto& lock_map_iter : lock_maps_) {
uint32_t cf_id = lock_map_iter.first;
cf_ids.push_back(cf_id);
const auto& lock_map = lock_map_iter.second;
// Iterate and lock all stripes in ascending order.
for (const auto& stripe : lock_map->lock_map_stripes_) {
stripe->stripe_mutex->Lock();
// Iterate through all keys in stripe, and copy to data.
for (const auto& it : stripe->keys) {
data.insert({cf_id, {it.first, it.second.txn_id}});
}
}
}
// Unlock everything. Unlocking order is not important.
for (auto i : cf_ids) {
const auto stripes = lock_maps_[i]->lock_map_stripes_;
for (const auto j : stripes) {
j->stripe_mutex->UnLock();
}
}
return data;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -9,6 +9,7 @@
#include <chrono>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "rocksdb/utilities/transaction.h"
@ -44,7 +45,7 @@ class TransactionLockMgr {
// Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key.
Status TryLock(const TransactionImpl* txn, uint32_t column_family_id,
Status TryLock(TransactionImpl* txn, uint32_t column_family_id,
const std::string& key, Env* env);
// Unlock a key locked by TryLock(). txn must be the same Transaction that
@ -54,6 +55,9 @@ class TransactionLockMgr {
void UnLock(TransactionImpl* txn, uint32_t column_family_id,
const std::string& key, Env* env);
using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>;
LockStatusData GetLockStatusData();
private:
TransactionDBImpl* txn_db_impl_;
@ -81,13 +85,15 @@ class TransactionLockMgr {
std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id);
Status AcquireWithTimeout(LockMap* lock_map, LockMapStripe* stripe,
Status AcquireWithTimeout(TransactionImpl* txn, LockMap* lock_map,
LockMapStripe* stripe, uint32_t column_family_id,
const std::string& key, Env* env, int64_t timeout,
const LockInfo& lock_info);
Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
const std::string& key, Env* env,
const LockInfo& lock_info, uint64_t* wait_time);
const LockInfo& lock_info, uint64_t* wait_time,
TransactionID* txn_id);
// No copying allowed
TransactionLockMgr(const TransactionLockMgr&);

@ -143,6 +143,7 @@ TEST_P(TransactionTest, SuccessTest) {
ASSERT_TRUE(txn);
ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_LE(0, txn->GetID());
s = txn->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
@ -167,6 +168,62 @@ TEST_P(TransactionTest, SuccessTest) {
delete txn;
}
TEST_P(TransactionTest, WaitingTxn) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
txn_options.lock_timeout = 1;
db->Put(write_options, Slice("foo"), Slice("bar"));
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
TransactionID id1 = txn1->GetID();
ASSERT_TRUE(txn1);
ASSERT_TRUE(txn2);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* arg) {
const std::string* key;
uint32_t cf_id;
TransactionID wait = txn2->GetWaitingTxn(&cf_id, &key);
ASSERT_EQ(*key, "foo");
ASSERT_EQ(wait, id1);
ASSERT_EQ(cf_id, 0);
});
s = txn1->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
auto lock_data = db->GetLockStatusData();
// Locked keys exist in one column family.
ASSERT_EQ(lock_data.size(), 1);
// Column family is 0 (default).
const auto& cf = *lock_data.cbegin();
ASSERT_EQ(cf.first, 0);
// The locked key is "foo" and is locked by txn1
const auto& key = cf.second;
ASSERT_EQ(key.key, "foo");
ASSERT_EQ(key.id, txn1->GetID());
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
s = txn2->GetForUpdate(read_options, "foo", &value);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
delete txn1;
delete txn2;
}
TEST_P(TransactionTest, CommitTimeBatchFailTest) {
WriteOptions write_options;
TransactionOptions txn_options;

Loading…
Cancel
Save