diff --git a/db/db_impl.cc b/db/db_impl.cc index 088860eaf..b4f6104a9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1644,8 +1644,8 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { uint64_t oldest_snapshot; if (snapshots_.empty()) { oldest_snapshot = concurrent_prepare_ && seq_per_batch_ - ? versions_->LastToBeWrittenSequence() - : versions_->LastSequence(); + ? versions_->LastToBeWrittenSequence() + : versions_->LastSequence(); } else { oldest_snapshot = snapshots_.oldest()->number_; } diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index be7487a83..7b188ffaa 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -145,6 +145,7 @@ class PessimisticTransaction : public TransactionBaseImpl { uint64_t expiration_time_; private: + friend class TransactionTest_ValidateSnapshotTest_Test; // Used to create unique ids for transactions. static std::atomic txn_id_counter_; @@ -179,8 +180,9 @@ class PessimisticTransaction : public TransactionBaseImpl { // Whether to perform deadlock detection or not. int64_t deadlock_detect_depth_; - Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, - SequenceNumber prev_seqno, SequenceNumber* new_seqno); + virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, SequenceNumber prev_seqno, + SequenceNumber* new_seqno); void UnlockGetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) override; @@ -208,9 +210,6 @@ class WriteCommittedTxn : public PessimisticTransaction { Status RollbackInternal() override; - Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, - SequenceNumber prev_seqno, SequenceNumber* new_seqno); - // No copying allowed WriteCommittedTxn(const WriteCommittedTxn&); void operator=(const WriteCommittedTxn&); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 9c7d08641..80aabb943 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -110,6 +110,47 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +// This test clarifies the contract of ValidateSnapshot +TEST_P(TransactionTest, ValidateSnapshotTest) { + for (bool with_2pc : {true, false}) { + ReOpen(); + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + Transaction* txn1 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn1); + s = txn1->Put(Slice("foo"), Slice("bar1")); + ASSERT_OK(s); + if (with_2pc) { + s = txn1->SetName("xid1"); + ASSERT_OK(s); + s = txn1->Prepare(); + ASSERT_OK(s); + } + + Transaction* txn2 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn2); + txn2->SetSnapshot(); + + s = txn1->Commit(); + ASSERT_OK(s); + delete txn1; + + SequenceNumber dont_care; + auto pes_txn2 = dynamic_cast(txn2); + // Test the simple case where the key is not tracked yet + auto trakced_seq = kMaxSequenceNumber; + s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", + trakced_seq, &dont_care); + ASSERT_TRUE(s.IsBusy()); + delete txn2; + } +} + TEST_P(TransactionTest, WaitingTxn) { WriteOptions write_options; ReadOptions read_options; diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index ad03a9432..545b92f7f 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -22,11 +22,9 @@ namespace rocksdb { -Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, - ColumnFamilyHandle* column_family, - const std::string& key, - SequenceNumber key_seq, - bool cache_only) { +Status TransactionUtil::CheckKeyForConflicts( + DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, + SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker) { Status result; auto cfh = reinterpret_cast(column_family); @@ -42,7 +40,8 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, SequenceNumber earliest_seq = db_impl->GetEarliestMemTableSequenceNumber(sv, true); - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); + result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only, + snap_checker); db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -52,8 +51,9 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, - SequenceNumber key_seq, const std::string& key, - bool cache_only) { + SequenceNumber snap_seq, + const std::string& key, bool cache_only, + ReadCallback* snap_checker) { Status result; bool need_to_read_sst = false; @@ -73,9 +73,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, result = Status::TryAgain( "Transaction ould not check for conflicts as the MemTable does not " "countain a long enough history to check write at SequenceNumber: ", - ToString(key_seq)); + ToString(snap_seq)); } - } else if (key_seq < earliest_seq) { + } else if (snap_seq < earliest_seq) { need_to_read_sst = true; if (cache_only) { @@ -91,7 +91,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, "max_write_buffer_number_to_maintain option could reduce the " "frequency " "of this error.", - key_seq, earliest_seq); + snap_seq, earliest_seq); result = Status::TryAgain(msg); } } @@ -105,9 +105,13 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { result = s; - } else if (found_record_for_key && (seq > key_seq)) { - // Write Conflict - result = Status::Busy(); + } else if (found_record_for_key) { + bool write_conflict = snap_checker == nullptr + ? snap_seq < seq + : !snap_checker->IsCommitted(seq); + if (write_conflict) { + result = Status::Busy(); + } } } diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 5c6b8fa49..7377874e6 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -10,6 +10,8 @@ #include #include +#include "db/read_callback.h" + #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -40,7 +42,7 @@ class WriteBatchWithIndex; class TransactionUtil { public: - // Verifies there have been no writes to this key in the db since this + // Verifies there have been no commits to this key in the db since this // sequence number. // // If cache_only is true, then this function will not attempt to read any @@ -52,7 +54,8 @@ class TransactionUtil { static Status CheckKeyForConflicts(DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber key_seq, bool cache_only); + SequenceNumber snap_seq, bool cache_only, + ReadCallback* snap_checker = nullptr); // For each key,SequenceNumber pair in the TransactionKeyMap, this function // will verify there have been no writes to the key in the db since that @@ -69,8 +72,9 @@ class TransactionUtil { private: static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, - SequenceNumber earliest_seq, SequenceNumber key_seq, - const std::string& key, bool cache_only); + SequenceNumber earliest_seq, SequenceNumber snap_seq, + const std::string& key, bool cache_only, + ReadCallback* snap_checker = nullptr); }; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 8ececbcb1..015cd58c6 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -240,6 +240,34 @@ Status WritePreparedTxn::RollbackInternal() { return s; } +Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, + SequenceNumber prev_seqno, + SequenceNumber* new_seqno) { + assert(snapshot_); + + SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); + // prev_seqno is either max or the last snapshot with which this key was + // trackeed so there is no need to apply the IsInSnapshot to this comparison + // here as prev_seqno is not a prepare seq. + if (prev_seqno <= snap_seq) { + // If the key has been previous validated at a sequence number earlier + // than the curent snapshot's sequence number, we already know it has not + // been modified. + return Status::OK(); + } + + *new_seqno = snap_seq; + + ColumnFamilyHandle* cfh = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + + WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq); + return TransactionUtil::CheckKeyForConflicts( + db_impl_, cfh, key.ToString(), snapshot_->GetSequenceNumber(), + false /* cache_only */, &snap_checker); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 4dc6945c0..f9ae1300f 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -82,12 +82,9 @@ class WritePreparedTxn : public PessimisticTransaction { Status RollbackInternal() override; - // TODO(myabandeh): verify that the current impl work with values being - // written with prepare sequence number too. - // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& - // key, - // SequenceNumber prev_seqno, SequenceNumber* - // new_seqno); + virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, SequenceNumber prev_seqno, + SequenceNumber* new_seqno) override; // No copying allowed WritePreparedTxn(const WritePreparedTxn&);