From be1f1092c9247a25a75847d277c4d5392ca3d60d Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Tue, 27 Sep 2016 17:43:06 -0700 Subject: [PATCH] Expose transaction id, lock state information and transaction wait information Summary: This diff does 3 things: Expose TransactionID so that we can identify transactions when we retrieve locking and lock wait information. This is exposed as `Transaction::GetID`. Expose lock state information by locking all stripes in all column families and copying their contents to a data structure. This is exposed as `TransactionDB::GetLockStatusData`. Adds support for tracking the transaction and the key being waited on, and exposes this as `Transaction::GetWaitingTxn`. Test Plan: unit tests Reviewers: horuff, sdong Reviewed By: sdong Subscribers: vasilep, hermanlee4, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D64413 --- include/rocksdb/utilities/transaction.h | 16 ++++- include/rocksdb/utilities/transaction_db.h | 12 ++++ utilities/transactions/transaction_db_impl.cc | 4 ++ utilities/transactions/transaction_db_impl.h | 2 + utilities/transactions/transaction_impl.cc | 3 + utilities/transactions/transaction_impl.h | 40 ++++++++++- .../transactions/transaction_lock_mgr.cc | 71 +++++++++++++++---- utilities/transactions/transaction_lock_mgr.h | 12 +++- utilities/transactions/transaction_test.cc | 57 +++++++++++++++ 9 files changed, 194 insertions(+), 23 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 5a6c0cf45..9669c9f49 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -20,7 +20,9 @@ class Iterator; class TransactionDB; class WriteBatchWithIndex; -typedef std::string TransactionName; +using TransactionName = std::string; + +using TransactionID = uint64_t; // Provides notification to the caller of SetSnapshotOnNextOperation when // the actual snapshot gets created @@ -389,11 +391,19 @@ class Transaction { virtual void SetLogNumber(uint64_t log) { log_number_ = log; } - virtual uint64_t GetLogNumber() { return log_number_; } + virtual uint64_t GetLogNumber() const { return log_number_; } virtual Status SetName(const TransactionName& name) = 0; - virtual TransactionName GetName() { return name_; } + virtual TransactionName GetName() const { return name_; } + + virtual TransactionID GetID() const { return 0; } + + virtual TransactionID GetWaitingTxn(uint32_t* column_family_id, + const std::string** key) const { + assert(false); + return 0; + } enum ExecutionStatus { STARTED = 0, diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index f93f624bb..5e352798e 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include "rocksdb/comparator.h" @@ -95,6 +96,11 @@ struct TransactionOptions { int64_t expiration = -1; }; +struct KeyLockInfo { + std::string key; + TransactionID id; +}; + class TransactionDB : public StackableDB { public: // Open a TransactionDB similar to DB::Open(). @@ -148,6 +154,12 @@ class TransactionDB : public StackableDB { virtual Transaction* GetTransactionByName(const TransactionName& name) = 0; virtual void GetAllPreparedTransactions(std::vector* trans) = 0; + // Returns set of all locks held. + // + // The mapping is column family id -> KeyLockInfo + virtual std::unordered_multimap + GetLockStatusData() = 0; + protected: // To Create an TransactionDB, call Open() explicit TransactionDB(DB* db) : StackableDB(db) {} diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 3971591cd..8d3874d48 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -441,6 +441,10 @@ void TransactionDBImpl::GetAllPreparedTransactions( } } +TransactionLockMgr::LockStatusData TransactionDBImpl::GetLockStatusData() { + return lock_mgr_.GetLockStatusData(); +} + void TransactionDBImpl::RegisterTransaction(Transaction* txn) { assert(txn); assert(txn->GetName().length() > 0); diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index cb1bbb589..a4792ddb6 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -94,6 +94,8 @@ class TransactionDBImpl : public TransactionDB { // not thread safe. current use case is during recovery (single thread) void GetAllPreparedTransactions(std::vector* trans) override; + TransactionLockMgr::LockStatusData GetLockStatusData() override; + private: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index b3e79487e..fe4a959b3 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -40,6 +40,9 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), txn_id_(0), + waiting_txn_id_(0), + waiting_cf_id_(0), + waiting_key_(nullptr), expiration_time_(0), lock_timeout_(0) { txn_db_impl_ = dynamic_cast(txn_db); diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index f1bd10bc1..edc23cd3f 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -8,6 +8,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include #include @@ -27,8 +28,6 @@ namespace rocksdb { -using TransactionID = uint64_t; - class TransactionDBImpl; class TransactionImpl : public TransactionBaseImpl { @@ -56,7 +55,23 @@ class TransactionImpl : public TransactionBaseImpl { // Generate a new unique transaction identifier static TransactionID GenTxnID(); - TransactionID GetTxnID() const { return txn_id_; } + TransactionID GetID() const override { return txn_id_; } + + TransactionID GetWaitingTxn(uint32_t* column_family_id, + const std::string** key) const override { + std::lock_guard lock(wait_mutex_); + if (key) *key = waiting_key_; + if (column_family_id) *column_family_id = waiting_cf_id_; + return waiting_txn_id_; + } + + void SetWaitingTxn(TransactionID id, uint32_t column_family_id, + const std::string* key) { + std::lock_guard lock(wait_mutex_); + waiting_txn_id_ = id; + waiting_cf_id_ = column_family_id; + waiting_key_ = key; + } // Returns the time (in microseconds according to Env->GetMicros()) // that this transaction will be expired. Returns 0 if this transaction does @@ -90,6 +105,25 @@ class TransactionImpl : public TransactionBaseImpl { // Unique ID for this transaction TransactionID txn_id_; + // ID for the transaction that is blocking the current transaction. + // + // 0 if current transaction is not waiting. + TransactionID waiting_txn_id_; + + // The following two represents the (cf, key) that a transaction is waiting + // on. + // + // If waiting_key_ is not null, then the pointer should always point to + // a valid string object. The reason is that it is only non-null when the + // transaction is blocked in the TransactionLockMgr::AcquireWithTimeout + // function. At that point, the key string object is one of the function + // parameters. + uint32_t waiting_cf_id_; + const std::string* waiting_key_; + + // Mutex protecting waiting_txn_id_, waiting_cf_id_ and waiting_key_. + mutable std::mutex wait_mutex_; + // If non-zero, this transaction should not be committed after this time (in // microseconds according to Env->NowMicros()) uint64_t expiration_time_; diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 28e85986a..79ae0bd34 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -24,6 +24,7 @@ #include "rocksdb/utilities/transaction_db_mutex.h" #include "util/autovector.h" #include "util/murmurhash.h" +#include "util/sync_point.h" #include "util/thread_local.h" #include "utilities/transactions/transaction_db_impl.h" @@ -213,7 +214,7 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env, return expired; } -Status TransactionLockMgr::TryLock(const TransactionImpl* txn, +Status TransactionLockMgr::TryLock(TransactionImpl* txn, uint32_t column_family_id, const std::string& key, Env* env) { // Lookup lock map for this column family id @@ -232,18 +233,18 @@ Status TransactionLockMgr::TryLock(const TransactionImpl* txn, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - LockInfo lock_info(txn->GetTxnID(), txn->GetExpirationTime()); + LockInfo lock_info(txn->GetID(), txn->GetExpirationTime()); int64_t timeout = txn->GetLockTimeout(); - return AcquireWithTimeout(lock_map, stripe, key, env, timeout, lock_info); + return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env, + timeout, lock_info); } // Helper function for TryLock(). -Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, - LockMapStripe* stripe, - const std::string& key, Env* env, - int64_t timeout, - const LockInfo& lock_info) { +Status TransactionLockMgr::AcquireWithTimeout( + TransactionImpl* txn, LockMap* lock_map, LockMapStripe* stripe, + uint32_t column_family_id, const std::string& key, Env* env, + int64_t timeout, const LockInfo& lock_info) { Status result; uint64_t start_time = 0; uint64_t end_time = 0; @@ -267,8 +268,9 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, // Acquire lock if we are able to uint64_t expire_time_hint = 0; - result = - AcquireLocked(lock_map, stripe, key, env, lock_info, &expire_time_hint); + TransactionID wait_id = 0; + result = AcquireLocked(lock_map, stripe, key, env, lock_info, + &expire_time_hint, &wait_id); if (!result.ok() && timeout != 0) { // If we weren't able to acquire the lock, we will keep retrying as long @@ -287,6 +289,9 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, cv_end_time = end_time; } + assert(result.IsBusy() || wait_id != 0); + txn->SetWaitingTxn(wait_id, column_family_id, &key); + TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); if (cv_end_time < 0) { // Wait indefinitely result = stripe->stripe_cv->Wait(stripe->stripe_mutex); @@ -297,6 +302,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, cv_end_time - now); } } + txn->SetWaitingTxn(0, 0, nullptr); if (result.IsTimedOut()) { timed_out = true; @@ -307,7 +313,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, if (result.ok() || result.IsTimedOut()) { result = AcquireLocked(lock_map, stripe, key, env, lock_info, - &expire_time_hint); + &expire_time_hint, &wait_id); } } while (!result.ok() && !timed_out); } @@ -325,7 +331,8 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, const LockInfo& txn_lock_info, - uint64_t* expire_time) { + uint64_t* expire_time, + TransactionID* txn_id) { Status result; // Check if this key is already locked if (stripe->keys.find(key) != stripe->keys.end()) { @@ -341,6 +348,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, // lock_cnt does not change } else { result = Status::TimedOut(Status::SubCode::kLockTimeout); + *txn_id = lock_info.txn_id; } } } else { // Lock not held. @@ -376,7 +384,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - TransactionID txn_id = txn->GetTxnID(); + TransactionID txn_id = txn->GetID(); stripe->stripe_mutex->Lock(); @@ -404,7 +412,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, void TransactionLockMgr::UnLock(const TransactionImpl* txn, const TransactionKeyMap* key_map, Env* env) { - TransactionID txn_id = txn->GetTxnID(); + TransactionID txn_id = txn->GetID(); for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; @@ -466,5 +474,40 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn, } } +TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { + LockStatusData data; + std::vector cf_ids; + + // Lock order here is important. The correct order is lock_map_mutex_, then + // for every column family ID in ascending order lock every stripe in + // ascending order. + InstrumentedMutexLock l(&lock_map_mutex_); + + for (const auto& lock_map_iter : lock_maps_) { + uint32_t cf_id = lock_map_iter.first; + cf_ids.push_back(cf_id); + const auto& lock_map = lock_map_iter.second; + + // Iterate and lock all stripes in ascending order. + for (const auto& stripe : lock_map->lock_map_stripes_) { + stripe->stripe_mutex->Lock(); + // Iterate through all keys in stripe, and copy to data. + for (const auto& it : stripe->keys) { + data.insert({cf_id, {it.first, it.second.txn_id}}); + } + } + } + + // Unlock everything. Unlocking order is not important. + for (auto i : cf_ids) { + const auto stripes = lock_maps_[i]->lock_map_stripes_; + for (const auto j : stripes) { + j->stripe_mutex->UnLock(); + } + } + + return data; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 5018f39c3..8c650c03f 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "rocksdb/utilities/transaction.h" @@ -44,7 +45,7 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. - Status TryLock(const TransactionImpl* txn, uint32_t column_family_id, + Status TryLock(TransactionImpl* txn, uint32_t column_family_id, const std::string& key, Env* env); // Unlock a key locked by TryLock(). txn must be the same Transaction that @@ -54,6 +55,9 @@ class TransactionLockMgr { void UnLock(TransactionImpl* txn, uint32_t column_family_id, const std::string& key, Env* env); + using LockStatusData = std::unordered_multimap; + LockStatusData GetLockStatusData(); + private: TransactionDBImpl* txn_db_impl_; @@ -81,13 +85,15 @@ class TransactionLockMgr { std::shared_ptr GetLockMap(uint32_t column_family_id); - Status AcquireWithTimeout(LockMap* lock_map, LockMapStripe* stripe, + Status AcquireWithTimeout(TransactionImpl* txn, LockMap* lock_map, + LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info); Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe, const std::string& key, Env* env, - const LockInfo& lock_info, uint64_t* wait_time); + const LockInfo& lock_info, uint64_t* wait_time, + TransactionID* txn_id); // No copying allowed TransactionLockMgr(const TransactionLockMgr&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 5bf683d4a..ac91b0c90 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -143,6 +143,7 @@ TEST_P(TransactionTest, SuccessTest) { ASSERT_TRUE(txn); ASSERT_EQ(0, txn->GetNumPuts()); + ASSERT_LE(0, txn->GetID()); s = txn->GetForUpdate(read_options, "foo", &value); ASSERT_OK(s); @@ -167,6 +168,62 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +TEST_P(TransactionTest, WaitingTxn) { + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + string value; + Status s; + + txn_options.lock_timeout = 1; + db->Put(write_options, Slice("foo"), Slice("bar")); + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + TransactionID id1 = txn1->GetID(); + ASSERT_TRUE(txn1); + ASSERT_TRUE(txn2); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* arg) { + const std::string* key; + uint32_t cf_id; + TransactionID wait = txn2->GetWaitingTxn(&cf_id, &key); + ASSERT_EQ(*key, "foo"); + ASSERT_EQ(wait, id1); + ASSERT_EQ(cf_id, 0); + }); + + s = txn1->GetForUpdate(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + + auto lock_data = db->GetLockStatusData(); + // Locked keys exist in one column family. + ASSERT_EQ(lock_data.size(), 1); + + // Column family is 0 (default). + const auto& cf = *lock_data.cbegin(); + ASSERT_EQ(cf.first, 0); + + // The locked key is "foo" and is locked by txn1 + const auto& key = cf.second; + ASSERT_EQ(key.key, "foo"); + ASSERT_EQ(key.id, txn1->GetID()); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + s = txn2->GetForUpdate(read_options, "foo", &value); + ASSERT_TRUE(s.IsTimedOut()); + ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + delete txn1; + delete txn2; +} + TEST_P(TransactionTest, CommitTimeBatchFailTest) { WriteOptions write_options; TransactionOptions txn_options;