diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 88f4ea032..7830cbd75 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -48,18 +48,21 @@ using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat; TEST(PreparedHeap, BasicsTest) { WritePreparedTxnDB::PreparedHeap heap; - heap.push(14l); - // Test with one element - ASSERT_EQ(14l, heap.top()); - heap.push(24l); - heap.push(34l); - // Test that old min is still on top - ASSERT_EQ(14l, heap.top()); - heap.push(44l); - heap.push(54l); - heap.push(64l); - heap.push(74l); - heap.push(84l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(14l); + // Test with one element + ASSERT_EQ(14l, heap.top()); + heap.push(24l); + heap.push(34l); + // Test that old min is still on top + ASSERT_EQ(14l, heap.top()); + heap.push(44l); + heap.push(54l); + heap.push(64l); + heap.push(74l); + heap.push(84l); + } // Test that old min is still on top ASSERT_EQ(14l, heap.top()); heap.erase(24l); @@ -81,11 +84,14 @@ TEST(PreparedHeap, BasicsTest) { ASSERT_EQ(64l, heap.top()); heap.erase(84l); ASSERT_EQ(64l, heap.top()); - heap.push(85l); - heap.push(86l); - heap.push(87l); - heap.push(88l); - heap.push(89l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(85l); + heap.push(86l); + heap.push(87l); + heap.push(88l); + heap.push(89l); + } heap.erase(87l); heap.erase(85l); heap.erase(89l); @@ -106,13 +112,19 @@ TEST(PreparedHeap, BasicsTest) { // not resurface again. TEST(PreparedHeap, EmptyAtTheEnd) { WritePreparedTxnDB::PreparedHeap heap; - heap.push(40l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(40l); + } ASSERT_EQ(40l, heap.top()); // Although not a recommended scenario, we must be resilient against erase // without a prior push. heap.erase(50l); ASSERT_EQ(40l, heap.top()); - heap.push(60l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(60l); + } ASSERT_EQ(40l, heap.top()); heap.erase(60l); @@ -120,11 +132,17 @@ TEST(PreparedHeap, EmptyAtTheEnd) { heap.erase(40l); ASSERT_TRUE(heap.empty()); - heap.push(40l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(40l); + } ASSERT_EQ(40l, heap.top()); heap.erase(50l); ASSERT_EQ(40l, heap.top()); - heap.push(60l); + { + MutexLock ml(heap.push_pop_mutex()); + heap.push(60l); + } ASSERT_EQ(40l, heap.top()); heap.erase(40l); @@ -139,30 +157,37 @@ TEST(PreparedHeap, EmptyAtTheEnd) { // successfully emptied at the end. TEST(PreparedHeap, Concurrent) { const size_t t_cnt = 10; - rocksdb::port::Thread t[t_cnt]; - Random rnd(1103); + rocksdb::port::Thread t[t_cnt + 1]; WritePreparedTxnDB::PreparedHeap heap; port::RWMutex prepared_mutex; + std::atomic last; for (size_t n = 0; n < 100; n++) { - for (size_t i = 0; i < t_cnt; i++) { - // This is not recommended usage but we should be resilient against it. - bool skip_push = rnd.OneIn(5); - t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() { - auto seq = i; - std::this_thread::yield(); + last = 0; + t[0] = rocksdb::port::Thread([&heap, t_cnt, &last]() { + Random rnd(1103); + for (size_t seq = 1; seq <= t_cnt; seq++) { + // This is not recommended usage but we should be resilient against it. + bool skip_push = rnd.OneIn(5); if (!skip_push) { - WriteLock wl(&prepared_mutex); + MutexLock ml(heap.push_pop_mutex()); + std::this_thread::yield(); heap.push(seq); + last.store(seq); } - std::this_thread::yield(); - { - WriteLock wl(&prepared_mutex); - heap.erase(seq); - } + } + }); + for (size_t i = 1; i <= t_cnt; i++) { + t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, &last, i]() { + auto seq = i; + do { + std::this_thread::yield(); + } while (last.load() < seq); + WriteLock wl(&prepared_mutex); + heap.erase(seq); }); } - for (size_t i = 0; i < t_cnt; i++) { + for (size_t i = 0; i <= t_cnt; i++) { t[i].join(); } ASSERT_TRUE(heap.empty()); @@ -3197,7 +3222,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { ReOpen(); std::atomic snap = {nullptr}; std::atomic exp_prepare = {0}; - std::atomic snapshot_taken = {false}; + rocksdb::port::Thread callback_thread; // Value is synchronized via snap PinnableSlice value; // Take a snapshot after publish and before RemovePrepared:Start @@ -3208,7 +3233,6 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { roptions.snapshot = snap.load(); auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value); ASSERT_OK(s); - snapshot_taken.store(true); }; auto callback = [&](void* param) { SequenceNumber prep_seq = *((SequenceNumber*)param); @@ -3216,8 +3240,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { // We need to spawn a thread to avoid deadlock since getting a // snpashot might end up calling AdvanceSeqByOne which needs joining // the write queue. - auto t = rocksdb::port::Thread(snap_callback); - t.detach(); + callback_thread = rocksdb::port::Thread(snap_callback); TEST_SYNC_POINT("callback:end"); } }; @@ -3250,15 +3273,12 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { // Let an eviction to kick in std::this_thread::yield(); - snapshot_taken.store(false); exp_prepare.store(txn->GetId()); ASSERT_OK(txn->Commit()); delete txn; // Wait for the snapshot taking that is triggered by // RemovePrepared:Start callback - while (!snapshot_taken) { - std::this_thread::yield(); - } + callback_thread.join(); // Read with the snapshot taken before delayed_prepared_ cleanup ReadOptions roptions; @@ -3278,9 +3298,9 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { }); write_thread.join(); eviction_thread.join(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 96e1aa7a7..a3b523a22 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -32,12 +32,19 @@ Status WritePreparedTxnDB::Initialize( auto dbimpl = reinterpret_cast(GetRootDB()); assert(dbimpl != nullptr); auto rtxns = dbimpl->recovered_transactions(); + std::map ordered_seq_cnt; for (auto rtxn : rtxns) { // There should only one batch for WritePrepared policy. assert(rtxn.second->batches_.size() == 1); const auto& seq = rtxn.second->batches_.begin()->first; const auto& batch_info = rtxn.second->batches_.begin()->second; auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; + ordered_seq_cnt[seq] = cnt; + } + // AddPrepared must be called in order + for (auto seq_cnt: ordered_seq_cnt) { + auto seq = seq_cnt.first; + auto cnt = seq_cnt.second; for (size_t i = 0; i < cnt; i++) { AddPrepared(seq + i); } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index acf2b97a9..9561bfada 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -511,9 +511,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // The mutex is required for push and pop from PreparedHeap. ::erase will // use external synchronization via prepared_mutex_. port::Mutex push_pop_mutex_; - // TODO(myabandeh): replace it with deque - std::priority_queue, std::greater> - heap_; + std::deque heap_; std::priority_queue, std::greater> erased_heap_; std::atomic heap_top_ = {kMaxSequenceNumber}; @@ -534,21 +532,27 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Returns kMaxSequenceNumber if empty() and the smallest otherwise. inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); } inline void push(uint64_t v) { - heap_.push(v); - heap_top_.store(heap_.top(), std::memory_order_release); + push_pop_mutex_.AssertHeld(); + if (heap_.empty()) { + heap_top_.store(v, std::memory_order_release); + } else { + assert(heap_top_.load() < v); + } + heap_.push_back(v); } void pop(bool locked = false) { if (!locked) { push_pop_mutex()->Lock(); } - heap_.pop(); + push_pop_mutex_.AssertHeld(); + heap_.pop_front(); while (!heap_.empty() && !erased_heap_.empty() && // heap_.top() > erased_heap_.top() could happen if we have erased // a non-existent entry. Ideally the user should not do that but we // should be resilient against it. - heap_.top() >= erased_heap_.top()) { - if (heap_.top() == erased_heap_.top()) { - heap_.pop(); + heap_.front() >= erased_heap_.top()) { + if (heap_.front() == erased_heap_.top()) { + heap_.pop_front(); } uint64_t erased __attribute__((__unused__)); erased = erased_heap_.top(); @@ -559,7 +563,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { while (heap_.empty() && !erased_heap_.empty()) { erased_heap_.pop(); } - heap_top_.store(!heap_.empty() ? heap_.top() : kMaxSequenceNumber, + heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber, std::memory_order_release); if (!locked) { push_pop_mutex()->Unlock(); @@ -568,13 +572,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Concurrrent calls needs external synchronization. It is safe to be called // concurrent to push and pop though. void erase(uint64_t seq) { - if (!heap_.empty()) { + if (!empty()) { auto top_seq = top(); if (seq < top_seq) { // Already popped, ignore it. } else if (top_seq == seq) { pop(); - assert(heap_.empty() || heap_.top() != seq); +#ifndef NDEBUG + MutexLock ml(push_pop_mutex()); + assert(heap_.empty() || heap_.front() != seq); +#endif } else { // top() > seq // Down the heap, remember to pop it later erased_heap_.push(seq); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 0c9418394..9382edfad 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -225,6 +225,7 @@ Status WriteUnpreparedTxnDB::Initialize( // create 'real' transactions from recovered shell transactions auto rtxns = dbimpl->recovered_transactions(); + std::map ordered_seq_cnt; for (auto rtxn : rtxns) { auto recovered_trx = rtxn.second; assert(recovered_trx); @@ -266,9 +267,7 @@ Status WriteUnpreparedTxnDB::Initialize( auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; assert(batch_info.log_number_); - for (size_t i = 0; i < cnt; i++) { - AddPrepared(seq + i); - } + ordered_seq_cnt[seq] = cnt; assert(wupt->unprep_seqs_.count(seq) == 0); wupt->unprep_seqs_[seq] = cnt; KeySetBuilder keyset_handler(wupt, @@ -288,6 +287,14 @@ Status WriteUnpreparedTxnDB::Initialize( break; } } + // AddPrepared must be called in order + for (auto seq_cnt: ordered_seq_cnt) { + auto seq = seq_cnt.first; + auto cnt = seq_cnt.second; + for (size_t i = 0; i < cnt; i++) { + AddPrepared(seq + i); + } + } SequenceNumber prev_max = max_evicted_seq_; SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();