Range Locking: Implementation of range locking (#7506)

Summary:
Range Locking - an implementation based on the locktree library

- Add a RangeTreeLockManager and RangeTreeLockTracker which implement
  range locking using the locktree library.
- Point locks are handled as locks on single-point ranges.
- Add a unit test: range_locking_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7506

Reviewed By: akankshamahajan15

Differential Revision: D25320703

Pulled By: cheng-chang

fbshipit-source-id: f86347384b42ba2b0257d67eca0f45f806b69da7
main
Sergei Petrunia 4 years ago committed by Facebook GitHub Bot
parent f4db3e4119
commit daab7603f6
  1. 5
      CMakeLists.txt
  2. 4
      Makefile
  3. 11
      TARGETS
  4. 6
      include/rocksdb/utilities/transaction_db.h
  5. 3
      src.mk
  6. 43
      utilities/transactions/lock/point/point_lock_manager_test.h
  7. 304
      utilities/transactions/lock/range/range_locking_test.cc
  8. 1
      utilities/transactions/lock/range/range_tree/TODO
  9. 42
      utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
  10. 30
      utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h
  11. 8
      utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
  12. 479
      utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc
  13. 130
      utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h
  14. 156
      utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
  15. 146
      utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h
  16. 2
      utilities/transactions/transaction_base.h

@ -814,6 +814,8 @@ set(SOURCES
utilities/transactions/lock/lock_manager.cc
utilities/transactions/lock/point/point_lock_tracker.cc
utilities/transactions/lock/point/point_lock_manager.cc
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction.cc
utilities/transactions/pessimistic_transaction.cc
@ -831,7 +833,7 @@ set(SOURCES
utilities/write_batch_with_index/write_batch_with_index_internal.cc
$<TARGET_OBJECTS:build_version>)
list(APPEND SOURCE
list(APPEND SOURCES
utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc
utilities/transactions/lock/range/range_tree/lib/locktree/keyrange.cc
utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
@ -1225,6 +1227,7 @@ if(WITH_TESTS)
utilities/transactions/lock/point/point_lock_manager_test.cc
utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc
utilities/transactions/lock/range/range_locking_test.cc
utilities/ttl/ttl_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
)

@ -558,6 +558,7 @@ PARALLEL_TEST = \
table_test \
transaction_test \
point_lock_manager_test \
range_locking_test \
write_prepared_transaction_test \
write_unprepared_transaction_test \
@ -1936,6 +1937,9 @@ blob_db_test: $(OBJ_DIR)/utilities/blob_db/blob_db_test.o $(TEST_LIBRARY) $(LIBR
repeatable_thread_test: $(OBJ_DIR)/util/repeatable_thread_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
range_locking_test: utilities/transactions/lock/range/range_locking_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
range_tombstone_fragmenter_test: $(OBJ_DIR)/db/range_tombstone_fragmenter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -400,6 +400,8 @@ cpp_library(
"utilities/transactions/lock/range/range_tree/lib/standalone_port.cc",
"utilities/transactions/lock/range/range_tree/lib/util/dbt.cc",
"utilities/transactions/lock/range/range_tree/lib/util/memarena.cc",
"utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc",
"utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
@ -702,6 +704,8 @@ cpp_library(
"utilities/transactions/lock/range/range_tree/lib/standalone_port.cc",
"utilities/transactions/lock/range/range_tree/lib/util/dbt.cc",
"utilities/transactions/lock/range/range_tree/lib/util/memarena.cc",
"utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc",
"utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
@ -1800,6 +1804,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"range_locking_test",
"utilities/transactions/lock/range/range_locking_test.cc",
"parallel",
[],
[],
],
[
"range_tombstone_fragmenter_test",
"db/range_tombstone_fragmenter_test.cc",

@ -86,7 +86,11 @@ struct RangeDeadlockPath {
// RocksDB
class RangeLockManagerHandle : public LockManagerHandle {
public:
// Total amount of lock memory to use (per column family)
// Set total amount of lock memory to use.
//
// @return 0 Ok
// @return EDOM Failed to set because currently using more memory than
// specified
virtual int SetMaxLockMemory(size_t max_lock_memory) = 0;
virtual size_t GetMaxLockMemory() = 0;

@ -267,6 +267,8 @@ LIB_SOURCES = \
utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \
utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \
utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \
utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \
utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc \
utilities/transactions/optimistic_transaction.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/pessimistic_transaction.cc \
@ -536,6 +538,7 @@ TEST_MAIN_SOURCES = \
utilities/simulator_cache/sim_cache_test.cc \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
utilities/transactions/lock/range/range_locking_test.cc \
utilities/transactions/transaction_test.cc \
utilities/transactions/lock/point/point_lock_manager_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \

@ -273,4 +273,47 @@ TEST_P(AnyLockManagerTest, Deadlock) {
delete txn1;
}
TEST_P(AnyLockManagerTest, GetWaitingTxns_MultipleTxns) {
MockColumnFamilyHandle cf(1);
locker_->AddColumnFamily(&cf);
auto txn1 = NewTxn();
ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false));
auto txn2 = NewTxn();
ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false));
auto txn3 = NewTxn();
txn3->SetLockTimeout(10000);
port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() {
ASSERT_OK(locker_->TryLock(txn3, 1, "k", env_, true));
locker_->UnLock(txn3, 1, "k", env_);
});
// Ok, now txn3 is waiting for lock on "k", which is owned by two
// transactions. Check that GetWaitingTxns reports this correctly
uint32_t wait_cf_id;
std::string wait_key;
auto waiters = txn3->GetWaitingTxns(&wait_cf_id, &wait_key);
ASSERT_EQ(wait_cf_id, 1u);
ASSERT_EQ(wait_key, "k");
ASSERT_EQ(waiters.size(), 2);
bool waits_correct =
(waiters[0] == txn1->GetID() && waiters[1] == txn2->GetID()) ||
(waiters[1] == txn1->GetID() && waiters[0] == txn2->GetID());
ASSERT_EQ(waits_correct, true);
// Release locks so txn3 can proceed with execution
locker_->UnLock(txn1, 1, "k", env_);
locker_->UnLock(txn2, 1, "k", env_);
// Wait until txn3 finishes
t1.join();
delete txn1;
delete txn2;
delete txn3;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,304 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/lock/point/point_lock_manager_test.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_test.h"
using std::string;
namespace ROCKSDB_NAMESPACE {
class RangeLockingTest : public ::testing::Test {
public:
TransactionDB* db;
std::string dbname;
Options options;
std::shared_ptr<RangeLockManagerHandle> range_lock_mgr;
TransactionDBOptions txn_db_options;
RangeLockingTest() : db(nullptr) {
options.create_if_missing = true;
dbname = test::PerThreadDBPath("range_locking_testdb");
DestroyDB(dbname, options);
range_lock_mgr.reset(NewRangeLockManager(nullptr));
txn_db_options.lock_mgr_handle = range_lock_mgr;
auto s = TransactionDB::Open(options, txn_db_options, dbname, &db);
assert(s.ok());
}
~RangeLockingTest() {
delete db;
db = nullptr;
// This is to skip the assert statement in FaultInjectionTestEnv. There
// seems to be a bug in btrfs that the makes readdir return recently
// unlink-ed files. By using the default fs we simply ignore errors resulted
// from attempting to delete such files in DestroyDB.
DestroyDB(dbname, options);
}
PessimisticTransaction* NewTxn(
TransactionOptions txn_opt = TransactionOptions()) {
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt);
return reinterpret_cast<PessimisticTransaction*>(txn);
}
};
// TODO: set a smaller lock wait timeout so that the test runs faster.
TEST_F(RangeLockingTest, BasicRangeLocking) {
WriteOptions write_options;
TransactionOptions txn_options;
std::string value;
ReadOptions read_options;
auto cf = db->DefaultColumnFamily();
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
// Check that range Lock inhibits an overlapping range lock
{
auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
ASSERT_TRUE(s.IsTimedOut());
}
// Check that range Lock inhibits an overlapping point lock
{
auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value);
ASSERT_TRUE(s.IsTimedOut());
}
// Get a point lock, check that it inhibits range locks
ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value")));
{
auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p"));
ASSERT_TRUE(s.IsTimedOut());
}
ASSERT_OK(txn0->Commit());
txn1->Rollback();
delete txn0;
delete txn1;
}
TEST_F(RangeLockingTest, MyRocksLikeUpdate) {
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto cf = db->DefaultColumnFamily();
Status s;
// Get a range lock for the range we are about to update
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
bool try_range_lock_called = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"RangeTreeLockManager::TryRangeLock:enter",
[&](void* /*arg*/) { try_range_lock_called = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// For performance reasons, the following must NOT call lock_mgr->TryLock():
// We verify that by checking the value of try_range_lock_called.
ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"),
/*assume_tracked=*/true));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_FALSE(try_range_lock_called);
txn0->Rollback();
delete txn0;
}
TEST_F(RangeLockingTest, SnapshotValidation) {
Status s;
Slice key_slice = Slice("k");
ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
auto txn0 = NewTxn();
txn0->Put(key_slice, Slice("initial"));
txn0->Commit();
// txn1
auto txn1 = NewTxn();
txn1->SetSnapshot();
std::string val1;
ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1));
ASSERT_EQ(val1, "initial");
val1 = val1 + std::string("-txn1");
ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1)));
// txn2
auto txn2 = NewTxn();
txn2->SetSnapshot();
std::string val2;
// This will see the original value as nothing is committed
// This is also Get, so it is doesn't acquire any locks.
ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2));
ASSERT_EQ(val2, "initial");
// txn1
ASSERT_OK(txn1->Commit());
// txn2
val2 = val2 + std::string("-txn2");
// Now, this call should do Snapshot Validation and fail:
s = txn2->Put(cfh, key_slice, Slice(val2));
ASSERT_TRUE(s.IsBusy());
ASSERT_OK(txn2->Commit());
delete txn0;
delete txn1;
delete txn2;
}
TEST_F(RangeLockingTest, MultipleTrxLockStatusData) {
WriteOptions write_options;
TransactionOptions txn_options;
auto cf = db->DefaultColumnFamily();
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")));
ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")));
auto s = range_lock_mgr->GetRangeLockStatusData();
ASSERT_EQ(s.size(), 2);
for (auto it = s.begin(); it != s.end(); ++it) {
ASSERT_EQ(it->first, cf->GetID());
auto val = it->second;
ASSERT_FALSE(val.start.inf_suffix);
ASSERT_FALSE(val.end.inf_suffix);
ASSERT_TRUE(val.exclusive);
ASSERT_EQ(val.ids.size(), 1);
if (val.ids[0] == txn0->GetID()) {
ASSERT_EQ(val.start.slice, "z");
ASSERT_EQ(val.end.slice, "z");
} else if (val.ids[0] == txn1->GetID()) {
ASSERT_EQ(val.start.slice, "b");
ASSERT_EQ(val.end.slice, "e");
} else {
FAIL(); // Unknown transaction ID.
}
}
delete txn0;
delete txn1;
}
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define SKIP_LOCK_ESCALATION_TEST 1
#endif
#endif
#ifndef SKIP_LOCK_ESCALATION_TEST
TEST_F(RangeLockingTest, BasicLockEscalation) {
auto cf = db->DefaultColumnFamily();
auto counters = range_lock_mgr->GetStatus();
// Initially not using any lock memory
ASSERT_EQ(counters.current_lock_memory, 0);
ASSERT_EQ(counters.escalation_count, 0);
ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000));
// Insert until we see lock escalations
auto txn = NewTxn();
// Get the locks until we hit an escalation
for (int i = 0; i < 2020; i++) {
char buf[32];
snprintf(buf, sizeof(buf) - 1, "%08d", i);
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf)));
}
counters = range_lock_mgr->GetStatus();
ASSERT_GT(counters.escalation_count, 0);
ASSERT_LE(counters.current_lock_memory, 2000);
delete txn;
}
#endif
void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
self->env_ = Env::Default();
self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
ASSERT_OK(self->env_->CreateDir(self->db_dir_));
Options opt;
opt.create_if_missing = true;
TransactionDBOptions txn_opt;
txn_opt.transaction_lock_timeout = 0;
auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>();
self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager());
std::shared_ptr<RangeLockManagerHandle> range_lock_mgr =
std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_);
txn_opt.lock_mgr_handle = range_lock_mgr;
ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_));
self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn";
}
INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest,
::testing::Values(PointLockManagerTestExternalSetup));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else // OS_WIN
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "skipped as Range Locking is not supported on Windows\n");
return 0;
}
#endif // OS_WIN
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"skipped as transactions are not supported in rocksdb_lite\n");
return 0;
}
#endif // ROCKSDB_LITE

@ -1 +0,0 @@
Implement LockTracker and LockManager for range lock with range tree.

@ -1,5 +1,5 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#ident "$Id$"
@ -240,7 +240,7 @@ int lock_request::wait(uint64_t wait_time_ms) {
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms,
int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID),
void (*lock_wait_callback)(void *, lock_wait_infos *),
void *callback_arg) {
uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now;
@ -250,13 +250,11 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms,
// check again, this time locking out other retry calls
if (m_state == state::PENDING) {
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
lock_wait_infos conflicts_collector;
retry(&conflicts_collector);
if (m_state == state::PENDING) {
report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
}
conflicts_collector.deinit();
}
while (m_state == state::PENDING) {
@ -325,7 +323,7 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}
int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
int lock_request::retry(lock_wait_infos *conflicts_collector) {
invariant(m_state == state::PENDING);
int r;
txnid_set conflicts;
@ -356,7 +354,7 @@ int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
}
void lock_request::retry_all_lock_requests(
locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID),
locktree *lt, void (*lock_wait_callback)(void *, lock_wait_infos *),
void *callback_arg, void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();
@ -370,8 +368,7 @@ void lock_request::retry_all_lock_requests(
toku_mutex_lock(&info->retry_mutex);
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
lock_wait_infos conflicts_collector;
// here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of
@ -398,11 +395,10 @@ void lock_request::retry_all_lock_requests(
toku_mutex_unlock(&info->retry_mutex);
report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
conflicts_collector.deinit();
}
void lock_request::retry_all_lock_requests_info(
lt_lock_request_info *info, GrowableArray<TXNID> *collector) {
void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info,
lock_wait_infos *collector) {
toku_external_mutex_lock(&info->mutex);
// retry all of the pending lock requests.
for (uint32_t i = 0; i < info->pending_lock_requests.size();) {
@ -425,26 +421,20 @@ void lock_request::retry_all_lock_requests_info(
toku_external_mutex_unlock(&info->mutex);
}
void lock_request::add_conflicts_to_waits(
txnid_set *conflicts, GrowableArray<TXNID> *wait_conflicts) {
void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
lock_wait_infos *wait_conflicts) {
wait_conflicts->push_back({m_lt, get_txnid(), m_extra, {}});
uint32_t num_conflicts = conflicts->size();
for (uint32_t i = 0; i < num_conflicts; i++) {
wait_conflicts->push(m_txnid);
wait_conflicts->push(conflicts->get(i));
wait_conflicts->back().waitees.push_back(conflicts->get(i));
}
}
void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID,
TXNID),
void lock_request::report_waits(lock_wait_infos *wait_conflicts,
void (*lock_wait_callback)(void *,
lock_wait_infos *),
void *callback_arg) {
if (!lock_wait_callback) return;
size_t num_conflicts = wait_conflicts->get_size();
for (size_t i = 0; i < num_conflicts; i += 2) {
TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i + 1);
(*lock_wait_callback)(callback_arg, blocked_txnid, blocking_txnid);
}
if (lock_wait_callback) (*lock_wait_callback)(callback_arg, wait_conflicts);
}
void *lock_request::get_extra(void) const { return m_extra; }

@ -1,5 +1,5 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
#ident "$Id$"
/*======
This file is part of PerconaFT.
@ -62,6 +62,18 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
namespace toku {
// Information about a lock wait
struct lock_wait_info {
locktree *ltree; // the tree where wait happens
TXNID waiter; // the waiting transaction
void *m_extra; // lock_request's m_extra
// The transactions that are waited for.
std::vector<TXNID> waitees;
};
typedef std::vector<lock_wait_info> lock_wait_infos;
// A lock request contains the db, the key range, the lock type, and
// the transaction id that describes a potential row range lock.
//
@ -101,7 +113,7 @@ class lock_request {
int wait(uint64_t wait_time_ms);
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms,
int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr,
void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr,
void *callback_arg = nullptr);
// return: left end-point of the lock range
@ -124,11 +136,12 @@ class lock_request {
// up.
// The rest remain pending.
static void retry_all_lock_requests(
locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr,
locktree *lt,
void (*lock_wait_callback)(void *, lock_wait_infos *) = nullptr,
void *callback_arg = nullptr,
void (*after_retry_test_callback)(void) = nullptr);
static void retry_all_lock_requests_info(lt_lock_request_info *info,
GrowableArray<TXNID> *collector);
lock_wait_infos *collector);
void set_start_test_callback(void (*f)(void));
void set_start_before_pending_test_callback(void (*f)(void));
@ -181,7 +194,7 @@ class lock_request {
// effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete
int retry(GrowableArray<TXNID> *conflict_collector);
int retry(lock_wait_infos *collector);
void complete(int complete_r);
@ -216,11 +229,12 @@ class lock_request {
static int find_by_txnid(lock_request *const &request, const TXNID &txnid);
// Report list of conflicts to lock wait callback.
static void report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID),
static void report_waits(lock_wait_infos *wait_conflicts,
void (*lock_wait_callback)(void *,
lock_wait_infos *),
void *callback_arg);
void add_conflicts_to_waits(txnid_set *conflicts,
GrowableArray<TXNID> *wait_conflicts);
lock_wait_infos *wait_conflicts);
void (*m_start_test_callback)(void);
void (*m_start_before_pending_test_callback)(void);

@ -1,5 +1,5 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#ident "$Id$"
@ -186,8 +186,14 @@ static bool determine_conflicting_txnids(
const TXNID other_txnid = lock.txnid;
if (other_txnid != txnid) {
if (conflicts) {
if (other_txnid == TXNID_SHARED) {
for (TXNID shared_id : *lock.owners) {
conflicts->add(shared_id);
}
} else {
conflicts->add(other_txnid);
}
}
conflicts_exist = true;
}
}

@ -0,0 +1,479 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h"
#include <algorithm>
#include <cinttypes>
#include <mutex>
#include "monitoring/perf_context_imp.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/transaction_db_mutex.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/hash.h"
#include "util/thread_local.h"
#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace ROCKSDB_NAMESPACE {
RangeLockManagerHandle* NewRangeLockManager(
std::shared_ptr<TransactionDBMutexFactory> mutex_factory) {
std::shared_ptr<TransactionDBMutexFactory> use_factory;
if (mutex_factory) {
use_factory = mutex_factory;
} else {
use_factory.reset(new TransactionDBMutexFactoryImpl());
}
return new RangeTreeLockManager(use_factory);
}
static const char SUFFIX_INFIMUM = 0x0;
static const char SUFFIX_SUPREMUM = 0x1;
// Convert Endpoint into an internal format used for storing it in locktree
// (DBT structure is used for passing endpoints to locktree and getting back)
void serialize_endpoint(const Endpoint& endp, std::string* buf) {
buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM);
buf->append(endp.slice.data(), endp.slice.size());
}
// Decode the endpoint from the format it is stored in the locktree (DBT) to
// one used outside (EndpointWithString)
void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) {
assert(dbt->size >= 1);
const char* dbt_data = (const char*)dbt->data;
char suffix = dbt_data[0];
assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM);
endp->inf_suffix = (suffix == SUFFIX_SUPREMUM);
endp->slice.assign(dbt_data + 1, dbt->size - 1);
}
// Get a range lock on [start_key; end_key] range
Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
uint32_t column_family_id,
const Endpoint& start_endp,
const Endpoint& end_endp, Env*,
bool exclusive) {
toku::lock_request request;
request.create(mutex_factory_);
DBT start_key_dbt, end_key_dbt;
TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter");
std::string start_key;
std::string end_key;
serialize_endpoint(start_endp, &start_key);
serialize_endpoint(end_endp, &end_key);
toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
auto lt = GetLockTreeForCF(column_family_id);
// Put the key waited on into request's m_extra. See
// wait_callback_for_locktree for details.
std::string wait_key(start_endp.slice.data(), start_endp.slice.size());
request.set(lt.get(), (TXNID)txn, &start_key_dbt, &end_key_dbt,
exclusive ? toku::lock_request::WRITE : toku::lock_request::READ,
false /* not a big txn */, &wait_key);
// This is for "periodically wake up and check if the wait is killed" feature
// which we are not using.
uint64_t killed_time_msec = 0;
uint64_t wait_time_msec = txn->GetLockTimeout();
if (wait_time_msec == static_cast<uint64_t>(-1)) {
// The transaction has no wait timeout. lock_request::wait doesn't support
// this, it needs a number of milliseconds to wait. Pass it one year to
// be safe.
wait_time_msec = uint64_t(1000) * 60 * 60 * 24 * 365;
} else {
// convert microseconds to milliseconds
wait_time_msec = (wait_time_msec + 500) / 1000;
}
std::vector<RangeDeadlockInfo> di_path;
request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive,
const DBT* start_dbt, const DBT* end_dbt) {
EndpointWithString start;
EndpointWithString end;
deserialize_endpoint(start_dbt, &start);
deserialize_endpoint(end_dbt, &end);
di_path.push_back({((PessimisticTransaction*)txnid)->GetID(),
column_family_id, is_exclusive, std::move(start),
std::move(end)});
};
request.start();
const int r = request.wait(wait_time_msec, killed_time_msec,
nullptr, // killed_callback
wait_callback_for_locktree, nullptr);
// Inform the txn that we are no longer waiting:
txn->ClearWaitingTxn();
request.destroy();
switch (r) {
case 0:
break; // fall through
case DB_LOCK_NOTGRANTED:
return Status::TimedOut(Status::SubCode::kLockTimeout);
case TOKUDB_OUT_OF_LOCKS:
return Status::Busy(Status::SubCode::kLockLimit);
case DB_LOCK_DEADLOCK: {
std::reverse(di_path.begin(), di_path.end());
dlock_buffer_.AddNewPath(
RangeDeadlockPath(di_path, request.get_start_time()));
return Status::Busy(Status::SubCode::kDeadlock);
}
default:
assert(0);
return Status::Busy(Status::SubCode::kLockLimit);
}
return Status::OK();
}
// Wait callback that locktree library will call to inform us about
// the lock waits that are in progress.
void wait_callback_for_locktree(void*, lock_wait_infos* infos) {
for (auto wait_info : *infos) {
auto txn = (PessimisticTransaction*)wait_info.waiter;
auto cf_id = (ColumnFamilyId)wait_info.ltree->get_dict_id().dictid;
autovector<TransactionID> waitee_ids;
for (auto waitee : wait_info.waitees) {
waitee_ids.push_back(((PessimisticTransaction*)waitee)->GetID());
}
txn->SetWaitingTxn(waitee_ids, cf_id, (std::string*)wait_info.m_extra);
}
// Here we can assume that the locktree code will now wait for some lock
TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn");
}
void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
ColumnFamilyId column_family_id,
const std::string& key, Env*) {
auto locktree = GetLockTreeForCF(column_family_id);
std::string endp_image;
serialize_endpoint({key.data(), key.size(), false}, &endp_image);
DBT key_dbt;
toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size());
toku::range_buffer range_buf;
range_buf.create();
range_buf.append(&key_dbt, &key_dbt);
locktree->release_locks((TXNID)txn, &range_buf);
range_buf.destroy();
toku::lock_request::retry_all_lock_requests(
locktree.get(), wait_callback_for_locktree, nullptr);
}
void RangeTreeLockManager::UnLock(PessimisticTransaction* txn,
const LockTracker& tracker, Env*) {
const RangeTreeLockTracker* range_tracker =
static_cast<const RangeTreeLockTracker*>(&tracker);
RangeTreeLockTracker* range_trx_tracker =
static_cast<RangeTreeLockTracker*>(&txn->GetTrackedLocks());
bool all_keys = (range_trx_tracker == range_tracker);
// tracked_locks_->range_list may hold nullptr if the transaction has never
// acquired any locks.
((RangeTreeLockTracker*)range_tracker)->ReleaseLocks(this, txn, all_keys);
}
int RangeTreeLockManager::CompareDbtEndpoints(void* arg, const DBT* a_key,
const DBT* b_key) {
const char* a = (const char*)a_key->data;
const char* b = (const char*)b_key->data;
size_t a_len = a_key->size;
size_t b_len = b_key->size;
size_t min_len = std::min(a_len, b_len);
// Compare the values. The first byte encodes the endpoint type, its value
// is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM.
Comparator* cmp = (Comparator*)arg;
int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1));
if (!res) {
if (b_len > min_len) {
// a is shorter;
if (a[0] == SUFFIX_INFIMUM) {
return -1; //"a is smaller"
} else {
// a is considered padded with 0xFF:FF:FF:FF...
return 1; // "a" is bigger
}
} else if (a_len > min_len) {
// the opposite of the above: b is shorter.
if (b[0] == SUFFIX_INFIMUM) {
return 1; //"b is smaller"
} else {
// b is considered padded with 0xFF:FF:FF:FF...
return -1; // "b" is bigger
}
} else {
// the lengths are equal (and the key values, too)
if (a[0] < b[0]) {
return -1;
} else if (a[0] > b[0]) {
return 1;
} else {
return 0;
}
}
} else {
return res;
}
}
namespace {
void UnrefLockTreeMapsCache(void* ptr) {
// Called when a thread exits or a ThreadLocalPtr gets destroyed.
auto lock_tree_map_cache = static_cast<
std::unordered_map<ColumnFamilyId, std::shared_ptr<locktree>>*>(ptr);
delete lock_tree_map_cache;
}
} // anonymous namespace
RangeTreeLockManager::RangeTreeLockManager(
std::shared_ptr<TransactionDBMutexFactory> mutex_factory)
: mutex_factory_(mutex_factory),
ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)),
dlock_buffer_(10) {
ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_);
}
void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize(
uint32_t target_size) {
dlock_buffer_.Resize(target_size);
}
void RangeTreeLockManager::Resize(uint32_t target_size) {
SetRangeDeadlockInfoBufferSize(target_size);
}
std::vector<RangeDeadlockPath>
RangeTreeLockManager::GetRangeDeadlockInfoBuffer() {
return dlock_buffer_.PrepareBuffer();
}
std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() {
std::vector<DeadlockPath> res;
std::vector<RangeDeadlockPath> data = GetRangeDeadlockInfoBuffer();
// report left endpoints
for (auto it = data.begin(); it != data.end(); ++it) {
std::vector<DeadlockInfo> path;
for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) {
path.push_back(
{it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice});
}
res.push_back(DeadlockPath(path, it->deadlock_time));
}
return res;
}
// @brief Lock Escalation Callback function
//
// @param txnid Transaction whose locks got escalated
// @param lt Lock Tree where escalation is happening
// @param buffer Escalation result: list of locks that this transaction now
// owns in this lock tree.
// @param void* Callback context
void RangeTreeLockManager::on_escalate(TXNID txnid, const locktree* lt,
const range_buffer& buffer, void*) {
auto txn = (PessimisticTransaction*)txnid;
((RangeTreeLockTracker*)&txn->GetTrackedLocks())->ReplaceLocks(lt, buffer);
}
RangeTreeLockManager::~RangeTreeLockManager() {
autovector<void*> local_caches;
ltree_lookup_cache_->Scrape(&local_caches, nullptr);
for (auto cache : local_caches) {
delete static_cast<LockTreeMap*>(cache);
}
ltree_map_.clear(); // this will call release_lt() for all locktrees
ltm_.destroy();
}
RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() {
LTM_STATUS_S ltm_status_test;
ltm_.get_status(&ltm_status_test);
Counters res;
// Searching status variable by its string name is how Toku's unit tests
// do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
// lookup keyname in status
for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) {
TOKU_ENGINE_STATUS_ROW status = &ltm_status_test.status[i];
if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) {
res.escalation_count = status->value.num;
continue;
}
if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) {
res.current_lock_memory = status->value.num;
}
}
return res;
}
std::shared_ptr<locktree> RangeTreeLockManager::MakeLockTreePtr(locktree* lt) {
locktree_manager* ltm = &ltm_;
return std::shared_ptr<locktree>(lt,
[ltm](locktree* p) { ltm->release_lt(p); });
}
void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) {
uint32_t column_family_id = cfh->GetID();
InstrumentedMutexLock l(&ltree_map_mutex_);
if (ltree_map_.find(column_family_id) == ltree_map_.end()) {
DICTIONARY_ID dict_id = {.dictid = column_family_id};
toku::comparator cmp;
cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator());
toku::locktree* ltree = ltm_.get_lt(dict_id, cmp,
/* on_create_extra*/ nullptr);
// This is ok to because get_lt has copied the comparator:
cmp.destroy();
ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)});
}
}
void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) {
uint32_t column_family_id = cfh->GetID();
// Remove lock_map for this column family. Since the lock map is stored
// as a shared ptr, concurrent transactions can still keep using it
// until they release their references to it.
// TODO what if one drops a column family while transaction(s) still have
// locks in it?
// locktree uses column family'c Comparator* as the criteria to do tree
// ordering. If the comparator is gone, we won't even be able to remove the
// elements from the locktree.
// A possible solution might be to remove everything right now:
// - wait until everyone traversing the locktree are gone
// - remove everything from the locktree.
// - some transactions may have acquired locks in their LockTracker objects.
// Arrange something so we don't blow up when they try to release them.
// - ...
// This use case (drop column family while somebody is using it) doesn't seem
// the priority, though.
{
InstrumentedMutexLock l(&ltree_map_mutex_);
auto lock_maps_iter = ltree_map_.find(column_family_id);
assert(lock_maps_iter != ltree_map_.end());
ltree_map_.erase(lock_maps_iter);
} // lock_map_mutex_
autovector<void*> local_caches;
ltree_lookup_cache_->Scrape(&local_caches, nullptr);
for (auto cache : local_caches) {
delete static_cast<LockTreeMap*>(cache);
}
}
std::shared_ptr<locktree> RangeTreeLockManager::GetLockTreeForCF(
ColumnFamilyId column_family_id) {
// First check thread-local cache
if (ltree_lookup_cache_->Get() == nullptr) {
ltree_lookup_cache_->Reset(new LockTreeMap());
}
auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get());
auto it = ltree_map_cache->find(column_family_id);
if (it != ltree_map_cache->end()) {
// Found lock map for this column family.
return it->second;
}
// Not found in local cache, grab mutex and check shared LockMaps
InstrumentedMutexLock l(&ltree_map_mutex_);
it = ltree_map_.find(column_family_id);
if (it == ltree_map_.end()) {
return nullptr;
} else {
// Found lock map. Store in thread-local cache and return.
ltree_map_cache->insert({column_family_id, it->second});
return it->second;
}
}
struct LOCK_PRINT_CONTEXT {
RangeLockManagerHandle::RangeLockStatus* data; // Save locks here
uint32_t cfh_id; // Column Family whose tree we are traversing
};
// Report left endpoints of the acquired locks
LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() {
PointLockStatus res;
LockManager::RangeLockStatus data = GetRangeLockStatus();
// report left endpoints
for (auto it = data.begin(); it != data.end(); ++it) {
auto& val = it->second;
res.insert({it->first, {val.start.slice, val.ids, val.exclusive}});
}
return res;
}
static void push_into_lock_status_data(void* param, const DBT* left,
const DBT* right, TXNID txnid_arg,
bool is_shared, TxnidVector* owners) {
struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param;
struct RangeLockInfo info;
info.exclusive = !is_shared;
deserialize_endpoint(left, &info.start);
deserialize_endpoint(right, &info.end);
if (txnid_arg != TXNID_SHARED) {
TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID();
info.ids.push_back(txnid);
} else {
for (auto it : *owners) {
TXNID real_id = ((PessimisticTransaction*)it)->GetID();
info.ids.push_back(real_id);
}
}
ctx->data->insert({ctx->cfh_id, info});
}
LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() {
LockManager::RangeLockStatus data;
{
InstrumentedMutexLock l(&ltree_map_mutex_);
for (auto it : ltree_map_) {
LOCK_PRINT_CONTEXT ctx = {&data, it.first};
it.second->dump_locks((void*)&ctx, push_into_lock_status_data);
}
}
return data;
}
} // namespace ROCKSDB_NAMESPACE
#endif // OS_WIN
#endif // ROCKSDB_LITE

@ -0,0 +1,130 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
// For DeadlockInfoBuffer:
#include "util/thread_local.h"
#include "utilities/transactions/lock/point/point_lock_manager.h"
#include "utilities/transactions/lock/range/range_lock_manager.h"
// Lock Tree library:
#include "utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h"
#include "utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h"
#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h"
namespace ROCKSDB_NAMESPACE {
using namespace toku;
typedef DeadlockInfoBufferTempl<RangeDeadlockPath> RangeDeadlockInfoBuffer;
// A Range Lock Manager that uses PerconaFT's locktree library
class RangeTreeLockManager : public RangeLockManagerBase,
public RangeLockManagerHandle {
public:
LockManager* getLockManager() override { return this; }
void AddColumnFamily(const ColumnFamilyHandle* cfh) override;
void RemoveColumnFamily(const ColumnFamilyHandle* cfh) override;
void Resize(uint32_t) override;
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() override;
void SetRangeDeadlockInfoBufferSize(uint32_t target_size) override;
// Get a lock on a range
// @note only exclusive locks are currently supported (requesting a
// non-exclusive lock will get an exclusive one)
using LockManager::TryLock;
Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const Endpoint& start_endp, const Endpoint& end_endp, Env* env,
bool exclusive) override;
void UnLock(PessimisticTransaction* txn, const LockTracker& tracker,
Env* env) override;
void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
const std::string& key, Env* env) override;
void UnLock(PessimisticTransaction*, ColumnFamilyId, const Endpoint&,
const Endpoint&, Env*) override {
// TODO: range unlock does nothing...
}
explicit RangeTreeLockManager(
std::shared_ptr<TransactionDBMutexFactory> mutex_factory);
~RangeTreeLockManager() override;
int SetMaxLockMemory(size_t max_lock_memory) override {
return ltm_.set_max_lock_memory(max_lock_memory);
}
size_t GetMaxLockMemory() override { return ltm_.get_max_lock_memory(); }
Counters GetStatus() override;
bool IsPointLockSupported() const override {
// One could have acquired a point lock (it is reduced to range lock)
return true;
}
PointLockStatus GetPointLockStatus() override;
// This is from LockManager
LockManager::RangeLockStatus GetRangeLockStatus() override;
// This has the same meaning as GetRangeLockStatus but is from
// RangeLockManagerHandle
RangeLockManagerHandle::RangeLockStatus GetRangeLockStatusData() override {
return GetRangeLockStatus();
}
bool IsRangeLockSupported() const override { return true; }
const LockTrackerFactory& GetLockTrackerFactory() const override {
return RangeTreeLockTrackerFactory::Get();
}
// Get the locktree which stores locks for the Column Family with given cf_id
std::shared_ptr<locktree> GetLockTreeForCF(ColumnFamilyId cf_id);
private:
toku::locktree_manager ltm_;
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
// Map from cf_id to locktree*. Can only be accessed while holding the
// ltree_map_mutex_. Must use a custom deleter that calls ltm_.release_lt
using LockTreeMap =
std::unordered_map<ColumnFamilyId, std::shared_ptr<locktree>>;
LockTreeMap ltree_map_;
InstrumentedMutex ltree_map_mutex_;
// Per-thread cache of ltree_map_.
// (uses the same approach as TransactionLockMgr::lock_maps_cache_)
std::unique_ptr<ThreadLocalPtr> ltree_lookup_cache_;
RangeDeadlockInfoBuffer dlock_buffer_;
std::shared_ptr<locktree> MakeLockTreePtr(locktree* lt);
static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key);
// Callbacks
static int on_create(locktree*, void*) { return 0; /* no error */ }
static void on_destroy(locktree*) {}
static void on_escalate(TXNID txnid, const locktree* lt,
const range_buffer& buffer, void* extra);
};
void serialize_endpoint(const Endpoint& endp, std::string* buf);
void wait_callback_for_locktree(void* cdata, lock_wait_infos* infos);
} // namespace ROCKSDB_NAMESPACE
#endif // OS_WIN
#endif // ROCKSDB_LITE

@ -0,0 +1,156 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#include "utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h"
#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h"
namespace ROCKSDB_NAMESPACE {
RangeLockList *RangeTreeLockTracker::getOrCreateList() {
if (range_list_) return range_list_.get();
// Doesn't exist, create
range_list_.reset(new RangeLockList());
return range_list_.get();
}
void RangeTreeLockTracker::Track(const PointLockRequest &lock_req) {
DBT key_dbt;
std::string key;
serialize_endpoint(Endpoint(lock_req.key, false), &key);
toku_fill_dbt(&key_dbt, key.data(), key.size());
RangeLockList *rl = getOrCreateList();
rl->Append(lock_req.column_family_id, &key_dbt, &key_dbt);
}
void RangeTreeLockTracker::Track(const RangeLockRequest &lock_req) {
DBT start_dbt, end_dbt;
std::string start_key, end_key;
serialize_endpoint(lock_req.start_endp, &start_key);
serialize_endpoint(lock_req.end_endp, &end_key);
toku_fill_dbt(&start_dbt, start_key.data(), start_key.size());
toku_fill_dbt(&end_dbt, end_key.data(), end_key.size());
RangeLockList *rl = getOrCreateList();
rl->Append(lock_req.column_family_id, &start_dbt, &end_dbt);
}
PointLockStatus RangeTreeLockTracker::GetPointLockStatus(
ColumnFamilyId /*cf_id*/, const std::string & /*key*/) const {
// This function is not expected to be called as RangeTreeLockTracker::
// IsPointLockSupported() returns false. Return the status which indicates
// the point is not locked.
PointLockStatus p;
p.locked = false;
p.exclusive = true;
p.seq = 0;
return p;
}
void RangeTreeLockTracker::Clear() { range_list_.reset(); }
void RangeLockList::Append(ColumnFamilyId cf_id, const DBT *left_key,
const DBT *right_key) {
MutexLock l(&mutex_);
// Only the transaction owner thread calls this function.
// The same thread does the lock release, so we can be certain nobody is
// releasing the locks concurrently.
assert(!releasing_locks_.load());
auto it = buffers_.find(cf_id);
if (it == buffers_.end()) {
// create a new one
it = buffers_.emplace(cf_id, std::make_shared<toku::range_buffer>()).first;
it->second->create();
}
it->second->append(left_key, right_key);
}
void RangeLockList::ReleaseLocks(RangeTreeLockManager *mgr,
PessimisticTransaction *txn,
bool all_trx_locks) {
{
MutexLock l(&mutex_);
// The lt->release_locks() call below will walk range_list->buffer_. We
// need to prevent lock escalation callback from replacing
// range_list->buffer_ while we are doing that.
//
// Additional complication here is internal mutex(es) in the locktree
// (let's call them latches):
// - Lock escalation first obtains latches on the lock tree
// - Then, it calls RangeTreeLockManager::on_escalate to replace
// transaction's range_list->buffer_. = Access to that buffer must be
// synchronized, so it will want to acquire the range_list->mutex_.
//
// While in this function we would want to do the reverse:
// - Acquire range_list->mutex_ to prevent access to the range_list.
// - Then, lt->release_locks() call will walk through the range_list
// - and acquire latches on parts of the lock tree to remove locks from
// it.
//
// How do we avoid the deadlock? The idea is that here we set
// releasing_locks_=true, and release the mutex.
// All other users of the range_list must:
// - Acquire the mutex, then check that releasing_locks_=false.
// (the code in this function doesnt do that as there's only one thread
// that releases transaction's locks)
releasing_locks_.store(true);
}
for (auto it : buffers_) {
// Don't try to call release_locks() if the buffer is empty! if we are
// not holding any locks, the lock tree might be in the STO-mode with
// another transaction, and our attempt to release an empty set of locks
// will cause an assertion failure.
if (it.second->get_num_ranges()) {
auto lt_ptr = mgr->GetLockTreeForCF(it.first);
toku::locktree *lt = lt_ptr.get();
lt->release_locks((TXNID)txn, it.second.get(), all_trx_locks);
it.second->destroy();
it.second->create();
toku::lock_request::retry_all_lock_requests(lt,
wait_callback_for_locktree);
}
}
Clear();
releasing_locks_.store(false);
}
void RangeLockList::ReplaceLocks(const toku::locktree *lt,
const toku::range_buffer &buffer) {
MutexLock l(&mutex_);
if (releasing_locks_.load()) {
// Do nothing. The transaction is releasing its locks, so it will not care
// about having a correct list of ranges. (In TokuDB,
// toku_db_txn_escalate_callback() makes use of this property, too)
return;
}
ColumnFamilyId cf_id = (ColumnFamilyId)lt->get_dict_id().dictid;
auto it = buffers_.find(cf_id);
it->second->destroy();
it->second->create();
toku::range_buffer::iterator iter(&buffer);
toku::range_buffer::iterator::record rec;
while (iter.current(&rec)) {
it->second->append(rec.get_left_key(), rec.get_right_key());
iter.next();
}
}
} // namespace ROCKSDB_NAMESPACE
#endif // OS_WIN
#endif // ROCKSDB_LITE

@ -0,0 +1,146 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include "util/mutexlock.h"
#include "utilities/transactions/lock/lock_tracker.h"
#include "utilities/transactions/pessimistic_transaction.h"
// Range Locking:
#include "lib/locktree/lock_request.h"
#include "lib/locktree/locktree.h"
namespace ROCKSDB_NAMESPACE {
class RangeTreeLockManager;
// Storage for locks that are currently held by a transaction.
//
// Locks are kept in toku::range_buffer because toku::locktree::release_locks()
// accepts that as an argument.
//
// Note: the list of locks may differ slighly from the contents of the lock
// tree, due to concurrency between lock acquisition, lock release, and lock
// escalation. See MDEV-18227 and RangeTreeLockManager::UnLock for details.
// This property is currently harmless.
//
// Append() and ReleaseLocks() are not thread-safe, as they are expected to be
// called only by the owner transaction. ReplaceLocks() is safe to call from
// other threads.
class RangeLockList {
public:
~RangeLockList() { Clear(); }
RangeLockList() : releasing_locks_(false) {}
void Append(ColumnFamilyId cf_id, const DBT* left_key, const DBT* right_key);
void ReleaseLocks(RangeTreeLockManager* mgr, PessimisticTransaction* txn,
bool all_trx_locks);
void ReplaceLocks(const toku::locktree* lt, const toku::range_buffer& buffer);
private:
void Clear() {
for (auto it : buffers_) {
it.second->destroy();
}
buffers_.clear();
}
std::unordered_map<ColumnFamilyId, std::shared_ptr<toku::range_buffer>>
buffers_;
port::Mutex mutex_;
std::atomic<bool> releasing_locks_;
};
// A LockTracker-based object that is used together with RangeTreeLockManager.
class RangeTreeLockTracker : public LockTracker {
public:
RangeTreeLockTracker() : range_list_(nullptr) {}
RangeTreeLockTracker(const RangeTreeLockTracker&) = delete;
RangeTreeLockTracker& operator=(const RangeTreeLockTracker&) = delete;
void Track(const PointLockRequest&) override;
void Track(const RangeLockRequest&) override;
bool IsPointLockSupported() const override {
// This indicates that we don't implement GetPointLockStatus()
return false;
}
bool IsRangeLockSupported() const override { return true; }
// a Not-supported dummy implementation.
UntrackStatus Untrack(const RangeLockRequest& /*lock_request*/) override {
return UntrackStatus::NOT_TRACKED;
}
UntrackStatus Untrack(const PointLockRequest& /*lock_request*/) override {
return UntrackStatus::NOT_TRACKED;
}
// "If this method is not supported, leave it as a no-op."
void Merge(const LockTracker&) override {}
// "If this method is not supported, leave it as a no-op."
void Subtract(const LockTracker&) override {}
void Clear() override;
// "If this method is not supported, returns nullptr."
virtual LockTracker* GetTrackedLocksSinceSavePoint(
const LockTracker&) const override {
return nullptr;
}
PointLockStatus GetPointLockStatus(ColumnFamilyId column_family_id,
const std::string& key) const override;
// The return value is only used for tests
uint64_t GetNumPointLocks() const override { return 0; }
ColumnFamilyIterator* GetColumnFamilyIterator() const override {
return nullptr;
}
KeyIterator* GetKeyIterator(
ColumnFamilyId /*column_family_id*/) const override {
return nullptr;
}
void ReleaseLocks(RangeTreeLockManager* mgr, PessimisticTransaction* txn,
bool all_trx_locks) {
if (range_list_) range_list_->ReleaseLocks(mgr, txn, all_trx_locks);
}
void ReplaceLocks(const toku::locktree* lt,
const toku::range_buffer& buffer) {
// range_list_ cannot be NULL here
range_list_->ReplaceLocks(lt, buffer);
}
private:
RangeLockList* getOrCreateList();
std::unique_ptr<RangeLockList> range_list_;
};
class RangeTreeLockTrackerFactory : public LockTrackerFactory {
public:
static const RangeTreeLockTrackerFactory& Get() {
static const RangeTreeLockTrackerFactory instance;
return instance;
}
LockTracker* Create() const override { return new RangeTreeLockTracker(); }
private:
RangeTreeLockTrackerFactory() {}
};
} // namespace ROCKSDB_NAMESPACE

@ -250,6 +250,8 @@ class TransactionBaseImpl : public Transaction {
WriteBatch* GetCommitTimeWriteBatch() override;
LockTracker& GetTrackedLocks() { return *tracked_locks_; }
protected:
// Add a key to the list of tracked keys.
//

Loading…
Cancel
Save