diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 532a8dfaf..7ee722de3 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -148,7 +148,7 @@ void DbVerificationThread(void* v) { } } -void SnapshotGcThread(void* v) { +void TimestampedSnapshotsThread(void* v) { assert(FLAGS_create_timestamped_snapshot_one_in > 0); auto* thread = reinterpret_cast(v); assert(thread); @@ -169,6 +169,14 @@ void SnapshotGcThread(void* v) { } uint64_t now = db_stress_env->NowNanos(); + std::pair> res = + stress_test->CreateTimestampedSnapshot(now); + if (res.first.ok()) { + assert(res.second); + assert(res.second->GetTimestamp() == now); + } else { + assert(!res.second); + } constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; stress_test->ReleaseOldTimestampedSnapshots(now - time_diff); diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 562b46a85..b822518eb 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -593,7 +593,7 @@ extern void PoolSizeChangeThread(void* v); extern void DbVerificationThread(void* v); -extern void SnapshotGcThread(void* v); +extern void TimestampedSnapshotsThread(void* v); extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 3c69f5408..06aa1c823 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -105,9 +105,10 @@ bool RunStressTest(StressTest* stress) { &continuous_verification_thread); } - ThreadState snapshots_gc_thread(0, &shared); + ThreadState timestamped_snapshots_thread(0, &shared); if (FLAGS_create_timestamped_snapshot_one_in > 0) { - db_stress_env->StartThread(SnapshotGcThread, &snapshots_gc_thread); + db_stress_env->StartThread(TimestampedSnapshotsThread, + ×tamped_snapshots_thread); } // Each thread goes through the following states: diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index d70c9553b..63fae30e6 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -432,6 +432,21 @@ void StressTest::ReleaseOldTimestampedSnapshots(uint64_t ts) { #endif // ROCKSDB_LITE } +std::pair> +StressTest::CreateTimestampedSnapshot(uint64_t ts) { +#ifndef ROCKSDB_LITE + if (!txn_db_) { + return std::make_pair(Status::InvalidArgument(), nullptr); + } + assert(txn_db_); + return txn_db_->CreateTimestampedSnapshot(ts); +#else + (void)ts; + fprintf(stderr, "timestamped snapshots not supported in LITE mode\n"); + exit(1); +#endif // ROCKSDB_LITE +} + // Currently PreloadDb has to be single-threaded. void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, SharedState* shared) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 31ce19123..94cefaeb7 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -45,6 +45,9 @@ class StressTest { void ReleaseOldTimestampedSnapshots(uint64_t ts); + std::pair> CreateTimestampedSnapshot( + uint64_t ts); + protected: Status AssertSame(DB* db, ColumnFamilyHandle* cf, ThreadState::SnapshotState& snap_state); diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 31c88e6d5..95f883ec0 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -670,7 +670,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, return s; } - s = txn->Commit(); + s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); auto& key_gen = key_gen_for_a_.at(thread->tid); if (s.ok()) { @@ -876,7 +876,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, return s; } - s = txn->Commit(); + s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); if (s.ok()) { delete txn; @@ -968,7 +968,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, return s; } - s = txn->Commit(); + s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); + if (s.ok()) { delete txn; } @@ -1011,8 +1012,8 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, RollbackTxn(txn).PermitUncheckedError(); }); - txn->SetSnapshot(); - ropts.snapshot = txn->GetSnapshot(); + std::shared_ptr snapshot; + SetupSnapshot(thread, ropts, *txn, snapshot); if (FLAGS_delay_snapshot_read_one_in > 0 && thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { @@ -1062,8 +1063,8 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, RollbackTxn(txn).PermitUncheckedError(); }); - txn->SetSnapshot(); - ropts.snapshot = txn->GetSnapshot(); + std::shared_ptr snapshot; + SetupSnapshot(thread, ropts, *txn, snapshot); if (FLAGS_delay_snapshot_read_one_in > 0 && thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { @@ -1369,6 +1370,35 @@ Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) { return ctwb->Put(Slice(key_buf, sizeof(key_buf)), Slice(val_buf, sizeof(val_buf))); } + +Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded( + ThreadState* thread, Transaction& txn) { + Status s; + if (FLAGS_create_timestamped_snapshot_one_in > 0 && + thread->rand.OneInOpt(FLAGS_create_timestamped_snapshot_one_in)) { + uint64_t ts = db_stress_env->NowNanos(); + std::shared_ptr snapshot; + s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, &snapshot); + } else { + s = txn.Commit(); + } + return s; +} + +void MultiOpsTxnsStressTest::SetupSnapshot( + ThreadState* thread, ReadOptions& read_opts, Transaction& txn, + std::shared_ptr& snapshot) { + if (thread->rand.OneInOpt(2)) { + snapshot = txn_db_->GetLatestTimestampedSnapshot(); + } + + if (snapshot) { + read_opts.snapshot = snapshot.get(); + } else { + txn.SetSnapshot(); + read_opts.snapshot = txn.GetSnapshot(); + } +} #endif // !ROCKSDB_LITE std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const { @@ -1734,6 +1764,15 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() { "-key_spaces_path\n"); exit(1); } + if (FLAGS_create_timestamped_snapshot_one_in > 0) { + if (FLAGS_txn_write_policy != + static_cast(TxnDBWritePolicy::WRITE_COMMITTED)) { + fprintf(stderr, + "Timestamped snapshot is not yet supported by " + "write-prepared/write-unprepared transactions\n"); + exit(1); + } + } #else fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n"); exit(1); diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index d711f2357..1c0f618f0 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -356,6 +356,13 @@ class MultiOpsTxnsStressTest : public StressTest { // actual value of the metadata. Method WriteToCommitTimeWriteBatch() // emulates this scenario. Status WriteToCommitTimeWriteBatch(Transaction& txn); + + Status CommitAndCreateTimestampedSnapshotIfNeeded(ThreadState* thread, + Transaction& txn); + + void SetupSnapshot(ThreadState* thread, ReadOptions& read_opts, + Transaction& txn, + std::shared_ptr& snapshot); #endif //! ROCKSDB_LITE std::vector> key_gen_for_a_; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 4449a8132..c968949c8 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -403,6 +403,7 @@ multiops_txn_default_params = { "rollback_one_in": 4, # Re-enable once we have a compaction for MultiOpsTxnStressTest "enable_compaction_filter": 0, + "create_timestamped_snapshot_one_in": 50, } multiops_wc_txn_params = { @@ -425,6 +426,7 @@ multiops_wp_txn_params = { "use_only_the_last_commit_time_batch_for_recovery": 1, "recycle_log_file_num": 0, "clear_wp_commit_cache_one_in": 10, + "create_timestamped_snapshot_one_in": 0, } def finalize_and_sanitize(src_params):