From fce6c892abb79343bc6d543c0823887e1902cc8e Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Fri, 8 Sep 2017 14:32:30 -0700 Subject: [PATCH] Advance max evicted seq in coarser granularity Summary: This patch advances the max_evicted_seq_ is larger granularities to reduce the overhead of updating the relevant data structures. It also refactor the related code and adds testing to that. As part of this patch some of the TODOs for removing usage of non-static const members are also addressed. Closes https://github.com/facebook/rocksdb/pull/2844 Differential Revision: D5772928 Pulled By: maysamyabandeh fbshipit-source-id: f4fcc2948be69c034f10812cf922ce5ab82ef98c --- .../pessimistic_transaction_db.cc | 45 +++-- .../transactions/pessimistic_transaction_db.h | 42 +++-- .../write_prepared_transaction_test.cc | 154 ++++++++++++------ 3 files changed, 154 insertions(+), 87 deletions(-) diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index f98d11d08..8ad7f9540 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -604,8 +604,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); if (prev_max < evicted.commit_seq) { - // TODO(myabandeh) inc max in larger steps to avoid frequent updates - auto max_evicted_seq = evicted.commit_seq; + // Inc max in larger steps to avoid frequent updates + auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); } // After each eviction from commit cache, check if the commit entry should @@ -683,30 +683,22 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, } } - // With each change to max_evicted_seq_ fetch the live snapshots behind it - SequenceNumber curr_seq; + // With each change to max_evicted_seq_ fetch the live snapshots behind it. + // We use max as the version of snapshots to identify how fresh are the + // snapshot list. This works because the snapshots are between 0 and + // max, so the larger the max, the more complete they are. + SequenceNumber new_snapshots_version = new_max; std::vector snapshots; bool update_snapshots = false; - { - InstrumentedMutex(db_impl_->mutex()); - // We use this to identify how fresh are the snapshot list. Since this - // is done atomically with obtaining the snapshot list, the one with - // the larger seq is more fresh. If the seq is equal the full snapshot - // list could be different since taking snapshots does not increase - // the db seq. However since we only care about snapshots before the - // new max, such recent snapshots would not be included the in the - // list anyway. - curr_seq = db_impl_->GetLatestSequenceNumber(); - if (curr_seq > snapshots_version_) { - // This is to avoid updating the snapshots_ if it already updated - // with a more recent vesion by a concrrent thread - update_snapshots = true; - // We only care about snapshots lower then max - snapshots = db_impl_->snapshots().GetAll(nullptr, new_max); - } + if (new_snapshots_version > snapshots_version_) { + // This is to avoid updating the snapshots_ if it already updated + // with a more recent vesion by a concrrent thread + update_snapshots = true; + // We only care about snapshots lower then max + snapshots = GetSnapshotListFromDB(new_max); } if (update_snapshots) { - UpdateSnapshots(snapshots, curr_seq); + UpdateSnapshots(snapshots, new_snapshots_version); } // TODO(myabandeh): check if it worked with relaxed ordering while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( @@ -715,10 +707,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, }; } -// 10m entry, 80MB size -size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); -size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = - static_cast(1 << 7); +const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( + SequenceNumber max) { + InstrumentedMutex(db_impl_->mutex()); + return db_impl_->snapshots().GetAll(nullptr, max); +} void WritePreparedTxnDB::UpdateSnapshots( const std::vector& snapshots, diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e9ea8b1ff..23ecdea29 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -123,6 +123,7 @@ class PessimisticTransactionDB : public TransactionDB { private: friend class WritePreparedTxnDB; + friend class WritePreparedTxnDBMock; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -166,19 +167,23 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { // mechanisms to tell such data apart from committed data. class WritePreparedTxnDB : public PessimisticTransactionDB { public: - explicit WritePreparedTxnDB(DB* db, - const TransactionDBOptions& txn_db_options) + explicit WritePreparedTxnDB( + DB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, + size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), - COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + SNAPSHOT_CACHE_SIZE(snapshot_cache_size), + COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } - explicit WritePreparedTxnDB(StackableDB* db, - const TransactionDBOptions& txn_db_options) + explicit WritePreparedTxnDB( + StackableDB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, + size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), - COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + SNAPSHOT_CACHE_SIZE(snapshot_cache_size), + COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } @@ -204,8 +209,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; friend class WritePreparedTransactionTest; friend class PreparedHeap_BasicsTest_Test; + friend class WritePreparedTxnDBMock; + friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; void init(const TransactionDBOptions& /* unused */) { + // Adcance max_evicted_seq_ no more than 100 times before the cache wraps + // around. + INC_STEP_FOR_MAX_EVICTED = + std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); snapshot_cache_ = unique_ptr[]>( new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = @@ -275,6 +286,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // largetst new_max value. void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber max); + // Update the list of snapshots corresponding to the soon-to-be-updated // max_eviceted_seq_. Thread-safety: this function can be called concurrently. // The concurrent invocations of this function is equivalent to a serial @@ -310,8 +324,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // each entry. In x86_64 architecture such reads are compiled to simple read // instructions. 128 entries - // TODO(myabandeh): avoid non-const static variables - static size_t DEF_SNAPSHOT_CACHE_SIZE; + static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast(1 << 7); const size_t SNAPSHOT_CACHE_SIZE; unique_ptr[]> snapshot_cache_; // 2nd list for storing snapshots. The list sorted in ascending order. @@ -324,14 +337,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - // TODO(myabandeh): avoid non-const static variables - static size_t DEF_COMMIT_CACHE_SIZE; + // 10m entry, 80MB size + static const size_t DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); const size_t COMMIT_CACHE_SIZE; // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. unique_ptr commit_cache_; // The largest evicted *commit* sequence number from the commit_cache_ std::atomic max_evicted_seq_ = {}; + // Advance max_evicted_seq_ by this value each time it needs an update. The + // larger the value, the less frequent advances we would have. We do not want + // it to be too large either as it would cause stalls by doing too much + // maintenance work under the lock. + size_t INC_STEP_FOR_MAX_EVICTED = 1; // A map of the evicted entries from commit_cache_ that has to be kept around // to service the old snapshots. This is expected to be empty normally. // Thread-safety is provided with old_commit_map_mutex_. diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index ffb4a7bc5..8abae6fa8 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -106,6 +106,32 @@ TEST(PreparedHeap, BasicsTest) { ASSERT_TRUE(heap.empty()); } +class WritePreparedTxnDBMock : public WritePreparedTxnDB { + public: + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) + : WritePreparedTxnDB(db_impl, opt) {} + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, + size_t snapshot_cache_size) + : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {} + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, + size_t snapshot_cache_size, size_t commit_cache_size) + : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size, + commit_cache_size) {} + void SetDBSnapshots(const std::vector& snapshots) { + snapshots_ = snapshots; + } + void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); } + + protected: + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber /* unused */) override { + return snapshots_; + } + + private: + std::vector snapshots_; +}; + class WritePreparedTransactionTest : public TransactionTest { protected: // If expect_update is set, check if it actually updated old_commit_map_. If @@ -309,12 +335,9 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { std::vector snapshots = {100l, 200l, 300l, 400l, 500l, 600l, 700l}; - // will take effect after ReOpen - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = snapshots.size() / 2; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, snapshots.size() / 2)); SequenceNumber version = 1000l; ASSERT_EQ(0, wp_db->snapshots_total_); wp_db->UpdateSnapshots(snapshots, version); @@ -353,16 +376,12 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { 60l, 70l, 80l, 90l, 100l}; SequenceNumber version = 1000l; // Choose the cache size so that the new snapshot list could replace all the - // existing items in the cache and also have some overflow Will take effect - // after ReOpen - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = (snapshots.size() - 2) / 2; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + // existing items in the cache and also have some overflow. + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, (snapshots.size() - 2) / 2)); // Add up to 2 items that do not fit into the cache - for (size_t old_size = 1; - old_size <= WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE + 2; + for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2; old_size++) { const std::vector old_snapshots( snapshots.begin(), snapshots.begin() + old_size); @@ -397,21 +416,17 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { // The critical part is when iterating the snapshot cache. Afterwards, // we are operating under the lock size_t a_range = - std::min(old_snapshots.size(), - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + - 1; + std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; size_t b_range = - std::min(new_snapshots.size(), - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + - 1; + std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; // Break each thread at two points for (size_t a1 = 1; a1 <= a_range; a1++) { for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { for (size_t b1 = 1; b1 <= b_range; b1++) { for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { - SnapshotConcurrentAccessTestInternal(wp_db, old_snapshots, - new_snapshots, entry, - version, a1, a2, b1, b2); + SnapshotConcurrentAccessTestInternal( + wp_db.get(), old_snapshots, new_snapshots, entry, version, + a1, a2, b1, b2); } } } @@ -423,16 +438,73 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { } #endif +// This test clarifies the contract of AdvanceMaxEvictedSeq method +TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options)); + + // 1. Set the initial values for max, prepared, and snapshots + SequenceNumber zero_max = 0l; + // Set the initial list of prepared txns + const std::vector initial_prepared = {10, 30, 50, 100, + 150, 200, 250}; + for (auto p : initial_prepared) { + wp_db->AddPrepared(p); + } + // This updates the max value and also set old prepared + SequenceNumber init_max = 100; + wp_db->AdvanceMaxEvictedSeq(zero_max, init_max); + const std::vector initial_snapshots = {20, 40}; + wp_db->SetDBSnapshots(initial_snapshots); + // This will update the internal cache of snapshots from the DB + wp_db->UpdateSnapshots(initial_snapshots, init_max); + + // 2. Invoke AdvanceMaxEvictedSeq + const std::vector latest_snapshots = {20, 110, 220, 300}; + wp_db->SetDBSnapshots(latest_snapshots); + SequenceNumber new_max = 200; + wp_db->AdvanceMaxEvictedSeq(init_max, new_max); + + // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract + // a. max should be updated to new_max + ASSERT_EQ(wp_db->max_evicted_seq_, new_max); + // b. delayed prepared should contain every txn <= max and prepared should + // only contian txns > max + auto it = initial_prepared.begin(); + for (; it != initial_prepared.end() && *it <= new_max; it++) { + ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it)); + } + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty(); + it++, wp_db->prepared_txns_.pop()) { + ASSERT_EQ(*it, wp_db->prepared_txns_.top()); + } + ASSERT_TRUE(it == initial_prepared.end()); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + // c. snapshots should contain everything below new_max + auto sit = latest_snapshots.begin(); + for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max && + i < wp_db->snapshots_total_; + sit++, i++) { + ASSERT_TRUE(i < wp_db->snapshots_total_); + // This test is in small scale and the list of snapshots are assumed to be + // within the cache size limit. This is just a safety check to double check + // that assumption. + ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE); + ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]); + } +} + // Test WritePreparedTxnDB's IsInSnapshot against different ordering of // snapshot, max_committed_seq_, prepared, and commit entries. TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { WriteOptions wo; // Use small commit cache to trigger lots of eviction and fast advance of // max_evicted_seq_ - // will take effect after ReOpen - WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8; + const size_t commit_cache_size = 8; // Same for snapshot cache size - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; + const size_t snapshot_cache_size = 5; // Take some preliminary snapshots first. This is to stress the data structure // that holds the old snapshots as it will be designed to be efficient when @@ -452,7 +524,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { uint64_t cur_txn = 0; // Number of snapshots taken so far int num_snapshots = 0; - std::vector to_be_released; // Number of gaps applied so far int gap_cnt = 0; // The final snapshot that we will inspect @@ -465,30 +536,23 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // We keep the list of txns comitted before we take the last snaphot. // These should be the only seq numbers that will be found in the snapshot std::set committed_before; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, snapshot_cache_size, commit_cache_size)); // We continue until max advances a bit beyond the snapshot. while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { // do prepare for a transaction - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); wp_db->AddPrepared(seq); prepared.insert(seq); // If cur_txn is not started, do prepare for it. if (!cur_txn) { - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); cur_txn = seq; wp_db->AddPrepared(cur_txn); } else { // else commit it - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); wp_db->AddCommitted(cur_txn, seq); if (!snapshot) { committed_before.insert(cur_txn); @@ -498,20 +562,15 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { if (num_snapshots < max_snapshots - 1) { // Take preliminary snapshots - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); + wp_db->TakeSnapshot(seq); num_snapshots++; } else if (gap_cnt < max_gap) { // Wait for some gap before taking the final snapshot gap_cnt++; } else if (!snapshot) { // Take the final snapshot if it is not already taken - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); - snapshot = tmp_snapshot->GetSequenceNumber(); - // We increase the db seq artificailly by a dummy Put. Check that this - // technique is effective and db seq is that same as ours. - ASSERT_EQ(snapshot, seq); + snapshot = seq; + wp_db->TakeSnapshot(snapshot); num_snapshots++; } @@ -548,9 +607,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } ASSERT_TRUE(wp_db->delayed_prepared_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty()); - for (auto s : to_be_released) { - db->ReleaseSnapshot(s); - } } } }