From caced09e79c378124de0b330d9d389cf9950cc45 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 5 Jul 2022 13:30:15 -0700 Subject: [PATCH] Expand stress test coverage for user-defined timestamp (#10280) Summary: Before this PR, we call `now()` to get the wall time before performing point-lookup and range scans when user-defined timestamp is enabled. With this PR, we expand the coverage to: - read with an older timestamp which is larger then the wall time when the process starts but potentially smaller than now() - add coverage for `ReadOptions::iter_start_ts != nullptr` Pull Request resolved: https://github.com/facebook/rocksdb/pull/10280 Test Plan: ```bash make check ``` Also, ```bash TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_ts ``` So far, we have had four successful runs of the above In addition, ```bash TEST_TMPDIR=/dev/shm/rocksdb make crash_test ``` Succeeded twice showing no regression. Reviewed By: ltamasi Differential Revision: D37539805 Pulled By: riversand963 fbshipit-source-id: f2d9887ad95245945ce17a014d55bb93f00e1cb5 --- db_stress_tool/db_stress_common.cc | 4 +- db_stress_tool/db_stress_common.h | 3 +- db_stress_tool/db_stress_shared_state.h | 6 +- db_stress_tool/db_stress_test_base.cc | 97 +++++++++++++++++++++++-- db_stress_tool/db_stress_test_base.h | 9 +++ db_stress_tool/no_batched_ops_stress.cc | 34 +++++++-- 6 files changed, 133 insertions(+), 20 deletions(-) diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index c20bcd719..532a8dfaf 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -267,15 +267,13 @@ uint32_t GetValueBase(Slice s) { return res; } -std::string NowNanosStr() { +std::string GetNowNanos() { uint64_t t = db_stress_env->NowNanos(); std::string ret; PutFixed64(&ret, t); return ret; } -std::string GenerateTimestampForRead() { return NowNanosStr(); } - namespace { class MyXXH64Checksum : public FileChecksumGenerator { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 55e43ee3d..562b46a85 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -613,8 +613,7 @@ extern void CheckAndSetOptionsForMultiOpsTxnStressTest(); extern void InitializeHotKeyGenerator(double alpha); extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key); -extern std::string GenerateTimestampForRead(); -extern std::string NowNanosStr(); +extern std::string GetNowNanos(); std::shared_ptr GetFileChecksumImpl( const std::string& name); diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index de928fd82..4e61d2252 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -75,7 +75,8 @@ class SharedState { should_stop_test_(false), no_overwrite_ids_(GenerateNoOverwriteIds()), expected_state_manager_(nullptr), - printing_verification_results_(false) { + printing_verification_results_(false), + start_timestamp_(Env::Default()->NowNanos()) { Status status; // TODO: We should introduce a way to explicitly disable verification // during shutdown. When that is disabled and FLAGS_expected_values_dir @@ -303,6 +304,8 @@ class SharedState { printing_verification_results_.store(false, std::memory_order_relaxed); } + uint64_t GetStartTimestamp() const { return start_timestamp_; } + private: static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; @@ -365,6 +368,7 @@ class SharedState { // and storing it in the container may require copying depending on the impl. std::vector> key_locks_; std::atomic printing_verification_results_; + const uint64_t start_timestamp_; }; // Per-thread state for concurrent executions of the same benchmark. diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index cd41a79dd..d70c9553b 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -474,7 +474,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = NowNanosStr(); + ts_str = GetNowNanos(); ts = ts_str; s = db_->Put(write_opts, cfh, key, ts, v); } else { @@ -856,10 +856,10 @@ void StressTest::OperateDb(ThreadState* thread) { Slice read_ts; Slice write_ts; if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) { - read_ts_str = GenerateTimestampForRead(); + read_ts_str = GetNowNanos(); read_ts = read_ts_str; read_opts.timestamp = &read_ts; - write_ts_str = NowNanosStr(); + write_ts_str = GetNowNanos(); write_ts = write_ts_str; } @@ -1004,6 +1004,11 @@ Status StressTest::TestIterate(ThreadState* thread, ReadOptions readoptionscopy = read_opts; readoptionscopy.snapshot = snapshot; + std::string read_ts_str; + Slice read_ts_slice; + MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, + readoptionscopy); + bool expect_total_order = false; if (thread->rand.OneIn(16)) { // When prefix extractor is used, it's useful to cover total order seek. @@ -1106,6 +1111,7 @@ Status StressTest::TestIterate(ThreadState* thread, // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions cmp_ro; cmp_ro.timestamp = readoptionscopy.timestamp; + cmp_ro.iter_start_ts = readoptionscopy.iter_start_ts; cmp_ro.snapshot = snapshot; cmp_ro.total_order_seek = true; ColumnFamilyHandle* cmp_cfh = @@ -1216,6 +1222,14 @@ void StressTest::VerifyIterator(ThreadState* thread, return; } + if (ro.iter_start_ts != nullptr) { + assert(FLAGS_user_timestamp_size > 0); + // We currently do not verify iterator when dumping history of internal + // keys. + *diverged = true; + return; + } + if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) { // SeekToFirst() with lower bound is not well defined. *diverged = true; @@ -1558,7 +1572,7 @@ Status StressTest::TestBackupRestore( std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; read_opts.timestamp = &ts; } @@ -1749,7 +1763,7 @@ Status StressTest::TestCheckpoint(ThreadState* thread, Slice ts; ReadOptions read_opts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; read_opts.timestamp = &ts; } @@ -1976,7 +1990,7 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread, std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; ropt.timestamp = &ts; } @@ -2129,7 +2143,7 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot, std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; ro.timestamp = &ts; } @@ -2700,6 +2714,75 @@ void StressTest::Reopen(ThreadState* thread) { } } +void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread, + std::string& ts_str, + Slice& ts_slice, + ReadOptions& read_opts) { + if (FLAGS_user_timestamp_size == 0) { + return; + } + + assert(thread); + if (!thread->rand.OneInOpt(3)) { + return; + } + + const SharedState* const shared = thread->shared; + assert(shared); + const uint64_t start_ts = shared->GetStartTimestamp(); + + uint64_t now = db_stress_env->NowNanos(); + + assert(now > start_ts); + uint64_t time_diff = now - start_ts; + uint64_t ts = start_ts + (thread->rand.Next64() % time_diff); + ts_str.clear(); + PutFixed64(&ts_str, ts); + ts_slice = ts_str; + read_opts.timestamp = &ts_slice; +} + +void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread, + std::string& ts_str, + Slice& ts_slice, + ReadOptions& read_opts) { + if (FLAGS_user_timestamp_size == 0) { + return; + } + + assert(thread); + if (!thread->rand.OneInOpt(3)) { + return; + } + + const Slice* const saved_ts = read_opts.timestamp; + assert(saved_ts != nullptr); + + const SharedState* const shared = thread->shared; + assert(shared); + const uint64_t start_ts = shared->GetStartTimestamp(); + + uint64_t now = db_stress_env->NowNanos(); + + assert(now > start_ts); + uint64_t time_diff = now - start_ts; + uint64_t ts = start_ts + (thread->rand.Next64() % time_diff); + ts_str.clear(); + PutFixed64(&ts_str, ts); + ts_slice = ts_str; + read_opts.timestamp = &ts_slice; + + if (!thread->rand.OneInOpt(3)) { + return; + } + + ts_str.clear(); + PutFixed64(&ts_str, start_ts); + ts_slice = ts_str; + read_opts.iter_start_ts = &ts_slice; + read_opts.timestamp = saved_ts; +} + void CheckAndSetOptionsForUserTimestamp(Options& options) { assert(FLAGS_user_timestamp_size > 0); const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper(); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index d4b2c5e21..31ce19123 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -229,6 +229,15 @@ class StressTest { TransactionDBOptions& /*txn_db_opts*/) {} #endif + void MaybeUseOlderTimestampForPointLookup(ThreadState* thread, + std::string& ts_str, + Slice& ts_slice, + ReadOptions& read_opts); + + void MaybeUseOlderTimestampForRangeScan(ThreadState* thread, + std::string& ts_str, Slice& ts_slice, + ReadOptions& read_opts); + std::shared_ptr cache_; std::shared_ptr compressed_cache_; std::shared_ptr filter_policy_; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 16e238501..377749f81 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -26,7 +26,7 @@ class NonBatchedOpsStressTest : public StressTest { std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; options.timestamp = &ts; } @@ -216,7 +216,7 @@ class NonBatchedOpsStressTest : public StressTest { std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GenerateTimestampForRead(); + ts_str = GetNowNanos(); ts = ts_str; read_opts.timestamp = &ts; } @@ -335,7 +335,14 @@ class NonBatchedOpsStressTest : public StressTest { fault_fs_guard->EnableErrorInjection(); SharedState::ignore_read_error = false; } - Status s = db_->Get(read_opts, cfh, key, &from_db); + + ReadOptions read_opts_copy = read_opts; + std::string read_ts_str; + Slice read_ts_slice; + MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice, + read_opts_copy); + + Status s = db_->Get(read_opts_copy, cfh, key, &from_db); if (fault_fs_guard) { error_count = fault_fs_guard->GetAndResetErrorCount(); } @@ -391,6 +398,12 @@ class NonBatchedOpsStressTest : public StressTest { if (do_consistency_check) { readoptionscopy.snapshot = db_->GetSnapshot(); } + + std::string read_ts_str; + Slice read_ts_slice; + MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice, + readoptionscopy); + readoptionscopy.rate_limiter_priority = FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; @@ -591,6 +604,11 @@ class NonBatchedOpsStressTest : public StressTest { ro_copy.iterate_upper_bound = &ub_slice; } + std::string read_ts_str; + Slice read_ts_slice; + MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, + ro_copy); + Iterator* iter = db_->NewIterator(ro_copy, cfh); unsigned long count = 0; for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); @@ -598,7 +616,9 @@ class NonBatchedOpsStressTest : public StressTest { ++count; } - assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound)); + if (ro_copy.iter_start_ts == nullptr) { + assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound)); + } Status s = iter->status(); if (iter->status().ok()) { @@ -630,12 +650,12 @@ class NonBatchedOpsStressTest : public StressTest { lock.reset( new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); if (FLAGS_user_timestamp_size > 0) { - write_ts_str = NowNanosStr(); + write_ts_str = GetNowNanos(); write_ts = write_ts_str; } } if (write_ts.size() == 0 && FLAGS_user_timestamp_size) { - write_ts_str = NowNanosStr(); + write_ts_str = GetNowNanos(); write_ts = write_ts_str; } @@ -723,7 +743,7 @@ class NonBatchedOpsStressTest : public StressTest { auto shared = thread->shared; // OPERATION delete - std::string write_ts_str = NowNanosStr(); + std::string write_ts_str = GetNowNanos(); Slice write_ts = write_ts_str; std::string key_str = Key(rand_key);