diff --git a/utilities/column_aware_encoding_util.cc b/utilities/column_aware_encoding_util.cc index 9320c3482..18b24d5b8 100644 --- a/utilities/column_aware_encoding_util.cc +++ b/utilities/column_aware_encoding_util.cc @@ -59,7 +59,8 @@ void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) { /*skip_filters=*/false), std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false); - table_reader_.reset(static_cast_with_check(table_reader.release())); + table_reader_.reset(static_cast_with_check( + table_reader.release())); } void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks( diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 2e1dc04d6..1386b5c22 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -134,6 +134,11 @@ class PessimisticTransactionDB : public TransactionDB { private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; + friend class TransactionTest_DoubleEmptyWrite_Test; + friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; + friend class TransactionTest_TwoPhaseLongPrepareTest_Test; + friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; + friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. @@ -150,6 +155,10 @@ class PessimisticTransactionDB : public TransactionDB { // map from name to two phase transaction instance std::mutex name_map_mutex_; std::unordered_map transactions_; + + // Signal that we are testing a crash scenario. Some asserts could be relaxed + // in such cases. + virtual void TEST_Crash() {} }; // A PessimisticTransactionDB that writes the data to the DB after the commit. diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index cbf44b647..7df5afa7c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -85,6 +85,7 @@ TEST_P(TransactionTest, DoubleEmptyWrite) { txn0->Put(Slice("foo0"), Slice("bar0a")); ASSERT_OK(txn0->Prepare()); delete txn0; + reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete()); txn0 = db->GetTransactionByName("xid2"); ASSERT_OK(txn0->Commit()); @@ -950,6 +951,8 @@ TEST_P(TransactionTest, TwoPhaseNameTest) { s = txn1->SetName("name4"); ASSERT_EQ(s, Status::InvalidArgument()); + txn1->Rollback(); + txn2->Rollback(); delete txn1; delete txn2; } @@ -1173,6 +1176,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { db->FlushWAL(false); delete txn; // kill and reopen + reinterpret_cast(db)->TEST_Crash(); s = ReOpenNoDelete(); ASSERT_OK(s); db_impl = reinterpret_cast(db->GetRootDB()); @@ -1361,6 +1365,7 @@ TEST_P(TransactionTest, TwoPhaseLongPrepareTest) { if (i % 29 == 0) { // crash env->SetFilesystemActive(false); + reinterpret_cast(db)->TEST_Crash(); ReOpenNoDelete(); } else if (i % 37 == 0) { // close @@ -1463,6 +1468,7 @@ TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) { // kill and reopen env->SetFilesystemActive(false); + reinterpret_cast(db)->TEST_Crash(); ReOpenNoDelete(); // commit old txn @@ -1844,6 +1850,7 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) { // kill and reopen env->SetFilesystemActive(false); + reinterpret_cast(db)->TEST_Crash(); ReOpenNoDelete(); s = db->Get(read_options, "first", &value); diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index cb51a894b..8e1276558 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -112,6 +112,73 @@ TEST(PreparedHeap, BasicsTest) { ASSERT_TRUE(heap.empty()); } +// This is a scenario reconstructed from a buggy trace. Test that the bug does +// not resurface again. +TEST(PreparedHeap, EmptyAtTheEnd) { + WritePreparedTxnDB::PreparedHeap heap; + heap.push(40l); + ASSERT_EQ(40l, heap.top()); + // Although not a recommended scenario, we must be resilient against erase + // without a prior push. + heap.erase(50l); + ASSERT_EQ(40l, heap.top()); + heap.push(60l); + ASSERT_EQ(40l, heap.top()); + + heap.erase(60l); + ASSERT_EQ(40l, heap.top()); + heap.erase(40l); + ASSERT_TRUE(heap.empty()); + + heap.push(40l); + ASSERT_EQ(40l, heap.top()); + heap.erase(50l); + ASSERT_EQ(40l, heap.top()); + heap.push(60l); + ASSERT_EQ(40l, heap.top()); + + heap.erase(40l); + // Test that the erase has not emptied the heap (we had a bug doing that) + ASSERT_FALSE(heap.empty()); + ASSERT_EQ(60l, heap.top()); + heap.erase(60l); + ASSERT_TRUE(heap.empty()); +} + +// Generate random order of PreparedHeap access and test that the heap will be +// successfully emptied at the end. +TEST(PreparedHeap, Concurrent) { + const size_t t_cnt = 10; + rocksdb::port::Thread t[t_cnt]; + Random rnd(1103); + WritePreparedTxnDB::PreparedHeap heap; + port::RWMutex prepared_mutex; + + for (size_t n = 0; n < 100; n++) { + for (size_t i = 0; i < t_cnt; i++) { + // This is not recommended usage but we should be resilient against it. + bool skip_push = rnd.OneIn(5); + t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() { + auto seq = i; + std::this_thread::yield(); + if (!skip_push) { + WriteLock wl(&prepared_mutex); + heap.push(seq); + } + std::this_thread::yield(); + { + WriteLock wl(&prepared_mutex); + heap.erase(seq); + } + }); + } + for (size_t i = 0; i < t_cnt; i++) { + t[i].join(); + } + ASSERT_TRUE(heap.empty()); + } +} + TEST(CommitEntry64b, BasicTest) { const size_t INDEX_BITS = static_cast(21); const size_t INDEX_SIZE = static_cast(1ull << INDEX_BITS); @@ -952,6 +1019,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) { delete txn0; delete txn1; wp_db->db_impl_->FlushWAL(true); + wp_db->TEST_Crash(); ReOpenNoDelete(); wp_db = dynamic_cast(db); // After recovery, all the uncommitted txns (0 and 1) should be inserted into @@ -995,6 +1063,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) { delete txn2; wp_db->db_impl_->FlushWAL(true); + wp_db->TEST_Crash(); ReOpenNoDelete(); wp_db = dynamic_cast(db); ASSERT_TRUE(wp_db->prepared_txns_.empty()); @@ -1064,6 +1133,7 @@ TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) { auto db_impl = reinterpret_cast(db->GetRootDB()); db_impl->FlushWAL(true); + dynamic_cast(db)->TEST_Crash(); ReOpenNoDelete(); // It should still conflict after the recovery @@ -1324,6 +1394,7 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) { delete txn; auto db_impl = reinterpret_cast(db->GetRootDB()); db_impl->FlushWAL(true); + dynamic_cast(db)->TEST_Crash(); ReOpenNoDelete(); wp_db = dynamic_cast(db); txn = db->GetTransactionByName("xid0"); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 7aedb1a15..017a4d7d8 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -275,8 +275,11 @@ Status WritePreparedTxn::RollbackInternal() { prepare_seq); // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. + const size_t ZERO_COMMITS = 0; + const bool PREP_HEAP_SKIPPED = true; WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq, ONE_BATCH); + wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS, + PREP_HEAP_SKIPPED); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 882b7fe14..d91bdccaa 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -11,6 +11,7 @@ #include "utilities/transactions/write_prepared_txn_db.h" +#include #include #include #include @@ -104,12 +105,13 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, } if (batch_cnt == 0) { // not provided, then compute it // TODO(myabandeh): add an option to allow user skipping this cost - ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, - "Duplicate key overhead"); SubBatchCounter counter(*GetCFComparatorMap()); auto s = batch->Iterate(&counter); assert(s.ok()); batch_cnt = counter.BatchCount(); + // TODO(myabandeh): replace me with a stat + ROCKS_LOG_WARN(info_log_, "Duplicate key overhead: %" PRIu64 " batches", + static_cast(batch_cnt)); } assert(batch_cnt); @@ -334,6 +336,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { // We should not normally reach here ReadLock rl(&prepared_mutex_); + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, + static_cast(delayed_prepared_.size())); if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { // Then it is not committed yet ROCKS_LOG_DETAILS( @@ -395,6 +400,8 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // We should not normally reach here unless sapshot_seq is old. This is a // rare case and it is ok to pay the cost of mutex ReadLock for such old, // reading transactions. + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ReadLock rl(&old_commit_map_mutex_); auto prep_set_entry = old_commit_map_.find(snapshot_seq); bool found = prep_set_entry != old_commit_map_.end(); @@ -458,8 +465,10 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, bool prepare_skipped, uint8_t loop_cnt) { - ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, - prepare_seq, commit_seq); + ROCKS_LOG_DETAILS(info_log_, + "Txn %" PRIu64 " Committing with %" PRIu64 + "(prepare_skipped=%d)", + prepare_seq, commit_seq, prepare_skipped); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; @@ -536,8 +545,11 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, return succ; } -void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, - SequenceNumber& new_max) { +void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, + const SequenceNumber& new_max) { + ROCKS_LOG_DETAILS(info_log_, + "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, + prev_max, new_max); // When max_evicted_seq_ advances, move older entries from prepared_txns_ // to delayed_prepared_. This guarantees that if a seq is lower than max, // then it is not in prepared_txns_ ans save an expensive, synchronized @@ -548,6 +560,12 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { auto to_be_popped = prepared_txns_.top(); delayed_prepared_.insert(to_be_popped); + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, + "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 + " new_max=%" PRIu64 " oldmax=%" PRIu64, + static_cast(delayed_prepared_.size()), + to_be_popped, new_max, prev_max); prepared_txns_.pop(); delayed_prepared_empty_.store(false, std::memory_order_release); } @@ -570,9 +588,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, if (update_snapshots) { UpdateSnapshots(snapshots, new_snapshots_version); } - while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( - prev_max, new_max, std::memory_order_acq_rel, - std::memory_order_relaxed)) { + auto updated_prev_max = prev_max; + while (updated_prev_max < new_max && + !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { }; } @@ -600,11 +620,15 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal( // old_commit_map_. Check and do garbage collection if that is the case. bool need_gc = false; { + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ReadLock rl(&old_commit_map_mutex_); auto prep_set_entry = old_commit_map_.find(snap_seq); need_gc = prep_set_entry != old_commit_map_.end(); } if (need_gc) { + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); WriteLock wl(&old_commit_map_mutex_); old_commit_map_.erase(snap_seq); old_commit_map_empty_.store(old_commit_map_.empty(), @@ -623,6 +647,8 @@ void WritePreparedTxnDB::UpdateSnapshots( #ifndef NDEBUG size_t sync_i = 0; #endif + // TODO(myabandeh): replace me with a stat + ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead"); WriteLock wl(&snapshots_mutex_); snapshots_version_ = version; // We update the list concurrently with the readers. @@ -702,6 +728,8 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && snapshot_seq < evicted.prep_seq)) { // Then access the less efficient list of snapshots_ + // TODO(myabandeh): also add a stat + ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead"); ReadLock rl(&snapshots_mutex_); // Items could have moved from the snapshots_ to snapshot_cache_ before // accquiring the lock. To make sure that we do not miss a valid snapshot, @@ -734,6 +762,8 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( } // then snapshot_seq < commit_seq if (prep_seq <= snapshot_seq) { // overlapping range + ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); + // TODO(myabandeh): also add a stat WriteLock wl(&old_commit_map_mutex_); old_commit_map_empty_.store(false, std::memory_order_release); auto& vec = old_commit_map_[snapshot_seq]; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 6e73496c2..63b66a753 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -231,9 +231,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CommitMapTest_Test; + friend class + WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; friend class WritePreparedTransactionTestBase; friend class PreparedHeap_BasicsTest_Test; + friend class PreparedHeap_EmptyAtTheEnd_Test; + friend class PreparedHeap_Concurrent_Test; friend class WritePreparedTxnDBMock; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; @@ -250,17 +254,34 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { heap_; std::priority_queue, std::greater> erased_heap_; + // True when testing crash recovery + bool TEST_CRASH_ = false; + friend class WritePreparedTxnDB; public: + ~PreparedHeap() { + if (!TEST_CRASH_) { + assert(heap_.empty()); + assert(erased_heap_.empty()); + } + } bool empty() { return heap_.empty(); } uint64_t top() { return heap_.top(); } void push(uint64_t v) { heap_.push(v); } void pop() { heap_.pop(); while (!heap_.empty() && !erased_heap_.empty() && - heap_.top() == erased_heap_.top()) { - heap_.pop(); + // heap_.top() > erased_heap_.top() could happen if we have erased + // a non-existent entry. Ideally the user should not do that but we + // should be resiliant againt it. + heap_.top() >= erased_heap_.top()) { + if (heap_.top() == erased_heap_.top()) { + heap_.pop(); + } + auto erased __attribute__((__unused__)) = erased_heap_.top(); erased_heap_.pop(); + // No duplicate prepare sequence numbers + assert(erased_heap_.empty() || erased_heap_.top() != erased); } while (heap_.empty() && !erased_heap_.empty()) { erased_heap_.pop(); @@ -272,6 +293,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Already popped, ignore it. } else if (heap_.top() == seq) { pop(); + assert(heap_.empty() || heap_.top() != seq); } else { // (heap_.top() > seq) // Down the heap, remember to pop it later erased_heap_.push(seq); @@ -280,6 +302,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { } }; + void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } + // Get the commit entry with index indexed_seq from the commit table. It // returns true if such entry exists. bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, @@ -305,7 +329,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // concurrently. The concurrent invocations of this function is equivalent to // a serial invocation in which the last invocation is the one with the // largetst new_max value. - void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); + void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, + const SequenceNumber& new_max); virtual const std::vector GetSnapshotListFromDB( SequenceNumber max); @@ -362,7 +387,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - // 10m entry, 80MB size + // 2m entry, 16MB size static const size_t DEF_COMMIT_CACHE_BITS = static_cast(21); const size_t COMMIT_CACHE_BITS; const size_t COMMIT_CACHE_SIZE;