diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index a625f5000..12be5fd24 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -61,10 +61,30 @@ class Transaction { // methods. See Transaction::Get() for more details. virtual void SetSnapshot() = 0; + // Similar to SetSnapshot(), but will not change the current snapshot + // until Put/Merge/Delete/GetForUpdate/MultigetForUpdate is called. + // By calling this function, the transaction will essentially call + // SetSnapshot() for you right before performing the next write/GetForUpdate. + // + // Calling SetSnapshotOnNextOperation() will not affect what snapshot is + // returned by GetSnapshot() until the next write/GetForUpdate is executed. + // + // This is an optimization to reduce the likelyhood of conflicts that + // could occur in between the time SetSnapshot() is called and the first + // write/GetForUpdate operation. Eg, this prevents the following + // race-condition: + // + // txn1->SetSnapshot(); + // txn2->Put("A", ...); + // txn2->Commit(); + // txn1->GetForUpdate(opts, "A", ...); // FAIL! + virtual void SetSnapshotOnNextOperation() = 0; + // Returns the Snapshot created by the last call to SetSnapshot(). // // REQUIRED: The returned Snapshot is only valid up until the next time - // SetSnapshot() is called or the Transaction is deleted. + // SetSnapshot()/SetSnapshotOnNextSavePoint() is called or the Transaction + // is deleted. virtual const Snapshot* GetSnapshot() const = 0; // Write all batched keys to the db atomically. diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 4bd262efc..028bed4c9 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -73,6 +73,8 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, } uint32_t cfh_id = GetColumnFamilyID(column_family); + SetSnapshotIfNeeded(); + SequenceNumber seq; if (snapshot_) { seq = snapshot_->snapshot()->GetSequenceNumber(); diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 37f8c75db..1f653b8db 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -36,6 +36,17 @@ void TransactionBaseImpl::Clear() { void TransactionBaseImpl::SetSnapshot() { snapshot_.reset(new ManagedSnapshot(db_)); + snapshot_needed_ = false; +} + +void TransactionBaseImpl::SetSnapshotOnNextOperation() { + snapshot_needed_ = true; +} + +void TransactionBaseImpl::SetSnapshotIfNeeded() { + if (snapshot_needed_) { + SetSnapshot(); + } } Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, @@ -59,7 +70,8 @@ void TransactionBaseImpl::SetSavePoint() { if (save_points_ == nullptr) { save_points_.reset(new std::stack()); } - save_points_->emplace(snapshot_, num_puts_, num_deletes_, num_merges_); + save_points_->emplace(snapshot_, snapshot_needed_, num_puts_, num_deletes_, + num_merges_); write_batch_->SetSavePoint(); } @@ -68,6 +80,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { // Restore saved SavePoint TransactionBaseImpl::SavePoint& save_point = save_points_->top(); snapshot_ = save_point.snapshot_; + snapshot_needed_ = save_point.snapshot_needed_; num_puts_ = save_point.num_puts_; num_deletes_ = save_point.num_deletes_; num_merges_ = save_point.num_merges_; diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index aac882eef..e609e563b 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -169,6 +169,7 @@ class TransactionBaseImpl : public Transaction { } void SetSnapshot() override; + void SetSnapshotOnNextOperation() override; void DisableIndexing() override { indexing_enabled_ = false; } @@ -195,6 +196,9 @@ class TransactionBaseImpl : public Transaction { const TransactionKeyMap* GetTrackedKeysSinceSavePoint(); + // Sets a snapshot if SetSnapshotOnNextOperation() has been called. + void SetSnapshotIfNeeded(); + DB* const db_; const WriteOptions write_options_; @@ -218,6 +222,7 @@ class TransactionBaseImpl : public Transaction { struct SavePoint { std::shared_ptr snapshot_; + bool snapshot_needed_; uint64_t num_puts_; uint64_t num_deletes_; uint64_t num_merges_; @@ -225,9 +230,10 @@ class TransactionBaseImpl : public Transaction { // Record all keys tracked since the last savepoint TransactionKeyMap new_keys_; - SavePoint(std::shared_ptr snapshot, uint64_t num_puts, - uint64_t num_deletes, uint64_t num_merges) + SavePoint(std::shared_ptr snapshot, bool snapshot_needed, + uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) : snapshot_(snapshot), + snapshot_needed_(snapshot_needed), num_puts_(num_puts), num_deletes_(num_deletes), num_merges_(num_merges) {} @@ -251,6 +257,10 @@ class TransactionBaseImpl : public Transaction { // underlying WriteBatch and not indexed in the WriteBatchWithIndex. bool indexing_enabled_ = true; + // SetSnapshotOnNextOperation() has been called and the snapshot has not yet + // been reset. + bool snapshot_needed_ = false; + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool untracked = false); diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index c2a93cf33..33eb6509b 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -220,16 +220,10 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, bool previously_locked; Status s; - // Even though we do not care about doing conflict checking for this write, - // we still need to take a lock to make sure we do not cause a conflict with - // some other write. However, we do not need to check if there have been - // any writes since this transaction's snapshot. - // TODO(agiardullo): could optimize by supporting shared txn locks in the - // future - bool check_snapshot = !untracked; - SequenceNumber tracked_seqno = kMaxSequenceNumber; + // lock this key if this transactions hasn't already locked it + SequenceNumber current_seqno = kMaxSequenceNumber; + SequenceNumber new_seqno = kMaxSequenceNumber; - // Lookup whether this key has already been locked by this transaction const auto& tracked_keys = GetTrackedKeys(); const auto tracked_keys_cf = tracked_keys.find(cfh_id); if (tracked_keys_cf == tracked_keys.end()) { @@ -240,7 +234,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, previously_locked = false; } else { previously_locked = true; - tracked_seqno = iter->second; + current_seqno = iter->second; } } @@ -249,39 +243,37 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, s = txn_db_impl_->TryLock(this, cfh_id, key_str); } - if (s.ok()) { + SetSnapshotIfNeeded(); + + // Even though we do not care about doing conflict checking for this write, + // we still need to take a lock to make sure we do not cause a conflict with + // some other write. However, we do not need to check if there have been + // any writes since this transaction's snapshot. + // TODO(agiardullo): could optimize by supporting shared txn locks in the + // future + if (untracked || snapshot_ == nullptr) { + // Need to remember the earliest sequence number that we know that this + // key has not been modified after. This is useful if this same + // transaction + // later tries to lock this key again. + if (current_seqno == kMaxSequenceNumber) { + // Since we haven't checked a snapshot, we only know this key has not + // been modified since after we locked it. + new_seqno = db_->GetLatestSequenceNumber(); + } else { + new_seqno = current_seqno; + } + } else { // If a snapshot is set, we need to make sure the key hasn't been modified // since the snapshot. This must be done after we locked the key. - if (!check_snapshot || snapshot_ == nullptr) { - // Need to remember the earliest sequence number that we know that this - // key has not been modified after. This is useful if this same - // transaction - // later tries to lock this key again. - if (tracked_seqno == kMaxSequenceNumber) { - // Since we haven't checked a snapshot, we only know this key has not - // been modified since after we locked it. - tracked_seqno = db_->GetLatestSequenceNumber(); - } - } else { - // If the key has been previous validated at a sequence number earlier - // than the curent snapshot's sequence number, we already know it has not - // been modified. - SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber(); - bool already_validated = tracked_seqno <= seq; - - if (!already_validated) { - s = CheckKeySequence(column_family, key); - - if (s.ok()) { - // Record that there have been no writes to this key after this - // sequence. - tracked_seqno = seq; - } else { - // Failed to validate key - if (!previously_locked) { - // Unlock key we just locked - txn_db_impl_->UnLock(this, cfh_id, key.ToString()); - } + if (s.ok()) { + s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno); + + if (!s.ok()) { + // Failed to validate key + if (!previously_locked) { + // Unlock key we just locked + txn_db_impl_->UnLock(this, cfh_id, key.ToString()); } } } @@ -289,7 +281,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, if (s.ok()) { // Let base class know we've conflict checked this key. - TrackKey(cfh_id, key_str, tracked_seqno); + TrackKey(cfh_id, key_str, new_seqno); } return s; @@ -297,22 +289,30 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // Return OK() if this key has not been modified more recently than the // transaction snapshot_. -Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, - const Slice& key) { - Status result; - if (snapshot_ != nullptr) { - assert(dynamic_cast(db_) != nullptr); - auto db_impl = reinterpret_cast(db_); +Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, + SequenceNumber prev_seqno, + SequenceNumber* new_seqno) { + assert(snapshot_); + + SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber(); + if (prev_seqno <= seq) { + // If the key has been previous validated at a sequence number earlier + // than the curent snapshot's sequence number, we already know it has not + // been modified. + return Status::OK(); + } - ColumnFamilyHandle* cfh = column_family ? column_family : - db_impl->DefaultColumnFamily(); + *new_seqno = seq; - result = TransactionUtil::CheckKeyForConflicts( - db_impl, cfh, key.ToString(), - snapshot_->snapshot()->GetSequenceNumber()); - } + assert(dynamic_cast(db_) != nullptr); + auto db_impl = reinterpret_cast(db_); + + ColumnFamilyHandle* cfh = + column_family ? column_family : db_impl->DefaultColumnFamily(); - return result; + return TransactionUtil::CheckKeyForConflicts( + db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber()); } } // namespace rocksdb diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 57ceacb4b..0fa087d67 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -88,7 +88,8 @@ class TransactionImpl : public TransactionBaseImpl { void Clear() override; - Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key); + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, + SequenceNumber prev_seqno, SequenceNumber* new_seqno); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index fd6bb2a33..3f792a99d 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1945,6 +1945,222 @@ TEST_F(TransactionTest, MergeTest) { ASSERT_EQ("a,3", value); } +TEST_F(TransactionTest, DeferSnapshotTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + Transaction* txn2 = db->BeginTransaction(write_options); + + txn1->SetSnapshotOnNextOperation(); + auto snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + s = txn2->Put("A", "a2"); + ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); + delete txn2; + + s = txn1->GetForUpdate(read_options, "A", &value); + // Should not conflict with txn2 since snapshot wasn't set until + // GetForUpdate was called. + ASSERT_OK(s); + ASSERT_EQ("a2", value); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + s = db->Put(write_options, "B", "b0"); + ASSERT_OK(s); + + // Cannot lock B since it was written after the snapshot was set + s = txn1->Put("B", "b1"); + ASSERT_TRUE(s.IsBusy()); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + s = db->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a1", value); + + s = db->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b0", value); +} + +TEST_F(TransactionTest, DeferSnapshotTest2) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSnapshot(); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + s = db->Put(write_options, "C", "c0"); + ASSERT_OK(s); + s = db->Put(write_options, "D", "d0"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + + txn1->SetSnapshotOnNextOperation(); + + s = txn1->Get(snapshot_read_options, "C", &value); + // Snapshot was set before C was written + ASSERT_TRUE(s.IsNotFound()); + s = txn1->Get(snapshot_read_options, "D", &value); + // Snapshot was set before D was written + ASSERT_TRUE(s.IsNotFound()); + + // Snapshot should not have changed yet. + snapshot_read_options.snapshot = txn1->GetSnapshot(); + + s = txn1->Get(snapshot_read_options, "C", &value); + // Snapshot was set before C was written + ASSERT_TRUE(s.IsNotFound()); + s = txn1->Get(snapshot_read_options, "D", &value); + // Snapshot was set before D was written + ASSERT_TRUE(s.IsNotFound()); + + s = txn1->GetForUpdate(read_options, "C", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = db->Put(write_options, "D", "d00"); + ASSERT_OK(s); + + // Snapshot is now set + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "D", &value); + ASSERT_OK(s); + ASSERT_EQ("d0", value); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; +} + +TEST_F(TransactionTest, DeferSnapshotSavePointTest) { + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + string value; + Status s; + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSavePoint(); // 1 + + s = db->Put(write_options, "T", "1"); + ASSERT_OK(s); + + txn1->SetSnapshotOnNextOperation(); + + s = db->Put(write_options, "T", "2"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 2 + + s = db->Put(write_options, "T", "3"); + ASSERT_OK(s); + + s = txn1->Put("A", "a"); + ASSERT_OK(s); + + txn1->SetSavePoint(); // 3 + + s = db->Put(write_options, "T", "4"); + ASSERT_OK(s); + + txn1->SetSnapshot(); + txn1->SetSnapshotOnNextOperation(); + + txn1->SetSavePoint(); // 4 + + s = db->Put(write_options, "T", "5"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("4", value); + + s = txn1->Put("A", "a1"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 4 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("4", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 3 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("3", value); + + s = txn1->Get(read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 2 + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->Delete("A"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->RollbackToSavePoint(); // Rollback to 1 + ASSERT_OK(s); + + s = txn1->Delete("A"); + ASSERT_OK(s); + + snapshot_read_options.snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot_read_options.snapshot); + s = txn1->Get(snapshot_read_options, "T", &value); + ASSERT_OK(s); + ASSERT_EQ("5", value); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; +} + } // namespace rocksdb int main(int argc, char** argv) {