Added callback notification when a snapshot is created

Summary: When SetSnapshot() is used the caller immediately knows a snapshot has been created, but when SetSnapshotOnNextOperation() is used the caller needs a way to get notified when that snapshot has been generated.  This creates an interface that the client can implement that will be called at the time the snapshot is created.

Test Plan: Added a new SetSnapshotOnNextOperationWithNotification test into the transaction_test.

Reviewers: sdong, anthony

Reviewed By: anthony

Subscribers: yoshinorim, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D51177
main
Jay Edgar 9 years ago
parent e8180f9901
commit b28b7c6dd9
  1. 17
      include/rocksdb/utilities/transaction.h
  2. 14
      utilities/transactions/transaction_base.cc
  3. 11
      utilities/transactions/transaction_base.h
  4. 59
      utilities/transactions/transaction_test.cc

@ -20,6 +20,17 @@ class Iterator;
class TransactionDB; class TransactionDB;
class WriteBatchWithIndex; class WriteBatchWithIndex;
// Provides notification to the caller of SetSnapshotOnNextOperation when
// the actual snapshot gets created
class TransactionNotifier {
public:
virtual ~TransactionNotifier() {}
// Implement this method to receive notification when a snapshot is
// requested via SetSnapshotOnNextOperation.
virtual void SnapshotCreated(const Snapshot* newSnapshot) = 0;
};
// Provides BEGIN/COMMIT/ROLLBACK transactions. // Provides BEGIN/COMMIT/ROLLBACK transactions.
// //
// To use transactions, you must first create either an OptimisticTransactionDB // To use transactions, you must first create either an OptimisticTransactionDB
@ -69,6 +80,9 @@ class Transaction {
// Calling SetSnapshotOnNextOperation() will not affect what snapshot is // Calling SetSnapshotOnNextOperation() will not affect what snapshot is
// returned by GetSnapshot() until the next write/GetForUpdate is executed. // returned by GetSnapshot() until the next write/GetForUpdate is executed.
// //
// When the snapshot is created the notifier's SnapshotCreated method will
// be called so that the caller can get access to the snapshot.
//
// This is an optimization to reduce the likelyhood of conflicts that // This is an optimization to reduce the likelyhood of conflicts that
// could occur in between the time SetSnapshot() is called and the first // could occur in between the time SetSnapshot() is called and the first
// write/GetForUpdate operation. Eg, this prevents the following // write/GetForUpdate operation. Eg, this prevents the following
@ -78,7 +92,8 @@ class Transaction {
// txn2->Put("A", ...); // txn2->Put("A", ...);
// txn2->Commit(); // txn2->Commit();
// txn1->GetForUpdate(opts, "A", ...); // FAIL! // txn1->GetForUpdate(opts, "A", ...); // FAIL!
virtual void SetSnapshotOnNextOperation() = 0; virtual void SetSnapshotOnNextOperation(
std::shared_ptr<TransactionNotifier> notifier = nullptr) = 0;
// Returns the Snapshot created by the last call to SetSnapshot(). // Returns the Snapshot created by the last call to SetSnapshot().
// //

@ -37,15 +37,22 @@ void TransactionBaseImpl::Clear() {
void TransactionBaseImpl::SetSnapshot() { void TransactionBaseImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_)); snapshot_.reset(new ManagedSnapshot(db_));
snapshot_needed_ = false; snapshot_needed_ = false;
snapshot_notifier_ = nullptr;
} }
void TransactionBaseImpl::SetSnapshotOnNextOperation() { void TransactionBaseImpl::SetSnapshotOnNextOperation(
std::shared_ptr<TransactionNotifier> notifier) {
snapshot_needed_ = true; snapshot_needed_ = true;
snapshot_notifier_ = notifier;
} }
void TransactionBaseImpl::SetSnapshotIfNeeded() { void TransactionBaseImpl::SetSnapshotIfNeeded() {
if (snapshot_needed_) { if (snapshot_needed_) {
std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
SetSnapshot(); SetSnapshot();
if (notifier != nullptr) {
notifier->SnapshotCreated(GetSnapshot());
}
} }
} }
@ -70,8 +77,8 @@ void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) { if (save_points_ == nullptr) {
save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>()); save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>());
} }
save_points_->emplace(snapshot_, snapshot_needed_, num_puts_, num_deletes_, save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
num_merges_); num_puts_, num_deletes_, num_merges_);
write_batch_->SetSavePoint(); write_batch_->SetSavePoint();
} }
@ -81,6 +88,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
TransactionBaseImpl::SavePoint& save_point = save_points_->top(); TransactionBaseImpl::SavePoint& save_point = save_points_->top();
snapshot_ = save_point.snapshot_; snapshot_ = save_point.snapshot_;
snapshot_needed_ = save_point.snapshot_needed_; snapshot_needed_ = save_point.snapshot_needed_;
snapshot_notifier_ = save_point.snapshot_notifier_;
num_puts_ = save_point.num_puts_; num_puts_ = save_point.num_puts_;
num_deletes_ = save_point.num_deletes_; num_deletes_ = save_point.num_deletes_;
num_merges_ = save_point.num_merges_; num_merges_ = save_point.num_merges_;

