WritePrepared: skip_concurrency_control option (#5330)

Summary:
This enables the user to set TransactionDBOptions::skip_concurrency_control so the standard `DB::Write(const WriteOptions& opts, WriteBatch* updates)` would skip the concurrency control. This would give higher throughput to the users who know their use case doesn't need concurrency control.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5330

Differential Revision: D15525932

Pulled By: maysamyabandeh

fbshipit-source-id: 68421ac1ba34f549a4a8de9ce4c2dccf6fb4b06b
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent f5576c3317
commit eab4f49a2c
  1. 7
      include/rocksdb/utilities/transaction_db.h
  2. 10
      tools/db_bench_tool.cc
  3. 27
      utilities/transactions/pessimistic_transaction_db.cc
  4. 22
      utilities/transactions/pessimistic_transaction_db.h
  5. 14
      utilities/transactions/write_prepared_txn_db.cc
  6. 3
      utilities/transactions/write_prepared_txn_db.h

@ -94,6 +94,13 @@ struct TransactionDBOptions {
// for the special way that myrocks uses this operands. // for the special way that myrocks uses this operands.
bool rollback_merge_operands = false; bool rollback_merge_operands = false;
// If true, the TransactionDB implementation might skip concurrency control
// unless it is overridden by TransactionOptions or
// TransactionDBWriteOptimizations. This can be used in conjuction with
// DBOptions::unordered_write when the TransactionDB is used solely for write
// ordering rather than concurrency control.
bool skip_concurrency_control = false;
private: private:
// 128 entries // 128 entries
size_t wp_snapshot_cache_bits = static_cast<size_t>(7); size_t wp_snapshot_cache_bits = static_cast<size_t>(7);

@ -3788,6 +3788,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} else if (FLAGS_transaction_db) { } else if (FLAGS_transaction_db) {
TransactionDB* ptr; TransactionDB* ptr;
TransactionDBOptions txn_db_options; TransactionDBOptions txn_db_options;
if (options.unordered_write) {
options.two_write_queues = true;
txn_db_options.skip_concurrency_control = true;
txn_db_options.write_policy = WRITE_PREPARED;
}
s = TransactionDB::Open(options, txn_db_options, db_name, s = TransactionDB::Open(options, txn_db_options, db_name,
column_families, &db->cfh, &ptr); column_families, &db->cfh, &ptr);
if (s.ok()) { if (s.ok()) {
@ -3814,6 +3819,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} else if (FLAGS_transaction_db) { } else if (FLAGS_transaction_db) {
TransactionDB* ptr = nullptr; TransactionDB* ptr = nullptr;
TransactionDBOptions txn_db_options; TransactionDBOptions txn_db_options;
if (options.unordered_write) {
options.two_write_queues = true;
txn_db_options.skip_concurrency_control = true;
txn_db_options.write_policy = WRITE_PREPARED;
}
s = CreateLoggerFromOptions(db_name, options, &options.info_log); s = CreateLoggerFromOptions(db_name, options, &options.info_log);
if (s.ok()) { if (s.ok()) {
s = TransactionDB::Open(options, txn_db_options, db_name, &ptr); s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);

@ -522,23 +522,16 @@ Status PessimisticTransactionDB::Merge(const WriteOptions& options,
Status PessimisticTransactionDB::Write(const WriteOptions& opts, Status PessimisticTransactionDB::Write(const WriteOptions& opts,
WriteBatch* updates) { WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with return WriteWithConcurrencyControl(opts, updates);
// concurrent transactions. }
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock.
// In order to avoid a deadlock with a concurrent Transaction, Transactions
// should use a lock timeout.
Status s = txn_impl->CommitBatch(updates);
delete txn;
return s; Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
if (txn_db_options_.skip_concurrency_control) {
return db_impl_->Write(opts, updates);
} else {
return WriteWithConcurrencyControl(opts, updates);
}
} }
Status WriteCommittedTxnDB::Write( Status WriteCommittedTxnDB::Write(
@ -547,7 +540,7 @@ Status WriteCommittedTxnDB::Write(
if (optimizations.skip_concurrency_control) { if (optimizations.skip_concurrency_control) {
return db_impl_->Write(opts, updates); return db_impl_->Write(opts, updates);
} else { } else {
return Write(opts, updates); return WriteWithConcurrencyControl(opts, updates);
} }
} }

@ -19,6 +19,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h" #include "utilities/transactions/write_prepared_txn.h"
@ -67,6 +68,26 @@ class PessimisticTransactionDB : public TransactionDB {
using TransactionDB::Write; using TransactionDB::Write;
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock.
// In order to avoid a deadlock with a concurrent Transaction, Transactions
// should use a lock timeout.
Status s = txn_impl->CommitBatch(updates);
delete txn;
return s;
}
using StackableDB::CreateColumnFamily; using StackableDB::CreateColumnFamily;
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
@ -191,6 +212,7 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB {
virtual Status Write(const WriteOptions& opts, virtual Status Write(const WriteOptions& opts,
const TransactionDBWriteOptimizations& optimizations, const TransactionDBWriteOptimizations& optimizations,
WriteBatch* updates) override; WriteBatch* updates) override;
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -108,6 +108,18 @@ Transaction* WritePreparedTxnDB::BeginTransaction(
} }
} }
Status WritePreparedTxnDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
if (txn_db_options_.skip_concurrency_control) {
// Skip locking the rows
const size_t UNKNOWN_BATCH_CNT = 0;
WritePreparedTxn* NO_TXN = nullptr;
return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
} else {
return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
}
}
Status WritePreparedTxnDB::Write( Status WritePreparedTxnDB::Write(
const WriteOptions& opts, const WriteOptions& opts,
const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
@ -123,7 +135,7 @@ Status WritePreparedTxnDB::Write(
} else { } else {
// TODO(myabandeh): Make use of skip_duplicate_key_check hint // TODO(myabandeh): Make use of skip_duplicate_key_check hint
// Fall back to unoptimized version // Fall back to unoptimized version
return PessimisticTransactionDB::Write(opts, updates); return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
} }
} }

@ -72,6 +72,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const TransactionOptions& txn_options, const TransactionOptions& txn_options,
Transaction* old_txn) override; Transaction* old_txn) override;
using TransactionDB::Write;
Status Write(const WriteOptions& opts, WriteBatch* updates) override;
// Optimized version of ::Write that receives more optimization request such // Optimized version of ::Write that receives more optimization request such
// as skip_concurrency_control. // as skip_concurrency_control.
using PessimisticTransactionDB::Write; using PessimisticTransactionDB::Write;

Loading…
Cancel
Save