Deferred snapshot creation in transactions

Summary: Support for Transaction::CreateSnapshotOnNextOperation().  This is to fix a write-conflict race-condition that Yoshinori was running into when testing MyRocks with LinkBench.

Test Plan: New tests

Reviewers: yhchiang, spetrunia, rven, igor, yoshinorim, sdong

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D48099
main
agiardullo 9 years ago
parent c5f3707d42
commit def74f8763
  1. 22
      include/rocksdb/utilities/transaction.h
  2. 2
      utilities/transactions/optimistic_transaction_impl.cc
  3. 15
      utilities/transactions/transaction_base.cc
  4. 14
      utilities/transactions/transaction_base.h
  5. 110
      utilities/transactions/transaction_impl.cc
  6. 3
      utilities/transactions/transaction_impl.h
  7. 216
      utilities/transactions/transaction_test.cc

@ -61,10 +61,30 @@ class Transaction {
// methods. See Transaction::Get() for more details. // methods. See Transaction::Get() for more details.
virtual void SetSnapshot() = 0; virtual void SetSnapshot() = 0;
// Similar to SetSnapshot(), but will not change the current snapshot
// until Put/Merge/Delete/GetForUpdate/MultigetForUpdate is called.
// By calling this function, the transaction will essentially call
// SetSnapshot() for you right before performing the next write/GetForUpdate.
//
// Calling SetSnapshotOnNextOperation() will not affect what snapshot is
// returned by GetSnapshot() until the next write/GetForUpdate is executed.
//
// This is an optimization to reduce the likelyhood of conflicts that
// could occur in between the time SetSnapshot() is called and the first
// write/GetForUpdate operation. Eg, this prevents the following
// race-condition:
//
// txn1->SetSnapshot();
// txn2->Put("A", ...);
// txn2->Commit();
// txn1->GetForUpdate(opts, "A", ...); // FAIL!
virtual void SetSnapshotOnNextOperation() = 0;
// Returns the Snapshot created by the last call to SetSnapshot(). // Returns the Snapshot created by the last call to SetSnapshot().
// //
// REQUIRED: The returned Snapshot is only valid up until the next time // REQUIRED: The returned Snapshot is only valid up until the next time
// SetSnapshot() is called or the Transaction is deleted. // SetSnapshot()/SetSnapshotOnNextSavePoint() is called or the Transaction
// is deleted.
virtual const Snapshot* GetSnapshot() const = 0; virtual const Snapshot* GetSnapshot() const = 0;
// Write all batched keys to the db atomically. // Write all batched keys to the db atomically.

@ -73,6 +73,8 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
} }
uint32_t cfh_id = GetColumnFamilyID(column_family); uint32_t cfh_id = GetColumnFamilyID(column_family);
SetSnapshotIfNeeded();
SequenceNumber seq; SequenceNumber seq;
if (snapshot_) { if (snapshot_) {
seq = snapshot_->snapshot()->GetSequenceNumber(); seq = snapshot_->snapshot()->GetSequenceNumber();

@ -36,6 +36,17 @@ void TransactionBaseImpl::Clear() {
void TransactionBaseImpl::SetSnapshot() { void TransactionBaseImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_)); snapshot_.reset(new ManagedSnapshot(db_));
snapshot_needed_ = false;
}
void TransactionBaseImpl::SetSnapshotOnNextOperation() {
snapshot_needed_ = true;
}
void TransactionBaseImpl::SetSnapshotIfNeeded() {
if (snapshot_needed_) {
SetSnapshot();
}
} }
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
@ -59,7 +70,8 @@ void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) { if (save_points_ == nullptr) {
save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>()); save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>());
} }
save_points_->emplace(snapshot_, num_puts_, num_deletes_, num_merges_); save_points_->emplace(snapshot_, snapshot_needed_, num_puts_, num_deletes_,
num_merges_);
write_batch_->SetSavePoint(); write_batch_->SetSavePoint();
} }
@ -68,6 +80,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
// Restore saved SavePoint // Restore saved SavePoint
TransactionBaseImpl::SavePoint& save_point = save_points_->top(); TransactionBaseImpl::SavePoint& save_point = save_points_->top();
snapshot_ = save_point.snapshot_; snapshot_ = save_point.snapshot_;
snapshot_needed_ = save_point.snapshot_needed_;
num_puts_ = save_point.num_puts_; num_puts_ = save_point.num_puts_;
num_deletes_ = save_point.num_deletes_; num_deletes_ = save_point.num_deletes_;
num_merges_ = save_point.num_merges_; num_merges_ = save_point.num_merges_;

