diff --git a/CMakeLists.txt b/CMakeLists.txt index d81c94048..1979b3d54 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -554,6 +554,7 @@ set(SOURCES utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc + utilities/transactions/write_prepared_txn_db.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -576,12 +577,12 @@ if(WIN32) port/win/win_logger.cc port/win/win_thread.cc port/win/xpress_win.cc) - + if(WITH_JEMALLOC) list(APPEND SOURCES port/win/win_jemalloc.cc) endif() - + else() list(APPEND SOURCES port/port_posix.cc diff --git a/TARGETS b/TARGETS index e4250ae0b..971250269 100644 --- a/TARGETS +++ b/TARGETS @@ -256,6 +256,7 @@ cpp_library( "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", "utilities/transactions/write_prepared_txn.cc", + "utilities/transactions/write_prepared_txn_db.cc", "utilities/ttl/db_ttl_impl.cc", "utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc", diff --git a/src.mk b/src.mk index 6753197a1..dc9e571fc 100644 --- a/src.mk +++ b/src.mk @@ -202,6 +202,7 @@ LIB_SOURCES = \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ utilities/transactions/write_prepared_txn.cc \ + utilities/transactions/write_prepared_txn_db.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index eed413778..335f05dc6 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -11,7 +11,6 @@ #include "utilities/transactions/pessimistic_transaction_db.h" -#include #include #include #include @@ -25,6 +24,7 @@ #include "util/sync_point.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" +#include "utilities/transactions/write_prepared_txn_db.h" namespace rocksdb { @@ -135,26 +135,6 @@ Status PessimisticTransactionDB::Initialize( return s; } -Status WritePreparedTxnDB::Initialize( - const std::vector& compaction_enabled_cf_indices, - const std::vector& handles) { - auto dbimpl = reinterpret_cast(GetRootDB()); - assert(dbimpl != nullptr); - auto rtxns = dbimpl->recovered_transactions(); - for (auto rtxn : rtxns) { - AddPrepared(rtxn.second->seq_); - } - SequenceNumber prev_max = max_evicted_seq_; - SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); - AdvanceMaxEvictedSeq(prev_max, last_seq); - - db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); - - auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, - handles); - return s; -} - Transaction* WriteCommittedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { @@ -166,17 +146,6 @@ Transaction* WriteCommittedTxnDB::BeginTransaction( } } -Transaction* WritePreparedTxnDB::BeginTransaction( - const WriteOptions& write_options, const TransactionOptions& txn_options, - Transaction* old_txn) { - if (old_txn != nullptr) { - ReinitializeTransaction(old_txn, write_options, txn_options); - return old_txn; - } else { - return new WritePreparedTxn(this, write_options, txn_options); - } -} - TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( const TransactionDBOptions& txn_db_options) { TransactionDBOptions validated = txn_db_options; @@ -571,458 +540,5 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { transactions_.erase(it); } -Status WritePreparedTxnDB::Get(const ReadOptions& options, - ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value) { - // We are fine with the latest committed value. This could be done by - // specifying the snapshot as kMaxSequenceNumber. - SequenceNumber seq = kMaxSequenceNumber; - if (options.snapshot != nullptr) { - seq = options.snapshot->GetSequenceNumber(); - } - WritePreparedTxnReadCallback callback(this, seq); - bool* dont_care = nullptr; - // Note: no need to specify a snapshot for read options as no specific - // snapshot is requested by the user. - return db_impl_->GetImpl(options, column_family, key, value, dont_care, - &callback); -} - -// Struct to hold ownership of snapshot and read callback for iterator cleanup. -struct WritePreparedTxnDB::IteratorState { - IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, - std::shared_ptr s) - : callback(txn_db, sequence), snapshot(s) {} - - WritePreparedTxnReadCallback callback; - std::shared_ptr snapshot; -}; - -namespace { -static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) { - delete reinterpret_cast(arg1); -} -} // anonymous namespace - -Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, - ColumnFamilyHandle* column_family) { - std::shared_ptr own_snapshot = nullptr; - SequenceNumber snapshot_seq = kMaxSequenceNumber; - if (options.snapshot != nullptr) { - snapshot_seq = options.snapshot->GetSequenceNumber(); - } else { - auto* snapshot = db_impl_->GetSnapshot(); - snapshot_seq = snapshot->GetSequenceNumber(); - own_snapshot = std::make_shared(db_impl_, snapshot); - } - assert(snapshot_seq != kMaxSequenceNumber); - auto* cfd = reinterpret_cast(column_family)->cfd(); - auto* state = new IteratorState(this, snapshot_seq, own_snapshot); - auto* db_iter = - db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); - db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); - return db_iter; -} - -Status WritePreparedTxnDB::NewIterators( - const ReadOptions& options, - const std::vector& column_families, - std::vector* iterators) { - std::shared_ptr own_snapshot = nullptr; - SequenceNumber snapshot_seq = kMaxSequenceNumber; - if (options.snapshot != nullptr) { - snapshot_seq = options.snapshot->GetSequenceNumber(); - } else { - auto* snapshot = db_impl_->GetSnapshot(); - snapshot_seq = snapshot->GetSequenceNumber(); - own_snapshot = std::make_shared(db_impl_, snapshot); - } - iterators->clear(); - iterators->reserve(column_families.size()); - for (auto* column_family : column_families) { - auto* cfd = reinterpret_cast(column_family)->cfd(); - auto* state = new IteratorState(this, snapshot_seq, own_snapshot); - auto* db_iter = - db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); - db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); - iterators->push_back(db_iter); - } - return Status::OK(); -} - -void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { - // Adcance max_evicted_seq_ no more than 100 times before the cache wraps - // around. - INC_STEP_FOR_MAX_EVICTED = - std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); - snapshot_cache_ = unique_ptr[]>( - new std::atomic[SNAPSHOT_CACHE_SIZE] {}); - commit_cache_ = unique_ptr[]>( - new std::atomic[COMMIT_CACHE_SIZE] {}); -} - -// Returns true if commit_seq <= snapshot_seq -bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, - uint64_t snapshot_seq) const { - // Here we try to infer the return value without looking into prepare list. - // This would help avoiding synchronization over a shared map. - // TODO(myabandeh): read your own writes - // TODO(myabandeh): optimize this. This sequence of checks must be correct but - // not necessary efficient - if (prep_seq == 0) { - // Compaction will output keys to bottom-level with sequence number 0 if - // it is visible to the earliest snapshot. - return true; - } - if (snapshot_seq < prep_seq) { - // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq - return false; - } - if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { - // We should not normally reach here - ReadLock rl(&prepared_mutex_); - if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { - // Then it is not committed yet - return false; - } - } - auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; - CommitEntry64b dont_care; - CommitEntry cached; - bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); - if (exist && prep_seq == cached.prep_seq) { - // It is committed and also not evicted from commit cache - return cached.commit_seq <= snapshot_seq; - } - // else it could be committed but not inserted in the map which could happen - // after recovery, or it could be committed and evicted by another commit, or - // never committed. - - // At this point we dont know if it was committed or it is still prepared - auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); - if (max_evicted_seq < prep_seq) { - // Not evicted from cache and also not present, so must be still prepared - return false; - } - // When advancing max_evicted_seq_, we move older entires from prepared to - // delayed_prepared_. Also we move evicted entries from commit cache to - // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= - // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in - // old_commit_map_, iii) committed with no conflict with any snapshot (i) - // delayed_prepared_ is checked above - if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case - // only (iii) is the case: committed - // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < - // snapshot_seq - return true; - } - // else (ii) might be the case: check the commit data saved for this snapshot. - // If there was no overlapping commit entry, then it is committed with a - // commit_seq lower than any live snapshot, including snapshot_seq. - if (old_commit_map_empty_.load(std::memory_order_acquire)) { - return true; - } - { - // We should not normally reach here - ReadLock rl(&old_commit_map_mutex_); - auto old_commit_entry = old_commit_map_.find(prep_seq); - if (old_commit_entry == old_commit_map_.end() || - old_commit_entry->second <= snapshot_seq) { - return true; - } - } - // (ii) it the case: it is committed but after the snapshot_seq - return false; -} - -void WritePreparedTxnDB::AddPrepared(uint64_t seq) { - ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); - // TODO(myabandeh): Add a runtime check to ensure the following assert. - assert(seq > max_evicted_seq_); - WriteLock wl(&prepared_mutex_); - prepared_txns_.push(seq); -} - -void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, - uint64_t rollback_seq) { - ROCKS_LOG_DEBUG( - info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", - prep_seq, rollback_seq); - std::vector snapshots = - GetSnapshotListFromDB(kMaxSequenceNumber); - // TODO(myabandeh): currently we are assuming that there is no snapshot taken - // when a transaciton is rolled back. This is the case the way MySQL does - // rollback which is after recovery. We should extend it to be able to - // rollback txns that overlap with exsiting snapshots. - assert(snapshots.size() == 0); - if (snapshots.size()) { - throw std::runtime_error( - "Rollback reqeust while there are live snapshots."); - } - WriteLock wl(&prepared_mutex_); - prepared_txns_.erase(prep_seq); - bool was_empty = delayed_prepared_.empty(); - if (!was_empty) { - delayed_prepared_.erase(prep_seq); - bool is_empty = delayed_prepared_.empty(); - if (was_empty != is_empty) { - delayed_prepared_empty_.store(is_empty, std::memory_order_release); - } - } -} - -void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, - uint64_t commit_seq) { - ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, - prepare_seq, commit_seq); - auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; - CommitEntry64b evicted_64b; - CommitEntry evicted; - bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); - if (to_be_evicted) { - auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); - if (prev_max < evicted.commit_seq) { - // Inc max in larger steps to avoid frequent updates - auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; - AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); - } - // After each eviction from commit cache, check if the commit entry should - // be kept around because it overlaps with a live snapshot. - CheckAgainstSnapshots(evicted); - } - bool succ = - ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); - if (!succ) { - // A very rare event, in which the commit entry is updated before we do. - // Here we apply a very simple solution of retrying. - // TODO(myabandeh): do precautions to detect bugs that cause infinite loops - AddCommitted(prepare_seq, commit_seq); - return; - } - { - WriteLock wl(&prepared_mutex_); - prepared_txns_.erase(prepare_seq); - bool was_empty = delayed_prepared_.empty(); - if (!was_empty) { - delayed_prepared_.erase(prepare_seq); - bool is_empty = delayed_prepared_.empty(); - if (was_empty != is_empty) { - delayed_prepared_empty_.store(is_empty, std::memory_order_release); - } - } - } -} - -bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, - CommitEntry64b* entry_64b, - CommitEntry* entry) const { - *entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire); - bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); - return valid; -} - -bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, - const CommitEntry& new_entry, - CommitEntry* evicted_entry) { - CommitEntry64b new_entry_64b(new_entry, FORMAT); - CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange( - new_entry_64b, std::memory_order_acq_rel); - bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); - return valid; -} - -bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, - CommitEntry64b& expected_entry_64b, - const CommitEntry& new_entry) { - auto& atomic_entry = commit_cache_[indexed_seq]; - CommitEntry64b new_entry_64b(new_entry, FORMAT); - bool succ = atomic_entry.compare_exchange_strong( - expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, - std::memory_order_acquire); - return succ; -} - -void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, - SequenceNumber& new_max) { - // When max_evicted_seq_ advances, move older entries from prepared_txns_ - // to delayed_prepared_. This guarantees that if a seq is lower than max, - // then it is not in prepared_txns_ ans save an expensive, synchronized - // lookup from a shared set. delayed_prepared_ is expected to be empty in - // normal cases. - { - WriteLock wl(&prepared_mutex_); - while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { - auto to_be_popped = prepared_txns_.top(); - delayed_prepared_.insert(to_be_popped); - prepared_txns_.pop(); - delayed_prepared_empty_.store(false, std::memory_order_release); - } - } - - // With each change to max_evicted_seq_ fetch the live snapshots behind it. - // We use max as the version of snapshots to identify how fresh are the - // snapshot list. This works because the snapshots are between 0 and - // max, so the larger the max, the more complete they are. - SequenceNumber new_snapshots_version = new_max; - std::vector snapshots; - bool update_snapshots = false; - if (new_snapshots_version > snapshots_version_) { - // This is to avoid updating the snapshots_ if it already updated - // with a more recent vesion by a concrrent thread - update_snapshots = true; - // We only care about snapshots lower then max - snapshots = GetSnapshotListFromDB(new_max); - } - if (update_snapshots) { - UpdateSnapshots(snapshots, new_snapshots_version); - } - while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( - prev_max, new_max, std::memory_order_acq_rel, - std::memory_order_relaxed)) { - }; -} - -const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( - SequenceNumber max) { - InstrumentedMutex(db_impl_->mutex()); - return db_impl_->snapshots().GetAll(nullptr, max); -} - -void WritePreparedTxnDB::UpdateSnapshots( - const std::vector& snapshots, - const SequenceNumber& version) { - TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); - TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); -#ifndef NDEBUG - size_t sync_i = 0; -#endif - WriteLock wl(&snapshots_mutex_); - snapshots_version_ = version; - // We update the list concurrently with the readers. - // Both new and old lists are sorted and the new list is subset of the - // previous list plus some new items. Thus if a snapshot repeats in - // both new and old lists, it will appear upper in the new list. So if - // we simply insert the new snapshots in order, if an overwritten item - // is still valid in the new list is either written to the same place in - // the array or it is written in a higher palce before it gets - // overwritten by another item. This guarantess a reader that reads the - // list bottom-up will eventaully see a snapshot that repeats in the - // update, either before it gets overwritten by the writer or - // afterwards. - size_t i = 0; - auto it = snapshots.begin(); - for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) { - snapshot_cache_[i].store(*it, std::memory_order_release); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); - } -#ifndef NDEBUG - // Release the remaining sync points since they are useless given that the - // reader would also use lock to access snapshots - for (++sync_i; sync_i <= 10; ++sync_i) { - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); - } -#endif - snapshots_.clear(); - for (; it != snapshots.end(); it++) { - // Insert them to a vector that is less efficient to access - // concurrently - snapshots_.push_back(*it); - } - // Update the size at the end. Otherwise a parallel reader might read - // items that are not set yet. - snapshots_total_.store(snapshots.size(), std::memory_order_release); - TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); - TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); -} - -void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { - TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); - TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); -#ifndef NDEBUG - size_t sync_i = 0; -#endif - // First check the snapshot cache that is efficient for concurrent access - auto cnt = snapshots_total_.load(std::memory_order_acquire); - // The list might get updated concurrently as we are reading from it. The - // reader should be able to read all the snapshots that are still valid - // after the update. Since the survived snapshots are written in a higher - // place before gets overwritten the reader that reads bottom-up will - // eventully see it. - const bool next_is_larger = true; - SequenceNumber snapshot_seq = kMaxSequenceNumber; - size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); - for (; 0 < ip1; ip1--) { - snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", - ++sync_i); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq, !next_is_larger)) { - break; - } - } -#ifndef NDEBUG - // Release the remaining sync points before accquiring the lock - for (++sync_i; sync_i <= 10; ++sync_i) { - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); - TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); - } -#endif - TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); - TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); - if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && - snapshot_seq < evicted.prep_seq)) { - // Then access the less efficient list of snapshots_ - ReadLock rl(&snapshots_mutex_); - // Items could have moved from the snapshots_ to snapshot_cache_ before - // accquiring the lock. To make sure that we do not miss a valid snapshot, - // read snapshot_cache_ again while holding the lock. - for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { - snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq, next_is_larger)) { - break; - } - } - for (auto snapshot_seq_2 : snapshots_) { - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq_2, next_is_larger)) { - break; - } - } - } -} - -bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( - const uint64_t& prep_seq, const uint64_t& commit_seq, - const uint64_t& snapshot_seq, const bool next_is_larger = true) { - // If we do not store an entry in old_commit_map we assume it is committed in - // all snapshots. if commit_seq <= snapshot_seq, it is considered already in - // the snapshot so we need not to keep the entry around for this snapshot. - if (commit_seq <= snapshot_seq) { - // continue the search if the next snapshot could be smaller than commit_seq - return !next_is_larger; - } - // then snapshot_seq < commit_seq - if (prep_seq <= snapshot_seq) { // overlapping range - WriteLock wl(&old_commit_map_mutex_); - old_commit_map_empty_.store(false, std::memory_order_release); - old_commit_map_[prep_seq] = commit_seq; - // Storing once is enough. No need to check it for other snapshots. - return false; - } - // continue the search if the next snapshot could be larger than prep_seq - return next_is_larger; -} - -WritePreparedTxnDB::~WritePreparedTxnDB() { - // At this point there could be running compaction/flush holding a - // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB. - // Make sure those jobs finished before destructing WritePreparedTxnDB. - db_impl_->CancelAllBackgroundWork(true/*wait*/); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 312bd9efc..4311e88c3 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -162,339 +162,5 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { Transaction* old_txn) override; }; -// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. -// In this way some data in the DB might not be committed. The DB provides -// mechanisms to tell such data apart from committed data. -class WritePreparedTxnDB : public PessimisticTransactionDB { - public: - explicit WritePreparedTxnDB( - DB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, - size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) - : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_BITS(snapshot_cache_bits), - SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), - COMMIT_CACHE_BITS(commit_cache_bits), - COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), - FORMAT(COMMIT_CACHE_BITS) { - Init(txn_db_options); - } - - explicit WritePreparedTxnDB( - StackableDB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, - size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) - : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_BITS(snapshot_cache_bits), - SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), - COMMIT_CACHE_BITS(commit_cache_bits), - COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), - FORMAT(COMMIT_CACHE_BITS) { - Init(txn_db_options); - } - - virtual ~WritePreparedTxnDB(); - - virtual Status Initialize( - const std::vector& compaction_enabled_cf_indices, - const std::vector& handles) override; - - Transaction* BeginTransaction(const WriteOptions& write_options, - const TransactionOptions& txn_options, - Transaction* old_txn) override; - - using DB::Get; - virtual Status Get(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - PinnableSlice* value) override; - - using DB::NewIterator; - virtual Iterator* NewIterator(const ReadOptions& options, - ColumnFamilyHandle* column_family) override; - - using DB::NewIterators; - virtual Status NewIterators( - const ReadOptions& options, - const std::vector& column_families, - std::vector* iterators) override; - - // Check whether the transaction that wrote the value with seqeunce number seq - // is visible to the snapshot with sequence number snapshot_seq - bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; - // Add the trasnaction with prepare sequence seq to the prepared list - void AddPrepared(uint64_t seq); - // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq - // with which the additional data is written to cancel the txn effect. It can - // be used to idenitfy the snapshots that overlap with the rolled back txn. - void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); - // Add the transaction with prepare sequence prepare_seq and commit sequence - // commit_seq to the commit map - void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); - - struct CommitEntry { - uint64_t prep_seq; - uint64_t commit_seq; - CommitEntry() : prep_seq(0), commit_seq(0) {} - CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} - bool operator==(const CommitEntry& rhs) const { - return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; - } - }; - - struct CommitEntry64bFormat { - explicit CommitEntry64bFormat(size_t index_bits) - : INDEX_BITS(index_bits), - PREP_BITS(static_cast(64 - PAD_BITS - INDEX_BITS)), - COMMIT_BITS(static_cast(64 - PREP_BITS)), - COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)) {} - // Number of higher bits of a sequence number that is not used. They are - // used to encode the value type, ... - const size_t PAD_BITS = static_cast(8); - // Number of lower bits from prepare seq that can be skipped as they are - // implied by the index of the entry in the array - const size_t INDEX_BITS; - // Number of bits we use to encode the prepare seq - const size_t PREP_BITS; - // Number of bits we use to encode the commit seq. - const size_t COMMIT_BITS; - // Filter to encode/decode commit seq - const uint64_t COMMIT_FILTER; - }; - - // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... - // INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... - // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA - // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and - // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the - // bits that do not have to be encoded (will be provided externally) DELTA: - // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of - // index bits + PADs - struct CommitEntry64b { - constexpr CommitEntry64b() noexcept : rep_(0) {} - - CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) - : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} - - CommitEntry64b(const uint64_t ps, const uint64_t cs, - const CommitEntry64bFormat& format) { - assert(ps < static_cast( - (1ull << (format.PREP_BITS + format.INDEX_BITS)))); - assert(ps <= cs); - uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 - // zero is reserved for uninitialized entries - assert(0 < delta); - assert(delta < static_cast((1ull << format.COMMIT_BITS))); - rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; - rep_ = rep_ | delta; - } - - // Return false if the entry is empty - bool Parse(const uint64_t indexed_seq, CommitEntry* entry, - const CommitEntry64bFormat& format) { - uint64_t delta = rep_ & format.COMMIT_FILTER; - // zero is reserved for uninitialized entries - assert(delta < static_cast((1ull << format.COMMIT_BITS))); - if (delta == 0) { - return false; // initialized entry would have non-zero delta - } - - assert(indexed_seq < static_cast((1ull << format.INDEX_BITS))); - uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; - prep_up >>= format.PAD_BITS; - const uint64_t& prep_low = indexed_seq; - entry->prep_seq = prep_up | prep_low; - - entry->commit_seq = entry->prep_seq + delta - 1; - return true; - } - - private: - uint64_t rep_; - }; - - // Struct to hold ownership of snapshot and read callback for cleanup. - struct IteratorState; - - private: - friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; - friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; - friend class WritePreparedTransactionTest_CommitMapTest_Test; - friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; - friend class WritePreparedTransactionTest; - friend class PreparedHeap_BasicsTest_Test; - friend class WritePreparedTxnDBMock; - friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; - friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; - friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; - friend class WritePreparedTransactionTest_RollbackTest_Test; - - void Init(const TransactionDBOptions& /* unused */); - - // A heap with the amortized O(1) complexity for erase. It uses one extra heap - // to keep track of erased entries that are not yet on top of the main heap. - class PreparedHeap { - std::priority_queue, std::greater> - heap_; - std::priority_queue, std::greater> - erased_heap_; - - public: - bool empty() { return heap_.empty(); } - uint64_t top() { return heap_.top(); } - void push(uint64_t v) { heap_.push(v); } - void pop() { - heap_.pop(); - while (!heap_.empty() && !erased_heap_.empty() && - heap_.top() == erased_heap_.top()) { - heap_.pop(); - erased_heap_.pop(); - } - while (heap_.empty() && !erased_heap_.empty()) { - erased_heap_.pop(); - } - } - void erase(uint64_t seq) { - if (!heap_.empty()) { - if (seq < heap_.top()) { - // Already popped, ignore it. - } else if (heap_.top() == seq) { - pop(); - } else { // (heap_.top() > seq) - // Down the heap, remember to pop it later - erased_heap_.push(seq); - } - } - } - }; - - // Get the commit entry with index indexed_seq from the commit table. It - // returns true if such entry exists. - bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, - CommitEntry* entry) const; - - // Rewrite the entry with the index indexed_seq in the commit table with the - // commit entry . If the rewrite results into eviction, - // sets the evicted_entry and returns true. - bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, - CommitEntry* evicted_entry); - - // Rewrite the entry with the index indexed_seq in the commit table with the - // commit entry new_entry only if the existing entry matches the - // expected_entry. Returns false otherwise. - bool ExchangeCommitEntry(const uint64_t indexed_seq, - CommitEntry64b& expected_entry, - const CommitEntry& new_entry); - - // Increase max_evicted_seq_ from the previous value prev_max to the new - // value. This also involves taking care of prepared txns that are not - // committed before new_max, as well as updating the list of live snapshots at - // the time of updating the max. Thread-safety: this function can be called - // concurrently. The concurrent invocations of this function is equivalent to - // a serial invocation in which the last invocation is the one with the - // largetst new_max value. - void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); - - virtual const std::vector GetSnapshotListFromDB( - SequenceNumber max); - - // Update the list of snapshots corresponding to the soon-to-be-updated - // max_eviceted_seq_. Thread-safety: this function can be called concurrently. - // The concurrent invocations of this function is equivalent to a serial - // invocation in which the last invocation is the one with the largetst - // version value. - void UpdateSnapshots(const std::vector& snapshots, - const SequenceNumber& version); - - // Check an evicted entry against live snapshots to see if it should be kept - // around or it can be safely discarded (and hence assume committed for all - // snapshots). Thread-safety: this function can be called concurrently. If it - // is called concurrently with multiple UpdateSnapshots, the result is the - // same as checking the intersection of the snapshot list before updates with - // the snapshot list of all the concurrent updates. - void CheckAgainstSnapshots(const CommitEntry& evicted); - - // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < - // commit_seq. Return false if checking the next snapshot(s) is not needed. - // This is the case if the entry already added to old_commit_map_ or none of - // the next snapshots could satisfy the condition. next_is_larger: the next - // snapshot will be a larger value - bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, - const uint64_t& commit_seq, - const uint64_t& snapshot_seq, - const bool next_is_larger); - - // The list of live snapshots at the last time that max_evicted_seq_ advanced. - // The list stored into two data structures: in snapshot_cache_ that is - // efficient for concurrent reads, and in snapshots_ if the data does not fit - // into snapshot_cache_. The total number of snapshots in the two lists - std::atomic snapshots_total_ = {}; - // The list sorted in ascending order. Thread-safety for writes is provided - // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for - // each entry. In x86_64 architecture such reads are compiled to simple read - // instructions. 128 entries - static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast(7); - const size_t SNAPSHOT_CACHE_BITS; - const size_t SNAPSHOT_CACHE_SIZE; - unique_ptr[]> snapshot_cache_; - // 2nd list for storing snapshots. The list sorted in ascending order. - // Thread-safety is provided with snapshots_mutex_. - std::vector snapshots_; - // The version of the latest list of snapshots. This can be used to avoid - // rewrittiing a list that is concurrently updated with a more recent version. - SequenceNumber snapshots_version_ = 0; - - // A heap of prepared transactions. Thread-safety is provided with - // prepared_mutex_. - PreparedHeap prepared_txns_; - // 10m entry, 80MB size - static const size_t DEF_COMMIT_CACHE_BITS = static_cast(21); - const size_t COMMIT_CACHE_BITS; - const size_t COMMIT_CACHE_SIZE; - const CommitEntry64bFormat FORMAT; - // commit_cache_ must be initialized to zero to tell apart an empty index from - // a filled one. Thread-safety is provided with commit_cache_mutex_. - unique_ptr[]> commit_cache_; - // The largest evicted *commit* sequence number from the commit_cache_ - std::atomic max_evicted_seq_ = {}; - // Advance max_evicted_seq_ by this value each time it needs an update. The - // larger the value, the less frequent advances we would have. We do not want - // it to be too large either as it would cause stalls by doing too much - // maintenance work under the lock. - size_t INC_STEP_FOR_MAX_EVICTED = 1; - // A map of the evicted entries from commit_cache_ that has to be kept around - // to service the old snapshots. This is expected to be empty normally. - // Thread-safety is provided with old_commit_map_mutex_. - std::map old_commit_map_; - // A set of long-running prepared transactions that are not finished by the - // time max_evicted_seq_ advances their sequence number. This is expected to - // be empty normally. Thread-safety is provided with prepared_mutex_. - std::set delayed_prepared_; - // Update when delayed_prepared_.empty() changes. Expected to be true - // normally. - std::atomic delayed_prepared_empty_ = {true}; - // Update when old_commit_map_.empty() changes. Expected to be true normally. - std::atomic old_commit_map_empty_ = {true}; - mutable port::RWMutex prepared_mutex_; - mutable port::RWMutex old_commit_map_mutex_; - mutable port::RWMutex commit_cache_mutex_; - mutable port::RWMutex snapshots_mutex_; -}; - -class WritePreparedTxnReadCallback : public ReadCallback { - public: - WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) - : db_(db), snapshot_(snapshot) {} - - // Will be called to see if the seq number accepted; if not it moves on to the - // next seq number. - virtual bool IsCommitted(SequenceNumber seq) override { - return db_->IsInSnapshot(seq, snapshot_); - } - - private: - WritePreparedTxnDB* db_; - SequenceNumber snapshot_; -}; - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/snapshot_checker.cc b/utilities/transactions/snapshot_checker.cc index 6a718fa8e..4293feaea 100644 --- a/utilities/transactions/snapshot_checker.cc +++ b/utilities/transactions/snapshot_checker.cc @@ -9,7 +9,7 @@ #include #endif // ROCKSDB_LITE -#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/write_prepared_txn_db.h" namespace rocksdb { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index bbae310ed..84ea6cdf3 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -36,6 +36,7 @@ #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/write_prepared_txn_db.h" #include "port/port.h" diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 015cd58c6..d11d667c6 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -15,7 +15,7 @@ #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" #include "utilities/transactions/pessimistic_transaction.h" -#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/write_prepared_txn_db.h" namespace rocksdb { diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc new file mode 100644 index 000000000..b48b6a3a7 --- /dev/null +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -0,0 +1,515 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/transactions/write_prepared_txn_db.h" + +#include +#include +#include +#include + +#include "db/db_impl.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/mutexlock.h" +#include "util/sync_point.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace rocksdb { + +Status WritePreparedTxnDB::Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) { + auto dbimpl = reinterpret_cast(GetRootDB()); + assert(dbimpl != nullptr); + auto rtxns = dbimpl->recovered_transactions(); + for (auto rtxn : rtxns) { + AddPrepared(rtxn.second->seq_); + } + SequenceNumber prev_max = max_evicted_seq_; + SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); + AdvanceMaxEvictedSeq(prev_max, last_seq); + + db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); + + auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, + handles); + return s; +} + +Transaction* WritePreparedTxnDB::BeginTransaction( + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { + if (old_txn != nullptr) { + ReinitializeTransaction(old_txn, write_options, txn_options); + return old_txn; + } else { + return new WritePreparedTxn(this, write_options, txn_options); + } +} + +Status WritePreparedTxnDB::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) { + // We are fine with the latest committed value. This could be done by + // specifying the snapshot as kMaxSequenceNumber. + SequenceNumber seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + seq = options.snapshot->GetSequenceNumber(); + } + WritePreparedTxnReadCallback callback(this, seq); + bool* dont_care = nullptr; + // Note: no need to specify a snapshot for read options as no specific + // snapshot is requested by the user. + return db_impl_->GetImpl(options, column_family, key, value, dont_care, + &callback); +} + +// Struct to hold ownership of snapshot and read callback for iterator cleanup. +struct WritePreparedTxnDB::IteratorState { + IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, + std::shared_ptr s) + : callback(txn_db, sequence), snapshot(s) {} + + WritePreparedTxnReadCallback callback; + std::shared_ptr snapshot; +}; + +namespace { +static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) { + delete reinterpret_cast(arg1); +} +} // anonymous namespace + +Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared(db_impl_, snapshot); + } + assert(snapshot_seq != kMaxSequenceNumber); + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + return db_iter; +} + +Status WritePreparedTxnDB::NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) { + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + } else { + auto* snapshot = db_impl_->GetSnapshot(); + snapshot_seq = snapshot->GetSequenceNumber(); + own_snapshot = std::make_shared(db_impl_, snapshot); + } + iterators->clear(); + iterators->reserve(column_families.size()); + for (auto* column_family : column_families) { + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = new IteratorState(this, snapshot_seq, own_snapshot); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback); + db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); + iterators->push_back(db_iter); + } + return Status::OK(); +} + +void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { + // Adcance max_evicted_seq_ no more than 100 times before the cache wraps + // around. + INC_STEP_FOR_MAX_EVICTED = + std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); + snapshot_cache_ = unique_ptr[]>( + new std::atomic[SNAPSHOT_CACHE_SIZE] {}); + commit_cache_ = unique_ptr[]>( + new std::atomic[COMMIT_CACHE_SIZE] {}); +} + +// Returns true if commit_seq <= snapshot_seq +bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, + uint64_t snapshot_seq) const { + // Here we try to infer the return value without looking into prepare list. + // This would help avoiding synchronization over a shared map. + // TODO(myabandeh): read your own writes + // TODO(myabandeh): optimize this. This sequence of checks must be correct but + // not necessary efficient + if (prep_seq == 0) { + // Compaction will output keys to bottom-level with sequence number 0 if + // it is visible to the earliest snapshot. + return true; + } + if (snapshot_seq < prep_seq) { + // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq + return false; + } + if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + // We should not normally reach here + ReadLock rl(&prepared_mutex_); + if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { + // Then it is not committed yet + return false; + } + } + auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; + CommitEntry64b dont_care; + CommitEntry cached; + bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + if (exist && prep_seq == cached.prep_seq) { + // It is committed and also not evicted from commit cache + return cached.commit_seq <= snapshot_seq; + } + // else it could be committed but not inserted in the map which could happen + // after recovery, or it could be committed and evicted by another commit, or + // never committed. + + // At this point we dont know if it was committed or it is still prepared + auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); + if (max_evicted_seq < prep_seq) { + // Not evicted from cache and also not present, so must be still prepared + return false; + } + // When advancing max_evicted_seq_, we move older entires from prepared to + // delayed_prepared_. Also we move evicted entries from commit cache to + // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= + // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in + // old_commit_map_, iii) committed with no conflict with any snapshot (i) + // delayed_prepared_ is checked above + if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case + // only (iii) is the case: committed + // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < + // snapshot_seq + return true; + } + // else (ii) might be the case: check the commit data saved for this snapshot. + // If there was no overlapping commit entry, then it is committed with a + // commit_seq lower than any live snapshot, including snapshot_seq. + if (old_commit_map_empty_.load(std::memory_order_acquire)) { + return true; + } + { + // We should not normally reach here + ReadLock rl(&old_commit_map_mutex_); + auto old_commit_entry = old_commit_map_.find(prep_seq); + if (old_commit_entry == old_commit_map_.end() || + old_commit_entry->second <= snapshot_seq) { + return true; + } + } + // (ii) it the case: it is committed but after the snapshot_seq + return false; +} + +void WritePreparedTxnDB::AddPrepared(uint64_t seq) { + ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); + // TODO(myabandeh): Add a runtime check to ensure the following assert. + assert(seq > max_evicted_seq_); + WriteLock wl(&prepared_mutex_); + prepared_txns_.push(seq); +} + +void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, + uint64_t rollback_seq) { + ROCKS_LOG_DEBUG( + info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", + prep_seq, rollback_seq); + std::vector snapshots = + GetSnapshotListFromDB(kMaxSequenceNumber); + // TODO(myabandeh): currently we are assuming that there is no snapshot taken + // when a transaciton is rolled back. This is the case the way MySQL does + // rollback which is after recovery. We should extend it to be able to + // rollback txns that overlap with exsiting snapshots. + assert(snapshots.size() == 0); + if (snapshots.size()) { + throw std::runtime_error( + "Rollback reqeust while there are live snapshots."); + } + WriteLock wl(&prepared_mutex_); + prepared_txns_.erase(prep_seq); + bool was_empty = delayed_prepared_.empty(); + if (!was_empty) { + delayed_prepared_.erase(prep_seq); + bool is_empty = delayed_prepared_.empty(); + if (was_empty != is_empty) { + delayed_prepared_empty_.store(is_empty, std::memory_order_release); + } + } +} + +void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, + uint64_t commit_seq) { + ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, + prepare_seq, commit_seq); + auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; + CommitEntry64b evicted_64b; + CommitEntry evicted; + bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); + if (to_be_evicted) { + auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); + if (prev_max < evicted.commit_seq) { + // Inc max in larger steps to avoid frequent updates + auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; + AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); + } + // After each eviction from commit cache, check if the commit entry should + // be kept around because it overlaps with a live snapshot. + CheckAgainstSnapshots(evicted); + } + bool succ = + ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); + if (!succ) { + // A very rare event, in which the commit entry is updated before we do. + // Here we apply a very simple solution of retrying. + // TODO(myabandeh): do precautions to detect bugs that cause infinite loops + AddCommitted(prepare_seq, commit_seq); + return; + } + { + WriteLock wl(&prepared_mutex_); + prepared_txns_.erase(prepare_seq); + bool was_empty = delayed_prepared_.empty(); + if (!was_empty) { + delayed_prepared_.erase(prepare_seq); + bool is_empty = delayed_prepared_.empty(); + if (was_empty != is_empty) { + delayed_prepared_empty_.store(is_empty, std::memory_order_release); + } + } + } +} + +bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, + CommitEntry64b* entry_64b, + CommitEntry* entry) const { + *entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire); + bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); + return valid; +} + +bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, + const CommitEntry& new_entry, + CommitEntry* evicted_entry) { + CommitEntry64b new_entry_64b(new_entry, FORMAT); + CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange( + new_entry_64b, std::memory_order_acq_rel); + bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); + return valid; +} + +bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, + CommitEntry64b& expected_entry_64b, + const CommitEntry& new_entry) { + auto& atomic_entry = commit_cache_[indexed_seq]; + CommitEntry64b new_entry_64b(new_entry, FORMAT); + bool succ = atomic_entry.compare_exchange_strong( + expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, + std::memory_order_acquire); + return succ; +} + +void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, + SequenceNumber& new_max) { + // When max_evicted_seq_ advances, move older entries from prepared_txns_ + // to delayed_prepared_. This guarantees that if a seq is lower than max, + // then it is not in prepared_txns_ ans save an expensive, synchronized + // lookup from a shared set. delayed_prepared_ is expected to be empty in + // normal cases. + { + WriteLock wl(&prepared_mutex_); + while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { + auto to_be_popped = prepared_txns_.top(); + delayed_prepared_.insert(to_be_popped); + prepared_txns_.pop(); + delayed_prepared_empty_.store(false, std::memory_order_release); + } + } + + // With each change to max_evicted_seq_ fetch the live snapshots behind it. + // We use max as the version of snapshots to identify how fresh are the + // snapshot list. This works because the snapshots are between 0 and + // max, so the larger the max, the more complete they are. + SequenceNumber new_snapshots_version = new_max; + std::vector snapshots; + bool update_snapshots = false; + if (new_snapshots_version > snapshots_version_) { + // This is to avoid updating the snapshots_ if it already updated + // with a more recent vesion by a concrrent thread + update_snapshots = true; + // We only care about snapshots lower then max + snapshots = GetSnapshotListFromDB(new_max); + } + if (update_snapshots) { + UpdateSnapshots(snapshots, new_snapshots_version); + } + while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( + prev_max, new_max, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + }; +} + +const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( + SequenceNumber max) { + InstrumentedMutex(db_impl_->mutex()); + return db_impl_->snapshots().GetAll(nullptr, max); +} + +void WritePreparedTxnDB::UpdateSnapshots( + const std::vector& snapshots, + const SequenceNumber& version) { + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); +#ifndef NDEBUG + size_t sync_i = 0; +#endif + WriteLock wl(&snapshots_mutex_); + snapshots_version_ = version; + // We update the list concurrently with the readers. + // Both new and old lists are sorted and the new list is subset of the + // previous list plus some new items. Thus if a snapshot repeats in + // both new and old lists, it will appear upper in the new list. So if + // we simply insert the new snapshots in order, if an overwritten item + // is still valid in the new list is either written to the same place in + // the array or it is written in a higher palce before it gets + // overwritten by another item. This guarantess a reader that reads the + // list bottom-up will eventaully see a snapshot that repeats in the + // update, either before it gets overwritten by the writer or + // afterwards. + size_t i = 0; + auto it = snapshots.begin(); + for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) { + snapshot_cache_[i].store(*it, std::memory_order_release); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); + } +#ifndef NDEBUG + // Release the remaining sync points since they are useless given that the + // reader would also use lock to access snapshots + for (++sync_i; sync_i <= 10; ++sync_i) { + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); + } +#endif + snapshots_.clear(); + for (; it != snapshots.end(); it++) { + // Insert them to a vector that is less efficient to access + // concurrently + snapshots_.push_back(*it); + } + // Update the size at the end. Otherwise a parallel reader might read + // items that are not set yet. + snapshots_total_.store(snapshots.size(), std::memory_order_release); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); +} + +void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); +#ifndef NDEBUG + size_t sync_i = 0; +#endif + // First check the snapshot cache that is efficient for concurrent access + auto cnt = snapshots_total_.load(std::memory_order_acquire); + // The list might get updated concurrently as we are reading from it. The + // reader should be able to read all the snapshots that are still valid + // after the update. Since the survived snapshots are written in a higher + // place before gets overwritten the reader that reads bottom-up will + // eventully see it. + const bool next_is_larger = true; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); + for (; 0 < ip1; ip1--) { + snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", + ++sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, !next_is_larger)) { + break; + } + } +#ifndef NDEBUG + // Release the remaining sync points before accquiring the lock + for (++sync_i; sync_i <= 10; ++sync_i) { + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); + } +#endif + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); + if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && + snapshot_seq < evicted.prep_seq)) { + // Then access the less efficient list of snapshots_ + ReadLock rl(&snapshots_mutex_); + // Items could have moved from the snapshots_ to snapshot_cache_ before + // accquiring the lock. To make sure that we do not miss a valid snapshot, + // read snapshot_cache_ again while holding the lock. + for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { + snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, next_is_larger)) { + break; + } + } + for (auto snapshot_seq_2 : snapshots_) { + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq_2, next_is_larger)) { + break; + } + } + } +} + +bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( + const uint64_t& prep_seq, const uint64_t& commit_seq, + const uint64_t& snapshot_seq, const bool next_is_larger = true) { + // If we do not store an entry in old_commit_map we assume it is committed in + // all snapshots. if commit_seq <= snapshot_seq, it is considered already in + // the snapshot so we need not to keep the entry around for this snapshot. + if (commit_seq <= snapshot_seq) { + // continue the search if the next snapshot could be smaller than commit_seq + return !next_is_larger; + } + // then snapshot_seq < commit_seq + if (prep_seq <= snapshot_seq) { // overlapping range + WriteLock wl(&old_commit_map_mutex_); + old_commit_map_empty_.store(false, std::memory_order_release); + old_commit_map_[prep_seq] = commit_seq; + // Storing once is enough. No need to check it for other snapshots. + return false; + } + // continue the search if the next snapshot could be larger than prep_seq + return next_is_larger; +} + +WritePreparedTxnDB::~WritePreparedTxnDB() { + // At this point there could be running compaction/flush holding a + // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB. + // Make sure those jobs finished before destructing WritePreparedTxnDB. + db_impl_->CancelAllBackgroundWork(true /*wait*/); +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h new file mode 100644 index 000000000..02427433c --- /dev/null +++ b/utilities/transactions/write_prepared_txn_db.h @@ -0,0 +1,364 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include + +#include "db/db_iter.h" +#include "db/read_callback.h" +#include "db/snapshot_checker.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_lock_mgr.h" +#include "utilities/transactions/write_prepared_txn.h" + +namespace rocksdb { + +// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. +// In this way some data in the DB might not be committed. The DB provides +// mechanisms to tell such data apart from committed data. +class WritePreparedTxnDB : public PessimisticTransactionDB { + public: + explicit WritePreparedTxnDB( + DB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, + size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) + : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { + Init(txn_db_options); + } + + explicit WritePreparedTxnDB( + StackableDB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, + size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) + : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { + Init(txn_db_options); + } + + virtual ~WritePreparedTxnDB(); + + virtual Status Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) override; + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; + + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + using DB::NewIterators; + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + + // Check whether the transaction that wrote the value with seqeunce number seq + // is visible to the snapshot with sequence number snapshot_seq + bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; + // Add the trasnaction with prepare sequence seq to the prepared list + void AddPrepared(uint64_t seq); + // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq + // with which the additional data is written to cancel the txn effect. It can + // be used to idenitfy the snapshots that overlap with the rolled back txn. + void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); + // Add the transaction with prepare sequence prepare_seq and commit sequence + // commit_seq to the commit map + void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); + + struct CommitEntry { + uint64_t prep_seq; + uint64_t commit_seq; + CommitEntry() : prep_seq(0), commit_seq(0) {} + CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} + bool operator==(const CommitEntry& rhs) const { + return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; + } + }; + + struct CommitEntry64bFormat { + explicit CommitEntry64bFormat(size_t index_bits) + : INDEX_BITS(index_bits), + PREP_BITS(static_cast(64 - PAD_BITS - INDEX_BITS)), + COMMIT_BITS(static_cast(64 - PREP_BITS)), + COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)) {} + // Number of higher bits of a sequence number that is not used. They are + // used to encode the value type, ... + const size_t PAD_BITS = static_cast(8); + // Number of lower bits from prepare seq that can be skipped as they are + // implied by the index of the entry in the array + const size_t INDEX_BITS; + // Number of bits we use to encode the prepare seq + const size_t PREP_BITS; + // Number of bits we use to encode the commit seq. + const size_t COMMIT_BITS; + // Filter to encode/decode commit seq + const uint64_t COMMIT_FILTER; + }; + + // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... + // INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... + // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA + // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and + // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the + // bits that do not have to be encoded (will be provided externally) DELTA: + // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of + // index bits + PADs + struct CommitEntry64b { + constexpr CommitEntry64b() noexcept : rep_(0) {} + + CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) + : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} + + CommitEntry64b(const uint64_t ps, const uint64_t cs, + const CommitEntry64bFormat& format) { + assert(ps < static_cast( + (1ull << (format.PREP_BITS + format.INDEX_BITS)))); + assert(ps <= cs); + uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 + // zero is reserved for uninitialized entries + assert(0 < delta); + assert(delta < static_cast((1ull << format.COMMIT_BITS))); + rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; + rep_ = rep_ | delta; + } + + // Return false if the entry is empty + bool Parse(const uint64_t indexed_seq, CommitEntry* entry, + const CommitEntry64bFormat& format) { + uint64_t delta = rep_ & format.COMMIT_FILTER; + // zero is reserved for uninitialized entries + assert(delta < static_cast((1ull << format.COMMIT_BITS))); + if (delta == 0) { + return false; // initialized entry would have non-zero delta + } + + assert(indexed_seq < static_cast((1ull << format.INDEX_BITS))); + uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; + prep_up >>= format.PAD_BITS; + const uint64_t& prep_low = indexed_seq; + entry->prep_seq = prep_up | prep_low; + + entry->commit_seq = entry->prep_seq + delta - 1; + return true; + } + + private: + uint64_t rep_; + }; + + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + + private: + friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; + friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; + friend class WritePreparedTransactionTest_CommitMapTest_Test; + friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; + friend class WritePreparedTransactionTest; + friend class PreparedHeap_BasicsTest_Test; + friend class WritePreparedTxnDBMock; + friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; + friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; + friend class WritePreparedTransactionTest_RollbackTest_Test; + + void Init(const TransactionDBOptions& /* unused */); + + // A heap with the amortized O(1) complexity for erase. It uses one extra heap + // to keep track of erased entries that are not yet on top of the main heap. + class PreparedHeap { + std::priority_queue, std::greater> + heap_; + std::priority_queue, std::greater> + erased_heap_; + + public: + bool empty() { return heap_.empty(); } + uint64_t top() { return heap_.top(); } + void push(uint64_t v) { heap_.push(v); } + void pop() { + heap_.pop(); + while (!heap_.empty() && !erased_heap_.empty() && + heap_.top() == erased_heap_.top()) { + heap_.pop(); + erased_heap_.pop(); + } + while (heap_.empty() && !erased_heap_.empty()) { + erased_heap_.pop(); + } + } + void erase(uint64_t seq) { + if (!heap_.empty()) { + if (seq < heap_.top()) { + // Already popped, ignore it. + } else if (heap_.top() == seq) { + pop(); + } else { // (heap_.top() > seq) + // Down the heap, remember to pop it later + erased_heap_.push(seq); + } + } + } + }; + + // Get the commit entry with index indexed_seq from the commit table. It + // returns true if such entry exists. + bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, + CommitEntry* entry) const; + + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry . If the rewrite results into eviction, + // sets the evicted_entry and returns true. + bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, + CommitEntry* evicted_entry); + + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry new_entry only if the existing entry matches the + // expected_entry. Returns false otherwise. + bool ExchangeCommitEntry(const uint64_t indexed_seq, + CommitEntry64b& expected_entry, + const CommitEntry& new_entry); + + // Increase max_evicted_seq_ from the previous value prev_max to the new + // value. This also involves taking care of prepared txns that are not + // committed before new_max, as well as updating the list of live snapshots at + // the time of updating the max. Thread-safety: this function can be called + // concurrently. The concurrent invocations of this function is equivalent to + // a serial invocation in which the last invocation is the one with the + // largetst new_max value. + void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); + + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber max); + + // Update the list of snapshots corresponding to the soon-to-be-updated + // max_eviceted_seq_. Thread-safety: this function can be called concurrently. + // The concurrent invocations of this function is equivalent to a serial + // invocation in which the last invocation is the one with the largetst + // version value. + void UpdateSnapshots(const std::vector& snapshots, + const SequenceNumber& version); + + // Check an evicted entry against live snapshots to see if it should be kept + // around or it can be safely discarded (and hence assume committed for all + // snapshots). Thread-safety: this function can be called concurrently. If it + // is called concurrently with multiple UpdateSnapshots, the result is the + // same as checking the intersection of the snapshot list before updates with + // the snapshot list of all the concurrent updates. + void CheckAgainstSnapshots(const CommitEntry& evicted); + + // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < + // commit_seq. Return false if checking the next snapshot(s) is not needed. + // This is the case if the entry already added to old_commit_map_ or none of + // the next snapshots could satisfy the condition. next_is_larger: the next + // snapshot will be a larger value + bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, + const uint64_t& commit_seq, + const uint64_t& snapshot_seq, + const bool next_is_larger); + + // The list of live snapshots at the last time that max_evicted_seq_ advanced. + // The list stored into two data structures: in snapshot_cache_ that is + // efficient for concurrent reads, and in snapshots_ if the data does not fit + // into snapshot_cache_. The total number of snapshots in the two lists + std::atomic snapshots_total_ = {}; + // The list sorted in ascending order. Thread-safety for writes is provided + // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for + // each entry. In x86_64 architecture such reads are compiled to simple read + // instructions. 128 entries + static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast(7); + const size_t SNAPSHOT_CACHE_BITS; + const size_t SNAPSHOT_CACHE_SIZE; + unique_ptr[]> snapshot_cache_; + // 2nd list for storing snapshots. The list sorted in ascending order. + // Thread-safety is provided with snapshots_mutex_. + std::vector snapshots_; + // The version of the latest list of snapshots. This can be used to avoid + // rewrittiing a list that is concurrently updated with a more recent version. + SequenceNumber snapshots_version_ = 0; + + // A heap of prepared transactions. Thread-safety is provided with + // prepared_mutex_. + PreparedHeap prepared_txns_; + // 10m entry, 80MB size + static const size_t DEF_COMMIT_CACHE_BITS = static_cast(21); + const size_t COMMIT_CACHE_BITS; + const size_t COMMIT_CACHE_SIZE; + const CommitEntry64bFormat FORMAT; + // commit_cache_ must be initialized to zero to tell apart an empty index from + // a filled one. Thread-safety is provided with commit_cache_mutex_. + unique_ptr[]> commit_cache_; + // The largest evicted *commit* sequence number from the commit_cache_ + std::atomic max_evicted_seq_ = {}; + // Advance max_evicted_seq_ by this value each time it needs an update. The + // larger the value, the less frequent advances we would have. We do not want + // it to be too large either as it would cause stalls by doing too much + // maintenance work under the lock. + size_t INC_STEP_FOR_MAX_EVICTED = 1; + // A map of the evicted entries from commit_cache_ that has to be kept around + // to service the old snapshots. This is expected to be empty normally. + // Thread-safety is provided with old_commit_map_mutex_. + std::map old_commit_map_; + // A set of long-running prepared transactions that are not finished by the + // time max_evicted_seq_ advances their sequence number. This is expected to + // be empty normally. Thread-safety is provided with prepared_mutex_. + std::set delayed_prepared_; + // Update when delayed_prepared_.empty() changes. Expected to be true + // normally. + std::atomic delayed_prepared_empty_ = {true}; + // Update when old_commit_map_.empty() changes. Expected to be true normally. + std::atomic old_commit_map_empty_ = {true}; + mutable port::RWMutex prepared_mutex_; + mutable port::RWMutex old_commit_map_mutex_; + mutable port::RWMutex commit_cache_mutex_; + mutable port::RWMutex snapshots_mutex_; +}; + +class WritePreparedTxnReadCallback : public ReadCallback { + public: + WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) + : db_(db), snapshot_(snapshot) {} + + // Will be called to see if the seq number accepted; if not it moves on to the + // next seq number. + virtual bool IsCommitted(SequenceNumber seq) override { + return db_->IsInSnapshot(seq, snapshot_); + } + + private: + WritePreparedTxnDB* db_; + SequenceNumber snapshot_; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE