diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 4e61d2252..c53a0742b 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -215,6 +215,32 @@ class SharedState { } } + // Returns a collection of mutex locks covering the key range [start, end) in + // `cf`. + std::vector> GetLocksForKeyRange(int cf, + int64_t start, + int64_t end) { + std::vector> range_locks; + + if (start >= end) { + return range_locks; + } + + const int64_t start_idx = start >> log2_keys_per_lock_; + + int64_t end_idx = end >> log2_keys_per_lock_; + if ((end & ((1 << log2_keys_per_lock_) - 1)) == 0) { + --end_idx; + } + + for (int64_t idx = start_idx; idx <= end_idx; ++idx) { + range_locks.emplace_back( + std::make_unique(&key_locks_[cf][idx])); + } + + return range_locks; + } + Status SaveAtAndAfter(DB* db) { return expected_state_manager_->SaveAtAndAfter(db); } diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 533d0b95c..031134a0c 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -1127,33 +1127,41 @@ class NonBatchedOpsStressTest : public StressTest { ThreadState* thread, const ReadOptions& read_opts, const std::vector& rand_column_families, const std::vector& rand_keys) override { - // Lock the whole range over which we might iterate to ensure it doesn't - // change under us. - std::vector> range_locks; - int64_t lb = rand_keys[0]; - int rand_column_family = rand_column_families[0]; + assert(thread); + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + auto shared = thread->shared; + assert(shared); + int64_t max_key = shared->GetMaxKey(); - if (static_cast(lb) > max_key - FLAGS_num_iterations) { - lb = thread->rand.Next() % (max_key - FLAGS_num_iterations + 1); - } - for (int j = 0; j < static_cast(FLAGS_num_iterations); ++j) { - if (j == 0 || ((lb + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { - range_locks.emplace_back( - new MutexLock(shared->GetMutexForKey(rand_column_family, lb + j))); - } + + const int64_t num_iter = static_cast(FLAGS_num_iterations); + + int64_t lb = rand_keys[0]; + if (lb > max_key - num_iter) { + lb = thread->rand.Next() % (max_key - num_iter + 1); } - int64_t ub = lb + FLAGS_num_iterations; - // Locks acquired for [lb, ub) - ReadOptions readoptscopy(read_opts); + + const int64_t ub = lb + num_iter; + + // Lock the whole range over which we might iterate to ensure it doesn't + // change under us. + const int rand_column_family = rand_column_families[0]; + std::vector> range_locks = + shared->GetLocksForKeyRange(rand_column_family, lb, ub); + + ReadOptions ro(read_opts); + ro.total_order_seek = true; + std::string read_ts_str; Slice read_ts; if (FLAGS_user_timestamp_size > 0) { read_ts_str = GetNowNanos(); read_ts = read_ts_str; - readoptscopy.timestamp = &read_ts; + ro.timestamp = &read_ts; } - readoptscopy.total_order_seek = true; + std::string max_key_str; Slice max_key_slice; if (!FLAGS_destroy_db_initially) { @@ -1162,11 +1170,41 @@ class NonBatchedOpsStressTest : public StressTest { // to restrict iterator from reading keys written in batched_op_stress // that do not have expected state updated and may not be parseable by // GetIntVal(). - readoptscopy.iterate_upper_bound = &max_key_slice; + ro.iterate_upper_bound = &max_key_slice; } - auto cfh = column_families_[rand_column_family]; + + ColumnFamilyHandle* const cfh = column_families_[rand_column_family]; + assert(cfh); + + std::unique_ptr iter(db_->NewIterator(ro, cfh)); + std::string op_logs; - std::unique_ptr iter(db_->NewIterator(readoptscopy, cfh)); + + auto check_columns = [&]() { + assert(iter); + assert(iter->Valid()); + + const WideColumns expected_columns = GenerateExpectedWideColumns( + GetValueBase(iter->value()), iter->value()); + if (iter->columns() != expected_columns) { + shared->SetVerificationFailure(); + + fprintf(stderr, + "Verification failed for key %s: " + "Value and columns inconsistent: %s\n", + Slice(iter->key()).ToString(/* hex */ true).c_str(), + DebugString(iter->value(), iter->columns(), expected_columns) + .c_str()); + fprintf(stderr, "Column family: %s, op_logs: %s\n", + cfh->GetName().c_str(), op_logs.c_str()); + + thread->stats.AddErrors(1); + + return false; + } + + return true; + }; auto check_no_key_in_range = [&](int64_t start, int64_t end) { for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) { @@ -1198,10 +1236,13 @@ class NonBatchedOpsStressTest : public StressTest { // The random sequence Next and Prev test below tends to be very short // ranged. int64_t last_key = lb - 1; + std::string key_str = Key(lb); - iter->Seek(Slice(key_str)); + iter->Seek(key_str); + op_logs += "S " + Slice(key_str).ToString(true) + " "; - uint64_t curr; + + uint64_t curr = 0; while (true) { if (!iter->Valid()) { if (!iter->status().ok()) { @@ -1213,29 +1254,38 @@ class NonBatchedOpsStressTest : public StressTest { thread->stats.AddErrors(1); return iter->status(); } - if (!check_no_key_in_range(last_key + 1, static_cast(ub))) { - // error reported in check_no_key_in_range() + if (!check_no_key_in_range(last_key + 1, ub)) { return Status::OK(); } break; } + + if (!check_columns()) { + return Status::OK(); + } + // iter is valid, the range (last_key, current key) was skipped GetIntVal(iter->key().ToString(), &curr); if (!check_no_key_in_range(last_key + 1, static_cast(curr))) { return Status::OK(); } + last_key = static_cast(curr); if (last_key >= ub - 1) { break; } + iter->Next(); + op_logs += "N"; } // backward scan key_str = Key(ub - 1); - iter->SeekForPrev(Slice(key_str)); + iter->SeekForPrev(key_str); + op_logs += " SFP " + Slice(key_str).ToString(true) + " "; + last_key = ub; while (true) { if (!iter->Valid()) { @@ -1253,16 +1303,24 @@ class NonBatchedOpsStressTest : public StressTest { } break; } + + if (!check_columns()) { + return Status::OK(); + } + // the range (current key, last key) was skipped GetIntVal(iter->key().ToString(), &curr); if (!check_no_key_in_range(static_cast(curr + 1), last_key)) { return Status::OK(); } + last_key = static_cast(curr); if (last_key <= lb) { break; } + iter->Prev(); + op_logs += "P"; } @@ -1271,11 +1329,14 @@ class NonBatchedOpsStressTest : public StressTest { // change. It is safe to refresh since the testing key range is locked. iter->Refresh(); } + // start from middle of [lb, ub) otherwise it is easy to iterate out of // locked range - int64_t mid = lb + static_cast(FLAGS_num_iterations / 2); + const int64_t mid = lb + num_iter / 2; + key_str = Key(mid); - Slice key = key_str; + const Slice key(key_str); + if (thread->rand.OneIn(2)) { iter->Seek(key); op_logs += " S " + key.ToString(true) + " "; @@ -1295,16 +1356,20 @@ class NonBatchedOpsStressTest : public StressTest { } } - for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) { + for (int64_t i = 0; i < num_iter && iter->Valid(); ++i) { + if (!check_columns()) { + return Status::OK(); + } + GetIntVal(iter->key().ToString(), &curr); - if (curr < static_cast(lb)) { + if (static_cast(curr) < lb) { iter->Next(); op_logs += "N"; - } else if (curr >= static_cast(ub)) { + } else if (static_cast(curr) >= ub) { iter->Prev(); op_logs += "P"; } else { - uint32_t expected_value = + const uint32_t expected_value = shared->Get(rand_column_family, static_cast(curr)); if (expected_value == shared->DELETION_SENTINEL) { // Fail fast to preserve the DB state. @@ -1316,13 +1381,14 @@ class NonBatchedOpsStressTest : public StressTest { thread->stats.AddErrors(1); break; } + if (thread->rand.OneIn(2)) { iter->Next(); op_logs += "N"; if (!iter->Valid()) { break; } - uint64_t next; + uint64_t next = 0; GetIntVal(iter->key().ToString(), &next); if (!check_no_key_in_range(static_cast(curr + 1), static_cast(next))) { @@ -1334,7 +1400,7 @@ class NonBatchedOpsStressTest : public StressTest { if (!iter->Valid()) { break; } - uint64_t prev; + uint64_t prev = 0; GetIntVal(iter->key().ToString(), &prev); if (!check_no_key_in_range(static_cast(prev + 1), static_cast(curr))) { @@ -1343,6 +1409,7 @@ class NonBatchedOpsStressTest : public StressTest { } } } + if (!iter->status().ok()) { thread->shared->SetVerificationFailure(); fprintf(stderr, "TestIterate against expected state error: %s\n", @@ -1352,7 +1419,9 @@ class NonBatchedOpsStressTest : public StressTest { thread->stats.AddErrors(1); return iter->status(); } + thread->stats.AddIterations(1); + return Status::OK(); }