@ -169,6 +169,7 @@ class TransactionBaseImpl : public Transaction {
} }
void SetSnapshot() override; void SetSnapshot() override;
void SetSnapshotOnNextOperation() override;
void DisableIndexing() override { indexing_enabled_ = false; } void DisableIndexing() override { indexing_enabled_ = false; }
@ -195,6 +196,9 @@ class TransactionBaseImpl : public Transaction {
const TransactionKeyMap* GetTrackedKeysSinceSavePoint(); const TransactionKeyMap* GetTrackedKeysSinceSavePoint();
// Sets a snapshot if SetSnapshotOnNextOperation() has been called.
void SetSnapshotIfNeeded();
DB* const db_; DB* const db_;
const WriteOptions write_options_; const WriteOptions write_options_;
@ -218,6 +222,7 @@ class TransactionBaseImpl : public Transaction {
struct SavePoint { struct SavePoint {
std::shared_ptr<ManagedSnapshot> snapshot_; std::shared_ptr<ManagedSnapshot> snapshot_;
bool snapshot_needed_;
uint64_t num_puts_; uint64_t num_puts_;
uint64_t num_deletes_; uint64_t num_deletes_;
uint64_t num_merges_; uint64_t num_merges_;
@ -225,9 +230,10 @@ class TransactionBaseImpl : public Transaction {
// Record all keys tracked since the last savepoint // Record all keys tracked since the last savepoint
TransactionKeyMap new_keys_; TransactionKeyMap new_keys_;
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, uint64_t num_puts, SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, bool snapshot_needed,
uint64_t num_deletes, uint64_t num_merges) uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
: snapshot_(snapshot), : snapshot_(snapshot),
snapshot_needed_(snapshot_needed),
num_puts_(num_puts), num_puts_(num_puts),
num_deletes_(num_deletes), num_deletes_(num_deletes),
num_merges_(num_merges) {} num_merges_(num_merges) {}
@ -251,6 +257,10 @@ class TransactionBaseImpl : public Transaction {
// underlying WriteBatch and not indexed in the WriteBatchWithIndex. // underlying WriteBatch and not indexed in the WriteBatchWithIndex.
bool indexing_enabled_ = true; bool indexing_enabled_ = true;
// SetSnapshotOnNextOperation() has been called and the snapshot has not yet
// been reset.
bool snapshot_needed_ = false;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool untracked = false); bool untracked = false);

@ -220,16 +220,10 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
bool previously_locked; bool previously_locked;
Status s; Status s;
// Even though we do not care about doing conflict checking for this write, // lock this key if this transactions hasn't already locked it
// we still need to take a lock to make sure we do not cause a conflict with SequenceNumber current_seqno = kMaxSequenceNumber;
// some other write. However, we do not need to check if there have been SequenceNumber new_seqno = kMaxSequenceNumber;
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
bool check_snapshot = !untracked;
SequenceNumber tracked_seqno = kMaxSequenceNumber;
// Lookup whether this key has already been locked by this transaction
const auto& tracked_keys = GetTrackedKeys(); const auto& tracked_keys = GetTrackedKeys();
const auto tracked_keys_cf = tracked_keys.find(cfh_id); const auto tracked_keys_cf = tracked_keys.find(cfh_id);
if (tracked_keys_cf == tracked_keys.end()) { if (tracked_keys_cf == tracked_keys.end()) {
@ -240,7 +234,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
previously_locked = false; previously_locked = false;
} else { } else {
previously_locked = true; previously_locked = true;
tracked_seqno = iter->second; current_seqno = iter->second;
} }
} }
@ -249,39 +243,37 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
s = txn_db_impl_->TryLock(this, cfh_id, key_str); s = txn_db_impl_->TryLock(this, cfh_id, key_str);
} }
if (s.ok()) { SetSnapshotIfNeeded();
// Even though we do not care about doing conflict checking for this write,
// we still need to take a lock to make sure we do not cause a conflict with
// some other write. However, we do not need to check if there have been
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
if (untracked || snapshot_ == nullptr) {
// Need to remember the earliest sequence number that we know that this
// key has not been modified after. This is useful if this same
// transaction
// later tries to lock this key again.
if (current_seqno == kMaxSequenceNumber) {
// Since we haven't checked a snapshot, we only know this key has not
// been modified since after we locked it.
new_seqno = db_->GetLatestSequenceNumber();
} else {
new_seqno = current_seqno;
}
} else {
// If a snapshot is set, we need to make sure the key hasn't been modified // If a snapshot is set, we need to make sure the key hasn't been modified
// since the snapshot. This must be done after we locked the key. // since the snapshot. This must be done after we locked the key.
if (!check_snapshot || snapshot_ == nullptr) { if (s.ok()) {
// Need to remember the earliest sequence number that we know that this s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno);
// key has not been modified after. This is useful if this same
// transaction if (!s.ok()) {
// later tries to lock this key again. // Failed to validate key
if (tracked_seqno == kMaxSequenceNumber) { if (!previously_locked) {
// Since we haven't checked a snapshot, we only know this key has not // Unlock key we just locked
// been modified since after we locked it. txn_db_impl_->UnLock(this, cfh_id, key.ToString());
tracked_seqno = db_->GetLatestSequenceNumber();
}
} else {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
bool already_validated = tracked_seqno <= seq;
if (!already_validated) {
s = CheckKeySequence(column_family, key);
if (s.ok()) {
// Record that there have been no writes to this key after this
// sequence.
tracked_seqno = seq;
} else {
// Failed to validate key
if (!previously_locked) {
// Unlock key we just locked
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
}
} }
} }
} }
@ -289,7 +281,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
if (s.ok()) { if (s.ok()) {
// Let base class know we've conflict checked this key. // Let base class know we've conflict checked this key.
TrackKey(cfh_id, key_str, tracked_seqno); TrackKey(cfh_id, key_str, new_seqno);
} }
return s; return s;
@ -297,22 +289,30 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// Return OK() if this key has not been modified more recently than the // Return OK() if this key has not been modified more recently than the
// transaction snapshot_. // transaction snapshot_.
Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key,
Status result; SequenceNumber prev_seqno,
if (snapshot_ != nullptr) { SequenceNumber* new_seqno) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr); assert(snapshot_);
auto db_impl = reinterpret_cast<DBImpl*>(db_);
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
if (prev_seqno <= seq) {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
return Status::OK();
}
ColumnFamilyHandle* cfh = column_family ? column_family : *new_seqno = seq;
db_impl->DefaultColumnFamily();
result = TransactionUtil::CheckKeyForConflicts( assert(dynamic_cast<DBImpl*>(db_) != nullptr);
db_impl, cfh, key.ToString(), auto db_impl = reinterpret_cast<DBImpl*>(db_);
snapshot_->snapshot()->GetSequenceNumber());
} ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl->DefaultColumnFamily();
return result; return TransactionUtil::CheckKeyForConflicts(
db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber());
} }
} // namespace rocksdb } // namespace rocksdb

