Fix DelayWrite() calls for two_write_queues (#11130)

Summary:
PR https://github.com/facebook/rocksdb/issues/11020 fixed a case where it was easy to deadlock the DB with LockWAL() but introduced a bug showing up as a rare assertion failure in the stress test. Specifically, `assert(w->state == STATE_INIT)` in `WriteThread::LinkOne()` called from `BeginWriteStall()`, `DelayWrite()`, `WriteImplWALOnly()`. I haven't been about to generate a unit test that reproduces this failure but I believe the root cause is that DelayWrite() was never meant to be re-entrant, only called from the DB's write_thread_ leader. https://github.com/facebook/rocksdb/issues/11020 introduced a call to DelayWrite() from the nonmem_write_thread_ group leader.

This fix is to make DelayWrite() apply to the specific write queue that it is being called from (inject a dummy write stall entry to the head of the appropriate write queue). WriteController is re-entrant, based on polling and state changes signalled with bg_cv_, so can manage stalling two queues. The only anticipated complication (called out by Andrew in previous PR) is that we don't want timed write delays being injected in parallel for the two queues, because that dimishes the intended throttling effect. Thus, we only allow timed delays for the primary write queue.

HISTORY not updated because this is intended for the same release where the bug was introduced.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11130

Test Plan:
Although I was not able to reproduce the assertion failure, I was able to reproduce a distinct flaw with what I believe is the same root cause: a kind of deadlock if both write queues need to wake up from stopped writes. Only one will be waiting on bg_cv_ (the other waiting in `LinkOne()` for the write queue to open up), so a single SignalAll() will only unblock one of the queues, with the other re-instating the stop until another signal on bg_cv_. A simple unit test is added for this case.

Will also run crash_test_with_multiops_wc_txn for a while looking for issues.

Reviewed By: ajkr

Differential Revision: D42749330

Pulled By: pdillinger

fbshipit-source-id: 4317dd899a93d57c26fd5af7143038f82d4d4d1b
oxigraph-8.1.1
Peter Dillinger 2 years ago committed by Facebook GitHub Bot
parent 9afa0f05ad
commit 546e213c4f
  1. 3
      db/db_impl/db_impl.h
  2. 33
      db/db_impl/db_impl_write.cc
  3. 52
      utilities/transactions/transaction_test.cc

@ -1872,7 +1872,8 @@ class DBImpl : public DB {
// num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
const WriteOptions& write_options);
// Begin stalling of writes when memory usage increases beyond a certain
// threshold.

@ -926,7 +926,8 @@ Status DBImpl::WriteImplWALOnly(
}
} else {
InstrumentedMutexLock lock(&mutex_);
Status status = DelayWrite(/*num_bytes=*/0ull, write_options);
Status status =
DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options);
if (!status.ok()) {
WriteThread::WriteGroup write_group;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
@ -1201,7 +1202,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
InstrumentedMutexLock l(&mutex_);
status = DelayWrite(last_batch_group_size_, write_options);
status = DelayWrite(last_batch_group_size_, write_thread_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}
@ -1769,8 +1770,8 @@ uint64_t DBImpl::GetMaxTotalWalSize() const {
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t num_bytes,
// REQUIRES: this thread is currently at the leader for write_thread
Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
const WriteOptions& write_options) {
mutex_.AssertHeld();
uint64_t time_delayed = 0;
@ -1778,8 +1779,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
uint64_t delay =
write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
// To avoid parallel timed delays (bad throttling), only support them
// on the primary write queue.
uint64_t delay;
if (&write_thread == &write_thread_) {
delay =
write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
} else {
assert(num_bytes == 0);
delay = 0;
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Start");
if (delay > 0) {
if (write_options.no_slowdown) {
@ -1787,9 +1796,9 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
// Notify write_thread_ about the stall so it can setup a barrier and
// Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
write_thread_.BeginWriteStall();
write_thread.BeginWriteStall();
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
// We will delay the write until we have slept for `delay` microseconds
@ -1809,7 +1818,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
write_thread_.EndWriteStall();
write_thread.EndWriteStall();
}
// Don't wait if there's a background error, even if its a soft error. We
@ -1823,12 +1832,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
}
delayed = true;
// Notify write_thread_ about the stall so it can setup a barrier and
// Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
write_thread_.BeginWriteStall();
write_thread.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
bg_cv_.Wait();
write_thread_.EndWriteStall();
write_thread.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);

@ -6589,6 +6589,58 @@ TEST_P(TransactionTest, LockWal) {
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(TransactionTest, StallTwoWriteQueues) {
// There was a two_write_queues bug in which both write thread leaders (for
// each queue) would attempt to own the stopping of writes in the primary
// write queue. This nearly worked but could lead to some broken assertions
// and a kind of deadlock in the test below. (Would resume if someone
// eventually signalled bg_cv_ again.)
if (!options.two_write_queues) {
ROCKSDB_GTEST_BYPASS("Test only needed with two_write_queues");
return;
}
// Stop writes
ASSERT_OK(db->LockWAL());
WriteOptions wopts;
wopts.sync = true;
wopts.disableWAL = false;
// Create one write thread that blocks in the primary write queue and one
// that blocks in the nonmem queue.
bool t1_completed = false;
bool t2_completed = false;
port::Thread t1{[&]() {
ASSERT_OK(db->Put(wopts, "x", "y"));
t1_completed = true;
}};
port::Thread t2{[&]() {
std::unique_ptr<Transaction> txn0{db->BeginTransaction(wopts, {})};
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Prepare()); // nonmem
ASSERT_OK(txn0->Commit());
t2_completed = true;
}};
// Sleep long enough to that above threads can usually reach a waiting point,
// to usually reveal deadlock if the bug is present.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Ensure proper test setup
ASSERT_FALSE(t1_completed);
ASSERT_FALSE(t2_completed);
// Resume writes
ASSERT_OK(db->UnlockWAL());
// Wait for writes to finish
t1.join();
t2.join();
// Ensure proper test setup
ASSERT_TRUE(t1_completed);
ASSERT_TRUE(t2_completed);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

Loading…
Cancel
Save