Expand stress test coverage for user-defined timestamp (#10280)

Summary:
Before this PR, we call `now()` to get the wall time before performing point-lookup and range
scans when user-defined timestamp is enabled.

With this PR, we expand the coverage to:
- read with an older timestamp which is larger then the wall time when the process starts but potentially smaller than now()
- add coverage for `ReadOptions::iter_start_ts != nullptr`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10280

Test Plan:
```bash
make check
```

Also,
```bash
TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_ts
```

So far, we have had four successful runs of the above

In addition,
```bash
TEST_TMPDIR=/dev/shm/rocksdb make crash_test
```
Succeeded twice showing no regression.

Reviewed By: ltamasi

Differential Revision: D37539805

Pulled By: riversand963

fbshipit-source-id: f2d9887ad95245945ce17a014d55bb93f00e1cb5
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent 9eced1a344
commit caced09e79
  1. 4
      db_stress_tool/db_stress_common.cc
  2. 3
      db_stress_tool/db_stress_common.h
  3. 6
      db_stress_tool/db_stress_shared_state.h
  4. 97
      db_stress_tool/db_stress_test_base.cc
  5. 9
      db_stress_tool/db_stress_test_base.h
  6. 32
      db_stress_tool/no_batched_ops_stress.cc

@ -267,15 +267,13 @@ uint32_t GetValueBase(Slice s) {
return res;
}
std::string NowNanosStr() {
std::string GetNowNanos() {
uint64_t t = db_stress_env->NowNanos();
std::string ret;
PutFixed64(&ret, t);
return ret;
}
std::string GenerateTimestampForRead() { return NowNanosStr(); }
namespace {
class MyXXH64Checksum : public FileChecksumGenerator {

@ -613,8 +613,7 @@ extern void CheckAndSetOptionsForMultiOpsTxnStressTest();
extern void InitializeHotKeyGenerator(double alpha);
extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
extern std::string GenerateTimestampForRead();
extern std::string NowNanosStr();
extern std::string GetNowNanos();
std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
const std::string& name);

@ -75,7 +75,8 @@ class SharedState {
should_stop_test_(false),
no_overwrite_ids_(GenerateNoOverwriteIds()),
expected_state_manager_(nullptr),
printing_verification_results_(false) {
printing_verification_results_(false),
start_timestamp_(Env::Default()->NowNanos()) {
Status status;
// TODO: We should introduce a way to explicitly disable verification
// during shutdown. When that is disabled and FLAGS_expected_values_dir
@ -303,6 +304,8 @@ class SharedState {
printing_verification_results_.store(false, std::memory_order_relaxed);
}
uint64_t GetStartTimestamp() const { return start_timestamp_; }
private:
static void IgnoreReadErrorCallback(void*) {
ignore_read_error = true;
@ -365,6 +368,7 @@ class SharedState {
// and storing it in the container may require copying depending on the impl.
std::vector<std::unique_ptr<port::Mutex[]>> key_locks_;
std::atomic<bool> printing_verification_results_;
const uint64_t start_timestamp_;
};
// Per-thread state for concurrent executions of the same benchmark.

@ -474,7 +474,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = NowNanosStr();
ts_str = GetNowNanos();
ts = ts_str;
s = db_->Put(write_opts, cfh, key, ts, v);
} else {
@ -856,10 +856,10 @@ void StressTest::OperateDb(ThreadState* thread) {
Slice read_ts;
Slice write_ts;
if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
read_ts_str = GenerateTimestampForRead();
read_ts_str = GetNowNanos();
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
write_ts_str = NowNanosStr();
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
}
@ -1004,6 +1004,11 @@ Status StressTest::TestIterate(ThreadState* thread,
ReadOptions readoptionscopy = read_opts;
readoptionscopy.snapshot = snapshot;
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
readoptionscopy);
bool expect_total_order = false;
if (thread->rand.OneIn(16)) {
// When prefix extractor is used, it's useful to cover total order seek.
@ -1106,6 +1111,7 @@ Status StressTest::TestIterate(ThreadState* thread,
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions cmp_ro;
cmp_ro.timestamp = readoptionscopy.timestamp;
cmp_ro.iter_start_ts = readoptionscopy.iter_start_ts;
cmp_ro.snapshot = snapshot;
cmp_ro.total_order_seek = true;
ColumnFamilyHandle* cmp_cfh =
@ -1216,6 +1222,14 @@ void StressTest::VerifyIterator(ThreadState* thread,
return;
}
if (ro.iter_start_ts != nullptr) {
assert(FLAGS_user_timestamp_size > 0);
// We currently do not verify iterator when dumping history of internal
// keys.
*diverged = true;
return;
}
if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
// SeekToFirst() with lower bound is not well defined.
*diverged = true;
@ -1558,7 +1572,7 @@ Status StressTest::TestBackupRestore(
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
read_opts.timestamp = &ts;
}
@ -1749,7 +1763,7 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
Slice ts;
ReadOptions read_opts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
read_opts.timestamp = &ts;
}
@ -1976,7 +1990,7 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
ropt.timestamp = &ts;
}
@ -2129,7 +2143,7 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
ro.timestamp = &ts;
}
@ -2700,6 +2714,75 @@ void StressTest::Reopen(ThreadState* thread) {
}
}
void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts) {
if (FLAGS_user_timestamp_size == 0) {
return;
}
assert(thread);
if (!thread->rand.OneInOpt(3)) {
return;
}
const SharedState* const shared = thread->shared;
assert(shared);
const uint64_t start_ts = shared->GetStartTimestamp();
uint64_t now = db_stress_env->NowNanos();
assert(now > start_ts);
uint64_t time_diff = now - start_ts;
uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
ts_str.clear();
PutFixed64(&ts_str, ts);
ts_slice = ts_str;
read_opts.timestamp = &ts_slice;
}
void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts) {
if (FLAGS_user_timestamp_size == 0) {
return;
}
assert(thread);
if (!thread->rand.OneInOpt(3)) {
return;
}
const Slice* const saved_ts = read_opts.timestamp;
assert(saved_ts != nullptr);
const SharedState* const shared = thread->shared;
assert(shared);
const uint64_t start_ts = shared->GetStartTimestamp();
uint64_t now = db_stress_env->NowNanos();
assert(now > start_ts);
uint64_t time_diff = now - start_ts;
uint64_t ts = start_ts + (thread->rand.Next64() % time_diff);
ts_str.clear();
PutFixed64(&ts_str, ts);
ts_slice = ts_str;
read_opts.timestamp = &ts_slice;
if (!thread->rand.OneInOpt(3)) {
return;
}
ts_str.clear();
PutFixed64(&ts_str, start_ts);
ts_slice = ts_str;
read_opts.iter_start_ts = &ts_slice;
read_opts.timestamp = saved_ts;
}
void CheckAndSetOptionsForUserTimestamp(Options& options) {
assert(FLAGS_user_timestamp_size > 0);
const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper();

@ -229,6 +229,15 @@ class StressTest {
TransactionDBOptions& /*txn_db_opts*/) {}
#endif
void MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts);
void MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
std::string& ts_str, Slice& ts_slice,
ReadOptions& read_opts);
std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_;

@ -26,7 +26,7 @@ class NonBatchedOpsStressTest : public StressTest {
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
options.timestamp = &ts;
}
@ -216,7 +216,7 @@ class NonBatchedOpsStressTest : public StressTest {
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts_str = GetNowNanos();
ts = ts_str;
read_opts.timestamp = &ts;
}
@ -335,7 +335,14 @@ class NonBatchedOpsStressTest : public StressTest {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
Status s = db_->Get(read_opts, cfh, key, &from_db);
ReadOptions read_opts_copy = read_opts;
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
read_opts_copy);
Status s = db_->Get(read_opts_copy, cfh, key, &from_db);
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
@ -391,6 +398,12 @@ class NonBatchedOpsStressTest : public StressTest {
if (do_consistency_check) {
readoptionscopy.snapshot = db_->GetSnapshot();
}
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
readoptionscopy);
readoptionscopy.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
@ -591,6 +604,11 @@ class NonBatchedOpsStressTest : public StressTest {
ro_copy.iterate_upper_bound = &ub_slice;
}
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
ro_copy);
Iterator* iter = db_->NewIterator(ro_copy, cfh);
unsigned long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
@ -598,7 +616,9 @@ class NonBatchedOpsStressTest : public StressTest {
++count;
}
if (ro_copy.iter_start_ts == nullptr) {
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
}
Status s = iter->status();
if (iter->status().ok()) {
@ -630,12 +650,12 @@ class NonBatchedOpsStressTest : public StressTest {
lock.reset(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
if (FLAGS_user_timestamp_size > 0) {
write_ts_str = NowNanosStr();
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
}
}
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
write_ts_str = NowNanosStr();
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
}
@ -723,7 +743,7 @@ class NonBatchedOpsStressTest : public StressTest {
auto shared = thread->shared;
// OPERATION delete
std::string write_ts_str = NowNanosStr();
std::string write_ts_str = GetNowNanos();
Slice write_ts = write_ts_str;
std::string key_str = Key(rand_key);

Loading…
Cancel
Save