diff --git a/HISTORY.md b/HISTORY.md index c82d0dfaa..020ada336 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,7 @@ * Delcare kHashSearch index type feature-incompatible with index_block_restart_interval larger than 1. * Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev(). * Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface. +* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset. ### New Features * It is now possible to enable periodic compactions for the base DB when using BlobDB. diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 8931f1752..74a8c89fa 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -40,6 +40,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) { ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); } +TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) { + Options options = GetOptions(); + options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4; + std::vector threads; + std::atomic thread_num(0); + port::Mutex mutex; + port::CondVar cv(&mutex); + + Reopen(options); + + std::function write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + dbfull()->Put(wo, key, "bar"); + }; + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + dbfull()->Put(wo, key, "bar"); + }; + std::function unblock_main_thread_func = [&](void *) { + mutex.Lock(); + cv.SignalAll(); + mutex.Unlock(); + }; + + // Create 3 L0 files and schedule 4th without waiting + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteTest::WriteThreadHangOnWriteStall:1", + "DBImpl::BackgroundCallFlush:start"}, + {"DBWriteTest::WriteThreadHangOnWriteStall:2", + "DBImpl::WriteImpl:BeforeLeaderEnters"}, + // Make compaction start wait for the write stall to be detected and + // implemented by a write group leader + {"DBWriteTest::WriteThreadHangOnWriteStall:3", + "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Schedule creation of 4th L0 file without waiting. This will seal the + // memtable and then wait for a sync point before writing the file. We need + // to do it this way because SwitchMemtable() needs to enter the + // write_thread + FlushOptions fopt; + fopt.wait = false; + dbfull()->Flush(fopt); + + // Create a mix of slowdown/no_slowdown write threads + mutex.Lock(); + // First leader + threads.emplace_back(write_slowdown_func); + cv.Wait(); + // Second leader. Will stall writes + threads.emplace_back(write_slowdown_func); + cv.Wait(); + threads.emplace_back(write_no_slowdown_func); + cv.Wait(); + threads.emplace_back(write_slowdown_func); + cv.Wait(); + threads.emplace_back(write_no_slowdown_func); + cv.Wait(); + threads.emplace_back(write_slowdown_func); + cv.Wait(); + mutex.Unlock(); + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1"); + dbfull()->TEST_WaitForFlushMemTable(nullptr); + // This would have triggered a write stall. Unblock the write group leader + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2"); + // The leader is going to create missing newer links. When the leader finishes, + // the next leader is going to delay writes and fail writers with no_slowdown + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3"); + for (auto& t : threads) { + t.join(); + } +} + TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { constexpr int kNumThreads = 5; std::unique_ptr mock_env( diff --git a/db/write_thread.cc b/db/write_thread.cc index 46859e814..01342d124 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -344,6 +344,9 @@ void WriteThread::BeginWriteStall() { prev->link_older = w->link_older; w->status = Status::Incomplete("Write stall"); SetState(w, STATE_COMPLETED); + if (prev->link_older) { + prev->link_older->link_newer = prev; + } w = prev->link_older; } else { prev = w; @@ -355,7 +358,11 @@ void WriteThread::BeginWriteStall() { void WriteThread::EndWriteStall() { MutexLock lock(&stall_mu_); + // Unlink write_stall_dummy_ from the write queue. This will unblock + // pending write threads to enqueue themselves assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); + assert(write_stall_dummy_.link_older != nullptr); + write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; newest_writer_.exchange(write_stall_dummy_.link_older); // Wake up writers