@ -169,11 +169,13 @@ class TransactionBaseImpl : public Transaction {
} }
void SetSnapshot() override; void SetSnapshot() override;
void SetSnapshotOnNextOperation() override; void SetSnapshotOnNextOperation(
std::shared_ptr<TransactionNotifier> notifier = nullptr) override;
void ClearSnapshot() override { void ClearSnapshot() override {
snapshot_.reset(); snapshot_.reset();
snapshot_needed_ = false; snapshot_needed_ = false;
snapshot_notifier_ = nullptr;
} }
void DisableIndexing() override { indexing_enabled_ = false; } void DisableIndexing() override { indexing_enabled_ = false; }
@ -228,6 +230,7 @@ class TransactionBaseImpl : public Transaction {
struct SavePoint { struct SavePoint {
std::shared_ptr<ManagedSnapshot> snapshot_; std::shared_ptr<ManagedSnapshot> snapshot_;
bool snapshot_needed_; bool snapshot_needed_;
std::shared_ptr<TransactionNotifier> snapshot_notifier_;
uint64_t num_puts_; uint64_t num_puts_;
uint64_t num_deletes_; uint64_t num_deletes_;
uint64_t num_merges_; uint64_t num_merges_;
@ -236,9 +239,11 @@ class TransactionBaseImpl : public Transaction {
TransactionKeyMap new_keys_; TransactionKeyMap new_keys_;
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, bool snapshot_needed, SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, bool snapshot_needed,
std::shared_ptr<TransactionNotifier> snapshot_notifier,
uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges) uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
: snapshot_(snapshot), : snapshot_(snapshot),
snapshot_needed_(snapshot_needed), snapshot_needed_(snapshot_needed),
snapshot_notifier_(snapshot_notifier),
num_puts_(num_puts), num_puts_(num_puts),
num_deletes_(num_deletes), num_deletes_(num_deletes),
num_merges_(num_merges) {} num_merges_(num_merges) {}
@ -266,6 +271,10 @@ class TransactionBaseImpl : public Transaction {
// been reset. // been reset.
bool snapshot_needed_ = false; bool snapshot_needed_ = false;
// SetSnapshotOnNextOperation() has been called and the caller would like
// a notification through the TransactionNotifier interface
std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool untracked = false); bool untracked = false);

@ -2163,6 +2163,65 @@ TEST_F(TransactionTest, DeferSnapshotSavePointTest) {
delete txn1; delete txn1;
} }
TEST_F(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
WriteOptions write_options;
ReadOptions read_options;
string value;
class Notifier : public TransactionNotifier {
private:
const Snapshot** snapshot_ptr_;
public:
explicit Notifier(const Snapshot** snapshot_ptr)
: snapshot_ptr_(snapshot_ptr) {}
void SnapshotCreated(const Snapshot* newSnapshot) {
*snapshot_ptr_ = newSnapshot;
}
};
std::shared_ptr<Notifier> notifier =
std::make_shared<Notifier>(&read_options.snapshot);
Status s;
s = db->Put(write_options, "B", "0");
ASSERT_OK(s);
Transaction* txn1 = db->BeginTransaction(write_options);
txn1->SetSnapshotOnNextOperation(notifier);
ASSERT_FALSE(read_options.snapshot);
s = db->Put(write_options, "B", "1");
ASSERT_OK(s);
// A Get does not generate the snapshot
s = txn1->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_FALSE(read_options.snapshot);
ASSERT_EQ(value, "1");
// Any other operation does
s = txn1->Put("A", "0");
ASSERT_OK(s);
// Now change "B".
s = db->Put(write_options, "B", "2");
ASSERT_OK(s);
// The original value should still be read
s = txn1->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_TRUE(read_options.snapshot);
ASSERT_EQ(value, "1");
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
}
TEST_F(TransactionTest, ClearSnapshotTest) { TEST_F(TransactionTest, ClearSnapshotTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;

Loading…
Cancel
Save