diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index a8d01e034..b6b78ef99 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -324,6 +324,8 @@ enum Tickers : uint32_t { TXN_DUPLICATE_KEY_OVERHEAD, // # of times snapshot_mutex_ is acquired in the fast path. TXN_SNAPSHOT_MUTEX_OVERHEAD, + // # of times ::Get returned TryAgain due to expired snapshot seq + TXN_GET_TRY_AGAIN, // Number of keys actually found in MultiGet calls (vs number requested by // caller) diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 667af809b..e9dc3fb82 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -4620,6 +4620,8 @@ class TickerTypeJni { return -0x0B; case rocksdb::Tickers::TXN_SNAPSHOT_MUTEX_OVERHEAD: return -0x0C; + case rocksdb::Tickers::TXN_GET_TRY_AGAIN: + return -0x0D; case rocksdb::Tickers::TICKER_ENUM_MAX: // 0x5F for backwards compatibility on current minor version. return 0x5F; @@ -4912,6 +4914,8 @@ class TickerTypeJni { return rocksdb::Tickers::TXN_DUPLICATE_KEY_OVERHEAD; case -0x0C: return rocksdb::Tickers::TXN_SNAPSHOT_MUTEX_OVERHEAD; + case -0x0D: + return rocksdb::Tickers::TXN_GET_TRY_AGAIN; case 0x5F: // 0x5F for backwards compatibility on current minor version. return rocksdb::Tickers::TICKER_ENUM_MAX; diff --git a/java/src/main/java/org/rocksdb/TickerType.java b/java/src/main/java/org/rocksdb/TickerType.java index 551e366dc..40a642bd6 100644 --- a/java/src/main/java/org/rocksdb/TickerType.java +++ b/java/src/main/java/org/rocksdb/TickerType.java @@ -717,6 +717,11 @@ public enum TickerType { */ TXN_SNAPSHOT_MUTEX_OVERHEAD((byte) -0x0C), + /** + * # of times ::Get returned TryAgain due to expired snapshot seq + */ + TXN_GET_TRY_AGAIN((byte) -0x0D), + TICKER_ENUM_MAX((byte) 0x5F); private final byte value; diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 15d702d1f..70c993b20 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -162,6 +162,7 @@ const std::vector> TickersNameMap = { "rocksdb.txn.overhead.mutex.old.commit.map"}, {TXN_DUPLICATE_KEY_OVERHEAD, "rocksdb.txn.overhead.duplicate.key"}, {TXN_SNAPSHOT_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.snapshot"}, + {TXN_GET_TRY_AGAIN, "rocksdb.txn.get.tryagain"}, {NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"}, {NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"}, {NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"}, diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index ef89aaeb8..2cb91f0d3 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1372,7 +1372,7 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { for (int i = 0; i < writes; i++) { WriteBatch batch; // For duplicate keys cause 4 commit entries, each evicting an entry that - // is not published yet, thus causing max ecited seq go higher than last + // is not published yet, thus causing max evicted seq go higher than last // published. for (int b = 0; b < batch_cnt; b++) { batch.Put("foo", "foo"); @@ -1404,6 +1404,64 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { db->ReleaseSnapshot(snap); } +// Test that reads without snapshots would not hit an undefined state +TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); + WriteOptions woptions; + WritePreparedTxnDB* wp_db = dynamic_cast(db); + + const int writes = 50; + rocksdb::port::Thread t1([&]() { + for (int i = 0; i < writes; i++) { + WriteBatch batch; + batch.Put("key", "foo"); + db->Write(woptions, &batch); + } + }); + + rocksdb::port::Thread t2([&]() { + while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread + std::this_thread::yield(); + } + ReadOptions ropt; + PinnableSlice pinnable_val; + TransactionOptions txn_options; + for (int i = 0; i < 10; i++) { + auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_TRUE(s.ok() || s.IsTryAgain()); + pinnable_val.Reset(); + Transaction* txn = db->BeginTransaction(woptions, txn_options); + s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_TRUE(s.ok() || s.IsTryAgain()); + pinnable_val.Reset(); + std::vector values; + auto s_vec = + txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values); + ASSERT_EQ(1, values.size()); + ASSERT_EQ(1, s_vec.size()); + s = s_vec[0]; + ASSERT_TRUE(s.ok() || s.IsTryAgain()); + Slice key("key"); + txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, + &s, true); + ASSERT_TRUE(s.ok() || s.IsTryAgain()); + delete txn; + } + }); + + t1.join(); + t2.join(); + + // Make sure that the test has worked and seq number has advanced as we + // thought + auto snap = db->GetSnapshot(); + ASSERT_GT(snap->GetSequenceNumber(), writes - 1); + db->ReleaseSnapshot(snap); +} + // Check that old_commit_map_ cleanup works correctly if the snapshot equals // max_evicted_seq_. TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 97bebac5d..188f61120 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -46,13 +46,16 @@ void WritePreparedTxn::MultiGet(const ReadOptions& options, PinnableSlice* values, Status* statuses, bool sorted_input) { SequenceNumber min_uncommitted, snap_seq; - const bool backed_by_snapshot = + const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); - WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); + WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, + backed_by_snapshot); write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, keys, values, statuses, sorted_input, &callback); - if (UNLIKELY(!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + if (UNLIKELY(!callback.valid() || + !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); for (size_t i = 0; i < num_keys; i++) { statuses[i] = Status::TryAgain(); } @@ -63,15 +66,18 @@ Status WritePreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { SequenceNumber min_uncommitted, snap_seq; - const bool backed_by_snapshot = + const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); - WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); + WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, + backed_by_snapshot); auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, pinnable_val, &callback); - if (LIKELY(wpt_db_->ValidateSnapshot(callback.max_visible_seq(), + if (LIKELY(callback.valid() && + wpt_db_->ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) { return res; } else { + wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); return Status::TryAgain(); } } @@ -241,9 +247,11 @@ Status WritePreparedTxn::RollbackInternal() { auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap(); auto read_at_seq = kMaxSequenceNumber; + ReadOptions roptions; + // to prevent callback's seq to be overrriden inside DBImpk::Get + roptions.snapshot = wpt_db_->GetMaxSnapshot(); struct RollbackWriteBatchBuilder : public WriteBatch::Handler { DBImpl* db_; - ReadOptions roptions; WritePreparedTxnReadCallback callback; WriteBatch* rollback_batch_; std::map& comparators_; @@ -251,18 +259,20 @@ Status WritePreparedTxn::RollbackInternal() { using CFKeys = std::set; std::map keys_; bool rollback_merge_operands_; + ReadOptions roptions_; RollbackWriteBatchBuilder( DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, WriteBatch* dst_batch, std::map& comparators, std::map& handles, - bool rollback_merge_operands) + bool rollback_merge_operands, ReadOptions _roptions) : db_(db), callback(wpt_db, snap_seq), // disable min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators), handles_(handles), - rollback_merge_operands_(rollback_merge_operands) {} + rollback_merge_operands_(rollback_merge_operands), + roptions_(_roptions) {} Status Rollback(uint32_t cf, const Slice& key) { Status s; @@ -280,7 +290,7 @@ Status WritePreparedTxn::RollbackInternal() { PinnableSlice pinnable_val; bool not_used; auto cf_handle = handles_[cf]; - s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + s = db_->GetImpl(roptions_, cf_handle, key, &pinnable_val, ¬_used, &callback); assert(s.ok() || s.IsNotFound()); if (s.ok()) { @@ -330,7 +340,8 @@ Status WritePreparedTxn::RollbackInternal() { bool WriteAfterCommit() const override { return false; } } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), - wpt_db_->txn_db_options_.rollback_merge_operands); + wpt_db_->txn_db_options_.rollback_merge_operands, + roptions); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); assert(s.ok()); if (!s.ok()) { @@ -434,7 +445,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); - WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted); + WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted, + kBackedByDBSnapshot); return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */, &snap_checker, min_uncommitted); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index a3b523a22..e6d710206 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -226,16 +226,18 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { SequenceNumber min_uncommitted, snap_seq; - const bool backed_by_snapshot = + const SnapshotBackup backed_by_snapshot = AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); - WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted); + WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted, + backed_by_snapshot); bool* dont_care = nullptr; auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care, &callback); - if (LIKELY( - ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) { + if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(), + backed_by_snapshot))) { return res; } else { + WPRecordTick(TXN_GET_TRY_AGAIN); return Status::TryAgain(); } } @@ -298,7 +300,8 @@ struct WritePreparedTxnDB::IteratorState { IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, std::shared_ptr s, SequenceNumber min_uncommitted) - : callback(txn_db, sequence, min_uncommitted), snapshot(s) {} + : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot), + snapshot(s) {} WritePreparedTxnReadCallback callback; std::shared_ptr snapshot; @@ -392,6 +395,7 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = std::unique_ptr[]>( new std::atomic[COMMIT_CACHE_SIZE] {}); + dummy_max_snapshot_.number_ = kMaxSequenceNumber; } void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 9561bfada..4ee7d8e6c 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -30,6 +30,7 @@ #include "utilities/transactions/write_prepared_txn.h" namespace rocksdb { +enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot }; // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. // In this way some data in the DB might not be committed. The DB provides @@ -448,18 +449,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const ColumnFamilyOptions& cf_options) override; // Assign the min and max sequence numbers for reading from the db. A seq > // max is not valid, and a seq < min is valid, and a min <= seq < max requires - // further checkings. Normally max is defined by the snapshot and min is by + // further checking. Normally max is defined by the snapshot and min is by // minimum uncommitted seq. - inline bool AssignMinMaxSeqs(const Snapshot* snapshot, SequenceNumber* min, - SequenceNumber* max); + inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot, + SequenceNumber* min, + SequenceNumber* max); // Validate is a snapshot sequence number is still valid based on the latest // db status. backed_by_snapshot specifies if the number is baked by an actual // snapshot object. order specified the memory order with which we load the // atomic variables: relax is enough for the default since we care about last // value seen by same thread. inline bool ValidateSnapshot( - const SequenceNumber snap_seq, const bool backed_by_snapshot, + const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, std::memory_order order = std::memory_order_relaxed); + // Get a dummy snapshot that refers to kMaxSequenceNumber + Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } private: friend class AddPreparedCallback; @@ -488,6 +492,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; + friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test; friend class WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; friend class @@ -783,26 +788,55 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Thread safety: since the handle is read-only object it is a const it is // safe to read it concurrently std::shared_ptr> handle_map_; + // A dummy snapshot object that refers to kMaxSequenceNumber + SnapshotImpl dummy_max_snapshot_; }; class WritePreparedTxnReadCallback : public ReadCallback { public: WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) - : ReadCallback(snapshot), db_(db) {} + : ReadCallback(snapshot), + db_(db), + backed_by_snapshot_(kBackedByDBSnapshot) {} WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, - SequenceNumber min_uncommitted) - : ReadCallback(snapshot, min_uncommitted), db_(db) {} + SequenceNumber min_uncommitted, + SnapshotBackup backed_by_snapshot) + : ReadCallback(snapshot, min_uncommitted), + db_(db), + backed_by_snapshot_(backed_by_snapshot) { + (void)backed_by_snapshot_; // to silence unused private field warning + } + + virtual ~WritePreparedTxnReadCallback() { + // If it is not backed by snapshot, the caller must check validity + assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); + } // Will be called to see if the seq number visible; if not it moves on to // the next seq number. inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { auto snapshot = max_visible_seq_; - return db_->IsInSnapshot(seq, snapshot, min_uncommitted_); + bool snap_released = false; + auto ret = + db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released); + assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); + snap_released_ |= snap_released; + return ret; + } + + inline bool valid() { + valid_checked_ = true; + return snap_released_ == false; } // TODO(myabandeh): override Refresh when Iterator::Refresh is supported private: WritePreparedTxnDB* db_; + // Whether max_visible_seq_ is backed by a snapshot + const SnapshotBackup backed_by_snapshot_; + bool snap_released_ = false; + // Safety check to ensure that the caller has checked invalid statuses + bool valid_checked_ = false; }; class AddPreparedCallback : public PreReleaseCallback { @@ -1034,26 +1068,26 @@ struct SubBatchCounter : public WriteBatch::Handler { bool WriteAfterCommit() const override { return false; } }; -bool WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, - SequenceNumber* min, - SequenceNumber* max) { +SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, + SequenceNumber* min, + SequenceNumber* max) { if (snapshot != nullptr) { *min = static_cast_with_check(snapshot) ->min_uncommitted_; *max = static_cast_with_check(snapshot) ->number_; - return true; + return kBackedByDBSnapshot; } else { *min = SmallestUnCommittedSeq(); *max = 0; // to be assigned later after sv is referenced. - return false; + return kUnbackedByDBSnapshot; } } -bool WritePreparedTxnDB::ValidateSnapshot(const SequenceNumber snap_seq, - const bool backed_by_snapshot, - std::memory_order order) { - if (backed_by_snapshot) { +bool WritePreparedTxnDB::ValidateSnapshot( + const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, + std::memory_order order) { + if (backed_by_snapshot == kBackedByDBSnapshot) { return true; } else { SequenceNumber max = max_evicted_seq_.load(order); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 993c3b8b6..a1862d32d 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -25,7 +25,11 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { } } - return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); + bool snap_released = false; + auto ret = db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); + assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); + snap_released_ |= snap_released; + return ret; } WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, @@ -547,8 +551,9 @@ Status WriteUnpreparedTxn::RollbackInternal() { Status s; const auto& cf_map = *wupt_db_->GetCFHandleMap(); auto read_at_seq = kMaxSequenceNumber; - ReadOptions roptions; + // to prevent callback's seq to be overrriden inside DBImpk::Get + roptions.snapshot = wpt_db_->GetMaxSnapshot(); // Note that we do not use WriteUnpreparedTxnReadCallback because we do not // need to read our own writes when reading prior versions of the key for // rollback. @@ -704,7 +709,8 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { ->min_uncommitted_; SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber(); WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, - top.unprep_seqs_); + top.unprep_seqs_, + kBackedByDBSnapshot); const auto& cf_map = *wupt_db_->GetCFHandleMap(); for (const auto& cfkey : tracked_keys) { const auto cfid = cfkey.first; @@ -784,14 +790,16 @@ void WriteUnpreparedTxn::MultiGet(const ReadOptions& options, PinnableSlice* values, Status* statuses, bool sorted_input) { SequenceNumber min_uncommitted, snap_seq; - const bool backed_by_snapshot = + const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, - unprep_seqs_); + unprep_seqs_, backed_by_snapshot); write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, keys, values, statuses, sorted_input, &callback); - if (UNLIKELY(!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + if (UNLIKELY(!callback.valid() || + !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); for (size_t i = 0; i < num_keys; i++) { statuses[i] = Status::TryAgain(); } @@ -802,15 +810,17 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { SequenceNumber min_uncommitted, snap_seq; - const bool backed_by_snapshot = + const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, - unprep_seqs_); + unprep_seqs_, backed_by_snapshot); auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, &callback); - if (LIKELY(wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + if (LIKELY(callback.valid() && + wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { return res; } else { + wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); return Status::TryAgain(); } } @@ -854,8 +864,8 @@ Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); - WriteUnpreparedTxnReadCallback snap_checker(wupt_db_, snap_seq, - min_uncommitted, unprep_seqs_); + WriteUnpreparedTxnReadCallback snap_checker( + wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot); return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */, &snap_checker, min_uncommitted); diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 774d90e8d..5c654b05b 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -56,7 +56,8 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { WriteUnpreparedTxnReadCallback( WritePreparedTxnDB* db, SequenceNumber snapshot, SequenceNumber min_uncommitted, - const std::map& unprep_seqs) + const std::map& unprep_seqs, + SnapshotBackup backed_by_snapshot) // Pass our last uncommitted seq as the snapshot to the parent class to // ensure that the parent will not prematurely filter out own writes. We // will do the exact comparison against snapshots in IsVisibleFullCheck @@ -64,10 +65,23 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted), db_(db), unprep_seqs_(unprep_seqs), - wup_snapshot_(snapshot) {} + wup_snapshot_(snapshot), + backed_by_snapshot_(backed_by_snapshot) { + (void)backed_by_snapshot_; // to silence unused private field warning + } + + virtual ~WriteUnpreparedTxnReadCallback() { + // If it is not backed by snapshot, the caller must check validity + assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); + } virtual bool IsVisibleFullCheck(SequenceNumber seq) override; + inline bool valid() { + valid_checked_ = true; + return snap_released_ == false; + } + void Refresh(SequenceNumber seq) override { max_visible_seq_ = std::max(max_visible_seq_, seq); wup_snapshot_ = seq; @@ -88,6 +102,11 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { WritePreparedTxnDB* db_; const std::map& unprep_seqs_; SequenceNumber wup_snapshot_; + // Whether max_visible_seq_ is backed by a snapshot + const SnapshotBackup backed_by_snapshot_; + bool snap_released_ = false; + // Safety check to ensure that the caller has checked invalid statuses + bool valid_checked_ = false; }; class WriteUnpreparedTxn : public WritePreparedTxn { diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 4381619e7..defaf9fce 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -348,7 +348,8 @@ struct WriteUnpreparedTxnDB::IteratorState { IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, std::shared_ptr s, SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) - : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_), + : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_, + kBackedByDBSnapshot), snapshot(s) {} SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }