From 0f4244fe00f4c90f15212dead896a2ab7b49e0c1 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Tue, 19 Feb 2019 16:52:50 -0800 Subject: [PATCH] WritePrepared: Improve stress tests with slow threads (#4974) Summary: The transaction stress tests, stress a high concurrency scenario. In WritePrepared/WriteUnPrepared we need to also stress the scenarios where an inserting/reading transaction is very slow. This would stress the corner cases that the caching is not sufficient and other slower data structures are engaged. To emulate such cases we make use of slow inserter/verifier threads and also reduce the size of cache data structures. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4974 Differential Revision: D14143070 Pulled By: maysamyabandeh fbshipit-source-id: 81eb674678faf9fae0f654cd60ebcc74e26aeee7 --- include/rocksdb/utilities/transaction_db.h | 1 + util/logging.h | 4 ++ utilities/transactions/transaction_base.cc | 3 + utilities/transactions/transaction_test.cc | 63 +++++++++++++++---- utilities/transactions/transaction_test.h | 27 +++++++- .../transactions/write_prepared_txn_db.h | 4 -- 6 files changed, 86 insertions(+), 16 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index d4bbe4eda..00103f444 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -102,6 +102,7 @@ struct TransactionDBOptions { friend class WritePreparedTxnDB; friend class WritePreparedTransactionTestBase; + friend class MySQLStyleTransactionTest; }; struct TransactionOptions { diff --git a/util/logging.h b/util/logging.h index f605d36a5..a4ef31bd6 100644 --- a/util/logging.h +++ b/util/logging.h @@ -55,3 +55,7 @@ inline const char* RocksLogShorterFileName(const char* file) #define ROCKS_LOG_BUFFER_MAX_SZ(LOG_BUF, MAX_LOG_SIZE, FMT, ...) \ rocksdb::LogToBuffer(LOG_BUF, MAX_LOG_SIZE, ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ + ; // due to overhead by default skip such lines +// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 2b0519cf8..e73062396 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -655,6 +655,9 @@ WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { if (snapshot != nullptr) { + ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log, + "ReleaseSnapshot %" PRIu64 " Set", + snapshot->GetSequenceNumber()); db->ReleaseSnapshot(snapshot); } } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 22e315c1a..b5dc32b26 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -66,12 +66,16 @@ INSTANTIATE_TEST_CASE_P( #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false), + std::make_tuple(false, true, WRITE_COMMITTED, false), + std::make_tuple(false, false, WRITE_PREPARED, false), + std::make_tuple(false, false, WRITE_PREPARED, true), + std::make_tuple(false, true, WRITE_PREPARED, false), + std::make_tuple(false, true, WRITE_PREPARED, true), + std::make_tuple(false, false, WRITE_UNPREPARED, false), + std::make_tuple(false, false, WRITE_UNPREPARED, true), + std::make_tuple(false, true, WRITE_UNPREPARED, false), + std::make_tuple(false, true, WRITE_UNPREPARED, true))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(TransactionTest, DoubleEmptyWrite) { @@ -4990,10 +4994,14 @@ TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) { #ifndef ROCKSDB_VALGRIND_RUN namespace { +// cmt_delay_ms is the delay between prepare and commit +// first_id is the id of the first transaction Status TransactionStressTestInserter(TransactionDB* db, const size_t num_transactions, const size_t num_sets, - const size_t num_keys_per_set) { + const size_t num_keys_per_set, + const uint64_t cmt_delay_ms = 0, + const uint64_t first_id = 0) { size_t seed = std::hash()(std::this_thread::get_id()); Random64 _rand(seed); WriteOptions write_options; @@ -5003,9 +5011,9 @@ Status TransactionStressTestInserter(TransactionDB* db, // separte functions are engaged for each. txn_options.set_snapshot = _rand.OneIn(2); - RandomTransactionInserter inserter(&_rand, write_options, read_options, - num_keys_per_set, - static_cast(num_sets)); + RandomTransactionInserter inserter( + &_rand, write_options, read_options, num_keys_per_set, + static_cast(num_sets), cmt_delay_ms, first_id); for (size_t t = 0; t < num_transactions; t++) { bool success = inserter.TransactionDBInsert(db, txn_options); @@ -5017,7 +5025,8 @@ Status TransactionStressTestInserter(TransactionDB* db, // Make sure at least some of the transactions succeeded. It's ok if // some failed due to write-conflicts. - if (inserter.GetFailureCount() > num_transactions / 2) { + if (num_transactions != 1 && + inserter.GetFailureCount() > num_transactions / 2) { return Status::TryAgain("Too many transactions failed! " + std::to_string(inserter.GetFailureCount()) + " / " + std::to_string(num_transactions)); @@ -5035,6 +5044,8 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { ReOpenNoDelete(); const size_t num_workers = 4; // worker threads count const size_t num_checkers = 2; // checker threads count + const size_t num_slow_checkers = 2; // checker threads emulating backups + const size_t num_slow_workers = 1; // slow worker threads count const size_t num_transactions_per_thread = 10000; const uint16_t num_sets = 3; const size_t num_keys_per_set = 100; @@ -5060,6 +5071,28 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { ASSERT_OK(s); } }; + std::function call_slow_checker = [&] { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 rand(seed); + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(100) + 1; + Status s = RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms); + ASSERT_OK(s); + } + }; + std::function call_slow_inserter = [&] { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 rand(seed); + uint64_t id = 0; + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(500) + 1; + ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set, + delay_ms, id++)); + } + }; for (uint32_t i = 0; i < num_workers; i++) { threads.emplace_back(call_inserter); @@ -5067,6 +5100,14 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { for (uint32_t i = 0; i < num_checkers; i++) { threads.emplace_back(call_checker); } + if (with_slow_threads_) { + for (uint32_t i = 0; i < num_slow_checkers; i++) { + threads.emplace_back(call_slow_checker); + } + for (uint32_t i = 0; i < num_slow_workers; i++) { + threads.emplace_back(call_slow_inserter); + } + } // Wait for all threads to finish for (auto& t : threads) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index cdc014acb..33b2c51ea 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -448,6 +448,31 @@ class TransactionTest : public TransactionTestBase, class TransactionStressTest : public TransactionTest {}; -class MySQLStyleTransactionTest : public TransactionTest {}; +class MySQLStyleTransactionTest + : public TransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { + public: + MySQLStyleTransactionTest() + : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam())), + with_slow_threads_(std::get<3>(GetParam())) { + if (with_slow_threads_ && + (txn_db_options.write_policy == WRITE_PREPARED || + txn_db_options.write_policy == WRITE_UNPREPARED)) { + // The corner case with slow threads involves the caches filling + // over which would not happen even with artifial delays. To help + // such cases to show up we lower the size of the cache-related data + // structures. + txn_db_options.wp_snapshot_cache_bits = 1; + txn_db_options.wp_commit_cache_bits = 10; + EXPECT_OK(ReOpen()); + } + }; + + protected: + // Also emulate slow threads by addin artiftial delays + const bool with_slow_threads_; +}; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e1190a864..2c642c814 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -34,10 +34,6 @@ namespace rocksdb { -#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ - ; // due to overhead by default skip such lines -// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) - // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. // In this way some data in the DB might not be committed. The DB provides // mechanisms to tell such data apart from committed data.