Ensure LockWAL() stall cleared for UnlockWAL() return (#11172)

Summary:
Fixes https://github.com/facebook/rocksdb/issues/11160

By counting the number of stalls placed on a write queue, we can check in UnlockWAL() whether the stall present at the start of UnlockWAL() has been cleared by the end, or wait until it's cleared.

More details in code comments and new unit test.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11172

Test Plan: unit test added. Yes, it uses sleep to amplify failure on buggy behavior if present, but using a sync point to only allow new behavior would fail with the old code only because it doesn't contain the new sync point. Basically, using a sync point in UnlockWAL() could easily mask a regression by artificially limiting key behaviors. The test would only check that UnlockWAL() invokes code that *should* do the right thing, without checking that it *does* the right thing.

Reviewed By: ajkr

Differential Revision: D42894341

Pulled By: pdillinger

fbshipit-source-id: 15c9da0ca383e6aec845b29f5447d76cecbf46c3
oxigraph-8.1.1
Peter Dillinger 2 years ago committed by Facebook GitHub Bot
parent 63da9cfa26
commit 390cc0b156
  1. 26
      db/db_impl/db_impl.cc
  2. 2
      db/db_impl/db_impl.h
  3. 2
      db/db_impl/db_impl_debug.cc
  4. 5
      db/db_impl/db_impl_write.cc
  5. 26
      db/write_thread.cc
  6. 24
      db/write_thread.h
  7. 102
      utilities/transactions/transaction_test.cc

@ -1539,6 +1539,14 @@ Status DBImpl::LockWAL() {
assert(lock_wal_write_token_); assert(lock_wal_write_token_);
++lock_wal_count_; ++lock_wal_count_;
} else { } else {
// NOTE: this will "unnecessarily" wait for other non-LockWAL() write
// stalls to clear before LockWAL returns, however fixing that would
// not be simple because if we notice the primary queue is already
// stalled, that stall might clear while we release DB mutex in
// EnterUnbatched() for the nonmem queue. And if we work around that in
// the naive way, we could deadlock by locking the two queues in different
// orders.
WriteThread::Writer w; WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w; WriteThread::Writer nonmem_w;
@ -1571,6 +1579,8 @@ Status DBImpl::LockWAL() {
Status DBImpl::UnlockWAL() { Status DBImpl::UnlockWAL() {
bool signal = false; bool signal = false;
uint64_t maybe_stall_begun_count = 0;
uint64_t nonmem_maybe_stall_begun_count = 0;
{ {
InstrumentedMutexLock lock(&mutex_); InstrumentedMutexLock lock(&mutex_);
if (lock_wal_count_ == 0) { if (lock_wal_count_ == 0) {
@ -1580,12 +1590,28 @@ Status DBImpl::UnlockWAL() {
if (lock_wal_count_ == 0) { if (lock_wal_count_ == 0) {
lock_wal_write_token_.reset(); lock_wal_write_token_.reset();
signal = true; signal = true;
// For the last UnlockWAL, we don't want to return from UnlockWAL()
// until the thread(s) that called BeginWriteStall() have had a chance to
// call EndWriteStall(), so that no_slowdown writes after UnlockWAL() are
// guaranteed to succeed if there's no other source of stall.
maybe_stall_begun_count = write_thread_.GetBegunCountOfOutstandingStall();
if (two_write_queues_) {
nonmem_maybe_stall_begun_count =
nonmem_write_thread_.GetBegunCountOfOutstandingStall();
}
} }
} }
if (signal) { if (signal) {
// SignalAll outside of mutex for efficiency // SignalAll outside of mutex for efficiency
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
// Ensure stalls have cleared
if (maybe_stall_begun_count) {
write_thread_.WaitForStallEndedCount(maybe_stall_begun_count);
}
if (nonmem_maybe_stall_begun_count) {
nonmem_write_thread_.WaitForStallEndedCount(nonmem_maybe_stall_begun_count);
}
return Status::OK(); return Status::OK();
} }

@ -1117,6 +1117,8 @@ class DBImpl : public DB {
void TEST_UnlockMutex(); void TEST_UnlockMutex();
void TEST_SignalAllBgCv();
// REQUIRES: mutex locked // REQUIRES: mutex locked
void* TEST_BeginWrite(); void* TEST_BeginWrite();

@ -198,6 +198,8 @@ void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); } void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
void DBImpl::TEST_SignalAllBgCv() { bg_cv_.SignalAll(); }
void* DBImpl::TEST_BeginWrite() { void* DBImpl::TEST_BeginWrite() {
auto w = new WriteThread::Writer(); auto w = new WriteThread::Writer();
write_thread_.EnterUnbatched(w, &mutex_); write_thread_.EnterUnbatched(w, &mutex_);

@ -1833,8 +1833,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
// Notify write_thread about the stall so it can setup a barrier and // Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown // fail any pending writers with no_slowdown
write_thread.BeginWriteStall(); write_thread.BeginWriteStall();
if (&write_thread == &write_thread_) {
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
} else {
TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait");
}
bg_cv_.Wait(); bg_cv_.Wait();
TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_);
write_thread.EndWriteStall(); write_thread.EndWriteStall();
} }
} }

@ -228,6 +228,7 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(w->state == STATE_INIT); assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed); Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) { while (true) {
assert(writers != w);
// If write stall in effect, and w->no_slowdown is not true, // If write stall in effect, and w->no_slowdown is not true,
// block here until stall is cleared. If its true, then return // block here until stall is cleared. If its true, then return
// immediately // immediately
@ -325,6 +326,7 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
} }
void WriteThread::BeginWriteStall() { void WriteThread::BeginWriteStall() {
++stall_begun_count_;
LinkOne(&write_stall_dummy_, &newest_writer_); LinkOne(&write_stall_dummy_, &newest_writer_);
// Walk writer list until w->write_group != nullptr. The current write group // Walk writer list until w->write_group != nullptr. The current write group
@ -367,10 +369,34 @@ void WriteThread::EndWriteStall() {
} }
newest_writer_.exchange(write_stall_dummy_.link_older); newest_writer_.exchange(write_stall_dummy_.link_older);
++stall_ended_count_;
// Wake up writers // Wake up writers
stall_cv_.SignalAll(); stall_cv_.SignalAll();
} }
uint64_t WriteThread::GetBegunCountOfOutstandingStall() {
if (stall_begun_count_ > stall_ended_count_) {
// Oustanding stall in queue
assert(newest_writer_.load(std::memory_order_relaxed) ==
&write_stall_dummy_);
return stall_begun_count_;
} else {
// No stall in queue
assert(newest_writer_.load(std::memory_order_relaxed) !=
&write_stall_dummy_);
return 0;
}
}
void WriteThread::WaitForStallEndedCount(uint64_t stall_count) {
MutexLock lock(&stall_mu_);
while (stall_ended_count_ < stall_count) {
stall_cv_.Wait();
}
}
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
void WriteThread::JoinBatchGroup(Writer* w) { void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);

@ -357,11 +357,23 @@ class WriteThread {
// Insert a dummy writer at the tail of the write queue to indicate a write // Insert a dummy writer at the tail of the write queue to indicate a write
// stall, and fail any writers in the queue with no_slowdown set to true // stall, and fail any writers in the queue with no_slowdown set to true
// REQUIRES: db mutex held, no other stall on this queue outstanding
void BeginWriteStall(); void BeginWriteStall();
// Remove the dummy writer and wake up waiting writers // Remove the dummy writer and wake up waiting writers
// REQUIRES: db mutex held
void EndWriteStall(); void EndWriteStall();
// Number of BeginWriteStall(), or 0 if there is no active stall in the
// write queue.
// REQUIRES: db mutex held
uint64_t GetBegunCountOfOutstandingStall();
// Wait for number of completed EndWriteStall() to reach >= `stall_count`,
// which will generally have come from GetBegunCountOfOutstandingStall().
// (Does not require db mutex held)
void WaitForStallEndedCount(uint64_t stall_count);
private: private:
// See AwaitState. // See AwaitState.
const uint64_t max_yield_usec_; const uint64_t max_yield_usec_;
@ -401,6 +413,18 @@ class WriteThread {
port::Mutex stall_mu_; port::Mutex stall_mu_;
port::CondVar stall_cv_; port::CondVar stall_cv_;
// Count the number of stalls begun, so that we can check whether
// a particular stall has cleared (even if caught in another stall).
// Controlled by DB mutex.
// Because of the contract on BeginWriteStall() / EndWriteStall(),
// stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1.
uint64_t stall_begun_count_ = 0;
// Count the number of stalls ended, so that we can check whether
// a particular stall has cleared (even if caught in another stall).
// Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_.
// Read with stall_mu or DB mutex.
uint64_t stall_ended_count_ = 0;
// Waits for w->state & goal_mask using w->StateMutex(). Returns // Waits for w->state & goal_mask using w->StateMutex(). Returns
// the state that satisfies goal_mask. // the state that satisfies goal_mask.
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);

@ -6640,6 +6640,108 @@ TEST_P(TransactionTest, StallTwoWriteQueues) {
ASSERT_TRUE(t2_completed); ASSERT_TRUE(t2_completed);
} }
// Make sure UnlockWAL does not return until the stall it controls is cleared.
TEST_P(TransactionTest, UnlockWALStallCleared) {
auto dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
for (bool external_stall : {false, true}) {
WriteOptions wopts;
wopts.sync = true;
wopts.disableWAL = false;
ASSERT_OK(db->Put(wopts, "k1", "val1"));
// Stall writes
ASSERT_OK(db->LockWAL());
std::unique_ptr<WriteControllerToken> token;
if (external_stall) {
// Also make sure UnlockWAL can return despite another stall being in
// effect.
token = dbimpl->TEST_write_controler().GetStopToken();
}
SyncPoint::GetInstance()->DisableProcessing();
std::vector<SyncPoint::SyncPointPair> sync_deps;
sync_deps.push_back(
{"DBImpl::DelayWrite:Wait",
"TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1"});
if (options.two_write_queues &&
txn_db_options.write_policy == WRITE_COMMITTED) {
sync_deps.push_back(
{"DBImpl::DelayWrite:NonmemWait",
"TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2"});
}
SyncPoint::GetInstance()->LoadDependency(sync_deps);
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:AfterWait", [](void* arg) {
auto& mu = *static_cast<CacheAlignedInstrumentedMutex*>(arg);
mu.AssertHeld();
// Pretend we are slow waking up from bg_cv_, to give a chance for the
// bug to occur if it can. Randomly prefer one queue over the other.
mu.Unlock();
if (Random::GetTLSInstance()->OneIn(2)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
std::this_thread::yield();
}
mu.Lock();
});
SyncPoint::GetInstance()->EnableProcessing();
// Create blocking writes (for both queues) in background and use
// sync point dependency to get the stall into the write queue(s)
std::atomic<bool> t1_completed{false};
port::Thread t1{[&]() {
ASSERT_OK(db->Put(wopts, "k2", "val2"));
t1_completed = true;
}};
std::atomic<bool> t2_completed{false};
port::Thread t2{[&]() {
std::unique_ptr<Transaction> txn0{db->BeginTransaction(wopts, {})};
ASSERT_OK(txn0->SetName("x1"));
ASSERT_OK(txn0->Put("k3", "val3"));
ASSERT_OK(txn0->Prepare()); // nonmem
ASSERT_OK(txn0->Commit());
}};
// Be sure the test is set up appropriately
TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL1");
TEST_SYNC_POINT("TransactionTest::UnlockWALStallCleared:BeforeUnlockWAL2");
ASSERT_FALSE(t1_completed.load());
ASSERT_FALSE(t2_completed.load());
// Clear the stall
ASSERT_OK(db->UnlockWAL());
WriteOptions wopts2 = wopts;
if (external_stall) {
// We did not deadlock in UnlockWAL, so now async clear the external
// stall and then do a blocking write.
// DB mutex acquire+release is needed to ensure we don't reset token and
// signal while DelayWrite() is between IsStopped() and
// BeginWriteStall().
token.reset();
dbimpl->TEST_LockMutex();
dbimpl->TEST_UnlockMutex();
dbimpl->TEST_SignalAllBgCv();
} else {
// To verify the LockWAL stall is guaranteed cleared, do a non-blocking
// write that is attempting to catch a bug by attempting to come before
// the thread that did BeginWriteStall() can do EndWriteStall()
wopts2.no_slowdown = true;
}
std::unique_ptr<Transaction> txn0{db->BeginTransaction(wopts2, {})};
ASSERT_OK(txn0->SetName("x2"));
ASSERT_OK(txn0->Put("k1", "val4"));
ASSERT_OK(txn0->Prepare()); // nonmem
ASSERT_OK(txn0->Commit());
t1.join();
t2.join();
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save