Refactor TransactionDBImpl

Summary:
This opens space for the new implementations of TransactionDBImpl such as WritePreparedTxnDBImpl that has a different policy of how to write to DB.
Closes https://github.com/facebook/rocksdb/pull/2689

Differential Revision: D5568918

Pulled By: maysamyabandeh

fbshipit-source-id: f7eac866e175daf3793ae79da108f65cc7dc7b25
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 20dc5e74f2
commit c9804e007a
  1. 2
      CMakeLists.txt
  2. 3
      TARGETS
  3. 12
      include/rocksdb/utilities/transaction_db.h
  4. 2
      src.mk
  5. 128
      utilities/transactions/pessimistic_transaction_db.cc
  6. 57
      utilities/transactions/pessimistic_transaction_db.h
  7. 4
      utilities/transactions/transaction_impl.cc
  8. 4
      utilities/transactions/transaction_impl.h
  9. 4
      utilities/transactions/transaction_lock_mgr.cc
  10. 4
      utilities/transactions/transaction_lock_mgr.h
  11. 2
      utilities/transactions/write_prepared_transaction_impl.cc

@ -520,7 +520,7 @@ set(SOURCES
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction_impl.cc
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_db_impl.cc
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_impl.cc
utilities/transactions/transaction_lock_mgr.cc

@ -247,11 +247,12 @@ cpp_library(
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/optimistic_transaction_impl.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_transaction_impl.cc",
"utilities/ttl/db_ttl_impl.cc",
"utilities/write_batch_with_index/write_batch_with_index.cc",
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",

@ -23,6 +23,12 @@ namespace rocksdb {
class TransactionDBMutexFactory;
enum TxnDBWritePolicy {
WRITE_COMMITTED = 0, // write only the committed data
WRITE_PREPARED, // write data after the prepare phase of 2pc
WRITE_UNPREPARED // write data before the prepare phase of 2pc
};
struct TransactionDBOptions {
// Specifies the maximum number of keys that can be locked at the same time
// per column family.
@ -66,6 +72,12 @@ struct TransactionDBOptions {
// condition variable for all transaction locking instead of the default
// mutex/condvar implementation.
std::shared_ptr<TransactionDBMutexFactory> custom_mutex_factory;
// The policy for when to write the data into the DB. The default policy is to
// write only the committed data (WRITE_COMMITTED). The data could be written
// before the commit phase. The DB then needs to provide the mechanisms to
// tell apart committed from uncommitted data.
TxnDBWritePolicy write_policy;
};
struct TransactionOptions {

@ -195,7 +195,7 @@ LIB_SOURCES = \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/optimistic_transaction_impl.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/transaction_db_impl.cc \
utilities/transactions/pessimistic_transaction_db.cc \
utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/transaction_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \

@ -5,7 +5,7 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include <string>
#include <unordered_set>
@ -21,8 +21,8 @@
namespace rocksdb {
TransactionDBImpl::TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options)
PessimisticTransactionDB::PessimisticTransactionDB(
DB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
txn_db_options_(txn_db_options),
@ -34,9 +34,9 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
assert(db_impl_ != nullptr);
}
// Support initiliazing TransactionDBImpl from a stackable db
// Support initiliazing PessimisticTransactionDB from a stackable db
//
// TransactionDBImpl
// PessimisticTransactionDB
// ^ ^
// | |
// | +
@ -50,8 +50,8 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
// +
// DB
//
TransactionDBImpl::TransactionDBImpl(StackableDB* db,
const TransactionDBOptions& txn_db_options)
PessimisticTransactionDB::PessimisticTransactionDB(
StackableDB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
txn_db_options_(txn_db_options),
@ -63,13 +63,13 @@ TransactionDBImpl::TransactionDBImpl(StackableDB* db,
assert(db_impl_ != nullptr);
}
TransactionDBImpl::~TransactionDBImpl() {
PessimisticTransactionDB::~PessimisticTransactionDB() {
while (!transactions_.empty()) {
delete transactions_.begin()->second;
}
}
Status TransactionDBImpl::Initialize(
Status PessimisticTransactionDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
for (auto cf_ptr : handles) {
@ -121,7 +121,7 @@ Status TransactionDBImpl::Initialize(
return s;
}
Transaction* TransactionDBImpl::BeginTransaction(
Transaction* WriteCommittedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
@ -132,7 +132,18 @@ Transaction* TransactionDBImpl::BeginTransaction(
}
}
TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions(
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WritePreparedTxnImpl(this, write_options, txn_options);
}
}
TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
const TransactionDBOptions& txn_db_options) {
TransactionDBOptions validated = txn_db_options;
@ -213,8 +224,19 @@ Status TransactionDB::WrapDB(
DB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
TransactionDBImpl* txn_db = new TransactionDBImpl(
db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
@ -227,8 +249,19 @@ Status TransactionDB::WrapStackableDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
TransactionDBImpl* txn_db = new TransactionDBImpl(
db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
@ -236,11 +269,12 @@ Status TransactionDB::WrapStackableDB(
// Let TransactionLockMgr know that this column family exists so it can
// allocate a LockMap for it.
void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) {
void PessimisticTransactionDB::AddColumnFamily(
const ColumnFamilyHandle* handle) {
lock_mgr_.AddColumnFamily(handle->GetID());
}
Status TransactionDBImpl::CreateColumnFamily(
Status PessimisticTransactionDB::CreateColumnFamily(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle) {
InstrumentedMutexLock l(&column_family_mutex_);
@ -255,7 +289,8 @@ Status TransactionDBImpl::CreateColumnFamily(
// Let TransactionLockMgr know that it can deallocate the LockMap for this
// column family.
Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
Status PessimisticTransactionDB::DropColumnFamily(
ColumnFamilyHandle* column_family) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = db_->DropColumnFamily(column_family);
@ -266,23 +301,24 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
return s;
}
Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key, bool exclusive) {
Status PessimisticTransactionDB::TryLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key,
bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
void TransactionDBImpl::UnLock(PessimisticTxn* txn,
const TransactionKeyMap* keys) {
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn,
const TransactionKeyMap* keys) {
lock_mgr_.UnLock(txn, keys, GetEnv());
}
void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key) {
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
}
// Used when wrapping DB write operations in a transaction
Transaction* TransactionDBImpl::BeginInternalTransaction(
Transaction* PessimisticTransactionDB::BeginInternalTransaction(
const WriteOptions& options) {
TransactionOptions txn_options;
Transaction* txn = BeginTransaction(options, txn_options, nullptr);
@ -301,9 +337,9 @@ Transaction* TransactionDBImpl::BeginInternalTransaction(
// sort its keys before locking them. This guarantees that TransactionDB write
// methods cannot deadlock with eachother (but still could deadlock with a
// Transaction).
Status TransactionDBImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
Status PessimisticTransactionDB::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
Status s;
Transaction* txn = BeginInternalTransaction(options);
@ -322,9 +358,9 @@ Status TransactionDBImpl::Put(const WriteOptions& options,
return s;
}
Status TransactionDBImpl::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) {
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) {
Status s;
Transaction* txn = BeginInternalTransaction(wopts);
@ -344,9 +380,9 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts,
return s;
}
Status TransactionDBImpl::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status PessimisticTransactionDB::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s;
Transaction* txn = BeginInternalTransaction(options);
@ -366,7 +402,8 @@ Status TransactionDBImpl::Merge(const WriteOptions& options,
return s;
}
Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
@ -385,19 +422,19 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
return s;
}
void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTxn* tx) {
void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTxn* tx) {
assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx});
}
void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) {
void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.erase(tx_id);
}
bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);
@ -409,7 +446,7 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
return tx.TryStealingLocks();
}
void TransactionDBImpl::ReinitializeTransaction(
void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn);
@ -417,7 +454,7 @@ void TransactionDBImpl::ReinitializeTransaction(
txn_impl->Reinitialize(this, write_options, txn_options);
}
Transaction* TransactionDBImpl::GetTransactionByName(
Transaction* PessimisticTransactionDB::GetTransactionByName(
const TransactionName& name) {
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(name);
@ -428,7 +465,7 @@ Transaction* TransactionDBImpl::GetTransactionByName(
}
}
void TransactionDBImpl::GetAllPreparedTransactions(
void PessimisticTransactionDB::GetAllPreparedTransactions(
std::vector<Transaction*>* transv) {
assert(transv);
transv->clear();
@ -440,11 +477,12 @@ void TransactionDBImpl::GetAllPreparedTransactions(
}
}
TransactionLockMgr::LockStatusData TransactionDBImpl::GetLockStatusData() {
TransactionLockMgr::LockStatusData
PessimisticTransactionDB::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
}
void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);
assert(GetTransactionByName(txn->GetName()) == nullptr);
@ -453,7 +491,7 @@ void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
transactions_[txn->GetName()] = txn;
}
void TransactionDBImpl::UnregisterTransaction(Transaction* txn) {
void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
assert(txn);
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(txn->GetName());

@ -17,25 +17,26 @@
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/transaction_impl.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_transaction_impl.h"
namespace rocksdb {
class TransactionDBImpl : public TransactionDB {
class PessimisticTransactionDB : public TransactionDB {
public:
explicit TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options);
explicit PessimisticTransactionDB(DB* db,
const TransactionDBOptions& txn_db_options);
explicit TransactionDBImpl(StackableDB* db,
const TransactionDBOptions& txn_db_options);
explicit PessimisticTransactionDB(StackableDB* db,
const TransactionDBOptions& txn_db_options);
~TransactionDBImpl();
virtual ~PessimisticTransactionDB();
Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles);
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
Transaction* old_txn) override = 0;
using StackableDB::Put;
virtual Status Put(const WriteOptions& options,
@ -97,11 +98,12 @@ class TransactionDBImpl : public TransactionDB {
TransactionLockMgr::LockStatusData GetLockStatusData() override;
private:
protected:
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
private:
DBImpl* db_impl_;
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;
@ -122,5 +124,44 @@ class TransactionDBImpl : public TransactionDB {
std::unordered_map<TransactionName, Transaction*> transactions_;
};
// A PessimisticTransactionDB that writes the data to the DB after the commit.
// In this way the DB only contains the committed data.
class WriteCommittedTxnDB : public PessimisticTransactionDB {
public:
explicit WriteCommittedTxnDB(DB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {}
explicit WriteCommittedTxnDB(StackableDB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {}
virtual ~WriteCommittedTxnDB() {}
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
};
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB {
public:
explicit WritePreparedTxnDB(DB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {}
explicit WritePreparedTxnDB(StackableDB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options) {}
virtual ~WritePreparedTxnDB() {}
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -22,7 +22,7 @@
#include "util/cast_util.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
@ -48,7 +48,7 @@ PessimisticTxn::PessimisticTxn(TransactionDB* txn_db,
deadlock_detect_(false),
deadlock_detect_depth_(0) {
txn_db_impl_ =
static_cast_with_check<TransactionDBImpl, TransactionDB>(txn_db);
static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
Initialize(txn_options);
}

@ -30,7 +30,7 @@
namespace rocksdb {
class TransactionDBImpl;
class PessimisticTransactionDB;
class PessimisticTxn;
// A transaction under pessimistic concurrency control. This class implements
@ -121,7 +121,7 @@ class PessimisticTxn : public TransactionBaseImpl {
void Clear() override;
TransactionDBImpl* txn_db_impl_;
PessimisticTransactionDB* txn_db_impl_;
DBImpl* db_impl_;
// If non-zero, this transaction should not be committed after this time (in

@ -26,7 +26,7 @@
#include "util/murmurhash.h"
#include "util/sync_point.h"
#include "util/thread_local.h"
#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
namespace rocksdb {
@ -115,7 +115,7 @@ TransactionLockMgr::TransactionLockMgr(
mutex_factory_(mutex_factory) {
assert(txn_db);
txn_db_impl_ =
static_cast_with_check<TransactionDBImpl, TransactionDB>(txn_db);
static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
}
TransactionLockMgr::~TransactionLockMgr() {}

@ -27,7 +27,7 @@ struct LockMap;
struct LockMapStripe;
class Slice;
class TransactionDBImpl;
class PessimisticTransactionDB;
class TransactionLockMgr {
public:
@ -61,7 +61,7 @@ class TransactionLockMgr {
LockStatusData GetLockStatusData();
private:
TransactionDBImpl* txn_db_impl_;
PessimisticTransactionDB* txn_db_impl_;
// Default number of lock map stripes per column family
const size_t default_num_stripes_;

@ -21,7 +21,7 @@
#include "rocksdb/utilities/transaction_db.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_impl.h"
#include "utilities/transactions/transaction_util.h"

Loading…
Cancel
Save