Set the number of threads up front in db_stress (#9466)

Summary:
With the code on main, `RunStressTest` increments the number of threads
one by one as the threads are created and started. This results in a
data race with `NonBatchedOpsStressTest::VerifyDb`, which reads this
value without synchronization, and is also not correct in the sense
that `VerifyDb` assumes that the number of threads already has its final
value set (e.g. it's checking whether the current thread is the last
one). The patch fixes this by setting the number of threads before
creating/starting any threads. This also eliminates the need for locking
the mutex during thread startup.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9466

Test Plan: Ran the blackbox crash test under TSAN for a while.

Reviewed By: ajkr

Differential Revision: D33858856

Pulled By: ltamasi

fbshipit-source-id: 8a6515a83fd1808b8b8dca61978777c4404f04cc
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent 42cca28ebb
commit f07c56928f
  1. 41
      db_stress_tool/db_stress_driver.cc
  2. 2
      db_stress_tool/db_stress_shared_state.h

@ -73,26 +73,31 @@ bool RunStressTest(StressTest* stress) {
fprintf(stdout, "%s Initializing worker threads\n", fprintf(stdout, "%s Initializing worker threads\n",
clock->TimeToString(now / 1000000).c_str()); clock->TimeToString(now / 1000000).c_str());
ThreadState bg_thread(0, &shared); shared.SetThreads(n);
ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
shared.IncBgThreads();
}
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
shared.IncBgThreads();
}
std::vector<ThreadState*> threads(n); std::vector<ThreadState*> threads(n);
{ for (uint32_t i = 0; i < n; i++) {
MutexLock l(shared.GetMutex()); threads[i] = new ThreadState(i, &shared);
db_stress_env->StartThread(ThreadBody, threads[i]);
}
for (uint32_t i = 0; i < n; i++) { ThreadState bg_thread(0, &shared);
shared.IncThreads(); if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
threads[i] = new ThreadState(i, &shared); db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
db_stress_env->StartThread(ThreadBody, threads[i]); }
}
if (FLAGS_compaction_thread_pool_adjust_interval > 0) { ThreadState continuous_verification_thread(0, &shared);
shared.IncBgThreads(); if (FLAGS_continuous_verification_interval > 0) {
db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread); db_stress_env->StartThread(DbVerificationThread,
} &continuous_verification_thread);
if (FLAGS_continuous_verification_interval > 0) {
shared.IncBgThreads();
db_stress_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}
} }
// Each thread goes through the following states: // Each thread goes through the following states:

@ -191,7 +191,7 @@ class SharedState {
uint32_t GetNumThreads() const { return num_threads_; } uint32_t GetNumThreads() const { return num_threads_; }
void IncThreads() { num_threads_++; } void SetThreads(int num_threads) { num_threads_ = num_threads; }
void IncInitialized() { num_initialized_++; } void IncInitialized() { num_initialized_++; }

Loading…
Cancel
Save