From f07c56928f998649cfd110854a7898dad532e1d3 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Sat, 29 Jan 2022 10:44:22 -0800 Subject: [PATCH] Set the number of threads up front in db_stress (#9466) Summary: With the code on main, `RunStressTest` increments the number of threads one by one as the threads are created and started. This results in a data race with `NonBatchedOpsStressTest::VerifyDb`, which reads this value without synchronization, and is also not correct in the sense that `VerifyDb` assumes that the number of threads already has its final value set (e.g. it's checking whether the current thread is the last one). The patch fixes this by setting the number of threads before creating/starting any threads. This also eliminates the need for locking the mutex during thread startup. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9466 Test Plan: Ran the blackbox crash test under TSAN for a while. Reviewed By: ajkr Differential Revision: D33858856 Pulled By: ltamasi fbshipit-source-id: 8a6515a83fd1808b8b8dca61978777c4404f04cc --- db_stress_tool/db_stress_driver.cc | 41 ++++++++++++++----------- db_stress_tool/db_stress_shared_state.h | 2 +- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 7ab9c5a9a..6bf46861f 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -73,26 +73,31 @@ bool RunStressTest(StressTest* stress) { fprintf(stdout, "%s Initializing worker threads\n", clock->TimeToString(now / 1000000).c_str()); - ThreadState bg_thread(0, &shared); - ThreadState continuous_verification_thread(0, &shared); + shared.SetThreads(n); + + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + shared.IncBgThreads(); + } + + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + shared.IncBgThreads(); + } + std::vector threads(n); - { - MutexLock l(shared.GetMutex()); + for (uint32_t i = 0; i < n; i++) { + threads[i] = new ThreadState(i, &shared); + db_stress_env->StartThread(ThreadBody, threads[i]); + } - for (uint32_t i = 0; i < n; i++) { - shared.IncThreads(); - threads[i] = new ThreadState(i, &shared); - db_stress_env->StartThread(ThreadBody, threads[i]); - } - if (FLAGS_compaction_thread_pool_adjust_interval > 0) { - shared.IncBgThreads(); - db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread); - } - if (FLAGS_continuous_verification_interval > 0) { - shared.IncBgThreads(); - db_stress_env->StartThread(DbVerificationThread, - &continuous_verification_thread); - } + ThreadState bg_thread(0, &shared); + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread); + } + + ThreadState continuous_verification_thread(0, &shared); + if (FLAGS_continuous_verification_interval > 0) { + db_stress_env->StartThread(DbVerificationThread, + &continuous_verification_thread); } // Each thread goes through the following states: diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index d48aeb131..ddfb67261 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -191,7 +191,7 @@ class SharedState { uint32_t GetNumThreads() const { return num_threads_; } - void IncThreads() { num_threads_++; } + void SetThreads(int num_threads) { num_threads_ = num_threads; } void IncInitialized() { num_initialized_++; }