Stop operating on DB in a stress test background thread (#10373)

Summary:
Stress test background threads do not coordinate with test worker
threads for db reopen in the middle of a test run, thus accessing db
obj in a stress test bg thread can race with test workers. Remove the
TimestampedSnapshotThread.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10373

Test Plan:
```
./db_stress --acquire_snapshot_one_in=0 --adaptive_readahead=0 --allow_concurrent_memtable_write=1 \
--allow_data_in_errors=True --async_io=0 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=1 \
--backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 \
--block_size=16384 --bloom_bits=7.580319535285394 --bottommost_compression_type=disable \
--bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_size=8388608 --cache_type=lru_cache \
--charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 \
--charge_table_reader=0 --checkpoint_one_in=0 --checksum_type=kxxHash64 --clear_column_family_one_in=0 \
--compact_files_one_in=1000000 --compact_range_one_in=0 --compaction_pri=1 --compaction_ttl=0 \
--compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 \
--compression_type=xpress --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 \
--continuous_verification_interval=0 --create_timestamped_snapshot_one_in=20 --data_block_index_type=0 \
--db=/dev/shm/rocksdb/ --db_write_buffer_size=0 --delpercent=5 --delrangepercent=0 --destroy_db_initially=1 \
--detect_filter_construct_corruption=0 --disable_wal=0 --enable_compaction_filter=1 --enable_pipelined_write=0 \
--fail_if_options_file_error=1 --file_checksum_impl=xxh64 --flush_one_in=1000000 --format_version=2 \
--get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 \
--get_sorted_wal_files_one_in=0 --index_block_restart_interval=11 --index_type=0 --ingest_external_file_one_in=0 \
--iterpercent=0 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True \
--log2_keys_per_lock=10 --long_running_snapshots=0 --mark_for_compaction_one_file_in=10 \
--max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=25000000 \
--max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=64 \
--max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.5 \
--memtable_whole_key_filtering=1 --memtablerep=skip_list --mmap_read=0 --mock_direct_io=True \
--nooverwritepercent=1 --open_files=500000 --open_metadata_write_fault_one_in=0 \
--open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=20000 \
--optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=2 \
--pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefix_size=1 \
--prefixpercent=5 --prepopulate_block_cache=0 --progress_reports=0 --read_fault_one_in=1000 \
--readpercent=55 --recycle_log_file_num=0 --reopen=100 --ribbon_starting_level=8 \
--secondary_cache_fault_one_in=0 --secondary_cache_uri= --snapshot_hold_ops=100000 \
--sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=0 \
--subcompactions=3 --sync=0 --sync_fault_injection=0 --target_file_size_base=2097152 \
--target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=1 \
--txn_write_policy=0 --unordered_write=0 --unpartitioned_pinning=0 \
--use_direct_io_for_flush_and_compaction=0 --use_direct_reads=1 --use_full_merge_v1=1 \
--use_merge=1 --use_multiget=0 --use_txn=1 --user_timestamp_size=0 --value_size_mult=32 \
--verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 \
--verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none \
--write_buffer_size=4194304 --write_dbid_to_manifest=0 --writepercent=35
```
make crash_test_with_txn
make crash_test_with_multiops_wc_txn

Reviewed By: jay-zhuang

Differential Revision: D37903189

Pulled By: riversand963

fbshipit-source-id: cd1728ad7ba4ce4cf47af23c4f65dda0956744f9
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent e576f2ab19
commit b443d24f4d
  1. 36
      db_stress_tool/db_stress_common.cc
  2. 13
      db_stress_tool/db_stress_driver.cc
  3. 52
      db_stress_tool/db_stress_test_base.cc
  4. 5
      db_stress_tool/db_stress_test_base.h
  5. 7
      db_stress_tool/multi_ops_txns_stress.cc

@ -148,42 +148,6 @@ void DbVerificationThread(void* v) {
} }
} }
void TimestampedSnapshotsThread(void* v) {
assert(FLAGS_create_timestamped_snapshot_one_in > 0);
auto* thread = reinterpret_cast<ThreadState*>(v);
assert(thread);
SharedState* shared = thread->shared;
assert(shared);
StressTest* stress_test = shared->GetStressTest();
assert(stress_test);
while (true) {
{
MutexLock l(shared->GetMutex());
if (shared->ShouldStopBgThread()) {
shared->IncBgThreadsFinished();
if (shared->BgThreadsFinished()) {
shared->GetCondVar()->SignalAll();
}
return;
}
}
uint64_t now = db_stress_env->NowNanos();
std::pair<Status, std::shared_ptr<const Snapshot>> res =
stress_test->CreateTimestampedSnapshot(now);
if (res.first.ok()) {
assert(res.second);
assert(res.second->GetTimestamp() == now);
} else {
assert(!res.second);
}
constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
stress_test->ReleaseOldTimestampedSnapshots(now - time_diff);
db_stress_env->SleepForMicroseconds(1000 * 1000);
}
}
void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
if (!FLAGS_verbose) { if (!FLAGS_verbose) {
return; return;

@ -84,10 +84,6 @@ bool RunStressTest(StressTest* stress) {
shared.IncBgThreads(); shared.IncBgThreads();
} }
if (FLAGS_create_timestamped_snapshot_one_in > 0) {
shared.IncBgThreads();
}
std::vector<ThreadState*> threads(n); std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) { for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared); threads[i] = new ThreadState(i, &shared);
@ -105,12 +101,6 @@ bool RunStressTest(StressTest* stress) {
&continuous_verification_thread); &continuous_verification_thread);
} }
ThreadState timestamped_snapshots_thread(0, &shared);
if (FLAGS_create_timestamped_snapshot_one_in > 0) {
db_stress_env->StartThread(TimestampedSnapshotsThread,
&timestamped_snapshots_thread);
}
// Each thread goes through the following states: // Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate // initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done // wait for others to operate -> verify -> done
@ -179,8 +169,7 @@ bool RunStressTest(StressTest* stress) {
stress->PrintStatistics(); stress->PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0 || if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0 || FLAGS_continuous_verification_interval > 0) {
FLAGS_create_timestamped_snapshot_one_in > 0) {
MutexLock l(shared.GetMutex()); MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread(); shared.SetShouldStopBgThread();
while (!shared.BgThreadsFinished()) { while (!shared.BgThreadsFinished()) {

@ -421,35 +421,6 @@ void StressTest::PrintStatistics() {
} }
} }
void StressTest::ReleaseOldTimestampedSnapshots(uint64_t ts) {
#ifndef ROCKSDB_LITE
if (!txn_db_) {
return;
}
assert(txn_db_);
txn_db_->ReleaseTimestampedSnapshotsOlderThan(ts);
#else
(void)ts;
fprintf(stderr, "timestamped snapshots not supported in LITE mode\n");
exit(1);
#endif // ROCKSDB_LITE
}
std::pair<Status, std::shared_ptr<const Snapshot>>
StressTest::CreateTimestampedSnapshot(uint64_t ts) {
#ifndef ROCKSDB_LITE
if (!txn_db_) {
return std::make_pair(Status::InvalidArgument(), nullptr);
}
assert(txn_db_);
return txn_db_->CreateTimestampedSnapshot(ts);
#else
(void)ts;
fprintf(stderr, "timestamped snapshots not supported in LITE mode\n");
exit(1);
#endif // ROCKSDB_LITE
}
// Currently PreloadDb has to be single-threaded. // Currently PreloadDb has to be single-threaded.
void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
SharedState* shared) { SharedState* shared) {
@ -594,6 +565,7 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
} }
assert(txn_db_);
Status s = txn->Prepare(); Status s = txn->Prepare();
std::shared_ptr<const Snapshot> timestamped_snapshot; std::shared_ptr<const Snapshot> timestamped_snapshot;
if (s.ok()) { if (s.ok()) {
@ -602,10 +574,32 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
uint64_t ts = db_stress_env->NowNanos(); uint64_t ts = db_stress_env->NowNanos();
s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts,
&timestamped_snapshot); &timestamped_snapshot);
std::pair<Status, std::shared_ptr<const Snapshot>> res;
if (thread->tid == 0) {
uint64_t now = db_stress_env->NowNanos();
res = txn_db_->CreateTimestampedSnapshot(now);
if (res.first.ok()) {
assert(res.second);
assert(res.second->GetTimestamp() == now);
if (timestamped_snapshot) {
assert(res.second->GetTimestamp() >
timestamped_snapshot->GetTimestamp());
}
} else {
assert(!res.second);
}
}
} else { } else {
s = txn->Commit(); s = txn->Commit();
} }
} }
if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
thread->rand.OneInOpt(50000)) {
uint64_t now = db_stress_env->NowNanos();
constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
}
delete txn; delete txn;
return s; return s;
} }

@ -43,11 +43,6 @@ class StressTest {
void PrintStatistics(); void PrintStatistics();
void ReleaseOldTimestampedSnapshots(uint64_t ts);
std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
uint64_t ts);
protected: protected:
Status AssertSame(DB* db, ColumnFamilyHandle* cf, Status AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state); ThreadState::SnapshotState& snap_state);

@ -1382,6 +1382,13 @@ Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded(
} else { } else {
s = txn.Commit(); s = txn.Commit();
} }
assert(txn_db_);
if (FLAGS_create_timestamped_snapshot_one_in > 0 &&
thread->rand.OneInOpt(50000)) {
uint64_t now = db_stress_env->NowNanos();
constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
}
return s; return s;
} }

Loading…
Cancel
Save