From b28b7c6dd928e7fceb8e86e21b9f1a416532b720 Mon Sep 17 00:00:00 2001 From: Jay Edgar Date: Fri, 4 Dec 2015 10:12:27 -0800 Subject: [PATCH] Added callback notification when a snapshot is created Summary: When SetSnapshot() is used the caller immediately knows a snapshot has been created, but when SetSnapshotOnNextOperation() is used the caller needs a way to get notified when that snapshot has been generated. This creates an interface that the client can implement that will be called at the time the snapshot is created. Test Plan: Added a new SetSnapshotOnNextOperationWithNotification test into the transaction_test. Reviewers: sdong, anthony Reviewed By: anthony Subscribers: yoshinorim, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D51177 --- include/rocksdb/utilities/transaction.h | 17 ++++++- utilities/transactions/transaction_base.cc | 14 +++-- utilities/transactions/transaction_base.h | 11 +++- utilities/transactions/transaction_test.cc | 59 ++++++++++++++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index cbeeceabc..fe5efff66 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -20,6 +20,17 @@ class Iterator; class TransactionDB; class WriteBatchWithIndex; +// Provides notification to the caller of SetSnapshotOnNextOperation when +// the actual snapshot gets created +class TransactionNotifier { + public: + virtual ~TransactionNotifier() {} + + // Implement this method to receive notification when a snapshot is + // requested via SetSnapshotOnNextOperation. + virtual void SnapshotCreated(const Snapshot* newSnapshot) = 0; +}; + // Provides BEGIN/COMMIT/ROLLBACK transactions. // // To use transactions, you must first create either an OptimisticTransactionDB @@ -69,6 +80,9 @@ class Transaction { // Calling SetSnapshotOnNextOperation() will not affect what snapshot is // returned by GetSnapshot() until the next write/GetForUpdate is executed. // + // When the snapshot is created the notifier's SnapshotCreated method will + // be called so that the caller can get access to the snapshot. + // // This is an optimization to reduce the likelyhood of conflicts that // could occur in between the time SetSnapshot() is called and the first // write/GetForUpdate operation. Eg, this prevents the following @@ -78,7 +92,8 @@ class Transaction { // txn2->Put("A", ...); // txn2->Commit(); // txn1->GetForUpdate(opts, "A", ...); // FAIL! - virtual void SetSnapshotOnNextOperation() = 0; + virtual void SetSnapshotOnNextOperation( + std::shared_ptr notifier = nullptr) = 0; // Returns the Snapshot created by the last call to SetSnapshot(). // diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 1f653b8db..d31ac6a26 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -37,15 +37,22 @@ void TransactionBaseImpl::Clear() { void TransactionBaseImpl::SetSnapshot() { snapshot_.reset(new ManagedSnapshot(db_)); snapshot_needed_ = false; + snapshot_notifier_ = nullptr; } -void TransactionBaseImpl::SetSnapshotOnNextOperation() { +void TransactionBaseImpl::SetSnapshotOnNextOperation( + std::shared_ptr notifier) { snapshot_needed_ = true; + snapshot_notifier_ = notifier; } void TransactionBaseImpl::SetSnapshotIfNeeded() { if (snapshot_needed_) { + std::shared_ptr notifier = snapshot_notifier_; SetSnapshot(); + if (notifier != nullptr) { + notifier->SnapshotCreated(GetSnapshot()); + } } } @@ -70,8 +77,8 @@ void TransactionBaseImpl::SetSavePoint() { if (save_points_ == nullptr) { save_points_.reset(new std::stack()); } - save_points_->emplace(snapshot_, snapshot_needed_, num_puts_, num_deletes_, - num_merges_); + save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, + num_puts_, num_deletes_, num_merges_); write_batch_->SetSavePoint(); } @@ -81,6 +88,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() { TransactionBaseImpl::SavePoint& save_point = save_points_->top(); snapshot_ = save_point.snapshot_; snapshot_needed_ = save_point.snapshot_needed_; + snapshot_notifier_ = save_point.snapshot_notifier_; num_puts_ = save_point.num_puts_; num_deletes_ = save_point.num_deletes_; num_merges_ = save_point.num_merges_; diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 47294b944..90a3b59b3 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -169,11 +169,13 @@ class TransactionBaseImpl : public Transaction { } void SetSnapshot() override; - void SetSnapshotOnNextOperation() override; + void SetSnapshotOnNextOperation( + std::shared_ptr notifier = nullptr) override; void ClearSnapshot() override { snapshot_.reset(); snapshot_needed_ = false; + snapshot_notifier_ = nullptr; } void DisableIndexing() override { indexing_enabled_ = false; } @@ -228,6 +230,7 @@ class TransactionBaseImpl : public Transaction { struct SavePoint { std::shared_ptr snapshot_; bool snapshot_needed_; + std::shared_ptr snapshot_notifier_; uint64_t num_puts_; uint64_t num_deletes_; uint64_t num_merges_; @@ -236,9 +239,11 @@ class TransactionBaseImpl : public Transaction { TransactionKeyMap new_keys_; SavePoint(std::shared_ptr snapshot, bool snapshot_needed, + std::shared_ptr snapshot_notifier, uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) : snapshot_(snapshot), snapshot_needed_(snapshot_needed), + snapshot_notifier_(snapshot_notifier), num_puts_(num_puts), num_deletes_(num_deletes), num_merges_(num_merges) {} @@ -266,6 +271,10 @@ class TransactionBaseImpl : public Transaction { // been reset. bool snapshot_needed_ = false; + // SetSnapshotOnNextOperation() has been called and the caller would like + // a notification through the TransactionNotifier interface + std::shared_ptr snapshot_notifier_ = nullptr; + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool untracked = false); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0d6647ae5..73f227ffb 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2163,6 +2163,65 @@ TEST_F(TransactionTest, DeferSnapshotSavePointTest) { delete txn1; } +TEST_F(TransactionTest, SetSnapshotOnNextOperationWithNotification) { + WriteOptions write_options; + ReadOptions read_options; + string value; + + class Notifier : public TransactionNotifier { + private: + const Snapshot** snapshot_ptr_; + + public: + explicit Notifier(const Snapshot** snapshot_ptr) + : snapshot_ptr_(snapshot_ptr) {} + + void SnapshotCreated(const Snapshot* newSnapshot) { + *snapshot_ptr_ = newSnapshot; + } + }; + + std::shared_ptr notifier = + std::make_shared(&read_options.snapshot); + Status s; + + s = db->Put(write_options, "B", "0"); + ASSERT_OK(s); + + Transaction* txn1 = db->BeginTransaction(write_options); + + txn1->SetSnapshotOnNextOperation(notifier); + ASSERT_FALSE(read_options.snapshot); + + s = db->Put(write_options, "B", "1"); + ASSERT_OK(s); + + // A Get does not generate the snapshot + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_FALSE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + // Any other operation does + s = txn1->Put("A", "0"); + ASSERT_OK(s); + + // Now change "B". + s = db->Put(write_options, "B", "2"); + ASSERT_OK(s); + + // The original value should still be read + s = txn1->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_TRUE(read_options.snapshot); + ASSERT_EQ(value, "1"); + + s = txn1->Commit(); + ASSERT_OK(s); + + delete txn1; +} + TEST_F(TransactionTest, ClearSnapshotTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;