diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e56f97ece..1bab7c7a9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1872,7 +1872,8 @@ class DBImpl : public DB { // num_bytes: for slowdown case, delay time is calculated based on // `num_bytes` going through. - Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options); + Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread, + const WriteOptions& write_options); // Begin stalling of writes when memory usage increases beyond a certain // threshold. diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 59c29e424..ee8c98989 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -926,7 +926,8 @@ Status DBImpl::WriteImplWALOnly( } } else { InstrumentedMutexLock lock(&mutex_); - Status status = DelayWrite(/*num_bytes=*/0ull, write_options); + Status status = + DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options); if (!status.ok()) { WriteThread::WriteGroup write_group; write_thread->EnterAsBatchGroupLeader(&w, &write_group); @@ -1201,7 +1202,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. InstrumentedMutexLock l(&mutex_); - status = DelayWrite(last_batch_group_size_, write_options); + status = DelayWrite(last_batch_group_size_, write_thread_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } @@ -1769,8 +1770,8 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { } // REQUIRES: mutex_ is held -// REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::DelayWrite(uint64_t num_bytes, +// REQUIRES: this thread is currently at the leader for write_thread +Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, const WriteOptions& write_options) { mutex_.AssertHeld(); uint64_t time_delayed = 0; @@ -1778,8 +1779,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, { StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL, &time_delayed); - uint64_t delay = - write_controller_.GetDelay(immutable_db_options_.clock, num_bytes); + // To avoid parallel timed delays (bad throttling), only support them + // on the primary write queue. + uint64_t delay; + if (&write_thread == &write_thread_) { + delay = + write_controller_.GetDelay(immutable_db_options_.clock, num_bytes); + } else { + assert(num_bytes == 0); + delay = 0; + } TEST_SYNC_POINT("DBImpl::DelayWrite:Start"); if (delay > 0) { if (write_options.no_slowdown) { @@ -1787,9 +1796,9 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, } TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); - // Notify write_thread_ about the stall so it can setup a barrier and + // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown - write_thread_.BeginWriteStall(); + write_thread.BeginWriteStall(); mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); // We will delay the write until we have slept for `delay` microseconds @@ -1809,7 +1818,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval); } mutex_.Lock(); - write_thread_.EndWriteStall(); + write_thread.EndWriteStall(); } // Don't wait if there's a background error, even if its a soft error. We @@ -1823,12 +1832,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, } delayed = true; - // Notify write_thread_ about the stall so it can setup a barrier and + // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown - write_thread_.BeginWriteStall(); + write_thread.BeginWriteStall(); TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); bg_cv_.Wait(); - write_thread_.EndWriteStall(); + write_thread.EndWriteStall(); } } assert(!delayed || !write_options.no_slowdown); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index e035e1f88..d74a4b8b1 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6589,6 +6589,58 @@ TEST_P(TransactionTest, LockWal) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(TransactionTest, StallTwoWriteQueues) { + // There was a two_write_queues bug in which both write thread leaders (for + // each queue) would attempt to own the stopping of writes in the primary + // write queue. This nearly worked but could lead to some broken assertions + // and a kind of deadlock in the test below. (Would resume if someone + // eventually signalled bg_cv_ again.) + if (!options.two_write_queues) { + ROCKSDB_GTEST_BYPASS("Test only needed with two_write_queues"); + return; + } + + // Stop writes + ASSERT_OK(db->LockWAL()); + + WriteOptions wopts; + wopts.sync = true; + wopts.disableWAL = false; + + // Create one write thread that blocks in the primary write queue and one + // that blocks in the nonmem queue. + bool t1_completed = false; + bool t2_completed = false; + port::Thread t1{[&]() { + ASSERT_OK(db->Put(wopts, "x", "y")); + t1_completed = true; + }}; + port::Thread t2{[&]() { + std::unique_ptr txn0{db->BeginTransaction(wopts, {})}; + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Prepare()); // nonmem + ASSERT_OK(txn0->Commit()); + t2_completed = true; + }}; + + // Sleep long enough to that above threads can usually reach a waiting point, + // to usually reveal deadlock if the bug is present. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // Ensure proper test setup + ASSERT_FALSE(t1_completed); + ASSERT_FALSE(t2_completed); + + // Resume writes + ASSERT_OK(db->UnlockWAL()); + + // Wait for writes to finish + t1.join(); + t2.join(); + // Ensure proper test setup + ASSERT_TRUE(t1_completed); + ASSERT_TRUE(t2_completed); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {