You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/utilities/transactions/pessimistic_transaction_db.cc

696 lines
24 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/pessimistic_transaction_db.h"
#include <string>
#include <unordered_set>
#include <vector>
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace rocksdb {
PessimisticTransactionDB::PessimisticTransactionDB(
DB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
assert(db_impl_ != nullptr);
}
// Support initiliazing PessimisticTransactionDB from a stackable db
//
// PessimisticTransactionDB
// ^ ^
// | |
// | +
// | StackableDB
// | ^
// | |
// + +
// DBImpl
// ^
// |(inherit)
// +
// DB
//
PessimisticTransactionDB::PessimisticTransactionDB(
StackableDB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
txn_db_options_(txn_db_options),
lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
txn_db_options_.max_num_deadlocks,
txn_db_options_.custom_mutex_factory
? txn_db_options_.custom_mutex_factory
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
assert(db_impl_ != nullptr);
}
PessimisticTransactionDB::~PessimisticTransactionDB() {
while (!transactions_.empty()) {
delete transactions_.begin()->second;
}
}
Status PessimisticTransactionDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
for (auto cf_ptr : handles) {
AddColumnFamily(cf_ptr);
}
// Re-enable compaction for the column families that initially had
// compaction enabled.
std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
for (auto index : compaction_enabled_cf_indices) {
compaction_enabled_cf_handles.push_back(handles[index]);
}
Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
// create 'real' transactions from recovered shell transactions
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr);
auto rtrxs = dbimpl->recovered_transactions();
for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
auto recovered_trx = it->second;
assert(recovered_trx);
assert(recovered_trx->log_number_);
assert(recovered_trx->name_.length());
WriteOptions w_options;
w_options.sync = true;
TransactionOptions t_options;
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx);
real_trx->SetLogNumber(recovered_trx->log_number_);
s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
break;
}
s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_);
real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) {
break;
}
}
if (s.ok()) {
dbimpl->DeleteAllRecoveredTransactions();
}
return s;
}
Transaction* WriteCommittedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WriteCommittedTxn(this, write_options, txn_options);
}
}
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WritePreparedTxn(this, write_options, txn_options);
}
}
TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
const TransactionDBOptions& txn_db_options) {
TransactionDBOptions validated = txn_db_options;
if (txn_db_options.num_stripes == 0) {
validated.num_stripes = 1;
}
return validated;
}
Status TransactionDB::Open(const Options& options,
const TransactionDBOptions& txn_db_options,
const std::string& dbname, TransactionDB** dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status TransactionDB::Open(
const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
Status s;
DB* db;
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
DBOptions db_options_2pc = db_options;
PrepareWrap(&db_options_2pc, &column_families_copy,
&compaction_enabled_cf_indices);
s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db);
if (s.ok()) {
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
dbptr);
}
return s;
}
void TransactionDB::PrepareWrap(
DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
std::vector<size_t>* compaction_enabled_cf_indices) {
compaction_enabled_cf_indices->clear();
// Enable MemTable History if not already enabled
for (size_t i = 0; i < column_families->size(); i++) {
ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
if (cf_options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to max_write_buffer_number.
cf_options->max_write_buffer_number_to_maintain = -1;
}
if (!cf_options->disable_auto_compactions) {
// Disable compactions momentarily to prevent race with DB::Open
cf_options->disable_auto_compactions = true;
compaction_enabled_cf_indices->push_back(i);
}
}
db_options->allow_2pc = true;
}
Status TransactionDB::WrapDB(
// make sure this db is already opened with memtable history enabled,
// auto compaction distabled and 2 phase commit enabled
DB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
}
Status TransactionDB::WrapStackableDB(
// make sure this stackable_db is already opened with memtable history
// enabled,
// auto compaction distabled and 2 phase commit enabled
StackableDB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
}
// Let TransactionLockMgr know that this column family exists so it can
// allocate a LockMap for it.
void PessimisticTransactionDB::AddColumnFamily(
const ColumnFamilyHandle* handle) {
lock_mgr_.AddColumnFamily(handle->GetID());
}
Status PessimisticTransactionDB::CreateColumnFamily(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = db_->CreateColumnFamily(options, column_family_name, handle);
if (s.ok()) {
lock_mgr_.AddColumnFamily((*handle)->GetID());
}
return s;
}
// Let TransactionLockMgr know that it can deallocate the LockMap for this
// column family.
Status PessimisticTransactionDB::DropColumnFamily(
ColumnFamilyHandle* column_family) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = db_->DropColumnFamily(column_family);
if (s.ok()) {
lock_mgr_.RemoveColumnFamily(column_family->GetID());
}
return s;
}
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
uint32_t cfh_id,
const std::string& key,
bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
const TransactionKeyMap* keys) {
lock_mgr_.UnLock(txn, keys, GetEnv());
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
uint32_t cfh_id, const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
}
// Used when wrapping DB write operations in a transaction
Transaction* PessimisticTransactionDB::BeginInternalTransaction(
const WriteOptions& options) {
TransactionOptions txn_options;
Transaction* txn = BeginTransaction(options, txn_options, nullptr);
// Use default timeout for non-transactional writes
txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
return txn;
}
// All user Put, Merge, Delete, and Write requests must be intercepted to make
// sure that they lock all keys that they are writing to avoid causing conflicts
// with any concurrent transactions. The easiest way to do this is to wrap all
// write operations in a transaction.
//
// Put(), Merge(), and Delete() only lock a single key per call. Write() will
// sort its keys before locking them. This guarantees that TransactionDB write
// methods cannot deadlock with eachother (but still could deadlock with a
// Transaction).
Status PessimisticTransactionDB::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
Status s;
Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do PutUntracked().
s = txn->PutUntracked(column_family, key, val);
if (s.ok()) {
s = txn->Commit();
}
delete txn;
return s;
}
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) {
Status s;
Transaction* txn = BeginInternalTransaction(wopts);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do
// DeleteUntracked().
s = txn->DeleteUntracked(column_family, key);
if (s.ok()) {
s = txn->Commit();
}
delete txn;
return s;
}
Status PessimisticTransactionDB::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s;
Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do
// MergeUntracked().
s = txn->MergeUntracked(column_family, key, value);
if (s.ok()) {
s = txn->Commit();
}
delete txn;
return s;
}
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock.
// In order to avoid a deadlock with a concurrent Transaction, Transactions
// should use a lock timeout.
Status s = txn_impl->CommitBatch(updates);
delete txn;
return s;
}
void PessimisticTransactionDB::InsertExpirableTransaction(
TransactionID tx_id, PessimisticTransaction* tx) {
assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx});
}
void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.erase(tx_id);
}
bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);
auto tx_it = expirable_transactions_map_.find(tx_id);
if (tx_it == expirable_transactions_map_.end()) {
return true;
}
PessimisticTransaction& tx = *(tx_it->second);
return tx.TryStealingLocks();
}
void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
txn_impl->Reinitialize(this, write_options, txn_options);
}
Transaction* PessimisticTransactionDB::GetTransactionByName(
const TransactionName& name) {
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(name);
if (it == transactions_.end()) {
return nullptr;
} else {
return it->second;
}
}
void PessimisticTransactionDB::GetAllPreparedTransactions(
std::vector<Transaction*>* transv) {
assert(transv);
transv->clear();
std::lock_guard<std::mutex> lock(name_map_mutex_);
for (auto it = transactions_.begin(); it != transactions_.end(); it++) {
if (it->second->GetState() == Transaction::PREPARED) {
transv->push_back(it->second);
}
}
}
TransactionLockMgr::LockStatusData
PessimisticTransactionDB::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
}
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
return lock_mgr_.GetDeadlockInfoBuffer();
}
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
lock_mgr_.Resize(target_size);
}
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);
assert(GetTransactionByName(txn->GetName()) == nullptr);
assert(txn->GetState() == Transaction::STARTED);
std::lock_guard<std::mutex> lock(name_map_mutex_);
transactions_[txn->GetName()] = txn;
}
void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
assert(txn);
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(txn->GetName());
assert(it != transactions_.end());
transactions_.erase(it);
}
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &cached);
if (!exist) {
// It is not committed, so it must be still prepared
return false;
}
if (prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
return cached.commit_seq <= snapshot_seq;
}
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot (i)
// delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
return true;
}
// else (ii) might be the case: check the commit data saved for this snapshot.
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
return true;
}
{
// We should not normally reach here
ReadLock rl(&old_commit_map_mutex_);
auto old_commit_entry = old_commit_map_.find(prep_seq);
if (old_commit_entry == old_commit_map_.end() ||
old_commit_entry->second <= snapshot_seq) {
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); }
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
auto max_evicted_seq = evicted.commit_seq;
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
{
WriteLock wl(&prepared_mutex_);
while (!prepared_txns_.empty() &&
prepared_txns_.top() <= max_evicted_seq) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
{
WriteLock wl(&snapshots_mutex_);
InstrumentedMutex(db_impl_->mutex());
snapshots_ = db_impl_->snapshots().GetAll();
}
while (prev_max < max_evicted_seq &&
!max_evicted_seq_.compare_exchange_weak(
prev_max, max_evicted_seq, std::memory_order_release,
std::memory_order_acquire)) {
};
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
{
ReadLock rl(&snapshots_mutex_);
for (auto snapshot : snapshots_) {
auto snapshot_seq =
reinterpret_cast<const SnapshotImpl*>(snapshot)->number_;
if (evicted.commit_seq <= snapshot_seq) {
break;
}
// then snapshot_seq < evicted.commit_seq
if (evicted.prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[evicted.prep_seq] = evicted.commit_seq;
}
}
}
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq,
CommitEntry* entry) {
// TODO(myabandeh): implement lock-free commit_cache_
ReadLock rl(&commit_cache_mutex_);
*entry = commit_cache_[indexed_seq];
return (entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq,
CommitEntry& new_entry,
CommitEntry* evicted_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
*evicted_entry = commit_cache_[indexed_seq];
commit_cache_[indexed_seq] = new_entry;
return (evicted_entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
CommitEntry& expected_entry,
CommitEntry new_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
auto& evicted_entry = commit_cache_[indexed_seq];
if (evicted_entry.prep_seq != expected_entry.prep_seq) {
return false;
}
commit_cache_[indexed_seq] = new_entry;
return true;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE