Skip concurrency control during recovery of pessimistic txn (#4346)

Summary:
TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. This could be as an optimization if the application knows that the transaction would not have any conflict with concurrent transactions. It is currently used during recovery assuming (i) application guarantees no conflict between prepared transactions in the WAL (ii) application guarantees that recovered transactions will be rolled back/commit before new transactions start.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4346

Differential Revision: D9759149

Pulled By: maysamyabandeh

fbshipit-source-id: f896e84fa58b0b584be904c7fd3883a41ea3215b
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent faf529fd7c
commit 3f5282268f
  1. 5
      HISTORY.md
  2. 7
      db/db_compaction_test.cc
  3. 9
      include/rocksdb/utilities/transaction_db.h
  4. 3
      tools/db_stress.cc
  5. 9
      utilities/transactions/pessimistic_transaction.cc
  6. 3
      utilities/transactions/pessimistic_transaction.h
  7. 8
      utilities/transactions/pessimistic_transaction_db.cc
  8. 49
      utilities/transactions/write_prepared_transaction_test.cc

@ -3,8 +3,10 @@
### Public API Change ### Public API Change
* `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero. * `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
* Add `FlushOptions::allow_write_stall`, which controls whether Flush calls start working immediately, even if it causes user writes to stall, or will wait until flush can be performed without causing write stall (similar to `CompactRangeOptions::allow_write_stall`). Note that the default value is false, meaning we add delay to Flush calls until stalling can be avoided when possible. This is behavior change compared to previous RocksDB versions, where Flush calls didn't check if they might cause stall or not. * Add `FlushOptions::allow_write_stall`, which controls whether Flush calls start working immediately, even if it causes user writes to stall, or will wait until flush can be performed without causing write stall (similar to `CompactRangeOptions::allow_write_stall`). Note that the default value is false, meaning we add delay to Flush calls until stalling can be avoided when possible. This is behavior change compared to previous RocksDB versions, where Flush calls didn't check if they might cause stall or not.
* Application using PessimisticTransactionDB is expected to rollback/commit recovered transactions before starting new ones. This assumption is used to skip concurrency control during recovery.
### New Features ### New Features
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
### Bug Fixes ### Bug Fixes
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction. * Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.
* Sync CURRENT file contents during checkpoint. * Sync CURRENT file contents during checkpoint.
@ -157,7 +159,8 @@
* `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened. * `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened.
* `DBOptions::preserve_deletes` is a new option that allows one to specify that DB should not drop tombstones for regular deletes if they have sequence number larger than what was set by the new API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)`. Disabled by default. * `DBOptions::preserve_deletes` is a new option that allows one to specify that DB should not drop tombstones for regular deletes if they have sequence number larger than what was set by the new API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)`. Disabled by default.
* API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)` was added, users who wish to preserve deletes are expected to periodically call this function to advance the cutoff seqnum (all deletes made before this seqnum can be dropped by DB). It's user responsibility to figure out how to advance the seqnum in the way so the tombstones are kept for the desired period of time, yet are eventually processed in time and don't eat up too much space. * API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)` was added, users who wish to preserve deletes are expected to periodically call this function to advance the cutoff seqnum (all deletes made before this seqnum can be dropped by DB). It's user responsibility to figure out how to advance the seqnum in the way so the tombstones are kept for the desired period of time, yet are eventually processed in time and don't eat up too much space.
* `ReadOptions::iter_start_seqnum` was added; if set to something > 0 user will see 2 changes in iterators behavior 1) only keys written with sequence larger than this parameter would be returned and 2) the `Slice` returned by iter->key() now points to the memory that keep User-oriented representation of the internal key, rather than user key. New struct `FullKey` was added to represent internal keys, along with a new helper function `ParseFullKey(const Slice& internal_key, FullKey* result);`. * `ReadOptions::iter_start_seqnum` was added;
if set to something > 0 user will see 2 changes in iterators behavior 1) only keys written with sequence larger than this parameter would be returned and 2) the `Slice` returned by iter->key() now points to the memory that keep User-oriented representation of the internal key, rather than user key. New struct `FullKey` was added to represent internal keys, along with a new helper function `ParseFullKey(const Slice& internal_key, FullKey* result);`.
* Deprecate trash_dir param in NewSstFileManager, right now we will rename deleted files to <name>.trash instead of moving them to trash directory * Deprecate trash_dir param in NewSstFileManager, right now we will rename deleted files to <name>.trash instead of moving them to trash directory
* Allow setting a custom trash/DB size ratio limit in the SstFileManager, after which files that are to be scheduled for deletion are deleted immediately, regardless of any delete ratelimit. * Allow setting a custom trash/DB size ratio limit in the SstFileManager, after which files that are to be scheduled for deletion are deleted immediately, regardless of any delete ratelimit.
* Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case. * Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case.

@ -120,13 +120,12 @@ class SstStatsCollector : public EventListener {
public: public:
SstStatsCollector() : num_ssts_creation_started_(0) {} SstStatsCollector() : num_ssts_creation_started_(0) {}
void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /* info */) override { void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& /* info */) override {
++num_ssts_creation_started_; ++num_ssts_creation_started_;
} }
int num_ssts_creation_started() { int num_ssts_creation_started() { return num_ssts_creation_started_; }
return num_ssts_creation_started_;
}
private: private:
std::atomic<int> num_ssts_creation_started_; std::atomic<int> num_ssts_creation_started_;

@ -137,6 +137,15 @@ struct TransactionOptions {
// The maximum number of bytes used for the write batch. 0 means no limit. // The maximum number of bytes used for the write batch. 0 means no limit.
size_t max_write_batch_size = 0; size_t max_write_batch_size = 0;
// Skip Concurrency Control. This could be as an optimization if the
// application knows that the transaction would not have any conflict with
// concurrent transactions. It could also be used during recovery if (i)
// application guarantees no conflict between prepared transactions in the WAL
// (ii) application guarantees that recovered transactions will be rolled
// back/commit before new transactions start.
// Default: false
bool skip_concurrency_control = false;
}; };
// The per-write optimizations that do not involve transactions. TransactionDB // The per-write optimizations that do not involve transactions. TransactionDB

@ -1237,7 +1237,8 @@ class DbStressListener : public EventListener {
DbStressListener(const std::string& db_name, DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths, const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families) const std::vector<ColumnFamilyDescriptor>& column_families)
: db_name_(db_name), db_paths_(db_paths), : db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families), column_families_(column_families),
num_pending_file_creations_(0) {} num_pending_file_creations_(0) {}
virtual ~DbStressListener() { virtual ~DbStressListener() {

@ -46,7 +46,8 @@ PessimisticTransaction::PessimisticTransaction(
waiting_key_(nullptr), waiting_key_(nullptr),
lock_timeout_(0), lock_timeout_(0),
deadlock_detect_(false), deadlock_detect_(false),
deadlock_detect_depth_(0) { deadlock_detect_depth_(0),
skip_concurrency_control_(false) {
txn_db_impl_ = txn_db_impl_ =
static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
db_impl_ = static_cast_with_check<DBImpl, DB>(db_); db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
@ -61,6 +62,7 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
deadlock_detect_ = txn_options.deadlock_detect; deadlock_detect_ = txn_options.deadlock_detect;
deadlock_detect_depth_ = txn_options.deadlock_detect_depth; deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
write_batch_.SetMaxBytes(txn_options.max_write_batch_size); write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
skip_concurrency_control_ = txn_options.skip_concurrency_control;
lock_timeout_ = txn_options.lock_timeout * 1000; lock_timeout_ = txn_options.lock_timeout * 1000;
if (lock_timeout_ < 0) { if (lock_timeout_ < 0) {
@ -492,11 +494,14 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only, const Slice& key, bool read_only,
bool exclusive, bool skip_validate) { bool exclusive, bool skip_validate) {
Status s;
if (UNLIKELY(skip_concurrency_control_)) {
return s;
}
uint32_t cfh_id = GetColumnFamilyID(column_family); uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString(); std::string key_str = key.ToString();
bool previously_locked; bool previously_locked;
bool lock_upgrade = false; bool lock_upgrade = false;
Status s;
// lock this key if this transactions hasn't already locked it // lock this key if this transactions hasn't already locked it
SequenceNumber tracked_at_seq = kMaxSequenceNumber; SequenceNumber tracked_at_seq = kMaxSequenceNumber;

@ -183,6 +183,9 @@ class PessimisticTransaction : public TransactionBaseImpl {
// Whether to perform deadlock detection or not. // Whether to perform deadlock detection or not.
int64_t deadlock_detect_depth_; int64_t deadlock_detect_depth_;
// Refer to TransactionOptions::skip_concurrency_control
bool skip_concurrency_control_;
virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& key,
SequenceNumber* tracked_at_seq); SequenceNumber* tracked_at_seq);

@ -133,6 +133,14 @@ Status PessimisticTransactionDB::Initialize(
WriteOptions w_options; WriteOptions w_options;
w_options.sync = true; w_options.sync = true;
TransactionOptions t_options; TransactionOptions t_options;
// This would help avoiding deadlock for keys that although exist in the WAL
// did not go through concurrency control. This includes the merge that
// MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
// there is a conflict between the keys of two transactions that must be
// avoided, it is already avoided by the application, MyRocks, before the
// restart (ii) application, MyRocks, guarntees to rollback/commit the
// recovered transactions before new transactions start.
t_options.skip_concurrency_control = true;
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx); assert(real_trx);

@ -1307,55 +1307,6 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
pinnable_val.Reset(); pinnable_val.Reset();
} }
// After recovery the new transactions should still conflict with recovered
// transactions.
TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) {
options.disable_auto_compactions = true;
ReOpen();
TransactionOptions txn_options;
WriteOptions write_options;
size_t index = 0;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto istr0 = std::to_string(index);
auto s = txn0->SetName("xid" + istr0);
ASSERT_OK(s);
s = txn0->Put(Slice("key" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s);
s = txn0->Prepare();
// With the same index 0 and key prefix, txn_t0 should conflict with txn0
txn_t0_with_status(0, Status::TimedOut());
delete txn0;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete();
// It should still conflict after the recovery
txn_t0_with_status(0, Status::TimedOut());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// Check that a recovered txn will still cause conflicts after 2nd recovery
txn_t0_with_status(0, Status::TimedOut());
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
delete txn0;
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// tnx0 is now committed and should no longer cause a conflict
txn_t0_with_status(0, Status::OK());
}
// After recovery the commit map is empty while the max is set. The code would // After recovery the commit map is empty while the max is set. The code would
// go through a different path which requires a separate test. // go through a different path which requires a separate test.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) { TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {

Loading…
Cancel
Save