WritePrepared Txn: Refactoring TrackKeys

Summary:
This patch clarifies and refactors the logic around tracked keys in transactions.
Closes https://github.com/facebook/rocksdb/pull/3140

Differential Revision: D6290258

Pulled By: maysamyabandeh

fbshipit-source-id: 03b50646264cbcc550813c060b180fc7451a55c1
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 2edc92bc28
commit 2515266725
  1. 3
      db/db_iter.cc
  2. 6
      include/rocksdb/utilities/transaction.h
  3. 57
      utilities/transactions/pessimistic_transaction.cc
  4. 6
      utilities/transactions/pessimistic_transaction.h
  5. 22
      utilities/transactions/transaction_base.cc
  6. 9
      utilities/transactions/transaction_base.h
  7. 3
      utilities/transactions/transaction_test.cc
  8. 17
      utilities/transactions/write_prepared_txn.cc
  9. 4
      utilities/transactions/write_prepared_txn.h
  10. 4
      utilities/transactions/write_prepared_txn_db.cc

@ -1359,6 +1359,9 @@ Status ArenaWrappedDBIter::Refresh() {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For allocate_seq_only_for_data_==false, this is not the correct
// behavior. Will be corrected automatically when we take a snapshot here for
// the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
if (sv_number_ != cur_sv_number) {

@ -321,9 +321,9 @@ class Transaction {
// gets committed successfully. But unlike Transaction::Put(),
// no conflict checking will be done for this key.
//
// If this Transaction was created on a TransactionDB, this function will
// still acquire locks necessary to make sure this write doesn't cause
// conflicts in other transactions and may return Status::Busy().
// If this Transaction was created on a PessimisticTransactionDB, this
// function will still acquire locks necessary to make sure this write doesn't
// cause conflicts in other transactions and may return Status::Busy().
virtual Status PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) = 0;
virtual Status PutUntracked(const Slice& key, const Slice& value) = 0;

@ -463,7 +463,7 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
// the snapshot time.
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
bool exclusive, bool skip_validate) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
bool previously_locked;
@ -471,8 +471,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
Status s;
// lock this key if this transactions hasn't already locked it
SequenceNumber current_seqno = kMaxSequenceNumber;
SequenceNumber new_seqno = kMaxSequenceNumber;
SequenceNumber tracked_at_seq = kMaxSequenceNumber;
const auto& tracked_keys = GetTrackedKeys();
const auto tracked_keys_cf = tracked_keys.find(cfh_id);
@ -487,7 +486,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
lock_upgrade = true;
}
previously_locked = true;
current_seqno = iter->second.seq;
tracked_at_seq = iter->second.seq;
}
}
@ -505,23 +504,28 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
if (untracked || snapshot_ == nullptr) {
if (skip_validate || snapshot_ == nullptr) {
// Need to remember the earliest sequence number that we know that this
// key has not been modified after. This is useful if this same
// transaction
// later tries to lock this key again.
if (current_seqno == kMaxSequenceNumber) {
if (tracked_at_seq == kMaxSequenceNumber) {
// Since we haven't checked a snapshot, we only know this key has not
// been modified since after we locked it.
new_seqno = db_->GetLatestSequenceNumber();
} else {
new_seqno = current_seqno;
// Note: when allocate_seq_only_for_data_==false this is less than the
// latest allocated seq but it is ok since i) this is just a heuristic
// used only as a hint to avoid actual check for conflicts, ii) this would
// cause a false positive only if the snapthot is taken right after the
// lock, which would be an unusual sequence.
tracked_at_seq = db_->GetLatestSequenceNumber();
}
} else {
// If a snapshot is set, we need to make sure the key hasn't been modified
// since the snapshot. This must be done after we locked the key.
// If we already have validated an earilier snapshot it must has been
// reflected in tracked_at_seq and ValidateSnapshot will return OK.
if (s.ok()) {
s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno);
s = ValidateSnapshot(column_family, key, &tracked_at_seq);
if (!s.ok()) {
// Failed to validate key
@ -540,8 +544,11 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
}
if (s.ok()) {
// Let base class know we've conflict checked this key.
TrackKey(cfh_id, key_str, new_seqno, read_only, exclusive);
// We must track all the locked keys so that we can unlock them later. If
// the key is already locked, this func will update some stats on the
// tracked key. It could also update the tracked_at_seq if it is lower than
// the existing trackey seq.
TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
}
return s;
@ -549,27 +556,33 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
// tracked_at_seq is the global seq at which we either locked the key or already
// have done ValidateSnapshot.
Status PessimisticTransaction::ValidateSnapshot(
ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno) {
SequenceNumber* tracked_at_seq) {
assert(snapshot_);
SequenceNumber seq = snapshot_->GetSequenceNumber();
if (prev_seqno <= seq) {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
if (*tracked_at_seq <= snap_seq) {
// If the key has been previous validated (or locked) at a sequence number
// earlier than the current snapshot's sequence number, we already know it
// has not been modified aftter snap_seq either.
return Status::OK();
}
// Otherwise we have either
// 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
// 2: snap_seq < tracked_at_seq: last time we lock the key was via
// skip_validate option which means we had skipped ValidateSnapshot. In both
// cases we should do ValidateSnapshot now.
*new_seqno = seq;
*tracked_at_seq = snap_seq;
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snapshot_->GetSequenceNumber(),
false /* cache_only */);
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
}
bool PessimisticTransaction::TryStealingLocks() {

@ -133,7 +133,7 @@ class PessimisticTransaction : public TransactionBaseImpl {
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive,
bool untracked = false) override;
bool skip_validate = false) override;
void Clear() override;
@ -181,8 +181,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
int64_t deadlock_detect_depth_;
virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key, SequenceNumber prev_seqno,
SequenceNumber* new_seqno);
const Slice& key,
SequenceNumber* tracked_at_seq);
void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) override;

@ -97,7 +97,7 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() {
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool read_only,
bool exclusive, bool untracked) {
bool exclusive, bool skip_validate) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
@ -110,7 +110,7 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, read_only, exclusive, untracked);
return TryLock(column_family, str, read_only, exclusive, skip_validate);
}
void TransactionBaseImpl::SetSavePoint() {
@ -398,7 +398,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -414,7 +414,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -430,7 +430,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Merge(column_family, key, value);
@ -445,7 +445,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -460,7 +460,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -475,7 +475,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
true /* exclusive */, true /* skip_validate */);
if (s.ok()) {
s = GetBatchForWrite()->SingleDelete(column_family, key);
@ -531,6 +531,8 @@ void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
}
// Add a key to the given TransactionKeyMap
// seq for pessimistic transactions is the sequence number from which we know
// there has not been a concurrent update to the key.
void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
const std::string& key, SequenceNumber seq,
bool read_only, bool exclusive) {
@ -543,6 +545,10 @@ void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
// Now tracking this key with an earlier sequence number
iter->second.seq = seq;
}
// else we do not update the seq. The smaller the tracked seq, the stronger it
// the guarantee since it implies from the seq onward there has not been a
// concurrent update to the key. So we update the seq if it implies stronger
// guarantees, i.e., if it is smaller than the existing trakced seq.
if (read_only) {
iter->second.num_reads++;

@ -36,11 +36,11 @@ class TransactionBaseImpl : public Transaction {
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
// untracked will be true if called from PutUntracked, DeleteUntracked, or
// skip_validate will be true if called from PutUntracked, DeleteUntracked, or
// MergeUntracked.
virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive,
bool untracked = false) = 0;
bool skip_validate = false) = 0;
void SetSavePoint() override;
@ -313,8 +313,7 @@ class TransactionBaseImpl : public Transaction {
// Map from column_family_id to map of keys that are involved in this
// transaction.
// Pessimistic Transactions will do conflict checking before adding a key
// by calling TrackKey().
// For Pessimistic Transactions this is the list of locked keys.
// Optimistic Transactions will wait till commit time to do conflict checking.
TransactionKeyMap tracked_keys_;
@ -333,7 +332,7 @@ class TransactionBaseImpl : public Transaction {
std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool read_only, bool exclusive, bool untracked = false);
bool read_only, bool exclusive, bool skip_validate = false);
WriteBatchBase* GetBatchForWrite();

@ -141,12 +141,11 @@ TEST_P(TransactionTest, ValidateSnapshotTest) {
ASSERT_OK(s);
delete txn1;
SequenceNumber dont_care;
auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
// Test the simple case where the key is not tracked yet
auto trakced_seq = kMaxSequenceNumber;
s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
trakced_seq, &dont_care);
&trakced_seq);
ASSERT_TRUE(s.IsBusy());
delete txn2;
}

@ -247,30 +247,29 @@ Status WritePreparedTxn::RollbackInternal() {
Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key,
SequenceNumber prev_seqno,
SequenceNumber* new_seqno) {
SequenceNumber* tracked_at_seq) {
assert(snapshot_);
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
// prev_seqno is either max or the last snapshot with which this key was
// tracked_at_seq is either max or the last snapshot with which this key was
// trackeed so there is no need to apply the IsInSnapshot to this comparison
// here as prev_seqno is not a prepare seq.
if (prev_seqno <= snap_seq) {
// here as tracked_at_seq is not a prepare seq.
if (*tracked_at_seq <= snap_seq) {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
return Status::OK();
}
*new_seqno = snap_seq;
*tracked_at_seq = snap_seq;
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq);
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snapshot_->GetSequenceNumber(),
false /* cache_only */, &snap_checker);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker);
}
} // namespace rocksdb

@ -83,8 +83,8 @@ class WritePreparedTxn : public PessimisticTransaction {
Status RollbackInternal() override;
virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key, SequenceNumber prev_seqno,
SequenceNumber* new_seqno) override;
const Slice& key,
SequenceNumber* tracked_at_seq) override;
// No copying allowed
WritePreparedTxn(const WritePreparedTxn&);

@ -99,6 +99,8 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map
// are not deleted.
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
@ -121,6 +123,8 @@ Status WritePreparedTxnDB::NewIterators(
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map
// are not deleted.
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}

Loading…
Cancel
Save