diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0b47acd2a..562f27e0b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1539,6 +1539,14 @@ Status DBImpl::LockWAL() { assert(lock_wal_write_token_); ++lock_wal_count_; } else { + // NOTE: this will "unnecessarily" wait for other non-LockWAL() write + // stalls to clear before LockWAL returns, however fixing that would + // not be simple because if we notice the primary queue is already + // stalled, that stall might clear while we release DB mutex in + // EnterUnbatched() for the nonmem queue. And if we work around that in + // the naive way, we could deadlock by locking the two queues in different + // orders. + WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); WriteThread::Writer nonmem_w; @@ -1571,6 +1579,8 @@ Status DBImpl::LockWAL() { Status DBImpl::UnlockWAL() { bool signal = false; + uint64_t maybe_stall_begun_count = 0; + uint64_t nonmem_maybe_stall_begun_count = 0; { InstrumentedMutexLock lock(&mutex_); if (lock_wal_count_ == 0) { @@ -1580,12 +1590,28 @@ Status DBImpl::UnlockWAL() { if (lock_wal_count_ == 0) { lock_wal_write_token_.reset(); signal = true; + // For the last UnlockWAL, we don't want to return from UnlockWAL() + // until the thread(s) that called BeginWriteStall() have had a chance to + // call EndWriteStall(), so that no_slowdown writes after UnlockWAL() are + // guaranteed to succeed if there's no other source of stall. + maybe_stall_begun_count = write_thread_.GetBegunCountOfOutstandingStall(); + if (two_write_queues_) { + nonmem_maybe_stall_begun_count = + nonmem_write_thread_.GetBegunCountOfOutstandingStall(); + } } } if (signal) { // SignalAll outside of mutex for efficiency bg_cv_.SignalAll(); } + // Ensure stalls have cleared + if (maybe_stall_begun_count) { + write_thread_.WaitForStallEndedCount(maybe_stall_begun_count); + } + if (nonmem_maybe_stall_begun_count) { + nonmem_write_thread_.WaitForStallEndedCount(nonmem_maybe_stall_begun_count); + } return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6a930b2d8..75e048e7f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1117,6 +1117,8 @@ class DBImpl : public DB { void TEST_UnlockMutex(); + void TEST_SignalAllBgCv(); + // REQUIRES: mutex locked void* TEST_BeginWrite(); diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 9b6685f62..e59a4b981 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -198,6 +198,8 @@ void DBImpl::TEST_LockMutex() { mutex_.Lock(); } void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); } +void DBImpl::TEST_SignalAllBgCv() { bg_cv_.SignalAll(); } + void* DBImpl::TEST_BeginWrite() { auto w = new WriteThread::Writer(); write_thread_.EnterUnbatched(w, &mutex_); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 15829ccd8..0891e5012 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1833,8 +1833,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown write_thread.BeginWriteStall(); - TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + if (&write_thread == &write_thread_) { + TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + } else { + TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait"); + } bg_cv_.Wait(); + TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_); write_thread.EndWriteStall(); } } diff --git a/db/write_thread.cc b/db/write_thread.cc index de1744cf0..798700775 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -228,6 +228,7 @@ bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { + assert(writers != w); // If write stall in effect, and w->no_slowdown is not true, // block here until stall is cleared. If its true, then return // immediately @@ -325,6 +326,7 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { } void WriteThread::BeginWriteStall() { + ++stall_begun_count_; LinkOne(&write_stall_dummy_, &newest_writer_); // Walk writer list until w->write_group != nullptr. The current write group @@ -367,10 +369,34 @@ void WriteThread::EndWriteStall() { } newest_writer_.exchange(write_stall_dummy_.link_older); + ++stall_ended_count_; + // Wake up writers stall_cv_.SignalAll(); } +uint64_t WriteThread::GetBegunCountOfOutstandingStall() { + if (stall_begun_count_ > stall_ended_count_) { + // Oustanding stall in queue + assert(newest_writer_.load(std::memory_order_relaxed) == + &write_stall_dummy_); + return stall_begun_count_; + } else { + // No stall in queue + assert(newest_writer_.load(std::memory_order_relaxed) != + &write_stall_dummy_); + return 0; + } +} + +void WriteThread::WaitForStallEndedCount(uint64_t stall_count) { + MutexLock lock(&stall_mu_); + + while (stall_ended_count_ < stall_count) { + stall_cv_.Wait(); + } +} + static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); diff --git a/db/write_thread.h b/db/write_thread.h index 0ea51d922..6e5805e37 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -357,11 +357,23 @@ class WriteThread { // Insert a dummy writer at the tail of the write queue to indicate a write // stall, and fail any writers in the queue with no_slowdown set to true + // REQUIRES: db mutex held, no other stall on this queue outstanding void BeginWriteStall(); // Remove the dummy writer and wake up waiting writers + // REQUIRES: db mutex held void EndWriteStall(); + // Number of BeginWriteStall(), or 0 if there is no active stall in the + // write queue. + // REQUIRES: db mutex held + uint64_t GetBegunCountOfOutstandingStall(); + + // Wait for number of completed EndWriteStall() to reach >= `stall_count`, + // which will generally have come from GetBegunCountOfOutstandingStall(). + // (Does not require db mutex held) + void WaitForStallEndedCount(uint64_t stall_count); + private: // See AwaitState. const uint64_t max_yield_usec_; @@ -401,6 +413,18 @@ class WriteThread { port::Mutex stall_mu_; port::CondVar stall_cv_; + // Count the number of stalls begun, so that we can check whether + // a particular stall has cleared (even if caught in another stall). + // Controlled by DB mutex. + // Because of the contract on BeginWriteStall() / EndWriteStall(), + // stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1. + uint64_t stall_begun_count_ = 0; + // Count the number of stalls ended, so that we can check whether + // a particular stall has cleared (even if caught in another stall). + // Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_. + // Read with stall_mu or DB mutex. + uint64_t stall_ended_count_ = 0; + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 695c7c4c5..2c3b76f7f 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6640,6 +6640,108 @@ TEST_P(TransactionTest, StallTwoWriteQueues) { ASSERT_TRUE(t2_completed); } +// Make sure UnlockWAL does not return until the stall it controls is cleared. +TEST_P(TransactionTest, UnlockWALStallCleared) { + auto dbimpl = static_cast_with_check(db->GetRootDB()); + for (bool external_stall : {false, true}) { + WriteOptions wopts; + wopts.sync = true; + wopts.disableWAL = false; + + ASSERT_OK(db->Put(wopts, "k1", "val1")); + + // Stall writes + ASSERT_OK(db->LockWAL()); + + std::unique_ptr token; + if (external_stall) { + // Also make sure UnlockWAL can return despite another stall being in + // effect. + token = dbimpl->TEST_write_controler().GetStopToken(); + } + + SyncPoint::GetInstance()->DisableProcessing(); + std::vector sync_deps; + sync_deps.push_back( + {"DBImpl::DelayWrite:Wait", + "TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1"}); + if (options.two_write_queues && + txn_db_options.write_policy == WRITE_COMMITTED) { + sync_deps.push_back( + {"DBImpl::DelayWrite:NonmemWait", + "TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2"}); + } + SyncPoint::GetInstance()->LoadDependency(sync_deps); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:AfterWait", [](void* arg) { + auto& mu = *static_cast(arg); + mu.AssertHeld(); + // Pretend we are slow waking up from bg_cv_, to give a chance for the + // bug to occur if it can. Randomly prefer one queue over the other. + mu.Unlock(); + if (Random::GetTLSInstance()->OneIn(2)) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + std::this_thread::yield(); + } + mu.Lock(); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Create blocking writes (for both queues) in background and use + // sync point dependency to get the stall into the write queue(s) + std::atomic t1_completed{false}; + port::Thread t1{[&]() { + ASSERT_OK(db->Put(wopts, "k2", "val2")); + t1_completed = true; + }}; + + std::atomic t2_completed{false}; + port::Thread t2{[&]() { + std::unique_ptr txn0{db->BeginTransaction(wopts, {})}; + ASSERT_OK(txn0->SetName("x1")); + ASSERT_OK(txn0->Put("k3", "val3")); + ASSERT_OK(txn0->Prepare()); // nonmem + ASSERT_OK(txn0->Commit()); + }}; + + // Be sure the test is set up appropriately + TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1"); + TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2"); + ASSERT_FALSE(t1_completed.load()); + ASSERT_FALSE(t2_completed.load()); + + // Clear the stall + ASSERT_OK(db->UnlockWAL()); + + WriteOptions wopts2 = wopts; + if (external_stall) { + // We did not deadlock in UnlockWAL, so now async clear the external + // stall and then do a blocking write. + // DB mutex acquire+release is needed to ensure we don't reset token and + // signal while DelayWrite() is between IsStopped() and + // BeginWriteStall(). + token.reset(); + dbimpl->TEST_LockMutex(); + dbimpl->TEST_UnlockMutex(); + dbimpl->TEST_SignalAllBgCv(); + } else { + // To verify the LockWAL stall is guaranteed cleared, do a non-blocking + // write that is attempting to catch a bug by attempting to come before + // the thread that did BeginWriteStall() can do EndWriteStall() + wopts2.no_slowdown = true; + } + std::unique_ptr txn0{db->BeginTransaction(wopts2, {})}; + ASSERT_OK(txn0->SetName("x2")); + ASSERT_OK(txn0->Put("k1", "val4")); + ASSERT_OK(txn0->Prepare()); // nonmem + ASSERT_OK(txn0->Commit()); + + t1.join(); + t2.join(); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {