From 390cc0b1560decc80ef92306721b549607eaa6e0 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Fri, 3 Feb 2023 12:08:37 -0800 Subject: [PATCH] Ensure LockWAL() stall cleared for UnlockWAL() return (#11172) Summary: Fixes https://github.com/facebook/rocksdb/issues/11160 By counting the number of stalls placed on a write queue, we can check in UnlockWAL() whether the stall present at the start of UnlockWAL() has been cleared by the end, or wait until it's cleared. More details in code comments and new unit test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11172 Test Plan: unit test added. Yes, it uses sleep to amplify failure on buggy behavior if present, but using a sync point to only allow new behavior would fail with the old code only because it doesn't contain the new sync point. Basically, using a sync point in UnlockWAL() could easily mask a regression by artificially limiting key behaviors. The test would only check that UnlockWAL() invokes code that *should* do the right thing, without checking that it *does* the right thing. Reviewed By: ajkr Differential Revision: D42894341 Pulled By: pdillinger fbshipit-source-id: 15c9da0ca383e6aec845b29f5447d76cecbf46c3 --- db/db_impl/db_impl.cc | 26 ++++++ db/db_impl/db_impl.h | 2 + db/db_impl/db_impl_debug.cc | 2 + db/db_impl/db_impl_write.cc | 7 +- db/write_thread.cc | 26 ++++++ db/write_thread.h | 24 +++++ utilities/transactions/transaction_test.cc | 102 +++++++++++++++++++++ 7 files changed, 188 insertions(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0b47acd2a..562f27e0b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1539,6 +1539,14 @@ Status DBImpl::LockWAL() { assert(lock_wal_write_token_); ++lock_wal_count_; } else { + // NOTE: this will "unnecessarily" wait for other non-LockWAL() write + // stalls to clear before LockWAL returns, however fixing that would + // not be simple because if we notice the primary queue is already + // stalled, that stall might clear while we release DB mutex in + // EnterUnbatched() for the nonmem queue. And if we work around that in + // the naive way, we could deadlock by locking the two queues in different + // orders. + WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); WriteThread::Writer nonmem_w; @@ -1571,6 +1579,8 @@ Status DBImpl::LockWAL() { Status DBImpl::UnlockWAL() { bool signal = false; + uint64_t maybe_stall_begun_count = 0; + uint64_t nonmem_maybe_stall_begun_count = 0; { InstrumentedMutexLock lock(&mutex_); if (lock_wal_count_ == 0) { @@ -1580,12 +1590,28 @@ Status DBImpl::UnlockWAL() { if (lock_wal_count_ == 0) { lock_wal_write_token_.reset(); signal = true; + // For the last UnlockWAL, we don't want to return from UnlockWAL() + // until the thread(s) that called BeginWriteStall() have had a chance to + // call EndWriteStall(), so that no_slowdown writes after UnlockWAL() are + // guaranteed to succeed if there's no other source of stall. + maybe_stall_begun_count = write_thread_.GetBegunCountOfOutstandingStall(); + if (two_write_queues_) { + nonmem_maybe_stall_begun_count = + nonmem_write_thread_.GetBegunCountOfOutstandingStall(); + } } } if (signal) { // SignalAll outside of mutex for efficiency bg_cv_.SignalAll(); } + // Ensure stalls have cleared + if (maybe_stall_begun_count) { + write_thread_.WaitForStallEndedCount(maybe_stall_begun_count); + } + if (nonmem_maybe_stall_begun_count) { + nonmem_write_thread_.WaitForStallEndedCount(nonmem_maybe_stall_begun_count); + } return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6a930b2d8..75e048e7f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1117,6 +1117,8 @@ class DBImpl : public DB { void TEST_UnlockMutex(); + void TEST_SignalAllBgCv(); + // REQUIRES: mutex locked void* TEST_BeginWrite(); diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 9b6685f62..e59a4b981 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -198,6 +198,8 @@ void DBImpl::TEST_LockMutex() { mutex_.Lock(); } void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); } +void DBImpl::TEST_SignalAllBgCv() { bg_cv_.SignalAll(); } + void* DBImpl::TEST_BeginWrite() { auto w = new WriteThread::Writer(); write_thread_.EnterUnbatched(w, &mutex_); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 15829ccd8..0891e5012 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1833,8 +1833,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown write_thread.BeginWriteStall(); - TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + if (&write_thread == &write_thread_) { + TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + } else { + TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait"); + } bg_cv_.Wait(); + TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_); write_thread.EndWriteStall(); } } diff --git a/db/write_thread.cc b/db/write_thread.cc index de1744cf0..798700775 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -228,6 +228,7 @@ bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { + assert(writers != w); // If write stall in effect, and w->no_slowdown is not true, // block here until stall is cleared. If its true, then return // immediately @@ -325,6 +326,7 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { } void WriteThread::BeginWriteStall() { + ++stall_begun_count_; LinkOne(&write_stall_dummy_, &newest_writer_); // Walk writer list until w->write_group != nullptr. The current write group @@ -367,10 +369,34 @@ void WriteThread::EndWriteStall() { } newest_writer_.exchange(write_stall_dummy_.link_older); + ++stall_ended_count_; + // Wake up writers stall_cv_.SignalAll(); } +uint64_t WriteThread::GetBegunCountOfOutstandingStall() { + if (stall_begun_count_ > stall_ended_count_) { + // Oustanding stall in queue + assert(newest_writer_.load(std::memory_order_relaxed) == + &write_stall_dummy_); + return stall_begun_count_; + } else { + // No stall in queue + assert(newest_writer_.load(std::memory_order_relaxed) != + &write_stall_dummy_); + return 0; + } +} + +void WriteThread::WaitForStallEndedCount(uint64_t stall_count) { + MutexLock lock(&stall_mu_); + + while (stall_ended_count_ < stall_count) { + stall_cv_.Wait(); + } +} + static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); diff --git a/db/write_thread.h b/db/write_thread.h index 0ea51d922..6e5805e37 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -357,11 +357,23 @@ class WriteThread { // Insert a dummy writer at the tail of the write queue to indicate a write // stall, and fail any writers in the queue with no_slowdown set to true + // REQUIRES: db mutex held, no other stall on this queue outstanding void BeginWriteStall(); // Remove the dummy writer and wake up waiting writers + // REQUIRES: db mutex held void EndWriteStall(); + // Number of BeginWriteStall(), or 0 if there is no active stall in the + // write queue. + // REQUIRES: db mutex held + uint64_t GetBegunCountOfOutstandingStall(); + + // Wait for number of completed EndWriteStall() to reach >= `stall_count`, + // which will generally have come from GetBegunCountOfOutstandingStall(). + // (Does not require db mutex held) + void WaitForStallEndedCount(uint64_t stall_count); + private: // See AwaitState. const uint64_t max_yield_usec_; @@ -401,6 +413,18 @@ class WriteThread { port::Mutex stall_mu_; port::CondVar stall_cv_; + // Count the number of stalls begun, so that we can check whether + // a particular stall has cleared (even if caught in another stall). + // Controlled by DB mutex. + // Because of the contract on BeginWriteStall() / EndWriteStall(), + // stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1. + uint64_t stall_begun_count_ = 0; + // Count the number of stalls ended, so that we can check whether + // a particular stall has cleared (even if caught in another stall). + // Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_. + // Read with stall_mu or DB mutex. + uint64_t stall_ended_count_ = 0; + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 695c7c4c5..2c3b76f7f 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6640,6 +6640,108 @@ TEST_P(TransactionTest, StallTwoWriteQueues) { ASSERT_TRUE(t2_completed); } +// Make sure UnlockWAL does not return until the stall it controls is cleared. +TEST_P(TransactionTest, UnlockWALStallCleared) { + auto dbimpl = static_cast_with_check(db->GetRootDB()); + for (bool external_stall : {false, true}) { + WriteOptions wopts; + wopts.sync = true; + wopts.disableWAL = false; + + ASSERT_OK(db->Put(wopts, "k1", "val1")); + + // Stall writes + ASSERT_OK(db->LockWAL()); + + std::unique_ptr token; + if (external_stall) { + // Also make sure UnlockWAL can return despite another stall being in + // effect. + token = dbimpl->TEST_write_controler().GetStopToken(); + } + + SyncPoint::GetInstance()->DisableProcessing(); + std::vector sync_deps; + sync_deps.push_back( + {"DBImpl::DelayWrite:Wait", + "TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1"}); + if (options.two_write_queues && + txn_db_options.write_policy == WRITE_COMMITTED) { + sync_deps.push_back( + {"DBImpl::DelayWrite:NonmemWait", + "TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2"}); + } + SyncPoint::GetInstance()->LoadDependency(sync_deps); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:AfterWait", [](void* arg) { + auto& mu = *static_cast(arg); + mu.AssertHeld(); + // Pretend we are slow waking up from bg_cv_, to give a chance for the + // bug to occur if it can. Randomly prefer one queue over the other. + mu.Unlock(); + if (Random::GetTLSInstance()->OneIn(2)) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + std::this_thread::yield(); + } + mu.Lock(); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Create blocking writes (for both queues) in background and use + // sync point dependency to get the stall into the write queue(s) + std::atomic t1_completed{false}; + port::Thread t1{[&]() { + ASSERT_OK(db->Put(wopts, "k2", "val2")); + t1_completed = true; + }}; + + std::atomic t2_completed{false}; + port::Thread t2{[&]() { + std::unique_ptr txn0{db->BeginTransaction(wopts, {})}; + ASSERT_OK(txn0->SetName("x1")); + ASSERT_OK(txn0->Put("k3", "val3")); + ASSERT_OK(txn0->Prepare()); // nonmem + ASSERT_OK(txn0->Commit()); + }}; + + // Be sure the test is set up appropriately + TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1"); + TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2"); + ASSERT_FALSE(t1_completed.load()); + ASSERT_FALSE(t2_completed.load()); + + // Clear the stall + ASSERT_OK(db->UnlockWAL()); + + WriteOptions wopts2 = wopts; + if (external_stall) { + // We did not deadlock in UnlockWAL, so now async clear the external + // stall and then do a blocking write. + // DB mutex acquire+release is needed to ensure we don't reset token and + // signal while DelayWrite() is between IsStopped() and + // BeginWriteStall(). + token.reset(); + dbimpl->TEST_LockMutex(); + dbimpl->TEST_UnlockMutex(); + dbimpl->TEST_SignalAllBgCv(); + } else { + // To verify the LockWAL stall is guaranteed cleared, do a non-blocking + // write that is attempting to catch a bug by attempting to come before + // the thread that did BeginWriteStall() can do EndWriteStall() + wopts2.no_slowdown = true; + } + std::unique_ptr txn0{db->BeginTransaction(wopts2, {})}; + ASSERT_OK(txn0->SetName("x2")); + ASSERT_OK(txn0->Put("k1", "val4")); + ASSERT_OK(txn0->Prepare()); // nonmem + ASSERT_OK(txn0->Commit()); + + t1.join(); + t2.join(); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {