Fix shutdown in db_stress with `-test_batches_snapshots=1` (#9313)

Summary:
The `SharedState` constructor had an early return in case of
`-test_batches_snapshots=1`. This early return caused `num_bg_threads_`
to never be incremented. Consequently, the driver thread could cleanup
objects like the `SharedState` while BG threads were still running and
accessing it, leading to crash.

The fix is to move the logic for counting threads (both FG and BG) to
the place they are launched. That way we can be sure the counts are
consistent, at least for now.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9313

Test Plan:
below command used to fail, now it passes.

```
$ ./db_stress --db=./test-db/ --expected_values_dir=./test-db-expected/ --max_key=100000 --ops_per_thread=1000 --sync_fault_injection=1 --clear_column_family_one_in=0 --destroy_db_initially=0 -reopen=0 -test_batches_snapshots=1
```

Reviewed By: jay-zhuang

Differential Revision: D33198670

Pulled By: ajkr

fbshipit-source-id: 126592dc1eb31998bc8f82ffbf5a0d4eb8dec317
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent cc1d4e3d33
commit 84228e21e8
  1. 5
      db_stress_tool/db_stress_driver.cc
  2. 16
      db_stress_tool/db_stress_shared_state.h

@ -68,22 +68,25 @@ bool RunStressTest(StressTest* stress) {
}
#endif
uint32_t n = shared.GetNumThreads();
uint32_t n = FLAGS_threads;
uint64_t now = clock->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
clock->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
shared.IncThreads();
threads[i] = new ThreadState(i, &shared);
db_stress_env->StartThread(ThreadBody, threads[i]);
}
ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
shared.IncBgThreads();
db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
}
ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_continuous_verification_interval > 0) {
shared.IncBgThreads();
db_stress_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}

@ -68,7 +68,7 @@ class SharedState {
seed_(static_cast<uint32_t>(FLAGS_seed)),
max_key_(FLAGS_max_key),
log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
num_threads_(FLAGS_threads),
num_threads_(0),
num_initialized_(0),
num_populated_(0),
vote_reopen_(0),
@ -163,14 +163,6 @@ class SharedState {
ptr.reset(new port::Mutex);
}
}
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
++num_bg_threads_;
fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
}
if (FLAGS_continuous_verification_interval > 0) {
++num_bg_threads_;
fprintf(stdout, "Starting continuous_verification_thread\n");
}
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
@ -199,6 +191,8 @@ class SharedState {
uint32_t GetNumThreads() const { return num_threads_; }
void IncThreads() { num_threads_++; }
void IncInitialized() { num_initialized_++; }
void IncOperated() { num_populated_++; }
@ -317,6 +311,8 @@ class SharedState {
bool ShouldStopBgThread() { return should_stop_bg_thread_; }
void IncBgThreads() { ++num_bg_threads_; }
void IncBgThreadsFinished() { ++bg_thread_finished_; }
bool BgThreadsFinished() const {
@ -347,7 +343,7 @@ class SharedState {
const uint32_t seed_;
const int64_t max_key_;
const uint32_t log2_keys_per_lock_;
const int num_threads_;
int num_threads_;
long num_initialized_;
long num_populated_;
long vote_reopen_;

Loading…
Cancel
Save