diff --git a/db/db_impl.cc b/db/db_impl.cc index 6e6408c9f..a8412f21e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1681,7 +1681,7 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { } #endif // ROCKSDB_LITE -const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { +SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { int64_t unix_time = 0; env_->GetCurrentTime(&unix_time); // Ignore error SnapshotImpl* s = new SnapshotImpl; diff --git a/db/db_impl.h b/db/db_impl.h index d6ac0bbcd..357458cbb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -731,6 +731,7 @@ class DBImpl : public DB { friend class DB; friend class InternalStats; friend class PessimisticTransaction; + friend class TransactionBaseImpl; friend class WriteCommittedTxn; friend class WritePreparedTxn; friend class WritePreparedTxnDB; @@ -955,7 +956,7 @@ class DBImpl : public DB { // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); - const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary); + SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary); uint64_t GetMaxTotalWalSize() const; diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 185daa4e1..c9ddabdec 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -21,6 +21,10 @@ class SnapshotList; class SnapshotImpl : public Snapshot { public: SequenceNumber number_; // const after creation + // It indicates the smallest uncommitted data at the time the snapshot was + // taken. This is currently used by WritePrepared transactions to limit the + // scope of queries to IsInSnpashot. + SequenceNumber min_uncommitted_ = 0; virtual SequenceNumber GetSequenceNumber() const override { return number_; } @@ -56,8 +60,8 @@ class SnapshotList { SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } - const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, - uint64_t unix_time, bool is_write_conflict_boundary) { + SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, + bool is_write_conflict_boundary) { s->number_ = seq; s->unix_time_ = unix_time; s->is_write_conflict_boundary_ = is_write_conflict_boundary; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index af4114cc2..30bd2e4ea 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -35,6 +35,8 @@ class PessimisticTransactionDB : public TransactionDB { virtual ~PessimisticTransactionDB(); + virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } + virtual Status Initialize( const std::vector& compaction_enabled_cf_indices, const std::vector& handles); diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index d42a6d1ba..3facd437e 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -187,7 +187,7 @@ class TransactionBaseImpl : public Transaction { return snapshot_ ? snapshot_.get() : nullptr; } - void SetSnapshot() override; + virtual void SetSnapshot() override; void SetSnapshotOnNextOperation( std::shared_ptr notifier = nullptr) override; @@ -303,6 +303,7 @@ class TransactionBaseImpl : public Transaction { WriteBatchWithIndex write_batch_; private: + friend class WritePreparedTxn; // Extra data to be persisted with the commit. Note this is only used when // prepare phase is not skipped. WriteBatch commit_time_batch_; @@ -335,7 +336,6 @@ class TransactionBaseImpl : public Transaction { bool read_only, bool exclusive, bool skip_validate = false); WriteBatchBase* GetBatchForWrite(); - void SetSnapshotInternal(const Snapshot* snapshot); }; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 9a8022ae1..b7880eed0 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -20,6 +20,7 @@ #include "rocksdb/db.h" #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/write_prepared_txn_db.h" @@ -39,8 +40,14 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options, auto snapshot = read_options.snapshot; auto snap_seq = snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; // by default disable the optimization + if (snapshot != nullptr) { + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + } - WritePreparedTxnReadCallback callback(wpt_db_, snap_seq); + WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, pinnable_val, &callback); } @@ -68,25 +75,26 @@ Status WritePreparedTxn::PrepareInternal() { const bool WRITE_AFTER_COMMIT = true; WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, !WRITE_AFTER_COMMIT); - const bool DISABLE_MEMTABLE = true; - uint64_t seq_used = kMaxSequenceNumber; // For each duplicate key we account for a new sub-batch prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); - Status s = - db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, &log_number_, /*log ref*/ 0, - !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_); + // AddPrepared better to be called in the pre-release callback otherwise there + // is a non-zero chance of max advancing prepare_seq and readers assume the + // data as committed. + // Also having it in the PreReleaseCallback allows in-order addition of + // prepared entries to PrepareHeap and hence enables an optimization. Refer to + // SmallestUnCommittedSeq for more details. + AddPreparedCallback add_prepared_callback( + wpt_db_, prepare_batch_cnt_, + db_impl_->immutable_db_options().two_write_queues); + const bool DISABLE_MEMTABLE = true; + uint64_t seq_used = kMaxSequenceNumber; + Status s = db_impl_->WriteImpl( + write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, + &seq_used, prepare_batch_cnt_, &add_prepared_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; SetId(prepare_seq); - // TODO(myabandeh): AddPrepared better to be called in the pre-release - // callback otherwise there is a non-zero chance of max dvancing prepare_seq - // and readers assume the data as committed. - if (s.ok()) { - for (size_t i = 0; i < prepare_batch_cnt_; i++) { - wpt_db_->AddPrepared(prepare_seq + i); - } - } return s; } @@ -135,6 +143,10 @@ Status WritePreparedTxn::CommitInternal() { const bool do_one_write = !db_impl_->immutable_db_options().two_write_queues || disable_memtable; const bool publish_seq = do_one_write; + // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while + // CommitTimeWriteBatch commits with PreReleaseCallback. WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, !PREP_HEAP_SKIPPED, publish_seq); @@ -204,7 +216,8 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatch* dst_batch, std::map& comparators) : db_(db), - callback(wpt_db, snap_seq), + callback(wpt_db, snap_seq, + 0), // 0 disables min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators) {} @@ -285,6 +298,10 @@ Status WritePreparedTxn::RollbackInternal() { const size_t ONE_BATCH = 1; WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); + // Note: the rollback batch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while + // the roolback batch commits with PreReleaseCallback. s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, do_one_write ? &update_commit_map : nullptr); @@ -335,6 +352,10 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, SequenceNumber* tracked_at_seq) { assert(snapshot_); + SequenceNumber min_uncommitted = + static_cast_with_check( + snapshot_.get()) + ->min_uncommitted_; SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); // tracked_at_seq is either max or the last snapshot with which this key was // trackeed so there is no need to apply the IsInSnapshot to this comparison @@ -351,12 +372,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); - WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq); + WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted); return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */, &snap_checker); } +void WritePreparedTxn::SetSnapshot() { + const bool FOR_WW_CONFLICT_CHECK = true; + SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK); + assert(snapshot); + wpt_db_->EnhanceSnapshot(snapshot); + SetSnapshotInternal(snapshot); +} + Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 50ce89930..6685c8278 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -61,6 +61,8 @@ class WritePreparedTxn : public PessimisticTransaction { virtual Iterator* GetIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; + virtual void SetSnapshot() override; + protected: // Override the protected SetId to make it visible to the friend class // WritePreparedTxnDB diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index c30ab46fd..89a60d537 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -21,6 +21,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" #include "util/mutexlock.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -149,11 +150,22 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; const size_t ZERO_PREPARES = 0; + // Since this is not 2pc, there is no need for AddPrepared but having it in + // the PreReleaseCallback enables an optimization. Refer to + // SmallestUnCommittedSeq for more details. + AddPreparedCallback add_prepared_callback( + this, batch_cnt, db_impl_->immutable_db_options().two_write_queues); WritePreparedCommitEntryPreReleaseCallback update_commit_map( this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); - auto s = db_impl_->WriteImpl( - write_options, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE, - &seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr); + PreReleaseCallback* pre_release_callback; + if (do_one_write) { + pre_release_callback = &update_commit_map; + } else { + pre_release_callback = &add_prepared_callback; + } + auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, + no_log_ref, !DISABLE_MEMTABLE, &seq_used, + batch_cnt, pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; if (txn != nullptr) { @@ -170,15 +182,12 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, prepare_seq); - // TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and - // readers assume the prepared data as committed? Almost zero probability. - // Commit the batch by writing an empty batch to the 2nd queue that will // release the commit sequence number to readers. const size_t ZERO_COMMITS = 0; const bool PREP_HEAP_SKIPPED = true; WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, PREP_HEAP_SKIPPED); + this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); const size_t ONE_BATCH = 1; @@ -197,10 +206,16 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, // We are fine with the latest committed value. This could be done by // specifying the snapshot as kMaxSequenceNumber. SequenceNumber seq = kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; if (options.snapshot != nullptr) { seq = options.snapshot->GetSequenceNumber(); + min_uncommitted = static_cast_with_check( + options.snapshot) + ->min_uncommitted_; + } else { + min_uncommitted = SmallestUnCommittedSeq(); } - WritePreparedTxnReadCallback callback(this, seq); + WritePreparedTxnReadCallback callback(this, seq, min_uncommitted); bool* dont_care = nullptr; // Note: no need to specify a snapshot for read options as no specific // snapshot is requested by the user. @@ -252,8 +267,9 @@ std::vector WritePreparedTxnDB::MultiGet( // Struct to hold ownership of snapshot and read callback for iterator cleanup. struct WritePreparedTxnDB::IteratorState { IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, - std::shared_ptr s) - : callback(txn_db, sequence), snapshot(s) {} + std::shared_ptr s, + SequenceNumber min_uncommitted) + : callback(txn_db, sequence, min_uncommitted), snapshot(s) {} WritePreparedTxnReadCallback callback; std::shared_ptr snapshot; @@ -271,18 +287,26 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, constexpr bool ALLOW_REFRESH = true; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; if (options.snapshot != nullptr) { snapshot_seq = options.snapshot->GetSequenceNumber(); + min_uncommitted = static_cast_with_check( + options.snapshot) + ->min_uncommitted_; } else { - auto* snapshot = db_impl_->GetSnapshot(); + auto* snapshot = GetSnapshot(); // We take a snapshot to make sure that the related data in the commit map // are not deleted. snapshot_seq = snapshot->GetSequenceNumber(); + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; own_snapshot = std::make_shared(db_impl_, snapshot); } assert(snapshot_seq != kMaxSequenceNumber); auto* cfd = reinterpret_cast(column_family)->cfd(); - auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* state = + new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); auto* db_iter = db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH); @@ -298,20 +322,28 @@ Status WritePreparedTxnDB::NewIterators( constexpr bool ALLOW_REFRESH = true; std::shared_ptr own_snapshot = nullptr; SequenceNumber snapshot_seq = kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; if (options.snapshot != nullptr) { snapshot_seq = options.snapshot->GetSequenceNumber(); + min_uncommitted = static_cast_with_check( + options.snapshot) + ->min_uncommitted_; } else { - auto* snapshot = db_impl_->GetSnapshot(); + auto* snapshot = GetSnapshot(); // We take a snapshot to make sure that the related data in the commit map // are not deleted. snapshot_seq = snapshot->GetSequenceNumber(); own_snapshot = std::make_shared(db_impl_, snapshot); + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; } iterators->clear(); iterators->reserve(column_families.size()); for (auto* column_family : column_families) { auto* cfd = reinterpret_cast(column_family)->cfd(); - auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* state = + new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); auto* db_iter = db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH); @@ -332,118 +364,6 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { new std::atomic[COMMIT_CACHE_SIZE] {}); } -// Returns true if commit_seq <= snapshot_seq -bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, - uint64_t snapshot_seq) const { - // Here we try to infer the return value without looking into prepare list. - // This would help avoiding synchronization over a shared map. - // TODO(myabandeh): optimize this. This sequence of checks must be correct but - // not necessary efficient - if (prep_seq == 0) { - // Compaction will output keys to bottom-level with sequence number 0 if - // it is visible to the earliest snapshot. - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 1); - return true; - } - if (snapshot_seq < prep_seq) { - // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 0); - return false; - } - if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { - // We should not normally reach here - ReadLock rl(&prepared_mutex_); - // TODO(myabandeh): also add a stat - ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, - static_cast(delayed_prepared_.size())); - if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { - // Then it is not committed yet - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 0); - return false; - } - } - auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; - CommitEntry64b dont_care; - CommitEntry cached; - bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); - if (exist && prep_seq == cached.prep_seq) { - // It is committed and also not evicted from commit cache - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); - return cached.commit_seq <= snapshot_seq; - } - // else it could be committed but not inserted in the map which could happen - // after recovery, or it could be committed and evicted by another commit, or - // never committed. - - // At this point we dont know if it was committed or it is still prepared - auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); - // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now - if (max_evicted_seq < prep_seq) { - // Not evicted from cache and also not present, so must be still prepared - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 0); - return false; - } - // When advancing max_evicted_seq_, we move older entires from prepared to - // delayed_prepared_. Also we move evicted entries from commit cache to - // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= - // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in - // old_commit_map_, iii) committed with no conflict with any snapshot. Case - // (i) delayed_prepared_ is checked above - if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case - // only (iii) is the case: committed - // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < - // snapshot_seq - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 1); - return true; - } - // else (ii) might be the case: check the commit data saved for this snapshot. - // If there was no overlapping commit entry, then it is committed with a - // commit_seq lower than any live snapshot, including snapshot_seq. - if (old_commit_map_empty_.load(std::memory_order_acquire)) { - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 1); - return true; - } - { - // We should not normally reach here unless sapshot_seq is old. This is a - // rare case and it is ok to pay the cost of mutex ReadLock for such old, - // reading transactions. - // TODO(myabandeh): also add a stat - ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); - ReadLock rl(&old_commit_map_mutex_); - auto prep_set_entry = old_commit_map_.find(snapshot_seq); - bool found = prep_set_entry != old_commit_map_.end(); - if (found) { - auto& vec = prep_set_entry->second; - found = std::binary_search(vec.begin(), vec.end(), prep_seq); - } - if (!found) { - ROCKS_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 1); - return true; - } - } - // (ii) it the case: it is committed but after the snapshot_seq - ROCKS_LOG_DETAILS(info_log_, - "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 0); - return false; -} - void WritePreparedTxnDB::AddPrepared(uint64_t seq) { ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); assert(seq > max_evicted_seq_); @@ -622,6 +542,14 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, }; } +const Snapshot* WritePreparedTxnDB::GetSnapshot() { + const bool FOR_WW_CONFLICT_CHECK = true; + SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK); + assert(snap_impl); + EnhanceSnapshot(snap_impl); + return snap_impl; +} + const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( SequenceNumber max) { ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 5de30ab8f..b620db077 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -6,6 +6,11 @@ #pragma once #ifndef ROCKSDB_LITE +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include #include @@ -110,8 +115,131 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Check whether the transaction that wrote the value with sequence number seq - // is visible to the snapshot with sequence number snapshot_seq - bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; + // is visible to the snapshot with sequence number snapshot_seq. + // Returns true if commit_seq <= snapshot_seq + inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, + uint64_t min_uncommitted = 0) const { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " min_uncommitted %" PRIu64, + prep_seq, snapshot_seq, min_uncommitted); + // Here we try to infer the return value without looking into prepare list. + // This would help avoiding synchronization over a shared map. + // TODO(myabandeh): optimize this. This sequence of checks must be correct + // but not necessary efficient + if (prep_seq == 0) { + // Compaction will output keys to bottom-level with sequence number 0 if + // it is visible to the earliest snapshot. + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + if (snapshot_seq < prep_seq) { + // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + // We should not normally reach here + ReadLock rl(&prepared_mutex_); + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, + static_cast(delayed_prepared_.size())); + if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { + // Then it is not committed yet + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + } + // Note: since min_uncommitted does not include the delayed_prepared_ we + // should check delayed_prepared_ first before applying this optimization. + // TODO(myabandeh): include delayed_prepared_ in min_uncommitted + if (prep_seq < min_uncommitted) { + return true; + } + auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; + CommitEntry64b dont_care; + CommitEntry cached; + bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + if (exist && prep_seq == cached.prep_seq) { + // It is committed and also not evicted from commit cache + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); + return cached.commit_seq <= snapshot_seq; + } + // else it could be committed but not inserted in the map which could happen + // after recovery, or it could be committed and evicted by another commit, + // or never committed. + + // At this point we dont know if it was committed or it is still prepared + auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); + // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now + if (max_evicted_seq < prep_seq) { + // Not evicted from cache and also not present, so must be still prepared + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + // When advancing max_evicted_seq_, we move older entires from prepared to + // delayed_prepared_. Also we move evicted entries from commit cache to + // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= + // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in + // old_commit_map_, iii) committed with no conflict with any snapshot. Case + // (i) delayed_prepared_ is checked above + if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case + // only (iii) is the case: committed + // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < + // snapshot_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + // else (ii) might be the case: check the commit data saved for this + // snapshot. If there was no overlapping commit entry, then it is committed + // with a commit_seq lower than any live snapshot, including snapshot_seq. + if (old_commit_map_empty_.load(std::memory_order_acquire)) { + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + { + // We should not normally reach here unless sapshot_seq is old. This is a + // rare case and it is ok to pay the cost of mutex ReadLock for such old, + // reading transactions. + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); + ReadLock rl(&old_commit_map_mutex_); + auto prep_set_entry = old_commit_map_.find(snapshot_seq); + bool found = prep_set_entry != old_commit_map_.end(); + if (found) { + auto& vec = prep_set_entry->second; + found = std::binary_search(vec.begin(), vec.end(), prep_seq); + } + if (!found) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 1); + return true; + } + } + // (ii) it the case: it is committed but after the snapshot_seq + ROCKS_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } + // Add the transaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq @@ -224,6 +352,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const std::vector& handles) override; void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override; + virtual const Snapshot* GetSnapshot() override; + protected: virtual Status VerifyCFOptions( const ColumnFamilyOptions& cf_options) override; @@ -239,6 +369,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_EmptyAtTheEnd_Test; friend class PreparedHeap_Concurrent_Test; + friend class WritePreparedTxn; friend class WritePreparedTxnDBMock; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class @@ -336,6 +467,32 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, const SequenceNumber& new_max); + inline SequenceNumber SmallestUnCommittedSeq() { + // Since we update the prepare_heap always from the main write queue via + // PreReleaseCallback, the prepared_txns_.top() indicates the smallest + // prepared data in 2pc transactions. For non-2pc transactions that are + // written in two steps, we also update prepared_txns_ at the first step + // (via the same mechanism) so that their uncommitted data is reflected in + // SmallestUnCommittedSeq. + ReadLock rl(&prepared_mutex_); + // Since we are holding the mutex, and GetLatestSequenceNumber is updated + // after prepared_txns_ are, the value of GetLatestSequenceNumber would + // reflect any uncommitted data that is not added to prepared_txns_ yet. + // Otherwise, if there is no concurrent txn, this value simply reflects that + // latest value in the memtable. + if (prepared_txns_.empty()) { + return db_impl_->GetLatestSequenceNumber() + 1; + } else { + return std::min(prepared_txns_.top(), + db_impl_->GetLatestSequenceNumber() + 1); + } + } + // Enhance the snapshot object by recording in it the smallest uncommitted seq + inline void EnhanceSnapshot(SnapshotImpl* snapshot) { + assert(snapshot); + snapshot->min_uncommitted_ = WritePreparedTxnDB::SmallestUnCommittedSeq(); + } + virtual const std::vector GetSnapshotListFromDB( SequenceNumber max); @@ -438,18 +595,44 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { class WritePreparedTxnReadCallback : public ReadCallback { public: - WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) - : db_(db), snapshot_(snapshot) {} + WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, + SequenceNumber min_uncommitted) + : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} // Will be called to see if the seq number accepted; if not it moves on to the // next seq number. - virtual bool IsCommitted(SequenceNumber seq) override { - return db_->IsInSnapshot(seq, snapshot_); + inline virtual bool IsCommitted(SequenceNumber seq) override { + return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); } private: WritePreparedTxnDB* db_; SequenceNumber snapshot_; + SequenceNumber min_uncommitted_; +}; + +class AddPreparedCallback : public PreReleaseCallback { + public: + AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt, + bool two_write_queues) + : db_(db), + sub_batch_cnt_(sub_batch_cnt), + two_write_queues_(two_write_queues) { + (void)two_write_queues_; // to silence unused private field warning + } + virtual Status Callback(SequenceNumber prepare_seq, + bool is_mem_disabled) override { + assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue + for (size_t i = 0; i < sub_batch_cnt_; i++) { + db_->AddPrepared(prepare_seq + i); + } + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + size_t sub_batch_cnt_; + bool two_write_queues_; }; class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {