diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 79427f6b0..3015524d3 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -473,6 +473,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable); write_thread_.JoinBatchGroup(&w); + TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); if (w.state == WriteThread::STATE_GROUP_LEADER) { WriteThread::WriteGroup wal_write_group; if (w.callback && !w.callback->AllowWriteBatching()) { diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 095292790..b23896050 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -42,6 +42,125 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) { ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); } +TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) { + Options options = GetOptions(); + options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = + 4; + std::vector threads; + std::atomic thread_num(0); + port::Mutex mutex; + port::CondVar cv(&mutex); + // Guarded by mutex + int writers = 0; + + Reopen(options); + + std::function write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + dbfull()->Put(wo, key, "bar"); + }; + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + dbfull()->Put(wo, key, "bar"); + }; + std::function unblock_main_thread_func = [&](void*) { + mutex.Lock(); + ++writers; + cv.SignalAll(); + mutex.Unlock(); + }; + + // Create 3 L0 files and schedule 4th without waiting + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1", + "DBImpl::BackgroundCallFlush:start"}, + {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2", + "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"}, + // Make compaction start wait for the write stall to be detected and + // implemented by a write group leader + {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Schedule creation of 4th L0 file without waiting. This will seal the + // memtable and then wait for a sync point before writing the file. We need + // to do it this way because SwitchMemtable() needs to enter the + // write_thread + FlushOptions fopt; + fopt.wait = false; + dbfull()->Flush(fopt); + + // Create a mix of slowdown/no_slowdown write threads + mutex.Lock(); + // First leader + threads.emplace_back(write_slowdown_func); + while (writers != 1) { + cv.Wait(); + } + + // Second leader. Will stall writes + // Build a writers list with no slowdown in the middle: + // +-------------+ + // | slowdown +<----+ newest + // +--+----------+ + // | + // v + // +--+----------+ + // | no slowdown | + // +--+----------+ + // | + // v + // +--+----------+ + // | slowdown + + // +-------------+ + threads.emplace_back(write_slowdown_func); + while (writers != 2) { + cv.Wait(); + } + threads.emplace_back(write_no_slowdown_func); + while (writers != 3) { + cv.Wait(); + } + threads.emplace_back(write_slowdown_func); + while (writers != 4) { + cv.Wait(); + } + + mutex.Unlock(); + + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1"); + dbfull()->TEST_WaitForFlushMemTable(nullptr); + // This would have triggered a write stall. Unblock the write group leader + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2"); + // The leader is going to create missing newer links. When the leader + // finishes, the next leader is going to delay writes and fail writers with + // no_slowdown + + TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3"); + for (auto& t : threads) { + t.join(); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) { Options options = GetOptions(); options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4; diff --git a/db/write_thread.cc b/db/write_thread.cc index 343534705..d26a694aa 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -344,7 +344,13 @@ void WriteThread::BeginWriteStall() { prev->link_older = w->link_older; w->status = Status::Incomplete("Write stall"); SetState(w, STATE_COMPLETED); - if (prev->link_older) { + // Only update `link_newer` if it's already set. + // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later, + // which assumes the the first non-nullptr `link_newer` is the last + // nullptr link in the writer list. + // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop + // updating the whole list when it sees the first non nullptr link. + if (prev->link_older && prev->link_older->link_newer) { prev->link_older->link_newer = prev; } w = prev->link_older;