// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #ifndef ROCKSDB_LITE #include #include #include #include #include #include #include "db/read_callback.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/write_prepared_txn.h" namespace rocksdb { class PessimisticTransactionDB : public TransactionDB { public: explicit PessimisticTransactionDB(DB* db, const TransactionDBOptions& txn_db_options); explicit PessimisticTransactionDB(StackableDB* db, const TransactionDBOptions& txn_db_options); virtual ~PessimisticTransactionDB(); Status Initialize(const std::vector& compaction_enabled_cf_indices, const std::vector& handles); Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override = 0; using StackableDB::Put; virtual Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) override; using StackableDB::Delete; virtual Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) override; using StackableDB::Merge; virtual Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; using StackableDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; using StackableDB::CreateColumnFamily; virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) override; using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive); void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys); void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key); void AddColumnFamily(const ColumnFamilyHandle* handle); static TransactionDBOptions ValidateTxnDBOptions( const TransactionDBOptions& txn_db_options); const TransactionDBOptions& GetTxnDBOptions() const { return txn_db_options_; } void InsertExpirableTransaction(TransactionID tx_id, PessimisticTransaction* tx); void RemoveExpirableTransaction(TransactionID tx_id); // If transaction is no longer available, locks can be stolen // If transaction is available, try stealing locks directly from transaction // It is the caller's responsibility to ensure that the referred transaction // is expirable (GetExpirationTime() > 0) and that it is expired. bool TryStealingExpiredTransactionLocks(TransactionID tx_id); Transaction* GetTransactionByName(const TransactionName& name) override; void RegisterTransaction(Transaction* txn); void UnregisterTransaction(Transaction* txn); // not thread safe. current use case is during recovery (single thread) void GetAllPreparedTransactions(std::vector* trans) override; TransactionLockMgr::LockStatusData GetLockStatusData() override; std::vector GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; struct CommitEntry { uint64_t prep_seq; uint64_t commit_seq; CommitEntry() : prep_seq(0), commit_seq(0) {} CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} bool operator==(const CommitEntry& rhs) const { return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; } }; protected: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); DBImpl* db_impl_; std::shared_ptr info_log_; private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; Transaction* BeginInternalTransaction(const WriteOptions& options); // Used to ensure that no locks are stolen from an expirable transaction // that has started a commit. Only transactions with an expiration time // should be in this map. std::mutex map_mutex_; std::unordered_map expirable_transactions_map_; // map from name to two phase transaction instance std::mutex name_map_mutex_; std::unordered_map transactions_; }; // A PessimisticTransactionDB that writes the data to the DB after the commit. // In this way the DB only contains the committed data. class WriteCommittedTxnDB : public PessimisticTransactionDB { public: explicit WriteCommittedTxnDB(DB* db, const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options) {} explicit WriteCommittedTxnDB(StackableDB* db, const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options) {} virtual ~WriteCommittedTxnDB() {} Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; }; // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. // In this way some data in the DB might not be committed. The DB provides // mechanisms to tell such data apart from committed data. class WritePreparedTxnDB : public PessimisticTransactionDB { public: explicit WritePreparedTxnDB( DB* db, const TransactionDBOptions& txn_db_options, size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), SNAPSHOT_CACHE_SIZE(snapshot_cache_size), COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } explicit WritePreparedTxnDB( StackableDB* db, const TransactionDBOptions& txn_db_options, size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), SNAPSHOT_CACHE_SIZE(snapshot_cache_size), COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } virtual ~WritePreparedTxnDB() {} Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); // Add the trasnaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); // Add the transaction with prepare sequence prepare_seq and commit sequence // commit_seq to the commit map void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CommitMapTest_Test; friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; friend class WritePreparedTransactionTest; friend class PreparedHeap_BasicsTest_Test; friend class WritePreparedTxnDBMock; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; void init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps // around. INC_STEP_FOR_MAX_EVICTED = std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); snapshot_cache_ = unique_ptr[]>( new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = unique_ptr(new CommitEntry[COMMIT_CACHE_SIZE]{}); } // A heap with the amortized O(1) complexity for erase. It uses one extra heap // to keep track of erased entries that are not yet on top of the main heap. class PreparedHeap { std::priority_queue, std::greater> heap_; std::priority_queue, std::greater> erased_heap_; public: bool empty() { return heap_.empty(); } uint64_t top() { return heap_.top(); } void push(uint64_t v) { heap_.push(v); } void pop() { heap_.pop(); while (!heap_.empty() && !erased_heap_.empty() && heap_.top() == erased_heap_.top()) { heap_.pop(); erased_heap_.pop(); } while (heap_.empty() && !erased_heap_.empty()) { erased_heap_.pop(); } } void erase(uint64_t seq) { if (!heap_.empty()) { if (seq < heap_.top()) { // Already popped, ignore it. } else if (heap_.top() == seq) { pop(); } else { // (heap_.top() > seq) // Down the heap, remember to pop it later erased_heap_.push(seq); } } } }; // Get the commit entry with index indexed_seq from the commit table. It // returns true if such entry exists. bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry); // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry . If the rewrite results into eviction, // sets the evicted_entry and returns true. bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, CommitEntry* evicted_entry); // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry new_entry only if the existing entry matches the // expected_entry. Returns false otherwise. bool ExchangeCommitEntry(const uint64_t indexed_seq, const CommitEntry& expected_entry, const CommitEntry& new_entry); // Increase max_evicted_seq_ from the previous value prev_max to the new // value. This also involves taking care of prepared txns that are not // committed before new_max, as well as updating the list of live snapshots at // the time of updating the max. Thread-safety: this function can be called // concurrently. The concurrent invocations of this function is equivalent to // a serial invocation in which the last invocation is the one with the // largetst new_max value. void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); virtual const std::vector GetSnapshotListFromDB( SequenceNumber max); // Update the list of snapshots corresponding to the soon-to-be-updated // max_eviceted_seq_. Thread-safety: this function can be called concurrently. // The concurrent invocations of this function is equivalent to a serial // invocation in which the last invocation is the one with the largetst // version value. void UpdateSnapshots(const std::vector& snapshots, const SequenceNumber& version); // Check an evicted entry against live snapshots to see if it should be kept // around or it can be safely discarded (and hence assume committed for all // snapshots). Thread-safety: this function can be called concurrently. If it // is called concurrently with multiple UpdateSnapshots, the result is the // same as checking the intersection of the snapshot list before updates with // the snapshot list of all the concurrent updates. void CheckAgainstSnapshots(const CommitEntry& evicted); // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < // commit_seq. Return false if checking the next snapshot(s) is not needed. // This is the case if the entry already added to old_commit_map_ or none of // the next snapshots could satisfy the condition. next_is_larger: the next // snapshot will be a larger value bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, const uint64_t& commit_seq, const uint64_t& snapshot_seq, const bool next_is_larger); // The list of live snapshots at the last time that max_evicted_seq_ advanced. // The list stored into two data structures: in snapshot_cache_ that is // efficient for concurrent reads, and in snapshots_ if the data does not fit // into snapshot_cache_. The total number of snapshots in the two lists std::atomic snapshots_total_ = {}; // The list sorted in ascending order. Thread-safety for writes is provided // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // each entry. In x86_64 architecture such reads are compiled to simple read // instructions. 128 entries static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast(1 << 7); const size_t SNAPSHOT_CACHE_SIZE; unique_ptr[]> snapshot_cache_; // 2nd list for storing snapshots. The list sorted in ascending order. // Thread-safety is provided with snapshots_mutex_. std::vector snapshots_; // The version of the latest list of snapshots. This can be used to avoid // rewrittiing a list that is concurrently updated with a more recent version. SequenceNumber snapshots_version_ = 0; // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; // 10m entry, 80MB size static const size_t DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); const size_t COMMIT_CACHE_SIZE; // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. unique_ptr commit_cache_; // The largest evicted *commit* sequence number from the commit_cache_ std::atomic max_evicted_seq_ = {}; // Advance max_evicted_seq_ by this value each time it needs an update. The // larger the value, the less frequent advances we would have. We do not want // it to be too large either as it would cause stalls by doing too much // maintenance work under the lock. size_t INC_STEP_FOR_MAX_EVICTED = 1; // A map of the evicted entries from commit_cache_ that has to be kept around // to service the old snapshots. This is expected to be empty normally. // Thread-safety is provided with old_commit_map_mutex_. std::map old_commit_map_; // A set of long-running prepared transactions that are not finished by the // time max_evicted_seq_ advances their sequence number. This is expected to // be empty normally. Thread-safety is provided with prepared_mutex_. std::set delayed_prepared_; // Update when delayed_prepared_.empty() changes. Expected to be true // normally. std::atomic delayed_prepared_empty_ = {true}; // Update when old_commit_map_.empty() changes. Expected to be true normally. std::atomic old_commit_map_empty_ = {true}; port::RWMutex prepared_mutex_; port::RWMutex old_commit_map_mutex_; port::RWMutex commit_cache_mutex_; port::RWMutex snapshots_mutex_; }; class WritePreparedTxnReadCallback : public ReadCallback { public: WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) : db_(db), snapshot_(snapshot) {} // Will be called to see if the seq number accepted; if not it moves on to the // next seq number. virtual bool IsCommitted(SequenceNumber seq) override { return db_->IsInSnapshot(seq, snapshot_); } private: WritePreparedTxnDB* db_; SequenceNumber snapshot_; }; } // namespace rocksdb #endif // ROCKSDB_LITE