diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index cbeeceabc..fe5efff66 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -20,6 +20,17 @@ class Iterator; class TransactionDB; class WriteBatchWithIndex; +// Provides notification to the caller of SetSnapshotOnNextOperation when +// the actual snapshot gets created +class TransactionNotifier { + public: + virtual ~TransactionNotifier() {} + + // Implement this method to receive notification when a snapshot is + // requested via SetSnapshotOnNextOperation. + virtual void SnapshotCreated(const Snapshot* newSnapshot) = 0; +}; + // Provides BEGIN/COMMIT/ROLLBACK transactions. // // To use transactions, you must first create either an OptimisticTransactionDB @@ -69,6 +80,9 @@ class Transaction { // Calling SetSnapshotOnNextOperation() will not affect what snapshot is // returned by GetSnapshot() until the next write/GetForUpdate is executed. // + // When the snapshot is created the notifier's SnapshotCreated method will + // be called so that the caller can get access to the snapshot. + // // This is an optimization to reduce the likelyhood of conflicts that // could occur in between the time SetSnapshot() is called and the first // write/GetForUpdate operation. Eg, this prevents the following @@ -78,7 +92,8 @@ class Transaction { // txn2->Put("A", ...); // txn2->Commit(); // txn1->GetForUpdate(opts, "A", ...); // FAIL! - virtual void SetSnapshotOnNextOperation() = 0; + virtual void SetSnapshotOnNextOperation( + std::shared_ptr notifier = nullptr) = 0; // Returns the Snapshot created by the last call to SetSnapshot(). // diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 1f653b8db..d31ac6a26 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -37,15 +37,22 @@ void TransactionBaseImpl::Clear() { void TransactionBaseImpl::SetSnapshot() { snapshot_.reset(new ManagedSnapshot(db_)); snapshot_needed_ = false; + snapshot_notifier_ = nullptr; } -void TransactionBaseImpl::SetSnapshotOnNextOperation() { +void TransactionBaseImpl::SetSnapshotOnNextOperation( + std::shared_ptr notifier) { snapshot_needed_ = true; + snapshot_notifier_ = notifier; } void TransactionBaseImpl::SetSnapshotIfNeeded() { if (snapshot_needed_) { + std::shared_ptr notifier = snapshot_notifier_; SetSnapshot(); + if (notifier != nullptr) { + notifier->SnapshotCreated(GetSnapshot()); + } } } @@ -70,8 +77,8 @@ void TransactionBaseImpl::SetSavePoint() { if (save_points_ == nullptr) { save_points_.reset(new std::stack()); } - save_points_->emplace(snapshot_, snapshot_needed_, num_puts_, num_deletes_, - num_merges_); + save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, + num_puts_, num_deletes_, num_merges_); write_batch_->SetSavePoint(); } @@ -81,6 +88,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { TransactionBaseImpl::SavePoint& save_point = save_points_->top(); snapshot_ = save_point.snapshot_; snapshot_needed_ = save_point.snapshot_needed_; + snapshot_notifier_ = save_point.snapshot_notifier_; num_puts_ = save_point.num_puts_; num_deletes_ = save_point.num_deletes_; num_merges_ = save_point.num_merges_; diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 47294b944..90a3b59b3 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -169,11 +169,13 @@ class TransactionBaseImpl : public Transaction { } void SetSnapshot() override; - void SetSnapshotOnNextOperation() override; + void SetSnapshotOnNextOperation( + std::shared_ptr notifier = nullptr) override; void ClearSnapshot() override { snapshot_.reset(); snapshot_needed_ = false; + snapshot_notifier_ = nullptr; } void DisableIndexing() override { indexing_enabled_ = false; } @@ -228,6 +230,7 @@ class TransactionBaseImpl : public Transaction { struct SavePoint { std::shared_ptr snapshot_; bool snapshot_needed_; + std::shared_ptr snapshot_notifier_; uint64_t num_puts_; uint64_t num_deletes_; uint64_t num_merges_; @@ -236,9 +239,11 @@ class TransactionBaseImpl : public Transaction { TransactionKeyMap new_keys_; SavePoint(std::shared_ptr snapshot, bool snapshot_needed, + std::shared_ptr snapshot_notifier, uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) : snapshot_(snapshot), snapshot_needed_(snapshot_needed), + snapshot_notifier_(snapshot_notifier), num_puts_(num_puts), num_deletes_(num_deletes), num_merges_(num_merges) {} @@ -266,6 +271,10 @@ class TransactionBaseImpl : public Transaction { // been reset. bool snapshot_needed_ = false; + // SetSnapshotOnNextOperation() has been called and the caller would like + // a notification through the TransactionNotifier interface + std::shared_ptr snapshot_notifier_ = nullptr; + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool untracked = false); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0d6647ae5..73f227ffb 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2163,6 +2163,65 @@ TEST_F(TransactionTest, DeferSnapshotSavePointTest) { delete txn1; } +TEST_F(TransactionTest, SetSnapshotOnNextOperationWithNotification) { + WriteOptions write_options; + ReadOptions read_options; + string value; + + class Notifier : public TransactionNotifier { + private: + const Snapshot** snapshot_ptr_; + + public: + explicit Notifier(const Snapshot** snapshot_ptr) + : snapshot_ptr_(snapshot_ptr) {} + + void SnapshotCreated(const Snapshot* newSnapshot) { + *snapshot_ptr_ = newSnapshot; + } + }; + + std::shared_ptr notifier = + std::make_shared(&read_options.snapshot); + Status s; + + s = db->Put(write_options, "B", "0"); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSnapshotOnNextOperation(notifier); + ASSERT_FALSE(read_options.snapshot); + + s = db->Put(write_options, "B", "1"); + ASSERT_OK(s); + + // A Get does not generate the snapshot + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_FALSE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + // Any other operation does + s = txn1->Put("A", "0"); + ASSERT_OK(s); + + // Now change "B". + s = db->Put(write_options, "B", "2"); + ASSERT_OK(s); + + // The original value should still be read + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_TRUE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; +} + TEST_F(TransactionTest, ClearSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;