Add coverage for timestamped snapshot to MultiOpsTxnsStressTest (#10325)

Summary:
As title.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10325

Test Plan:
```bash
TEST_TMPDIR=/dev/shm/rocksdb/ make crash_test_with_multiops_wc_txn
TEST_TMPDIR=/dev/shm/rocksdb/ make crash_test_with_txn
```

Reviewed By: akankshamahajan15

Differential Revision: D37688742

Pulled By: riversand963

fbshipit-source-id: e198ace921898af63f99e869568c1a7bbf69f1a4
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 96206531bc
commit 2f13f5f7d0
  1. 10
      db_stress_tool/db_stress_common.cc
  2. 2
      db_stress_tool/db_stress_common.h
  3. 5
      db_stress_tool/db_stress_driver.cc
  4. 15
      db_stress_tool/db_stress_test_base.cc
  5. 3
      db_stress_tool/db_stress_test_base.h
  6. 53
      db_stress_tool/multi_ops_txns_stress.cc
  7. 7
      db_stress_tool/multi_ops_txns_stress.h
  8. 2
      tools/db_crashtest.py

@ -148,7 +148,7 @@ void DbVerificationThread(void* v) {
} }
} }
void SnapshotGcThread(void* v) { void TimestampedSnapshotsThread(void* v) {
assert(FLAGS_create_timestamped_snapshot_one_in > 0); assert(FLAGS_create_timestamped_snapshot_one_in > 0);
auto* thread = reinterpret_cast<ThreadState*>(v); auto* thread = reinterpret_cast<ThreadState*>(v);
assert(thread); assert(thread);
@ -169,6 +169,14 @@ void SnapshotGcThread(void* v) {
} }
uint64_t now = db_stress_env->NowNanos(); uint64_t now = db_stress_env->NowNanos();
std::pair<Status, std::shared_ptr<const Snapshot>> res =
stress_test->CreateTimestampedSnapshot(now);
if (res.first.ok()) {
assert(res.second);
assert(res.second->GetTimestamp() == now);
} else {
assert(!res.second);
}
constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000; constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
stress_test->ReleaseOldTimestampedSnapshots(now - time_diff); stress_test->ReleaseOldTimestampedSnapshots(now - time_diff);

@ -593,7 +593,7 @@ extern void PoolSizeChangeThread(void* v);
extern void DbVerificationThread(void* v); extern void DbVerificationThread(void* v);
extern void SnapshotGcThread(void* v); extern void TimestampedSnapshotsThread(void* v);
extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz); extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz);

@ -105,9 +105,10 @@ bool RunStressTest(StressTest* stress) {
&continuous_verification_thread); &continuous_verification_thread);
} }
ThreadState snapshots_gc_thread(0, &shared); ThreadState timestamped_snapshots_thread(0, &shared);
if (FLAGS_create_timestamped_snapshot_one_in > 0) { if (FLAGS_create_timestamped_snapshot_one_in > 0) {
db_stress_env->StartThread(SnapshotGcThread, &snapshots_gc_thread); db_stress_env->StartThread(TimestampedSnapshotsThread,
&timestamped_snapshots_thread);
} }
// Each thread goes through the following states: // Each thread goes through the following states:

@ -432,6 +432,21 @@ void StressTest::ReleaseOldTimestampedSnapshots(uint64_t ts) {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
std::pair<Status, std::shared_ptr<const Snapshot>>
StressTest::CreateTimestampedSnapshot(uint64_t ts) {
#ifndef ROCKSDB_LITE
if (!txn_db_) {
return std::make_pair(Status::InvalidArgument(), nullptr);
}
assert(txn_db_);
return txn_db_->CreateTimestampedSnapshot(ts);
#else
(void)ts;
fprintf(stderr, "timestamped snapshots not supported in LITE mode\n");
exit(1);
#endif // ROCKSDB_LITE
}
// Currently PreloadDb has to be single-threaded. // Currently PreloadDb has to be single-threaded.
void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
SharedState* shared) { SharedState* shared) {

@ -45,6 +45,9 @@ class StressTest {
void ReleaseOldTimestampedSnapshots(uint64_t ts); void ReleaseOldTimestampedSnapshots(uint64_t ts);
std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
uint64_t ts);
protected: protected:
Status AssertSame(DB* db, ColumnFamilyHandle* cf, Status AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state); ThreadState::SnapshotState& snap_state);

@ -670,7 +670,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
return s; return s;
} }
s = txn->Commit(); s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
auto& key_gen = key_gen_for_a_.at(thread->tid); auto& key_gen = key_gen_for_a_.at(thread->tid);
if (s.ok()) { if (s.ok()) {
@ -876,7 +876,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
return s; return s;
} }
s = txn->Commit(); s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
if (s.ok()) { if (s.ok()) {
delete txn; delete txn;
@ -968,7 +968,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
return s; return s;
} }
s = txn->Commit(); s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
if (s.ok()) { if (s.ok()) {
delete txn; delete txn;
} }
@ -1011,8 +1012,8 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
RollbackTxn(txn).PermitUncheckedError(); RollbackTxn(txn).PermitUncheckedError();
}); });
txn->SetSnapshot(); std::shared_ptr<const Snapshot> snapshot;
ropts.snapshot = txn->GetSnapshot(); SetupSnapshot(thread, ropts, *txn, snapshot);
if (FLAGS_delay_snapshot_read_one_in > 0 && if (FLAGS_delay_snapshot_read_one_in > 0 &&
thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
@ -1062,8 +1063,8 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
RollbackTxn(txn).PermitUncheckedError(); RollbackTxn(txn).PermitUncheckedError();
}); });
txn->SetSnapshot(); std::shared_ptr<const Snapshot> snapshot;
ropts.snapshot = txn->GetSnapshot(); SetupSnapshot(thread, ropts, *txn, snapshot);
if (FLAGS_delay_snapshot_read_one_in > 0 && if (FLAGS_delay_snapshot_read_one_in > 0 &&
thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
@ -1369,6 +1370,35 @@ Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
return ctwb->Put(Slice(key_buf, sizeof(key_buf)), return ctwb->Put(Slice(key_buf, sizeof(key_buf)),
Slice(val_buf, sizeof(val_buf))); Slice(val_buf, sizeof(val_buf)));
} }
Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded(
ThreadState* thread, Transaction& txn) {
Status s;
if (FLAGS_create_timestamped_snapshot_one_in > 0 &&
thread->rand.OneInOpt(FLAGS_create_timestamped_snapshot_one_in)) {
uint64_t ts = db_stress_env->NowNanos();
std::shared_ptr<const Snapshot> snapshot;
s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, &snapshot);
} else {
s = txn.Commit();
}
return s;
}
void MultiOpsTxnsStressTest::SetupSnapshot(
ThreadState* thread, ReadOptions& read_opts, Transaction& txn,
std::shared_ptr<const Snapshot>& snapshot) {
if (thread->rand.OneInOpt(2)) {
snapshot = txn_db_->GetLatestTimestampedSnapshot();
}
if (snapshot) {
read_opts.snapshot = snapshot.get();
} else {
txn.SetSnapshot();
read_opts.snapshot = txn.GetSnapshot();
}
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const { std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const {
@ -1734,6 +1764,15 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() {
"-key_spaces_path\n"); "-key_spaces_path\n");
exit(1); exit(1);
} }
if (FLAGS_create_timestamped_snapshot_one_in > 0) {
if (FLAGS_txn_write_policy !=
static_cast<uint64_t>(TxnDBWritePolicy::WRITE_COMMITTED)) {
fprintf(stderr,
"Timestamped snapshot is not yet supported by "
"write-prepared/write-unprepared transactions\n");
exit(1);
}
}
#else #else
fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n"); fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n");
exit(1); exit(1);

@ -356,6 +356,13 @@ class MultiOpsTxnsStressTest : public StressTest {
// actual value of the metadata. Method WriteToCommitTimeWriteBatch() // actual value of the metadata. Method WriteToCommitTimeWriteBatch()
// emulates this scenario. // emulates this scenario.
Status WriteToCommitTimeWriteBatch(Transaction& txn); Status WriteToCommitTimeWriteBatch(Transaction& txn);
Status CommitAndCreateTimestampedSnapshotIfNeeded(ThreadState* thread,
Transaction& txn);
void SetupSnapshot(ThreadState* thread, ReadOptions& read_opts,
Transaction& txn,
std::shared_ptr<const Snapshot>& snapshot);
#endif //! ROCKSDB_LITE #endif //! ROCKSDB_LITE
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_; std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;

@ -403,6 +403,7 @@ multiops_txn_default_params = {
"rollback_one_in": 4, "rollback_one_in": 4,
# Re-enable once we have a compaction for MultiOpsTxnStressTest # Re-enable once we have a compaction for MultiOpsTxnStressTest
"enable_compaction_filter": 0, "enable_compaction_filter": 0,
"create_timestamped_snapshot_one_in": 50,
} }
multiops_wc_txn_params = { multiops_wc_txn_params = {
@ -425,6 +426,7 @@ multiops_wp_txn_params = {
"use_only_the_last_commit_time_batch_for_recovery": 1, "use_only_the_last_commit_time_batch_for_recovery": 1,
"recycle_log_file_num": 0, "recycle_log_file_num": 0,
"clear_wp_commit_cache_one_in": 10, "clear_wp_commit_cache_one_in": 10,
"create_timestamped_snapshot_one_in": 0,
} }
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):

Loading…
Cancel
Save