move db_stress locking to `StressTest::Test*()` functions (#10678)

Summary:
One problem of the previous strategy was `NonBatchedOpsStressTest::TestIngestExternalFile()` could release the lock for `rand_keys[0]` in `rand_column_families[0]`, and then subsequent operations in the same loop iteration (e.g., `TestPut()`) would run without locking. This PR changes the strategy so each `Test*()` function is responsible for acquiring and releasing its own locks.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10678

Reviewed By: hx235

Differential Revision: D39516401

Pulled By: ajkr

fbshipit-source-id: bf67f12ebbd293ba8c24fdf8754ff28737bcd758
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent 7dad485278
commit 6ce782beaf
  1. 13
      db_stress_tool/batched_ops_stress.cc
  2. 13
      db_stress_tool/cf_consistency_stress.cc
  3. 36
      db_stress_tool/db_stress_test_base.cc
  4. 16
      db_stress_tool/db_stress_test_base.h
  5. 12
      db_stress_tool/multi_ops_txns_stress.cc
  6. 12
      db_stress_tool/multi_ops_txns_stress.h
  7. 51
      db_stress_tool/no_batched_ops_stress.cc

@ -24,8 +24,8 @@ class BatchedOpsStressTest : public StressTest {
Status TestPut(ThreadState* thread, WriteOptions& write_opts, Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */, const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, char (&value)[100], const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) override { char (&value)[100]) override {
uint32_t value_base = uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value)); size_t sz = GenerateValue(value_base, value, sizeof(value));
@ -66,8 +66,7 @@ class BatchedOpsStressTest : public StressTest {
// in DB atomically i.e in a single batch. Also refer MultiGet. // in DB atomically i.e in a single batch. Also refer MultiGet.
Status TestDelete(ThreadState* thread, WriteOptions& writeoptions, Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& /* lock */) override {
std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"}; std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
@ -95,8 +94,7 @@ class BatchedOpsStressTest : public StressTest {
Status TestDeleteRange(ThreadState* /* thread */, Status TestDeleteRange(ThreadState* /* thread */,
WriteOptions& /* write_opts */, WriteOptions& /* write_opts */,
const std::vector<int>& /* rand_column_families */, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */, const std::vector<int64_t>& /* rand_keys */) override {
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false); assert(false);
return Status::NotSupported( return Status::NotSupported(
"BatchedOpsStressTest does not support " "BatchedOpsStressTest does not support "
@ -106,8 +104,7 @@ class BatchedOpsStressTest : public StressTest {
void TestIngestExternalFile( void TestIngestExternalFile(
ThreadState* /* thread */, ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */, const std::vector<int64_t>& /* rand_keys */) override {
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false); assert(false);
fprintf(stderr, fprintf(stderr,
"BatchedOpsStressTest does not support " "BatchedOpsStressTest does not support "

@ -23,8 +23,8 @@ class CfConsistencyStressTest : public StressTest {
Status TestPut(ThreadState* thread, WriteOptions& write_opts, Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */, const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, char (&value)[100], const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) override { char (&value)[100]) override {
std::string key_str = Key(rand_keys[0]); std::string key_str = Key(rand_keys[0]);
Slice key = key_str; Slice key = key_str;
uint64_t value_base = batch_id_.fetch_add(1); uint64_t value_base = batch_id_.fetch_add(1);
@ -54,8 +54,7 @@ class CfConsistencyStressTest : public StressTest {
Status TestDelete(ThreadState* thread, WriteOptions& write_opts, Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& /* lock */) override {
std::string key_str = Key(rand_keys[0]); std::string key_str = Key(rand_keys[0]);
Slice key = key_str; Slice key = key_str;
WriteBatch batch; WriteBatch batch;
@ -75,8 +74,7 @@ class CfConsistencyStressTest : public StressTest {
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& /* lock */) override {
int64_t rand_key = rand_keys[0]; int64_t rand_key = rand_keys[0];
auto shared = thread->shared; auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey(); int64_t max_key = shared->GetMaxKey();
@ -107,8 +105,7 @@ class CfConsistencyStressTest : public StressTest {
void TestIngestExternalFile( void TestIngestExternalFile(
ThreadState* /* thread */, ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */, const std::vector<int64_t>& /* rand_keys */) override {
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false); assert(false);
fprintf(stderr, fprintf(stderr,
"CfConsistencyStressTest does not support TestIngestExternalFile " "CfConsistencyStressTest does not support TestIngestExternalFile "

@ -748,11 +748,6 @@ void StressTest::OperateDb(ThreadState* thread) {
int64_t rand_key = GenerateOneKey(thread, i); int64_t rand_key = GenerateOneKey(thread, i);
std::string keystr = Key(rand_key); std::string keystr = Key(rand_key);
Slice key = keystr; Slice key = keystr;
std::unique_ptr<MutexLock> lock;
if (ShouldAcquireMutexOnKey()) {
lock.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) { if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
TestCompactRange(thread, rand_key, key, column_family); TestCompactRange(thread, rand_key, key, column_family);
@ -825,7 +820,7 @@ void StressTest::OperateDb(ThreadState* thread) {
std::vector<int64_t> rand_keys = GenerateKeys(rand_key); std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) { if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
TestIngestExternalFile(thread, rand_column_families, rand_keys, lock); TestIngestExternalFile(thread, rand_column_families, rand_keys);
} }
if (thread->rand.OneInOpt(FLAGS_backup_one_in)) { if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
@ -881,7 +876,7 @@ void StressTest::OperateDb(ThreadState* thread) {
std::string write_ts_str; std::string write_ts_str;
Slice read_ts; Slice read_ts;
Slice write_ts; Slice write_ts;
if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) { if (FLAGS_user_timestamp_size > 0) {
read_ts_str = GetNowNanos(); read_ts_str = GetNowNanos();
read_ts = read_ts_str; read_ts = read_ts_str;
read_opts.timestamp = &read_ts; read_opts.timestamp = &read_ts;
@ -923,16 +918,15 @@ void StressTest::OperateDb(ThreadState* thread) {
assert(prefix_bound <= prob_op); assert(prefix_bound <= prob_op);
// OPERATION write // OPERATION write
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
value, lock); value);
} else if (prob_op < del_bound) { } else if (prob_op < del_bound) {
assert(write_bound <= prob_op); assert(write_bound <= prob_op);
// OPERATION delete // OPERATION delete
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); TestDelete(thread, write_opts, rand_column_families, rand_keys);
} else if (prob_op < delrange_bound) { } else if (prob_op < delrange_bound) {
assert(del_bound <= prob_op); assert(del_bound <= prob_op);
// OPERATION delete range // OPERATION delete range
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys, TestDeleteRange(thread, write_opts, rand_column_families, rand_keys);
lock);
} else if (prob_op < iterate_bound) { } else if (prob_op < iterate_bound) {
assert(delrange_bound <= prob_op); assert(delrange_bound <= prob_op);
// OPERATION iterate // OPERATION iterate
@ -940,7 +934,7 @@ void StressTest::OperateDb(ThreadState* thread) {
thread->rand.OneInOpt( thread->rand.OneInOpt(
FLAGS_verify_iterator_with_expected_state_one_in)) { FLAGS_verify_iterator_with_expected_state_one_in)) {
TestIterateAgainstExpected(thread, read_opts, rand_column_families, TestIterateAgainstExpected(thread, read_opts, rand_column_families,
rand_keys, lock); rand_keys);
} else { } else {
int num_seeks = static_cast<int>( int num_seeks = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)), std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
@ -1423,6 +1417,15 @@ void StressTest::TestCompactFiles(ThreadState* /* thread */,
Status StressTest::TestBackupRestore( Status StressTest::TestBackupRestore(
ThreadState* thread, const std::vector<int>& rand_column_families, ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) { const std::vector<int64_t>& rand_keys) {
std::vector<std::unique_ptr<MutexLock>> locks;
if (ShouldAcquireMutexOnKey()) {
for (int rand_column_family : rand_column_families) {
// `rand_keys[0]` on each chosen CF will be verified.
locks.emplace_back(new MutexLock(
thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
}
}
const std::string backup_dir = const std::string backup_dir =
FLAGS_db + "/.backup" + std::to_string(thread->tid); FLAGS_db + "/.backup" + std::to_string(thread->tid);
const std::string restore_dir = const std::string restore_dir =
@ -1724,6 +1727,15 @@ Status StressTest::TestApproximateSize(
Status StressTest::TestCheckpoint(ThreadState* thread, Status StressTest::TestCheckpoint(ThreadState* thread,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) { const std::vector<int64_t>& rand_keys) {
std::vector<std::unique_ptr<MutexLock>> locks;
if (ShouldAcquireMutexOnKey()) {
for (int rand_column_family : rand_column_families) {
// `rand_keys[0]` on each chosen CF will be verified.
locks.emplace_back(new MutexLock(
thread->shared->GetMutexForKey(rand_column_family, rand_keys[0])));
}
}
std::string checkpoint_dir = std::string checkpoint_dir =
FLAGS_db + "/.checkpoint" + std::to_string(thread->tid); FLAGS_db + "/.checkpoint" + std::to_string(thread->tid);
Options tmp_opts(options_); Options tmp_opts(options_);

@ -94,23 +94,20 @@ class StressTest {
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts, virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts, const ReadOptions& read_opts,
const std::vector<int>& cf_ids, const std::vector<int>& cf_ids,
const std::vector<int64_t>& keys, char (&value)[100], const std::vector<int64_t>& keys,
std::unique_ptr<MutexLock>& lock) = 0; char (&value)[100]) = 0;
virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) = 0;
std::unique_ptr<MutexLock>& lock) = 0;
virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) = 0;
std::unique_ptr<MutexLock>& lock) = 0;
virtual void TestIngestExternalFile( virtual void TestIngestExternalFile(
ThreadState* thread, const std::vector<int>& rand_column_families, ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) = 0;
std::unique_ptr<MutexLock>& lock) = 0;
// Issue compact range, starting with start_key, whose integer value // Issue compact range, starting with start_key, whose integer value
// is rand_key. // is rand_key.
@ -155,8 +152,7 @@ class StressTest {
virtual Status TestIterateAgainstExpected( virtual Status TestIterateAgainstExpected(
ThreadState* /* thread */, const ReadOptions& /* read_opts */, ThreadState* /* thread */, const ReadOptions& /* read_opts */,
const std::vector<int>& /* rand_column_families */, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */, const std::vector<int64_t>& /* rand_keys */) {
std::unique_ptr<MutexLock>& /* lock */) {
return Status::NotSupported(); return Status::NotSupported();
} }

@ -416,8 +416,7 @@ Status MultiOpsTxnsStressTest::TestPut(ThreadState* /*thread*/,
const ReadOptions& /*read_opts*/, const ReadOptions& /*read_opts*/,
const std::vector<int>& /*cf_ids*/, const std::vector<int>& /*cf_ids*/,
const std::vector<int64_t>& /*keys*/, const std::vector<int64_t>& /*keys*/,
char (&value)[100], char (&value)[100]) {
std::unique_ptr<MutexLock>& /*lock*/) {
(void)value; (void)value;
return Status::NotSupported(); return Status::NotSupported();
} }
@ -426,8 +425,7 @@ Status MultiOpsTxnsStressTest::TestPut(ThreadState* /*thread*/,
Status MultiOpsTxnsStressTest::TestDelete( Status MultiOpsTxnsStressTest::TestDelete(
ThreadState* /*thread*/, WriteOptions& /*write_opts*/, ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
const std::vector<int>& /*rand_column_families*/, const std::vector<int>& /*rand_column_families*/,
const std::vector<int64_t>& /*rand_keys*/, const std::vector<int64_t>& /*rand_keys*/) {
std::unique_ptr<MutexLock>& /*lock*/) {
return Status::NotSupported(); return Status::NotSupported();
} }
@ -435,15 +433,13 @@ Status MultiOpsTxnsStressTest::TestDelete(
Status MultiOpsTxnsStressTest::TestDeleteRange( Status MultiOpsTxnsStressTest::TestDeleteRange(
ThreadState* /*thread*/, WriteOptions& /*write_opts*/, ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
const std::vector<int>& /*rand_column_families*/, const std::vector<int>& /*rand_column_families*/,
const std::vector<int64_t>& /*rand_keys*/, const std::vector<int64_t>& /*rand_keys*/) {
std::unique_ptr<MutexLock>& /*lock*/) {
return Status::NotSupported(); return Status::NotSupported();
} }
void MultiOpsTxnsStressTest::TestIngestExternalFile( void MultiOpsTxnsStressTest::TestIngestExternalFile(
ThreadState* thread, const std::vector<int>& rand_column_families, ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& /*rand_keys*/, const std::vector<int64_t>& /*rand_keys*/) {
std::unique_ptr<MutexLock>& /*lock*/) {
// TODO (yanqin) // TODO (yanqin)
(void)thread; (void)thread;
(void)rand_column_families; (void)rand_column_families;

@ -222,23 +222,19 @@ class MultiOpsTxnsStressTest : public StressTest {
Status TestPut(ThreadState* thread, WriteOptions& write_opts, Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts, const std::vector<int>& cf_ids, const ReadOptions& read_opts, const std::vector<int>& cf_ids,
const std::vector<int64_t>& keys, char (&value)[100], const std::vector<int64_t>& keys, char (&value)[100]) override;
std::unique_ptr<MutexLock>& lock) override;
Status TestDelete(ThreadState* thread, WriteOptions& write_opts, Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override;
std::unique_ptr<MutexLock>& lock) override;
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override;
std::unique_ptr<MutexLock>& lock) override;
void TestIngestExternalFile(ThreadState* thread, void TestIngestExternalFile(ThreadState* thread,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override;
std::unique_ptr<MutexLock>& lock) override;
void TestCompactRange(ThreadState* thread, int64_t rand_key, void TestCompactRange(ThreadState* thread, int64_t rand_key,
const Slice& start_key, const Slice& start_key,

@ -336,6 +336,9 @@ class NonBatchedOpsStressTest : public StressTest {
SharedState::ignore_read_error = false; SharedState::ignore_read_error = false;
} }
std::unique_ptr<MutexLock> lock(new MutexLock(
thread->shared->GetMutexForKey(rand_column_families[0], rand_keys[0])));
ReadOptions read_opts_copy = read_opts; ReadOptions read_opts_copy = read_opts;
std::string read_ts_str; std::string read_ts_str;
Slice read_ts_slice; Slice read_ts_slice;
@ -656,14 +659,16 @@ class NonBatchedOpsStressTest : public StressTest {
Status TestPut(ThreadState* thread, WriteOptions& write_opts, Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, char (&value)[100], const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) override { char (&value)[100]) override {
auto shared = thread->shared; auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey(); int64_t max_key = shared->GetMaxKey();
int64_t rand_key = rand_keys[0]; int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0]; int rand_column_family = rand_column_families[0];
std::string write_ts_str; std::string write_ts_str;
Slice write_ts; Slice write_ts;
std::unique_ptr<MutexLock> lock(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
while (!shared->AllowsOverwrite(rand_key) && while (!shared->AllowsOverwrite(rand_key) &&
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
lock.reset(); lock.reset();
@ -758,12 +763,14 @@ class NonBatchedOpsStressTest : public StressTest {
Status TestDelete(ThreadState* thread, WriteOptions& write_opts, Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& /* lock */) override {
int64_t rand_key = rand_keys[0]; int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0]; int rand_column_family = rand_column_families[0];
auto shared = thread->shared; auto shared = thread->shared;
std::unique_ptr<MutexLock> lock(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
// OPERATION delete // OPERATION delete
std::string write_ts_str = GetNowNanos(); std::string write_ts_str = GetNowNanos();
Slice write_ts = write_ts_str; Slice write_ts = write_ts_str;
@ -855,8 +862,7 @@ class NonBatchedOpsStressTest : public StressTest {
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& lock) override {
// OPERATION delete range // OPERATION delete range
std::vector<std::unique_ptr<MutexLock>> range_locks; std::vector<std::unique_ptr<MutexLock>> range_locks;
// delete range does not respect disallowed overwrites. the keys for // delete range does not respect disallowed overwrites. the keys for
@ -868,16 +874,12 @@ class NonBatchedOpsStressTest : public StressTest {
auto shared = thread->shared; auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey(); int64_t max_key = shared->GetMaxKey();
if (rand_key > max_key - FLAGS_range_deletion_width) { if (rand_key > max_key - FLAGS_range_deletion_width) {
lock.reset();
rand_key = rand_key =
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1); thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
} else {
range_locks.emplace_back(std::move(lock));
} }
for (int j = 1; j < FLAGS_range_deletion_width; ++j) { for (int j = 0; j < FLAGS_range_deletion_width; ++j) {
if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { if (j == 0 ||
((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(new MutexLock( range_locks.emplace_back(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key + j))); shared->GetMutexForKey(rand_column_family, rand_key + j)));
} }
@ -918,8 +920,7 @@ class NonBatchedOpsStressTest : public StressTest {
void TestIngestExternalFile( void TestIngestExternalFile(
ThreadState* /* thread */, ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */, const std::vector<int64_t>& /* rand_keys */) override {
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false); assert(false);
fprintf(stderr, fprintf(stderr,
"RocksDB lite does not support " "RocksDB lite does not support "
@ -929,8 +930,7 @@ class NonBatchedOpsStressTest : public StressTest {
#else #else
void TestIngestExternalFile(ThreadState* thread, void TestIngestExternalFile(ThreadState* thread,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& lock) override {
const std::string sst_filename = const std::string sst_filename =
FLAGS_db + "/." + std::to_string(thread->tid) + ".sst"; FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
Status s; Status s;
@ -960,9 +960,8 @@ class NonBatchedOpsStressTest : public StressTest {
s.ok() && key < shared->GetMaxKey() && s.ok() && key < shared->GetMaxKey() &&
static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width; static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
++key) { ++key) {
if (key == key_base) { if (key == key_base ||
range_locks.emplace_back(std::move(lock)); (key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
} else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back( range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(column_family, key))); new MutexLock(shared->GetMutexForKey(column_family, key)));
} }
@ -1006,8 +1005,7 @@ class NonBatchedOpsStressTest : public StressTest {
Status TestIterateAgainstExpected( Status TestIterateAgainstExpected(
ThreadState* thread, const ReadOptions& read_opts, ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys) override {
std::unique_ptr<MutexLock>& lock) override {
// Lock the whole range over which we might iterate to ensure it doesn't // Lock the whole range over which we might iterate to ensure it doesn't
// change under us. // change under us.
std::vector<std::unique_ptr<MutexLock>> range_locks; std::vector<std::unique_ptr<MutexLock>> range_locks;
@ -1016,15 +1014,10 @@ class NonBatchedOpsStressTest : public StressTest {
auto shared = thread->shared; auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey(); int64_t max_key = shared->GetMaxKey();
if (static_cast<uint64_t>(lb) > max_key - FLAGS_num_iterations) { if (static_cast<uint64_t>(lb) > max_key - FLAGS_num_iterations) {
lock.reset();
lb = thread->rand.Next() % (max_key - FLAGS_num_iterations + 1); lb = thread->rand.Next() % (max_key - FLAGS_num_iterations + 1);
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, lb)));
} else {
range_locks.emplace_back(std::move(lock));
} }
for (int j = 1; j < static_cast<int>(FLAGS_num_iterations); ++j) { for (int j = 0; j < static_cast<int>(FLAGS_num_iterations); ++j) {
if (((lb + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { if (j == 0 || ((lb + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back( range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, lb + j))); new MutexLock(shared->GetMutexForKey(rand_column_family, lb + j)));
} }

Loading…
Cancel
Save