From fbfa3e7a43526bdf38183f78383db62b430e3885 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Sat, 26 Aug 2017 00:53:13 -0700 Subject: [PATCH] WriteAtPrepare: Efficient read from snapshot list Summary: Divide the old snapshots to two lists: a few that fit into a cached array and the rest in a vector, which is expected to be empty in normal cases. The former is to optimize concurrent reads from snapshots without requiring locks. It is done by an array of std::atomic, from which std::memory_order_acquire reads are compiled to simple read instructions in most of the x86_64 architectures. Closes https://github.com/facebook/rocksdb/pull/2758 Differential Revision: D5660504 Pulled By: maysamyabandeh fbshipit-source-id: 524fcf9a8e7f90a92324536456912a99aaa6740c --- db/snapshot_impl.h | 9 +- include/rocksdb/utilities/transaction_db.h | 6 +- .../pessimistic_transaction_db.cc | 135 ++++++++++++++++-- .../transactions/pessimistic_transaction_db.h | 51 ++++++- utilities/transactions/transaction_test.cc | 33 +++-- 5 files changed, 201 insertions(+), 33 deletions(-) diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 8441050fd..ad9c1a9fb 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -74,9 +74,11 @@ class SnapshotList { count_--; } - // retrieve all snapshot numbers. They are sorted in ascending order. + // retrieve all snapshot numbers up until max_seq. They are sorted in + // ascending order. std::vector GetAll( - SequenceNumber* oldest_write_conflict_snapshot = nullptr) const { + SequenceNumber* oldest_write_conflict_snapshot = nullptr, + const SequenceNumber& max_seq = kMaxSequenceNumber) const { std::vector ret; if (oldest_write_conflict_snapshot != nullptr) { @@ -88,6 +90,9 @@ class SnapshotList { } const SnapshotImpl* s = &list_; while (s->next_ != &list_) { + if (s->next_->number_ > max_seq) { + break; + } ret.push_back(s->next_->number_); if (oldest_write_conflict_snapshot != nullptr && diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 7a592c4f6..77043897a 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -25,8 +25,10 @@ class TransactionDBMutexFactory; enum TxnDBWritePolicy { WRITE_COMMITTED = 0, // write only the committed data - WRITE_PREPARED, // write data after the prepare phase of 2pc - WRITE_UNPREPARED // write data before the prepare phase of 2pc + // TODO(myabandeh): Not implemented yet + WRITE_PREPARED, // write data after the prepare phase of 2pc + // TODO(myabandeh): Not implemented yet + WRITE_UNPREPARED // write data before the prepare phase of 2pc }; const uint32_t kInitialMaxDeadlocks = 5; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 07c3eeeeb..8fa9575e4 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -5,8 +5,13 @@ #ifndef ROCKSDB_LITE +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + #include "utilities/transactions/pessimistic_transaction_db.h" +#include #include #include #include @@ -34,6 +39,7 @@ PessimisticTransactionDB::PessimisticTransactionDB( : std::shared_ptr( new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); + info_log_ = db_impl_->GetDBOptions().info_log; } // Support initiliazing PessimisticTransactionDB from a stackable db @@ -581,16 +587,23 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, return false; } -void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); } +void WritePreparedTxnDB::AddPrepared(uint64_t seq) { + ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); + WriteLock wl(&prepared_mutex_); + prepared_txns_.push(seq); +} void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq) { + ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, + prepare_seq, commit_seq); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; CommitEntry evicted; bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted); if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); if (prev_max < evicted.commit_seq) { + // TODO(myabandeh) inc max in larger steps to avoid frequent updates auto max_evicted_seq = evicted.commit_seq; // When max_evicted_seq_ advances, move older entries from prepared_txns_ // to delayed_prepared_. This guarantees that if a seq is lower than max, @@ -607,11 +620,59 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, delayed_prepared_empty_.store(false, std::memory_order_release); } } + // With each change to max_evicted_seq_ fetch the live snapshots behind it + SequenceNumber curr_seq; + std::vector all_snapshots; + bool update_snapshots = false; { - WriteLock wl(&snapshots_mutex_); InstrumentedMutex(db_impl_->mutex()); - snapshots_ = db_impl_->snapshots().GetAll(); + // We use this to identify how fresh are the snapshot list. Since this + // is done atomically with obtaining the snapshot list, the one with + // the larger seq is more fresh. If the seq is equal the full snapshot + // list could be different since taking snapshots does not increase + // the db seq. However since we only care about snapshots before the + // new max, such recent snapshots would not be included the in the + // list anyway. + curr_seq = db_impl_->GetLatestSequenceNumber(); + if (curr_seq > snapshots_version_) { + // This is to avoid updating the snapshots_ if it already updated + // with a more recent vesion by a concrrent thread + update_snapshots = true; + // We only care about snapshots lower then max + all_snapshots = + db_impl_->snapshots().GetAll(nullptr, max_evicted_seq); + } + } + if (update_snapshots) { + WriteLock wl(&snapshots_mutex_); + snapshots_version_ = curr_seq; + // We update the list concurrently with the readers. + // Both new and old lists are sorted and the new list is subset of the + // previous list plus some new items. Thus if a snapshot repeats in + // both new and old lists, it will appear upper in the new list. So if + // we simply insert the new snapshots in order, if an overwritten item + // is still valid in the new list is either written to the same place in + // the array or it is written in a higher palce before it gets + // overwritten by another item. This guarantess a reader that reads the + // list bottom-up will eventaully see a snapshot that repeats in the + // update, either before it gets overwritten by the writer or + // afterwards. + size_t i = 0; + auto it = all_snapshots.begin(); + for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE; + it++, i++) { + snapshot_cache_[i].store(*it, std::memory_order_release); + } + snapshots_.clear(); + for (; it != all_snapshots.end(); it++) { + // Insert them to a vector that is less efficient to access + // concurrently + snapshots_.push_back(*it); + } + // Update the size at the end. Otherwise a parallel reader might read + // items that are not set yet. + snapshots_total_.store(all_snapshots.size(), std::memory_order_release); } while (prev_max < max_evicted_seq && !max_evicted_seq_.compare_exchange_weak( @@ -621,17 +682,41 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, } // After each eviction from commit cache, check if the commit entry should // be kept around because it overlaps with a live snapshot. - { + // First check the snapshot cache that is efficient for concurrent access + auto cnt = snapshots_total_.load(std::memory_order_acquire); + // The list might get updated concurrently as we are reading from it. The + // reader should be able to read all the snapshots that are still valid + // after the update. Since the survived snapshots are written in a higher + // place before gets overwritten the reader that reads bottom-up will + // eventully see it. + const bool next_is_larger = true; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); + for (; 0 < ip1; ip1--) { + snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, !next_is_larger)) { + break; + } + } + if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && + snapshot_seq < evicted.prep_seq)) { + // Then access the less efficient list of snapshots_ ReadLock rl(&snapshots_mutex_); - for (auto snapshot_seq : snapshots_) { - if (evicted.commit_seq <= snapshot_seq) { + // Items could have moved from the snapshots_ to snapshot_cache_ before + // accquiring the lock. To make sure that we do not miss a valid snapshot, + // read snapshot_cache_ again while holding the lock. + for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { + snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, next_is_larger)) { break; } - // then snapshot_seq < evicted.commit_seq - if (evicted.prep_seq <= snapshot_seq) { // overlapping range - WriteLock wl(&old_commit_map_mutex_); - old_commit_map_empty_.store(false, std::memory_order_release); - old_commit_map_[evicted.prep_seq] = evicted.commit_seq; + } + for (auto snapshot_seq_2 : snapshots_) { + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq_2, next_is_larger)) { + break; } } } @@ -691,7 +776,31 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq, } // 10m entry, 80MB size -uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = - static_cast(1 << 21); +size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); +size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = + static_cast(1 << 7); + +bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( + const uint64_t& prep_seq, const uint64_t& commit_seq, + const uint64_t& snapshot_seq, const bool next_is_larger = true) { + // If we do not store an entry in old_commit_map we assume it is committed in + // all snapshots. if commit_seq <= snapshot_seq, it is considered already in + // the snapshot so we need not to keep the entry around for this snapshot. + if (commit_seq <= snapshot_seq) { + // continue the search if the next snapshot could be smaller than commit_seq + return !next_is_larger; + } + // then snapshot_seq < commit_seq + if (prep_seq <= snapshot_seq) { // overlapping range + WriteLock wl(&old_commit_map_mutex_); + old_commit_map_empty_.store(false, std::memory_order_release); + old_commit_map_[prep_seq] = commit_seq; + // Storing once is enough. No need to check it for other snapshots. + return false; + } + // continue the search if the next snapshot could be larger than prep_seq + return next_is_larger; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 489da30bf..e3eec6b60 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -107,6 +107,8 @@ class PessimisticTransactionDB : public TransactionDB { struct CommitEntry { uint64_t prep_seq; uint64_t commit_seq; + CommitEntry() : prep_seq(0), commit_seq(0) {} + CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} }; protected: @@ -114,8 +116,10 @@ class PessimisticTransactionDB : public TransactionDB { Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); DBImpl* db_impl_; + std::shared_ptr info_log_; private: + friend class WritePreparedTxnDB; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -162,6 +166,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { explicit WritePreparedTxnDB(DB* db, const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { init(txn_db_options); } @@ -169,6 +174,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { explicit WritePreparedTxnDB(StackableDB* db, const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options), + SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { init(txn_db_options); } @@ -192,6 +198,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; void init(const TransactionDBOptions& /* unused */) { + snapshot_cache_ = unique_ptr[]>( + new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = unique_ptr(new CommitEntry[COMMIT_CACHE_SIZE]{}); } @@ -199,8 +207,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap with the amortized O(1) complexity for erase. It uses one extra heap // to keep track of erased entries that are not yet on top of the main heap. class PreparedHeap { - std::priority_queue heap_; - std::priority_queue erased_heap_; + std::priority_queue, std::greater> + heap_; + std::priority_queue, std::greater> + erased_heap_; public: bool empty() { return heap_.empty(); } @@ -216,7 +226,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { } void erase(uint64_t seq) { if (!heap_.empty()) { - if (heap_.top() < seq) { + if (seq < heap_.top()) { // Already popped, ignore it. } else if (heap_.top() == seq) { heap_.pop(); @@ -242,15 +252,42 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry, CommitEntry new_entry); + // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < + // commit_seq. Return false if checking the next snapshot(s) is not needed. + // This is the case if the entry already added to old_commit_map_ or none of + // the next snapshots could satisfy the condition. next_is_larger: the next + // snapshot will be a larger value + bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, + const uint64_t& commit_seq, + const uint64_t& snapshot_seq, + const bool next_is_larger); + // The list of live snapshots at the last time that max_evicted_seq_ advanced. - // The list sorted in ascending order. Thread-safety is provided with - // snapshots_mutex_. + // The list stored into two data structures: in snapshot_cache_ that is + // efficient for concurrent reads, and in snapshots_ if the data does not fit + // into snapshot_cache_. The total number of snapshots in the two lists + std::atomic snapshots_total_ = {}; + // The list sorted in ascending order. Thread-safety for writes is provided + // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for + // each entry. In x86_64 architecture such reads are compiled to simple read + // instructions. 128 entries + // TODO(myabandeh): avoid non-const static variables + static size_t DEF_SNAPSHOT_CACHE_SIZE; + const size_t SNAPSHOT_CACHE_SIZE; + unique_ptr[]> snapshot_cache_; + // 2nd list for storing snapshots. The list sorted in ascending order. + // Thread-safety is provided with snapshots_mutex_. std::vector snapshots_; + // The version of the latest list of snapshots. This can be used to avoid + // rewrittiing a list that is concurrently updated with a more recent version. + SequenceNumber snapshots_version_ = 0; + // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - static uint64_t DEF_COMMIT_CACHE_SIZE; - const uint64_t COMMIT_CACHE_SIZE; + // TODO(myabandeh): avoid non-const static variables + static size_t DEF_COMMIT_CACHE_SIZE; + const size_t COMMIT_CACHE_SIZE; // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. unique_ptr commit_cache_; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2e8c87f49..eac8e563d 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5,6 +5,11 @@ #ifndef ROCKSDB_LITE +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include #include @@ -4734,8 +4739,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { WriteOptions wo; // Use small commit cache to trigger lots of eviction and fast advance of // max_evicted_seq_ - WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = - 8; // will take effect after ReOpen + // will take effect after ReOpen + WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8; + // Same for snapshot cache size + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; // Take some preliminary snapshots first. This is to stress the data structure // that holds the old snapshots as it will be designed to be efficient when @@ -4755,6 +4762,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { uint64_t cur_txn = 0; // Number of snapshots taken so far int num_snapshots = 0; + std::vector to_be_released; // Number of gaps applied so far int gap_cnt = 0; // The final snapshot that we will inspect @@ -4800,14 +4808,17 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { if (num_snapshots < max_snapshots - 1) { // Take preliminary snapshots - db->GetSnapshot(); + auto tmp_snapshot = db->GetSnapshot(); + to_be_released.push_back(tmp_snapshot); num_snapshots++; } else if (gap_cnt < max_gap) { // Wait for some gap before taking the final snapshot gap_cnt++; } else if (!snapshot) { // Take the final snapshot if it is not already taken - snapshot = db->GetSnapshot()->GetSequenceNumber(); + auto tmp_snapshot = db->GetSnapshot(); + to_be_released.push_back(tmp_snapshot); + snapshot = tmp_snapshot->GetSequenceNumber(); // We increase the db seq artificailly by a dummy Put. Check that this // technique is effective and db seq is that same as ours. ASSERT_EQ(snapshot, seq); @@ -4823,11 +4834,12 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { (committed_before.find(s) != committed_before.end()); bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); if (was_committed != is_in_snapshot) { - printf( - "max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu " - "gap_cnt %d num_snapshots %d\n", - max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(), - snapshot, gap_cnt, num_snapshots); + printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 + " snapshot %" PRIu64 + " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", + max_snapshots, max_gap, seq, + wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, + num_snapshots, s); } ASSERT_EQ(was_committed, is_in_snapshot); found_committed = found_committed || is_in_snapshot; @@ -4846,6 +4858,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } ASSERT_TRUE(wp_db->delayed_prepared_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty()); + for (auto s : to_be_released) { + db->ReleaseSnapshot(s); + } } } }