@ -88,7 +88,8 @@ class TransactionImpl : public TransactionBaseImpl {
void Clear() override; void Clear() override;
Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key); Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno);
Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock);

@ -1945,6 +1945,222 @@ TEST_F(TransactionTest, MergeTest) {
ASSERT_EQ("a,3", value); ASSERT_EQ("a,3", value);
} }
TEST_F(TransactionTest, DeferSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
s = db->Put(write_options, "A", "a0");
ASSERT_OK(s);
Transaction* txn1 = db->BeginTransaction(write_options);
Transaction* txn2 = db->BeginTransaction(write_options);
txn1->SetSnapshotOnNextOperation();
auto snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot);
s = txn2->Put("A", "a2");
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
s = txn1->GetForUpdate(read_options, "A", &value);
// Should not conflict with txn2 since snapshot wasn't set until
// GetForUpdate was called.
ASSERT_OK(s);
ASSERT_EQ("a2", value);
s = txn1->Put("A", "a1");
ASSERT_OK(s);
s = db->Put(write_options, "B", "b0");
ASSERT_OK(s);
// Cannot lock B since it was written after the snapshot was set
s = txn1->Put("B", "b1");
ASSERT_TRUE(s.IsBusy());
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
s = db->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a1", value);
s = db->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_EQ("b0", value);
}
TEST_F(TransactionTest, DeferSnapshotTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options);
txn1->SetSnapshot();
s = txn1->Put("A", "a1");
ASSERT_OK(s);
s = db->Put(write_options, "C", "c0");
ASSERT_OK(s);
s = db->Put(write_options, "D", "d0");
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
txn1->SetSnapshotOnNextOperation();
s = txn1->Get(snapshot_read_options, "C", &value);
// Snapshot was set before C was written
ASSERT_TRUE(s.IsNotFound());
s = txn1->Get(snapshot_read_options, "D", &value);
// Snapshot was set before D was written
ASSERT_TRUE(s.IsNotFound());
// Snapshot should not have changed yet.
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "C", &value);
// Snapshot was set before C was written
ASSERT_TRUE(s.IsNotFound());
s = txn1->Get(snapshot_read_options, "D", &value);
// Snapshot was set before D was written
ASSERT_TRUE(s.IsNotFound());
s = txn1->GetForUpdate(read_options, "C", &value);
ASSERT_OK(s);
ASSERT_EQ("c0", value);
s = db->Put(write_options, "D", "d00");
ASSERT_OK(s);
// Snapshot is now set
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "D", &value);
ASSERT_OK(s);
ASSERT_EQ("d0", value);
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
}
TEST_F(TransactionTest, DeferSnapshotSavePointTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
Status s;
Transaction* txn1 = db->BeginTransaction(write_options);
txn1->SetSavePoint(); // 1
s = db->Put(write_options, "T", "1");
ASSERT_OK(s);
txn1->SetSnapshotOnNextOperation();
s = db->Put(write_options, "T", "2");
ASSERT_OK(s);
txn1->SetSavePoint(); // 2
s = db->Put(write_options, "T", "3");
ASSERT_OK(s);
s = txn1->Put("A", "a");
ASSERT_OK(s);
txn1->SetSavePoint(); // 3
s = db->Put(write_options, "T", "4");
ASSERT_OK(s);
txn1->SetSnapshot();
txn1->SetSnapshotOnNextOperation();
txn1->SetSavePoint(); // 4
s = db->Put(write_options, "T", "5");
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("4", value);
s = txn1->Put("A", "a1");
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("5", value);
s = txn1->RollbackToSavePoint(); // Rollback to 4
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("4", value);
s = txn1->RollbackToSavePoint(); // Rollback to 3
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("3", value);
s = txn1->Get(read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("5", value);
s = txn1->RollbackToSavePoint(); // Rollback to 2
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot_read_options.snapshot);
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("5", value);
s = txn1->Delete("A");
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
ASSERT_TRUE(snapshot_read_options.snapshot);
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("5", value);
s = txn1->RollbackToSavePoint(); // Rollback to 1
ASSERT_OK(s);
s = txn1->Delete("A");
ASSERT_OK(s);
snapshot_read_options.snapshot = txn1->GetSnapshot();
ASSERT_FALSE(snapshot_read_options.snapshot);
s = txn1->Get(snapshot_read_options, "T", &value);
ASSERT_OK(s);
ASSERT_EQ("5", value);
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save