From daab7603f6fdfbfedecb1c530a16152a22d11397 Mon Sep 17 00:00:00 2001 From: Sergei Petrunia Date: Tue, 22 Dec 2020 19:10:56 -0800 Subject: [PATCH] Range Locking: Implementation of range locking (#7506) Summary: Range Locking - an implementation based on the locktree library - Add a RangeTreeLockManager and RangeTreeLockTracker which implement range locking using the locktree library. - Point locks are handled as locks on single-point ranges. - Add a unit test: range_locking_test Pull Request resolved: https://github.com/facebook/rocksdb/pull/7506 Reviewed By: akankshamahajan15 Differential Revision: D25320703 Pulled By: cheng-chang fbshipit-source-id: f86347384b42ba2b0257d67eca0f45f806b69da7 --- CMakeLists.txt | 5 +- Makefile | 4 + TARGETS | 11 + include/rocksdb/utilities/transaction_db.h | 6 +- src.mk | 3 + .../lock/point/point_lock_manager_test.h | 43 ++ .../lock/range/range_locking_test.cc | 304 +++++++++++ .../transactions/lock/range/range_tree/TODO | 1 - .../range_tree/lib/locktree/lock_request.cc | 42 +- .../range_tree/lib/locktree/lock_request.h | 30 +- .../range/range_tree/lib/locktree/locktree.cc | 10 +- .../range_tree/range_tree_lock_manager.cc | 479 ++++++++++++++++++ .../range_tree/range_tree_lock_manager.h | 130 +++++ .../range_tree/range_tree_lock_tracker.cc | 156 ++++++ .../range_tree/range_tree_lock_tracker.h | 146 ++++++ utilities/transactions/transaction_base.h | 2 + 16 files changed, 1333 insertions(+), 39 deletions(-) create mode 100644 utilities/transactions/lock/range/range_locking_test.cc delete mode 100644 utilities/transactions/lock/range/range_tree/TODO create mode 100644 utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc create mode 100644 utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h create mode 100644 utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc create mode 100644 utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 929073c38..648de5965 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -814,6 +814,8 @@ set(SOURCES utilities/transactions/lock/lock_manager.cc utilities/transactions/lock/point/point_lock_tracker.cc utilities/transactions/lock/point/point_lock_manager.cc + utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc + utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/pessimistic_transaction.cc @@ -831,7 +833,7 @@ set(SOURCES utilities/write_batch_with_index/write_batch_with_index_internal.cc $) -list(APPEND SOURCE +list(APPEND SOURCES utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc @@ -1225,6 +1227,7 @@ if(WITH_TESTS) utilities/transactions/lock/point/point_lock_manager_test.cc utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc + utilities/transactions/lock/range/range_locking_test.cc utilities/ttl/ttl_test.cc utilities/write_batch_with_index/write_batch_with_index_test.cc ) diff --git a/Makefile b/Makefile index 6051b55e1..d1eaed72f 100644 --- a/Makefile +++ b/Makefile @@ -558,6 +558,7 @@ PARALLEL_TEST = \ table_test \ transaction_test \ point_lock_manager_test \ + range_locking_test \ write_prepared_transaction_test \ write_unprepared_transaction_test \ @@ -1936,6 +1937,9 @@ blob_db_test: $(OBJ_DIR)/utilities/blob_db/blob_db_test.o $(TEST_LIBRARY) $(LIBR repeatable_thread_test: $(OBJ_DIR)/util/repeatable_thread_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +range_locking_test: utilities/transactions/lock/range/range_locking_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + range_tombstone_fragmenter_test: $(OBJ_DIR)/db/range_tombstone_fragmenter_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 66e45cd32..020b3e985 100644 --- a/TARGETS +++ b/TARGETS @@ -400,6 +400,8 @@ cpp_library( "utilities/transactions/lock/range/range_tree/lib/standalone_port.cc", "utilities/transactions/lock/range/range_tree/lib/util/dbt.cc", "utilities/transactions/lock/range/range_tree/lib/util/memarena.cc", + "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc", + "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc", "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/pessimistic_transaction.cc", @@ -702,6 +704,8 @@ cpp_library( "utilities/transactions/lock/range/range_tree/lib/standalone_port.cc", "utilities/transactions/lock/range/range_tree/lib/util/dbt.cc", "utilities/transactions/lock/range/range_tree/lib/util/memarena.cc", + "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc", + "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc", "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/pessimistic_transaction.cc", @@ -1800,6 +1804,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "range_locking_test", + "utilities/transactions/lock/range/range_locking_test.cc", + "parallel", + [], + [], + ], [ "range_tombstone_fragmenter_test", "db/range_tombstone_fragmenter_test.cc", diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index a48847cd2..580c6f6bb 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -86,7 +86,11 @@ struct RangeDeadlockPath { // RocksDB class RangeLockManagerHandle : public LockManagerHandle { public: - // Total amount of lock memory to use (per column family) + // Set total amount of lock memory to use. + // + // @return 0 Ok + // @return EDOM Failed to set because currently using more memory than + // specified virtual int SetMaxLockMemory(size_t max_lock_memory) = 0; virtual size_t GetMaxLockMemory() = 0; diff --git a/src.mk b/src.mk index 2f8077d5b..2bb45f3eb 100644 --- a/src.mk +++ b/src.mk @@ -267,6 +267,8 @@ LIB_SOURCES = \ utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \ utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \ utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \ + utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \ + utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc \ utilities/transactions/optimistic_transaction.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/pessimistic_transaction.cc \ @@ -536,6 +538,7 @@ TEST_MAIN_SOURCES = \ utilities/simulator_cache/sim_cache_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/transactions/optimistic_transaction_test.cc \ + utilities/transactions/lock/range/range_locking_test.cc \ utilities/transactions/transaction_test.cc \ utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \ diff --git a/utilities/transactions/lock/point/point_lock_manager_test.h b/utilities/transactions/lock/point/point_lock_manager_test.h index 63c580501..d4011c024 100644 --- a/utilities/transactions/lock/point/point_lock_manager_test.h +++ b/utilities/transactions/lock/point/point_lock_manager_test.h @@ -273,4 +273,47 @@ TEST_P(AnyLockManagerTest, Deadlock) { delete txn1; } +TEST_P(AnyLockManagerTest, GetWaitingTxns_MultipleTxns) { + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + + auto txn1 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + auto txn3 = NewTxn(); + txn3->SetLockTimeout(10000); + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + ASSERT_OK(locker_->TryLock(txn3, 1, "k", env_, true)); + locker_->UnLock(txn3, 1, "k", env_); + }); + + // Ok, now txn3 is waiting for lock on "k", which is owned by two + // transactions. Check that GetWaitingTxns reports this correctly + uint32_t wait_cf_id; + std::string wait_key; + auto waiters = txn3->GetWaitingTxns(&wait_cf_id, &wait_key); + + ASSERT_EQ(wait_cf_id, 1u); + ASSERT_EQ(wait_key, "k"); + ASSERT_EQ(waiters.size(), 2); + bool waits_correct = + (waiters[0] == txn1->GetID() && waiters[1] == txn2->GetID()) || + (waiters[1] == txn1->GetID() && waiters[0] == txn2->GetID()); + ASSERT_EQ(waits_correct, true); + + // Release locks so txn3 can proceed with execution + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + // Wait until txn3 finishes + t1.join(); + + delete txn1; + delete txn2; + delete txn3; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc new file mode 100644 index 000000000..458ba4524 --- /dev/null +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -0,0 +1,304 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/lock/point/point_lock_manager_test.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_test.h" + +using std::string; + +namespace ROCKSDB_NAMESPACE { + +class RangeLockingTest : public ::testing::Test { + public: + TransactionDB* db; + std::string dbname; + Options options; + + std::shared_ptr range_lock_mgr; + TransactionDBOptions txn_db_options; + + RangeLockingTest() : db(nullptr) { + options.create_if_missing = true; + dbname = test::PerThreadDBPath("range_locking_testdb"); + + DestroyDB(dbname, options); + + range_lock_mgr.reset(NewRangeLockManager(nullptr)); + txn_db_options.lock_mgr_handle = range_lock_mgr; + + auto s = TransactionDB::Open(options, txn_db_options, dbname, &db); + assert(s.ok()); + } + + ~RangeLockingTest() { + delete db; + db = nullptr; + // This is to skip the assert statement in FaultInjectionTestEnv. There + // seems to be a bug in btrfs that the makes readdir return recently + // unlink-ed files. By using the default fs we simply ignore errors resulted + // from attempting to delete such files in DestroyDB. + DestroyDB(dbname, options); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast(txn); + } +}; + +// TODO: set a smaller lock wait timeout so that the test runs faster. +TEST_F(RangeLockingTest, BasicRangeLocking) { + WriteOptions write_options; + TransactionOptions txn_options; + std::string value; + ReadOptions read_options; + auto cf = db->DefaultColumnFamily(); + + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Get a range lock + ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c"))); + + // Check that range Lock inhibits an overlapping range lock + { + auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z")); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Check that range Lock inhibits an overlapping point lock + { + auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Get a point lock, check that it inhibits range locks + ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value"))); + { + auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p")); + ASSERT_TRUE(s.IsTimedOut()); + } + + ASSERT_OK(txn0->Commit()); + txn1->Rollback(); + + delete txn0; + delete txn1; +} + +TEST_F(RangeLockingTest, MyRocksLikeUpdate) { + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto cf = db->DefaultColumnFamily(); + Status s; + + // Get a range lock for the range we are about to update + ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c"))); + + bool try_range_lock_called = false; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "RangeTreeLockManager::TryRangeLock:enter", + [&](void* /*arg*/) { try_range_lock_called = true; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // For performance reasons, the following must NOT call lock_mgr->TryLock(): + // We verify that by checking the value of try_range_lock_called. + ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"), + /*assume_tracked=*/true)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_FALSE(try_range_lock_called); + + txn0->Rollback(); + + delete txn0; +} + +TEST_F(RangeLockingTest, SnapshotValidation) { + Status s; + Slice key_slice = Slice("k"); + ColumnFamilyHandle* cfh = db->DefaultColumnFamily(); + + auto txn0 = NewTxn(); + txn0->Put(key_slice, Slice("initial")); + txn0->Commit(); + + // txn1 + auto txn1 = NewTxn(); + txn1->SetSnapshot(); + std::string val1; + ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1)); + ASSERT_EQ(val1, "initial"); + val1 = val1 + std::string("-txn1"); + + ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1))); + + // txn2 + auto txn2 = NewTxn(); + txn2->SetSnapshot(); + std::string val2; + // This will see the original value as nothing is committed + // This is also Get, so it is doesn't acquire any locks. + ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2)); + ASSERT_EQ(val2, "initial"); + + // txn1 + ASSERT_OK(txn1->Commit()); + + // txn2 + val2 = val2 + std::string("-txn2"); + // Now, this call should do Snapshot Validation and fail: + s = txn2->Put(cfh, key_slice, Slice(val2)); + ASSERT_TRUE(s.IsBusy()); + + ASSERT_OK(txn2->Commit()); + + delete txn0; + delete txn1; + delete txn2; +} + +TEST_F(RangeLockingTest, MultipleTrxLockStatusData) { + WriteOptions write_options; + TransactionOptions txn_options; + auto cf = db->DefaultColumnFamily(); + + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Get a range lock + ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z"))); + ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e"))); + + auto s = range_lock_mgr->GetRangeLockStatusData(); + ASSERT_EQ(s.size(), 2); + for (auto it = s.begin(); it != s.end(); ++it) { + ASSERT_EQ(it->first, cf->GetID()); + auto val = it->second; + ASSERT_FALSE(val.start.inf_suffix); + ASSERT_FALSE(val.end.inf_suffix); + ASSERT_TRUE(val.exclusive); + ASSERT_EQ(val.ids.size(), 1); + if (val.ids[0] == txn0->GetID()) { + ASSERT_EQ(val.start.slice, "z"); + ASSERT_EQ(val.end.slice, "z"); + } else if (val.ids[0] == txn1->GetID()) { + ASSERT_EQ(val.start.slice, "b"); + ASSERT_EQ(val.end.slice, "e"); + } else { + FAIL(); // Unknown transaction ID. + } + } + + delete txn0; + delete txn1; +} + +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define SKIP_LOCK_ESCALATION_TEST 1 +#endif +#endif + +#ifndef SKIP_LOCK_ESCALATION_TEST +TEST_F(RangeLockingTest, BasicLockEscalation) { + auto cf = db->DefaultColumnFamily(); + + auto counters = range_lock_mgr->GetStatus(); + + // Initially not using any lock memory + ASSERT_EQ(counters.current_lock_memory, 0); + ASSERT_EQ(counters.escalation_count, 0); + + ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000)); + + // Insert until we see lock escalations + auto txn = NewTxn(); + + // Get the locks until we hit an escalation + for (int i = 0; i < 2020; i++) { + char buf[32]; + snprintf(buf, sizeof(buf) - 1, "%08d", i); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf))); + } + counters = range_lock_mgr->GetStatus(); + ASSERT_GT(counters.escalation_count, 0); + ASSERT_LE(counters.current_lock_memory, 2000); + + delete txn; +} +#endif + +void PointLockManagerTestExternalSetup(PointLockManagerTest* self) { + self->env_ = Env::Default(); + self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(self->env_->CreateDir(self->db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + auto mutex_factory = std::make_shared(); + self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager()); + std::shared_ptr range_lock_mgr = + std::dynamic_pointer_cast(self->locker_); + txn_opt.lock_mgr_handle = range_lock_mgr; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_)); + self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn"; +} + +INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest, + ::testing::Values(PointLockManagerTestExternalSetup)); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else // OS_WIN + +#include +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "skipped as Range Locking is not supported on Windows\n"); + return 0; +} + +#endif // OS_WIN + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "skipped as transactions are not supported in rocksdb_lite\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/TODO b/utilities/transactions/lock/range/range_tree/TODO deleted file mode 100644 index 4bab2347e..000000000 --- a/utilities/transactions/lock/range/range_tree/TODO +++ /dev/null @@ -1 +0,0 @@ -Implement LockTracker and LockManager for range lock with range tree. diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc index fb14f98bd..ec7bd04dc 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc @@ -1,5 +1,5 @@ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2: #ifndef ROCKSDB_LITE #ifndef OS_WIN #ident "$Id$" @@ -240,7 +240,7 @@ int lock_request::wait(uint64_t wait_time_ms) { int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void), - void (*lock_wait_callback)(void *, TXNID, TXNID), + void (*lock_wait_callback)(void *, lock_wait_infos *), void *callback_arg) { uint64_t t_now = toku_current_time_microsec(); uint64_t t_start = t_now; @@ -250,13 +250,11 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, // check again, this time locking out other retry calls if (m_state == state::PENDING) { - GrowableArray conflicts_collector; - conflicts_collector.init(); + lock_wait_infos conflicts_collector; retry(&conflicts_collector); if (m_state == state::PENDING) { report_waits(&conflicts_collector, lock_wait_callback, callback_arg); } - conflicts_collector.deinit(); } while (m_state == state::PENDING) { @@ -325,7 +323,7 @@ TXNID lock_request::get_conflicting_txnid(void) const { return m_conflicting_txnid; } -int lock_request::retry(GrowableArray *conflicts_collector) { +int lock_request::retry(lock_wait_infos *conflicts_collector) { invariant(m_state == state::PENDING); int r; txnid_set conflicts; @@ -356,7 +354,7 @@ int lock_request::retry(GrowableArray *conflicts_collector) { } void lock_request::retry_all_lock_requests( - locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), + locktree *lt, void (*lock_wait_callback)(void *, lock_wait_infos *), void *callback_arg, void (*after_retry_all_test_callback)(void)) { lt_lock_request_info *info = lt->get_lock_request_info(); @@ -370,8 +368,7 @@ void lock_request::retry_all_lock_requests( toku_mutex_lock(&info->retry_mutex); - GrowableArray conflicts_collector; - conflicts_collector.init(); + lock_wait_infos conflicts_collector; // here is the group retry algorithm. // get the latest retry_want count and use it as the generation number of @@ -398,11 +395,10 @@ void lock_request::retry_all_lock_requests( toku_mutex_unlock(&info->retry_mutex); report_waits(&conflicts_collector, lock_wait_callback, callback_arg); - conflicts_collector.deinit(); } -void lock_request::retry_all_lock_requests_info( - lt_lock_request_info *info, GrowableArray *collector) { +void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, + lock_wait_infos *collector) { toku_external_mutex_lock(&info->mutex); // retry all of the pending lock requests. for (uint32_t i = 0; i < info->pending_lock_requests.size();) { @@ -425,26 +421,20 @@ void lock_request::retry_all_lock_requests_info( toku_external_mutex_unlock(&info->mutex); } -void lock_request::add_conflicts_to_waits( - txnid_set *conflicts, GrowableArray *wait_conflicts) { +void lock_request::add_conflicts_to_waits(txnid_set *conflicts, + lock_wait_infos *wait_conflicts) { + wait_conflicts->push_back({m_lt, get_txnid(), m_extra, {}}); uint32_t num_conflicts = conflicts->size(); for (uint32_t i = 0; i < num_conflicts; i++) { - wait_conflicts->push(m_txnid); - wait_conflicts->push(conflicts->get(i)); + wait_conflicts->back().waitees.push_back(conflicts->get(i)); } } -void lock_request::report_waits(GrowableArray *wait_conflicts, - void (*lock_wait_callback)(void *, TXNID, - TXNID), +void lock_request::report_waits(lock_wait_infos *wait_conflicts, + void (*lock_wait_callback)(void *, + lock_wait_infos *), void *callback_arg) { - if (!lock_wait_callback) return; - size_t num_conflicts = wait_conflicts->get_size(); - for (size_t i = 0; i < num_conflicts; i += 2) { - TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i); - TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i + 1); - (*lock_wait_callback)(callback_arg, blocked_txnid, blocking_txnid); - } + if (lock_wait_callback) (*lock_wait_callback)(callback_arg, wait_conflicts); } void *lock_request::get_extra(void) const { return m_extra; } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h index ac47c3428..3544f102f 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h @@ -1,5 +1,5 @@ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2: #ident "$Id$" /*====== This file is part of PerconaFT. @@ -62,6 +62,18 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku { +// Information about a lock wait +struct lock_wait_info { + locktree *ltree; // the tree where wait happens + TXNID waiter; // the waiting transaction + void *m_extra; // lock_request's m_extra + + // The transactions that are waited for. + std::vector waitees; +}; + +typedef std::vector lock_wait_infos; + // A lock request contains the db, the key range, the lock type, and // the transaction id that describes a potential row range lock. // @@ -101,7 +113,7 @@ class lock_request { int wait(uint64_t wait_time_ms); int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void), - void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, + void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr, void *callback_arg = nullptr); // return: left end-point of the lock range @@ -124,11 +136,12 @@ class lock_request { // up. // The rest remain pending. static void retry_all_lock_requests( - locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, + locktree *lt, + void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr, void *callback_arg = nullptr, void (*after_retry_test_callback)(void) = nullptr); static void retry_all_lock_requests_info(lt_lock_request_info *info, - GrowableArray *collector); + lock_wait_infos *collector); void set_start_test_callback(void (*f)(void)); void set_start_before_pending_test_callback(void (*f)(void)); @@ -181,7 +194,7 @@ class lock_request { // effect: tries again to acquire the lock described by this lock request // returns: 0 if retrying the request succeeded and is now complete - int retry(GrowableArray *conflict_collector); + int retry(lock_wait_infos *collector); void complete(int complete_r); @@ -216,11 +229,12 @@ class lock_request { static int find_by_txnid(lock_request *const &request, const TXNID &txnid); // Report list of conflicts to lock wait callback. - static void report_waits(GrowableArray *wait_conflicts, - void (*lock_wait_callback)(void *, TXNID, TXNID), + static void report_waits(lock_wait_infos *wait_conflicts, + void (*lock_wait_callback)(void *, + lock_wait_infos *), void *callback_arg); void add_conflicts_to_waits(txnid_set *conflicts, - GrowableArray *wait_conflicts); + lock_wait_infos *wait_conflicts); void (*m_start_test_callback)(void); void (*m_start_before_pending_test_callback)(void); diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc index b8482a227..b7086bc8a 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc @@ -1,5 +1,5 @@ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2: #ifndef ROCKSDB_LITE #ifndef OS_WIN #ident "$Id$" @@ -186,7 +186,13 @@ static bool determine_conflicting_txnids( const TXNID other_txnid = lock.txnid; if (other_txnid != txnid) { if (conflicts) { - conflicts->add(other_txnid); + if (other_txnid == TXNID_SHARED) { + for (TXNID shared_id : *lock.owners) { + conflicts->add(shared_id); + } + } else { + conflicts->add(other_txnid); + } } conflicts_exist = true; } diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc new file mode 100644 index 000000000..6dfb78d3f --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -0,0 +1,479 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h" + +#include +#include +#include + +#include "monitoring/perf_context_imp.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction_db_mutex.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/hash.h" +#include "util/thread_local.h" +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +RangeLockManagerHandle* NewRangeLockManager( + std::shared_ptr mutex_factory) { + std::shared_ptr use_factory; + + if (mutex_factory) { + use_factory = mutex_factory; + } else { + use_factory.reset(new TransactionDBMutexFactoryImpl()); + } + return new RangeTreeLockManager(use_factory); +} + +static const char SUFFIX_INFIMUM = 0x0; +static const char SUFFIX_SUPREMUM = 0x1; + +// Convert Endpoint into an internal format used for storing it in locktree +// (DBT structure is used for passing endpoints to locktree and getting back) +void serialize_endpoint(const Endpoint& endp, std::string* buf) { + buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM); + buf->append(endp.slice.data(), endp.slice.size()); +} + +// Decode the endpoint from the format it is stored in the locktree (DBT) to +// one used outside (EndpointWithString) +void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) { + assert(dbt->size >= 1); + const char* dbt_data = (const char*)dbt->data; + char suffix = dbt_data[0]; + assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); + endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); + endp->slice.assign(dbt_data + 1, dbt->size - 1); +} + +// Get a range lock on [start_key; end_key] range +Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, + uint32_t column_family_id, + const Endpoint& start_endp, + const Endpoint& end_endp, Env*, + bool exclusive) { + toku::lock_request request; + request.create(mutex_factory_); + DBT start_key_dbt, end_key_dbt; + + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter"); + std::string start_key; + std::string end_key; + serialize_endpoint(start_endp, &start_key); + serialize_endpoint(end_endp, &end_key); + + toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); + + auto lt = GetLockTreeForCF(column_family_id); + + // Put the key waited on into request's m_extra. See + // wait_callback_for_locktree for details. + std::string wait_key(start_endp.slice.data(), start_endp.slice.size()); + + request.set(lt.get(), (TXNID)txn, &start_key_dbt, &end_key_dbt, + exclusive ? toku::lock_request::WRITE : toku::lock_request::READ, + false /* not a big txn */, &wait_key); + + // This is for "periodically wake up and check if the wait is killed" feature + // which we are not using. + uint64_t killed_time_msec = 0; + uint64_t wait_time_msec = txn->GetLockTimeout(); + + if (wait_time_msec == static_cast(-1)) { + // The transaction has no wait timeout. lock_request::wait doesn't support + // this, it needs a number of milliseconds to wait. Pass it one year to + // be safe. + wait_time_msec = uint64_t(1000) * 60 * 60 * 24 * 365; + } else { + // convert microseconds to milliseconds + wait_time_msec = (wait_time_msec + 500) / 1000; + } + + std::vector di_path; + request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive, + const DBT* start_dbt, const DBT* end_dbt) { + EndpointWithString start; + EndpointWithString end; + deserialize_endpoint(start_dbt, &start); + deserialize_endpoint(end_dbt, &end); + + di_path.push_back({((PessimisticTransaction*)txnid)->GetID(), + column_family_id, is_exclusive, std::move(start), + std::move(end)}); + }; + + request.start(); + + const int r = request.wait(wait_time_msec, killed_time_msec, + nullptr, // killed_callback + wait_callback_for_locktree, nullptr); + + // Inform the txn that we are no longer waiting: + txn->ClearWaitingTxn(); + + request.destroy(); + switch (r) { + case 0: + break; // fall through + case DB_LOCK_NOTGRANTED: + return Status::TimedOut(Status::SubCode::kLockTimeout); + case TOKUDB_OUT_OF_LOCKS: + return Status::Busy(Status::SubCode::kLockLimit); + case DB_LOCK_DEADLOCK: { + std::reverse(di_path.begin(), di_path.end()); + dlock_buffer_.AddNewPath( + RangeDeadlockPath(di_path, request.get_start_time())); + return Status::Busy(Status::SubCode::kDeadlock); + } + default: + assert(0); + return Status::Busy(Status::SubCode::kLockLimit); + } + + return Status::OK(); +} + +// Wait callback that locktree library will call to inform us about +// the lock waits that are in progress. +void wait_callback_for_locktree(void*, lock_wait_infos* infos) { + for (auto wait_info : *infos) { + auto txn = (PessimisticTransaction*)wait_info.waiter; + auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid; + + autovector waitee_ids; + for (auto waitee : wait_info.waitees) { + waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID()); + } + txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra); + } + + // Here we can assume that the locktree code will now wait for some lock + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn"); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env*) { + auto locktree = GetLockTreeForCF(column_family_id); + std::string endp_image; + serialize_endpoint({key.data(), key.size(), false}, &endp_image); + + DBT key_dbt; + toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size()); + + toku::range_buffer range_buf; + range_buf.create(); + range_buf.append(&key_dbt, &key_dbt); + + locktree->release_locks((TXNID)txn, &range_buf); + range_buf.destroy(); + + toku::lock_request::retry_all_lock_requests( + locktree.get(), wait_callback_for_locktree, nullptr); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + const LockTracker& tracker, Env*) { + const RangeTreeLockTracker* range_tracker = + static_cast(&tracker); + + RangeTreeLockTracker* range_trx_tracker = + static_cast(&txn->GetTrackedLocks()); + bool all_keys = (range_trx_tracker == range_tracker); + + // tracked_locks_->range_list may hold nullptr if the transaction has never + // acquired any locks. + ((RangeTreeLockTracker*)range_tracker)->ReleaseLocks(this, txn, all_keys); +} + +int RangeTreeLockManager::CompareDbtEndpoints(void* arg, const DBT* a_key, + const DBT* b_key) { + const char* a = (const char*)a_key->data; + const char* b = (const char*)b_key->data; + + size_t a_len = a_key->size; + size_t b_len = b_key->size; + + size_t min_len = std::min(a_len, b_len); + + // Compare the values. The first byte encodes the endpoint type, its value + // is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM. + Comparator* cmp = (Comparator*)arg; + int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1)); + if (!res) { + if (b_len > min_len) { + // a is shorter; + if (a[0] == SUFFIX_INFIMUM) { + return -1; //"a is smaller" + } else { + // a is considered padded with 0xFF:FF:FF:FF... + return 1; // "a" is bigger + } + } else if (a_len > min_len) { + // the opposite of the above: b is shorter. + if (b[0] == SUFFIX_INFIMUM) { + return 1; //"b is smaller" + } else { + // b is considered padded with 0xFF:FF:FF:FF... + return -1; // "b" is bigger + } + } else { + // the lengths are equal (and the key values, too) + if (a[0] < b[0]) { + return -1; + } else if (a[0] > b[0]) { + return 1; + } else { + return 0; + } + } + } else { + return res; + } +} + +namespace { +void UnrefLockTreeMapsCache(void* ptr) { + // Called when a thread exits or a ThreadLocalPtr gets destroyed. + auto lock_tree_map_cache = static_cast< + std::unordered_map>*>(ptr); + delete lock_tree_map_cache; +} +} // anonymous namespace + +RangeTreeLockManager::RangeTreeLockManager( + std::shared_ptr mutex_factory) + : mutex_factory_(mutex_factory), + ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)), + dlock_buffer_(10) { + ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_); +} + +void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( + uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} + +void RangeTreeLockManager::Resize(uint32_t target_size) { + SetRangeDeadlockInfoBufferSize(target_size); +} + +std::vector +RangeTreeLockManager::GetRangeDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +std::vector RangeTreeLockManager::GetDeadlockInfoBuffer() { + std::vector res; + std::vector data = GetRangeDeadlockInfoBuffer(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + std::vector path; + + for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) { + path.push_back( + {it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice}); + } + res.push_back(DeadlockPath(path, it->deadlock_time)); + } + return res; +} + +// @brief Lock Escalation Callback function +// +// @param txnid Transaction whose locks got escalated +// @param lt Lock Tree where escalation is happening +// @param buffer Escalation result: list of locks that this transaction now +// owns in this lock tree. +// @param void* Callback context +void RangeTreeLockManager::on_escalate(TXNID txnid, const locktree* lt, + const range_buffer& buffer, void*) { + auto txn = (PessimisticTransaction*)txnid; + ((RangeTreeLockTracker*)&txn->GetTrackedLocks())->ReplaceLocks(lt, buffer); +} + +RangeTreeLockManager::~RangeTreeLockManager() { + autovector local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast(cache); + } + ltree_map_.clear(); // this will call release_lt() for all locktrees + ltm_.destroy(); +} + +RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() { + LTM_STATUS_S ltm_status_test; + ltm_.get_status(<m_status_test); + Counters res; + + // Searching status variable by its string name is how Toku's unit tests + // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?) + // lookup keyname in status + for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) { + TOKU_ENGINE_STATUS_ROW status = <m_status_test.status[i]; + if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) { + res.escalation_count = status->value.num; + continue; + } + if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) { + res.current_lock_memory = status->value.num; + } + } + return res; +} + +std::shared_ptr RangeTreeLockManager::MakeLockTreePtr(locktree* lt) { + locktree_manager* ltm = <m_; + return std::shared_ptr(lt, + [ltm](locktree* p) { ltm->release_lt(p); }); +} + +void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + + InstrumentedMutexLock l(<ree_map_mutex_); + if (ltree_map_.find(column_family_id) == ltree_map_.end()) { + DICTIONARY_ID dict_id = {.dictid = column_family_id}; + toku::comparator cmp; + cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator()); + toku::locktree* ltree = ltm_.get_lt(dict_id, cmp, + /* on_create_extra*/ nullptr); + // This is ok to because get_lt has copied the comparator: + cmp.destroy(); + + ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)}); + } +} + +void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + // Remove lock_map for this column family. Since the lock map is stored + // as a shared ptr, concurrent transactions can still keep using it + // until they release their references to it. + + // TODO what if one drops a column family while transaction(s) still have + // locks in it? + // locktree uses column family'c Comparator* as the criteria to do tree + // ordering. If the comparator is gone, we won't even be able to remove the + // elements from the locktree. + // A possible solution might be to remove everything right now: + // - wait until everyone traversing the locktree are gone + // - remove everything from the locktree. + // - some transactions may have acquired locks in their LockTracker objects. + // Arrange something so we don't blow up when they try to release them. + // - ... + // This use case (drop column family while somebody is using it) doesn't seem + // the priority, though. + + { + InstrumentedMutexLock l(<ree_map_mutex_); + + auto lock_maps_iter = ltree_map_.find(column_family_id); + assert(lock_maps_iter != ltree_map_.end()); + ltree_map_.erase(lock_maps_iter); + } // lock_map_mutex_ + + autovector local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast(cache); + } +} + +std::shared_ptr RangeTreeLockManager::GetLockTreeForCF( + ColumnFamilyId column_family_id) { + // First check thread-local cache + if (ltree_lookup_cache_->Get() == nullptr) { + ltree_lookup_cache_->Reset(new LockTreeMap()); + } + + auto ltree_map_cache = static_cast(ltree_lookup_cache_->Get()); + + auto it = ltree_map_cache->find(column_family_id); + if (it != ltree_map_cache->end()) { + // Found lock map for this column family. + return it->second; + } + + // Not found in local cache, grab mutex and check shared LockMaps + InstrumentedMutexLock l(<ree_map_mutex_); + + it = ltree_map_.find(column_family_id); + if (it == ltree_map_.end()) { + return nullptr; + } else { + // Found lock map. Store in thread-local cache and return. + ltree_map_cache->insert({column_family_id, it->second}); + return it->second; + } +} + +struct LOCK_PRINT_CONTEXT { + RangeLockManagerHandle::RangeLockStatus* data; // Save locks here + uint32_t cfh_id; // Column Family whose tree we are traversing +}; + +// Report left endpoints of the acquired locks +LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() { + PointLockStatus res; + LockManager::RangeLockStatus data = GetRangeLockStatus(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + auto& val = it->second; + res.insert({it->first, {val.start.slice, val.ids, val.exclusive}}); + } + return res; +} + +static void push_into_lock_status_data(void* param, const DBT* left, + const DBT* right, TXNID txnid_arg, + bool is_shared, TxnidVector* owners) { + struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param; + struct RangeLockInfo info; + + info.exclusive = !is_shared; + + deserialize_endpoint(left, &info.start); + deserialize_endpoint(right, &info.end); + + if (txnid_arg != TXNID_SHARED) { + TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID(); + info.ids.push_back(txnid); + } else { + for (auto it : *owners) { + TXNID real_id = ((PessimisticTransaction*)it)->GetID(); + info.ids.push_back(real_id); + } + } + ctx->data->insert({ctx->cfh_id, info}); +} + +LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() { + LockManager::RangeLockStatus data; + { + InstrumentedMutexLock l(<ree_map_mutex_); + for (auto it : ltree_map_) { + LOCK_PRINT_CONTEXT ctx = {&data, it.first}; + it.second->dump_locks((void*)&ctx, push_into_lock_status_data); + } + } + return data; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h new file mode 100644 index 000000000..5d55ded02 --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -0,0 +1,130 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +// For DeadlockInfoBuffer: +#include "util/thread_local.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" +#include "utilities/transactions/lock/range/range_lock_manager.h" + +// Lock Tree library: +#include "utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h" +#include "utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h" +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +using namespace toku; + +typedef DeadlockInfoBufferTempl RangeDeadlockInfoBuffer; + +// A Range Lock Manager that uses PerconaFT's locktree library +class RangeTreeLockManager : public RangeLockManagerBase, + public RangeLockManagerHandle { + public: + LockManager* getLockManager() override { return this; } + + void AddColumnFamily(const ColumnFamilyHandle* cfh) override; + void RemoveColumnFamily(const ColumnFamilyHandle* cfh) override; + + void Resize(uint32_t) override; + std::vector GetDeadlockInfoBuffer() override; + + std::vector GetRangeDeadlockInfoBuffer() override; + void SetRangeDeadlockInfoBufferSize(uint32_t target_size) override; + + // Get a lock on a range + // @note only exclusive locks are currently supported (requesting a + // non-exclusive lock will get an exclusive one) + using LockManager::TryLock; + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start_endp, const Endpoint& end_endp, Env* env, + bool exclusive) override; + + void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, + Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env) override; + void UnLock(PessimisticTransaction*, ColumnFamilyId, const Endpoint&, + const Endpoint&, Env*) override { + // TODO: range unlock does nothing... + } + + explicit RangeTreeLockManager( + std::shared_ptr mutex_factory); + + ~RangeTreeLockManager() override; + + int SetMaxLockMemory(size_t max_lock_memory) override { + return ltm_.set_max_lock_memory(max_lock_memory); + } + + size_t GetMaxLockMemory() override { return ltm_.get_max_lock_memory(); } + + Counters GetStatus() override; + + bool IsPointLockSupported() const override { + // One could have acquired a point lock (it is reduced to range lock) + return true; + } + + PointLockStatus GetPointLockStatus() override; + + // This is from LockManager + LockManager::RangeLockStatus GetRangeLockStatus() override; + + // This has the same meaning as GetRangeLockStatus but is from + // RangeLockManagerHandle + RangeLockManagerHandle::RangeLockStatus GetRangeLockStatusData() override { + return GetRangeLockStatus(); + } + + bool IsRangeLockSupported() const override { return true; } + + const LockTrackerFactory& GetLockTrackerFactory() const override { + return RangeTreeLockTrackerFactory::Get(); + } + + // Get the locktree which stores locks for the Column Family with given cf_id + std::shared_ptr GetLockTreeForCF(ColumnFamilyId cf_id); + + private: + toku::locktree_manager ltm_; + + std::shared_ptr mutex_factory_; + + // Map from cf_id to locktree*. Can only be accessed while holding the + // ltree_map_mutex_. Must use a custom deleter that calls ltm_.release_lt + using LockTreeMap = + std::unordered_map>; + LockTreeMap ltree_map_; + + InstrumentedMutex ltree_map_mutex_; + + // Per-thread cache of ltree_map_. + // (uses the same approach as TransactionLockMgr::lock_maps_cache_) + std::unique_ptr ltree_lookup_cache_; + + RangeDeadlockInfoBuffer dlock_buffer_; + + std::shared_ptr MakeLockTreePtr(locktree* lt); + static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key); + + // Callbacks + static int on_create(locktree*, void*) { return 0; /* no error */ } + static void on_destroy(locktree*) {} + static void on_escalate(TXNID txnid, const locktree* lt, + const range_buffer& buffer, void* extra); +}; + +void serialize_endpoint(const Endpoint& endp, std::string* buf); +void wait_callback_for_locktree(void* cdata, lock_wait_infos* infos); + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc new file mode 100644 index 000000000..be1e1478b --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc @@ -0,0 +1,156 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h" + +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h" + +namespace ROCKSDB_NAMESPACE { + +RangeLockList *RangeTreeLockTracker::getOrCreateList() { + if (range_list_) return range_list_.get(); + + // Doesn't exist, create + range_list_.reset(new RangeLockList()); + return range_list_.get(); +} + +void RangeTreeLockTracker::Track(const PointLockRequest &lock_req) { + DBT key_dbt; + std::string key; + serialize_endpoint(Endpoint(lock_req.key, false), &key); + toku_fill_dbt(&key_dbt, key.data(), key.size()); + RangeLockList *rl = getOrCreateList(); + rl->Append(lock_req.column_family_id, &key_dbt, &key_dbt); +} + +void RangeTreeLockTracker::Track(const RangeLockRequest &lock_req) { + DBT start_dbt, end_dbt; + std::string start_key, end_key; + + serialize_endpoint(lock_req.start_endp, &start_key); + serialize_endpoint(lock_req.end_endp, &end_key); + + toku_fill_dbt(&start_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_dbt, end_key.data(), end_key.size()); + + RangeLockList *rl = getOrCreateList(); + rl->Append(lock_req.column_family_id, &start_dbt, &end_dbt); +} + +PointLockStatus RangeTreeLockTracker::GetPointLockStatus( + ColumnFamilyId /*cf_id*/, const std::string & /*key*/) const { + // This function is not expected to be called as RangeTreeLockTracker:: + // IsPointLockSupported() returns false. Return the status which indicates + // the point is not locked. + PointLockStatus p; + p.locked = false; + p.exclusive = true; + p.seq = 0; + return p; +} + +void RangeTreeLockTracker::Clear() { range_list_.reset(); } + +void RangeLockList::Append(ColumnFamilyId cf_id, const DBT *left_key, + const DBT *right_key) { + MutexLock l(&mutex_); + // Only the transaction owner thread calls this function. + // The same thread does the lock release, so we can be certain nobody is + // releasing the locks concurrently. + assert(!releasing_locks_.load()); + auto it = buffers_.find(cf_id); + if (it == buffers_.end()) { + // create a new one + it = buffers_.emplace(cf_id, std::make_shared()).first; + it->second->create(); + } + it->second->append(left_key, right_key); +} + +void RangeLockList::ReleaseLocks(RangeTreeLockManager *mgr, + PessimisticTransaction *txn, + bool all_trx_locks) { + { + MutexLock l(&mutex_); + // The lt->release_locks() call below will walk range_list->buffer_. We + // need to prevent lock escalation callback from replacing + // range_list->buffer_ while we are doing that. + // + // Additional complication here is internal mutex(es) in the locktree + // (let's call them latches): + // - Lock escalation first obtains latches on the lock tree + // - Then, it calls RangeTreeLockManager::on_escalate to replace + // transaction's range_list->buffer_. = Access to that buffer must be + // synchronized, so it will want to acquire the range_list->mutex_. + // + // While in this function we would want to do the reverse: + // - Acquire range_list->mutex_ to prevent access to the range_list. + // - Then, lt->release_locks() call will walk through the range_list + // - and acquire latches on parts of the lock tree to remove locks from + // it. + // + // How do we avoid the deadlock? The idea is that here we set + // releasing_locks_=true, and release the mutex. + // All other users of the range_list must: + // - Acquire the mutex, then check that releasing_locks_=false. + // (the code in this function doesnt do that as there's only one thread + // that releases transaction's locks) + releasing_locks_.store(true); + } + + for (auto it : buffers_) { + // Don't try to call release_locks() if the buffer is empty! if we are + // not holding any locks, the lock tree might be in the STO-mode with + // another transaction, and our attempt to release an empty set of locks + // will cause an assertion failure. + if (it.second->get_num_ranges()) { + auto lt_ptr = mgr->GetLockTreeForCF(it.first); + toku::locktree *lt = lt_ptr.get(); + + lt->release_locks((TXNID)txn, it.second.get(), all_trx_locks); + + it.second->destroy(); + it.second->create(); + + toku::lock_request::retry_all_lock_requests(lt, + wait_callback_for_locktree); + } + } + + Clear(); + releasing_locks_.store(false); +} + +void RangeLockList::ReplaceLocks(const toku::locktree *lt, + const toku::range_buffer &buffer) { + MutexLock l(&mutex_); + if (releasing_locks_.load()) { + // Do nothing. The transaction is releasing its locks, so it will not care + // about having a correct list of ranges. (In TokuDB, + // toku_db_txn_escalate_callback() makes use of this property, too) + return; + } + + ColumnFamilyId cf_id = (ColumnFamilyId)lt->get_dict_id().dictid; + + auto it = buffers_.find(cf_id); + it->second->destroy(); + it->second->create(); + + toku::range_buffer::iterator iter(&buffer); + toku::range_buffer::iterator::record rec; + while (iter.current(&rec)) { + it->second->append(rec.get_left_key(), rec.get_right_key()); + iter.next(); + } +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h new file mode 100644 index 000000000..4ef48d252 --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h @@ -0,0 +1,146 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include + +#include "util/mutexlock.h" +#include "utilities/transactions/lock/lock_tracker.h" +#include "utilities/transactions/pessimistic_transaction.h" + +// Range Locking: +#include "lib/locktree/lock_request.h" +#include "lib/locktree/locktree.h" + +namespace ROCKSDB_NAMESPACE { + +class RangeTreeLockManager; + +// Storage for locks that are currently held by a transaction. +// +// Locks are kept in toku::range_buffer because toku::locktree::release_locks() +// accepts that as an argument. +// +// Note: the list of locks may differ slighly from the contents of the lock +// tree, due to concurrency between lock acquisition, lock release, and lock +// escalation. See MDEV-18227 and RangeTreeLockManager::UnLock for details. +// This property is currently harmless. +// +// Append() and ReleaseLocks() are not thread-safe, as they are expected to be +// called only by the owner transaction. ReplaceLocks() is safe to call from +// other threads. +class RangeLockList { + public: + ~RangeLockList() { Clear(); } + + RangeLockList() : releasing_locks_(false) {} + + void Append(ColumnFamilyId cf_id, const DBT* left_key, const DBT* right_key); + void ReleaseLocks(RangeTreeLockManager* mgr, PessimisticTransaction* txn, + bool all_trx_locks); + void ReplaceLocks(const toku::locktree* lt, const toku::range_buffer& buffer); + + private: + void Clear() { + for (auto it : buffers_) { + it.second->destroy(); + } + buffers_.clear(); + } + + std::unordered_map> + buffers_; + port::Mutex mutex_; + std::atomic releasing_locks_; +}; + +// A LockTracker-based object that is used together with RangeTreeLockManager. +class RangeTreeLockTracker : public LockTracker { + public: + RangeTreeLockTracker() : range_list_(nullptr) {} + + RangeTreeLockTracker(const RangeTreeLockTracker&) = delete; + RangeTreeLockTracker& operator=(const RangeTreeLockTracker&) = delete; + + void Track(const PointLockRequest&) override; + void Track(const RangeLockRequest&) override; + + bool IsPointLockSupported() const override { + // This indicates that we don't implement GetPointLockStatus() + return false; + } + bool IsRangeLockSupported() const override { return true; } + + // a Not-supported dummy implementation. + UntrackStatus Untrack(const RangeLockRequest& /*lock_request*/) override { + return UntrackStatus::NOT_TRACKED; + } + + UntrackStatus Untrack(const PointLockRequest& /*lock_request*/) override { + return UntrackStatus::NOT_TRACKED; + } + + // "If this method is not supported, leave it as a no-op." + void Merge(const LockTracker&) override {} + + // "If this method is not supported, leave it as a no-op." + void Subtract(const LockTracker&) override {} + + void Clear() override; + + // "If this method is not supported, returns nullptr." + virtual LockTracker* GetTrackedLocksSinceSavePoint( + const LockTracker&) const override { + return nullptr; + } + + PointLockStatus GetPointLockStatus(ColumnFamilyId column_family_id, + const std::string& key) const override; + + // The return value is only used for tests + uint64_t GetNumPointLocks() const override { return 0; } + + ColumnFamilyIterator* GetColumnFamilyIterator() const override { + return nullptr; + } + + KeyIterator* GetKeyIterator( + ColumnFamilyId /*column_family_id*/) const override { + return nullptr; + } + + void ReleaseLocks(RangeTreeLockManager* mgr, PessimisticTransaction* txn, + bool all_trx_locks) { + if (range_list_) range_list_->ReleaseLocks(mgr, txn, all_trx_locks); + } + + void ReplaceLocks(const toku::locktree* lt, + const toku::range_buffer& buffer) { + // range_list_ cannot be NULL here + range_list_->ReplaceLocks(lt, buffer); + } + + private: + RangeLockList* getOrCreateList(); + std::unique_ptr range_list_; +}; + +class RangeTreeLockTrackerFactory : public LockTrackerFactory { + public: + static const RangeTreeLockTrackerFactory& Get() { + static const RangeTreeLockTrackerFactory instance; + return instance; + } + + LockTracker* Create() const override { return new RangeTreeLockTracker(); } + + private: + RangeTreeLockTrackerFactory() {} +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 14f507b81..2c5770d8a 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -250,6 +250,8 @@ class TransactionBaseImpl : public Transaction { WriteBatch* GetCommitTimeWriteBatch() override; + LockTracker& GetTrackedLocks() { return *tracked_locks_; } + protected: // Add a key to the list of tracked keys. //