From ffb5f1f445a18266b5c65b212b73b2ad78a3b553 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Mon, 22 May 2023 12:31:52 -0700 Subject: [PATCH] Refactor WriteUnpreparedStressTest to be a unit test (#11424) Summary: This patch remove the "stress" aspect from the WriteUnpreparedStressTest and leave it to be a unit test for some correctness testing w.r.t. snapshot functionality. I added some read-your-write verification to the transaction test in db_stress. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11424 Test Plan: `./write_unprepared_transaction_test` `./db_crashtest.py whitebox --txn` `./db_crashtest.py blackbox --txn` Reviewed By: hx235 Differential Revision: D45551521 Pulled By: jowlyzhang fbshipit-source-id: 20c3d510eb4255b08ddd7b6c85bdb4945436f6e8 --- db_stress_tool/expected_value.h | 2 + db_stress_tool/no_batched_ops_stress.cc | 140 +++++++++++++-- .../write_unprepared_transaction_test.cc | 167 +++++++----------- 3 files changed, 185 insertions(+), 124 deletions(-) diff --git a/db_stress_tool/expected_value.h b/db_stress_tool/expected_value.h index 2e41b130c..338afc049 100644 --- a/db_stress_tool/expected_value.h +++ b/db_stress_tool/expected_value.h @@ -31,6 +31,8 @@ class ExpectedValue { return IsValuePartValid(value_base, VALUE_BASE_MASK); } + ExpectedValue() : expected_value_(DEL_MASK) {} + explicit ExpectedValue(uint32_t expected_value) : expected_value_(expected_value) {} diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 966558243..2b2e62489 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -571,7 +571,14 @@ class NonBatchedOpsStressTest : public StressTest { keys.reserve(num_keys); std::vector values(num_keys); std::vector statuses(num_keys); - ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + // When Flags_use_txn is enabled, we also do a read your write check. + std::vector> ryw_expected_values; + ryw_expected_values.reserve(num_keys); + + SharedState* shared = thread->shared; + + int column_family = rand_column_families[0]; + ColumnFamilyHandle* cfh = column_families_[column_family]; int error_count = 0; // Do a consistency check between Get and MultiGet. Don't do it too // often as it will slow db_stress down @@ -609,21 +616,32 @@ class NonBatchedOpsStressTest : public StressTest { } } for (size_t i = 0; i < num_keys; ++i) { - key_str.emplace_back(Key(rand_keys[i])); + uint64_t rand_key = rand_keys[i]; + key_str.emplace_back(Key(rand_key)); keys.emplace_back(key_str.back()); if (use_txn) { + if (!shared->AllowsOverwrite(rand_key) && + shared->Exists(column_family, rand_key)) { + // Just do read your write checks for keys that allow overwrites. + ryw_expected_values.push_back(std::nullopt); + continue; + } // With a 1 in 10 probability, insert the just added key in the batch // into the transaction. This will create an overlap with the MultiGet // keys and exercise some corner cases in the code if (thread->rand.OneIn(10)) { int op = thread->rand.Uniform(2); Status s; + assert(txn); switch (op) { case 0: case 1: { - const uint32_t value_base = 0; + ExpectedValue put_value; + put_value.Put(false /* pending */); + ryw_expected_values.emplace_back(put_value); char value[100]; - size_t sz = GenerateValue(value_base, value, sizeof(value)); + size_t sz = + GenerateValue(put_value.GetValueBase(), value, sizeof(value)); Slice v(value, sz); if (op == 0) { s = txn->Put(cfh, keys.back(), v); @@ -632,9 +650,13 @@ class NonBatchedOpsStressTest : public StressTest { } break; } - case 2: + case 2: { + ExpectedValue delete_value; + delete_value.Delete(false /* pending */); + ryw_expected_values.emplace_back(delete_value); s = txn->Delete(cfh, keys.back()); break; + } default: assert(false); } @@ -642,6 +664,8 @@ class NonBatchedOpsStressTest : public StressTest { fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str()); std::terminate(); } + } else { + ryw_expected_values.push_back(std::nullopt); } } } @@ -657,6 +681,7 @@ class NonBatchedOpsStressTest : public StressTest { error_count = fault_fs_guard->GetAndResetErrorCount(); } } else { + assert(txn); txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), statuses.data()); } @@ -685,50 +710,119 @@ class NonBatchedOpsStressTest : public StressTest { fault_fs_guard->DisableErrorInjection(); } - for (size_t i = 0; i < statuses.size(); ++i) { - Status s = statuses[i]; + auto ryw_check = + [](const Slice& key, const PinnableSlice& value, const Status& s, + const std::optional& ryw_expected_value) -> bool { + if (!ryw_expected_value.has_value()) { + return true; + } + const ExpectedValue& expected = ryw_expected_value.value(); + char expected_value[100]; + if (s.ok() && + ExpectedValueHelper::MustHaveNotExisted(expected, expected)) { + fprintf(stderr, + "MultiGet returned value different from what was " + "written for key %s\n", + key.ToString(true).c_str()); + fprintf(stderr, + "MultiGet returned ok, transaction has non-committed " + "delete.\n"); + return false; + } else if (s.IsNotFound() && + ExpectedValueHelper::MustHaveExisted(expected, expected)) { + fprintf(stderr, + "MultiGet returned value different from what was " + "written for key %s\n", + key.ToString(true).c_str()); + fprintf(stderr, + "MultiGet returned not found, transaction has " + "non-committed value.\n"); + return false; + } else if (s.ok() && + ExpectedValueHelper::MustHaveExisted(expected, expected)) { + Slice from_txn_slice(value); + size_t sz = GenerateValue(expected.GetValueBase(), expected_value, + sizeof(expected_value)); + Slice expected_value_slice(expected_value, sz); + if (expected_value_slice.compare(from_txn_slice) == 0) { + return true; + } + fprintf(stderr, + "MultiGet returned value different from what was " + "written for key %s\n", + key.ToString(true /* hex */).c_str()); + fprintf(stderr, "MultiGet returned value %s\n", + from_txn_slice.ToString(true /* hex */).c_str()); + fprintf(stderr, "Transaction has non-committed value %s\n", + expected_value_slice.ToString(true /* hex */).c_str()); + return false; + } + return true; + }; + + auto check_multiget = + [&](const Slice& key, const PinnableSlice& expected_value, + const Status& s, + const std::optional& ryw_expected_value) -> bool { bool is_consistent = true; - // Only do the consistency check if no error was injected and MultiGet - // didn't return an unexpected error + bool is_ryw_correct = true; + // Only do the consistency check if no error was injected and + // MultiGet didn't return an unexpected error. If test does not use + // transaction, the consistency check for each key included check results + // from db `Get` and db `MultiGet` are consistent. + // If test use transaction, after consistency check, also do a read your + // own write check. if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) { Status tmp_s; std::string value; if (use_txn) { - tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value); + assert(txn); + tmp_s = txn->Get(readoptionscopy, cfh, key, &value); } else { - tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value); + tmp_s = db_->Get(readoptionscopy, cfh, key, &value); } if (!tmp_s.ok() && !tmp_s.IsNotFound()) { fprintf(stderr, "Get error: %s\n", s.ToString().c_str()); is_consistent = false; } else if (!s.ok() && tmp_s.ok()) { fprintf(stderr, "MultiGet returned different results with key %s\n", - keys[i].ToString(true).c_str()); + key.ToString(true).c_str()); fprintf(stderr, "Get returned ok, MultiGet returned not found\n"); is_consistent = false; } else if (s.ok() && tmp_s.IsNotFound()) { fprintf(stderr, "MultiGet returned different results with key %s\n", - keys[i].ToString(true).c_str()); + key.ToString(true).c_str()); fprintf(stderr, "MultiGet returned ok, Get returned not found\n"); is_consistent = false; - } else if (s.ok() && value != values[i].ToString()) { + } else if (s.ok() && value != expected_value.ToString()) { fprintf(stderr, "MultiGet returned different results with key %s\n", - keys[i].ToString(true).c_str()); + key.ToString(true).c_str()); fprintf(stderr, "MultiGet returned value %s\n", - values[i].ToString(true).c_str()); + expected_value.ToString(true).c_str()); fprintf(stderr, "Get returned value %s\n", Slice(value).ToString(true /* hex */).c_str()); is_consistent = false; } } + // If test uses transaction, continue to do a read your own write check. + if (is_consistent && use_txn) { + is_ryw_correct = ryw_check(key, expected_value, s, ryw_expected_value); + } + if (!is_consistent) { fprintf(stderr, "TestMultiGet error: is_consistent is false\n"); thread->stats.AddErrors(1); // Fail fast to preserve the DB state thread->shared->SetVerificationFailure(); - break; + return false; + } else if (!is_ryw_correct) { + fprintf(stderr, "TestMultiGet error: is_ryw_correct is false\n"); + thread->stats.AddErrors(1); + // Fail fast to preserve the DB state + thread->shared->SetVerificationFailure(); + return false; } else if (s.ok()) { // found case thread->stats.AddGets(1, 1); @@ -747,6 +841,18 @@ class NonBatchedOpsStressTest : public StressTest { thread->stats.AddVerifiedErrors(1); } } + return true; + }; + + size_t num_of_keys = keys.size(); + assert(values.size() == num_of_keys); + assert(statuses.size() == num_of_keys); + assert(ryw_expected_values.size() == num_of_keys); + for (size_t i = 0; i < num_of_keys; ++i) { + if (!check_multiget(keys[i], values[i], statuses[i], + ryw_expected_values[i])) { + break; + } } if (readoptionscopy.snapshot) { diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index bdc324634..d1307d760 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -35,26 +35,32 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), std::make_tuple(false, true, WRITE_UNPREPARED))); -enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; -class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { +enum SnapshotAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; +enum VerificationOperation { VERIFY_GET, VERIFY_NEXT, VERIFY_PREV }; +class WriteUnpreparedSnapshotTest + : public WriteUnpreparedTransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { public: - WriteUnpreparedStressTest() + WriteUnpreparedSnapshotTest() : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()), WRITE_UNPREPARED), - action_(std::get<1>(GetParam())) {} - StressAction action_; + action_(std::get<1>(GetParam())), + verify_op_(std::get<2>(GetParam())) {} + SnapshotAction action_; + VerificationOperation verify_op_; }; +// Test parameters: +// Param 0): use stackable db, parameterization hard coded to be overwritten to +// false. Param 1): test mode for snapshot action Param 2): test mode for +// verification operation INSTANTIATE_TEST_CASE_P( - WriteUnpreparedStressTest, WriteUnpreparedStressTest, - ::testing::Values(std::make_tuple(false, NO_SNAPSHOT), - std::make_tuple(false, RO_SNAPSHOT), - std::make_tuple(false, REFRESH_SNAPSHOT), - std::make_tuple(true, NO_SNAPSHOT), - std::make_tuple(true, RO_SNAPSHOT), - std::make_tuple(true, REFRESH_SNAPSHOT))); + WriteUnpreparedSnapshotTest, WriteUnpreparedSnapshotTest, + ::testing::Combine( + ::testing::Bool(), + ::testing::Values(NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT), + ::testing::Values(VERIFY_GET, VERIFY_NEXT, VERIFY_PREV))); TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { // The following tests checks whether reading your own write for @@ -135,42 +141,33 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { } } -#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) -TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) { - // This is a stress test where different threads are writing random keys, and - // then before committing or aborting the transaction, it validates to see - // that it can read the keys it wrote, and the keys it did not write respect - // the snapshot. To avoid row lock contention (and simply stressing the - // locking system), each thread is mostly only writing to its own set of keys. +TEST_P(WriteUnpreparedSnapshotTest, ReadYourOwnWrite) { + // This test validates a transaction can read its writes and the correctness + // of its read with regard to a mocked snapshot functionality. const uint32_t kNumIter = 1000; - const uint32_t kNumThreads = 10; const uint32_t kNumKeys = 5; // Test with // 1. no snapshots set // 2. snapshot set on ReadOptions // 3. snapshot set, and refreshing after every write. - StressAction a = action_; + SnapshotAction snapshot_action = action_; WriteOptions write_options; txn_db_options.transaction_lock_timeout = -1; options.disable_auto_compactions = true; ASSERT_OK(ReOpen()); std::vector keys; - for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) { + for (uint32_t k = 0; k < kNumKeys; k++) { keys.push_back("k" + std::to_string(k)); } - RandomShuffle(keys.begin(), keys.end()); // This counter will act as a "sequence number" to help us validate // visibility logic with snapshots. If we had direct access to the seqno of // snapshots and key/values, then we should directly compare those instead. std::atomic counter(0); - std::function stress_thread = [&](int id) { - size_t tid = std::hash()(std::this_thread::get_id()); - Random64 rnd(static_cast(tid)); - + std::function check_correctness_wrt_snapshot = [&]() { Transaction* txn; TransactionOptions txn_options; // batch_size of 1 causes writes to DB for every marker. @@ -178,114 +175,82 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) { ReadOptions read_options; for (uint32_t i = 0; i < kNumIter; i++) { - std::set owned_keys(keys.begin() + id * kNumKeys, - keys.begin() + (id + 1) * kNumKeys); - // Add unowned keys to make the workload more interesting, but this - // increases row lock contention, so just do it sometimes. - if (rnd.OneIn(2)) { - owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]); - } - txn = db->BeginTransaction(write_options, txn_options); - ASSERT_OK(txn->SetName(std::to_string(id))); txn->SetSnapshot(); - if (a >= RO_SNAPSHOT) { + if (snapshot_action >= RO_SNAPSHOT) { read_options.snapshot = txn->GetSnapshot(); ASSERT_TRUE(read_options.snapshot != nullptr); } - uint64_t buf[2]; - buf[0] = id; + uint64_t buf[1]; // When scanning through the database, make sure that all unprepared - // keys have value >= snapshot and all other keys have value < snapshot. + // keys have value >= snapshot. int64_t snapshot_num = counter.fetch_add(1); Status s; - for (const auto& key : owned_keys) { - buf[1] = counter.fetch_add(1); + for (const auto& key : keys) { + buf[0] = counter.fetch_add(1); s = txn->Put(key, Slice((const char*)buf, sizeof(buf))); if (!s.ok()) { break; } - if (a == REFRESH_SNAPSHOT) { + if (snapshot_action == REFRESH_SNAPSHOT) { txn->SetSnapshot(); read_options.snapshot = txn->GetSnapshot(); snapshot_num = counter.fetch_add(1); } } - // Failure is possible due to snapshot validation. In this case, - // rollback and move onto next iteration. - if (!s.ok()) { - ASSERT_TRUE(s.IsBusy()); - ASSERT_OK(txn->Rollback()); - delete txn; - continue; - } + ASSERT_OK(s); - auto verify_key = [&owned_keys, &a, &id, &snapshot_num]( - const std::string& key, const std::string& value) { - if (owned_keys.count(key) > 0) { - ASSERT_EQ(value.size(), 16); - - // Since this key is part of owned_keys, then this key must be - // unprepared by this transaction identified by 'id' - ASSERT_EQ(((int64_t*)value.c_str())[0], id); - if (a == REFRESH_SNAPSHOT) { - // If refresh snapshot is true, then the snapshot is refreshed - // after every Put(), meaning that the current snapshot in - // snapshot_num must be greater than the "seqno" of any keys - // written by the current transaction. - ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); - } else { - // If refresh snapshot is not on, then the snapshot was taken at - // the beginning of the transaction, meaning all writes must come - // after snapshot_num - ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num); - } - } else if (a >= RO_SNAPSHOT) { - // If this is not an unprepared key, just assert that the key - // "seqno" is smaller than the snapshot seqno. - ASSERT_EQ(value.size(), 16); - ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); + auto verify_key = [&snapshot_action, + &snapshot_num](const std::string& value) { + ASSERT_EQ(value.size(), 8); + + if (snapshot_action == REFRESH_SNAPSHOT) { + // If refresh snapshot is true, then the snapshot is refreshed + // after every Put(), meaning that the current snapshot in + // snapshot_num must be greater than the "seqno" of any keys + // written by the current transaction. + ASSERT_LT(((int64_t*)value.c_str())[0], snapshot_num); + } else { + // If refresh snapshot is not on, then the snapshot was taken at + // the beginning of the transaction, meaning all writes must come + // after snapshot_num + ASSERT_GT(((int64_t*)value.c_str())[0], snapshot_num); } }; - // Validate Get()/Next()/Prev(). Do only one of them to save time, and - // reduce lock contention. - switch (rnd.Uniform(3)) { - case 0: // Validate Get() + // Validate one of Get()/Next()/Prev() depending on the verification + // operation to use. + switch (verify_op_) { + case VERIFY_GET: // Validate Get() { for (const auto& key : keys) { std::string value; - s = txn->Get(read_options, Slice(key), &value); - if (!s.ok()) { - ASSERT_TRUE(s.IsNotFound()); - ASSERT_EQ(owned_keys.count(key), 0); - } else { - verify_key(key, value); - } + ASSERT_OK(txn->Get(read_options, Slice(key), &value)); + verify_key(value); } break; } - case 1: // Validate Next() + case VERIFY_NEXT: // Validate Next() { Iterator* iter = txn->GetIterator(read_options); ASSERT_OK(iter->status()); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - verify_key(iter->key().ToString(), iter->value().ToString()); + verify_key(iter->value().ToString()); } ASSERT_OK(iter->status()); delete iter; break; } - case 2: // Validate Prev() + case VERIFY_PREV: // Validate Prev() { Iterator* iter = txn->GetIterator(read_options); ASSERT_OK(iter->status()); for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { - verify_key(iter->key().ToString(), iter->value().ToString()); + verify_key(iter->value().ToString()); } ASSERT_OK(iter->status()); delete iter; @@ -295,25 +260,13 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) { FAIL(); } - if (rnd.OneIn(2)) { - ASSERT_OK(txn->Commit()); - } else { - ASSERT_OK(txn->Rollback()); - } + ASSERT_OK(txn->Commit()); delete txn; } }; - std::vector threads; - for (uint32_t i = 0; i < kNumThreads; i++) { - threads.emplace_back(stress_thread, i); - } - - for (auto& t : threads) { - t.join(); - } + check_correctness_wrt_snapshot(); } -#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) // This tests how write unprepared behaves during recovery when the DB crashes // after a transaction has either been unprepared or prepared, and tests if