diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index d4bbe4eda..00103f444 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -102,6 +102,7 @@ struct TransactionDBOptions { friend class WritePreparedTxnDB; friend class WritePreparedTransactionTestBase; + friend class MySQLStyleTransactionTest; }; struct TransactionOptions { diff --git a/util/logging.h b/util/logging.h index f605d36a5..a4ef31bd6 100644 --- a/util/logging.h +++ b/util/logging.h @@ -55,3 +55,7 @@ inline const char* RocksLogShorterFileName(const char* file) #define ROCKS_LOG_BUFFER_MAX_SZ(LOG_BUF, MAX_LOG_SIZE, FMT, ...) \ rocksdb::LogToBuffer(LOG_BUF, MAX_LOG_SIZE, ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ + ; // due to overhead by default skip such lines +// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 2b0519cf8..e73062396 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -655,6 +655,9 @@ WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { if (snapshot != nullptr) { + ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log, + "ReleaseSnapshot %" PRIu64 " Set", + snapshot->GetSequenceNumber()); db->ReleaseSnapshot(snapshot); } } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 22e315c1a..b5dc32b26 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -66,12 +66,16 @@ INSTANTIATE_TEST_CASE_P( #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false), + std::make_tuple(false, true, WRITE_COMMITTED, false), + std::make_tuple(false, false, WRITE_PREPARED, false), + std::make_tuple(false, false, WRITE_PREPARED, true), + std::make_tuple(false, true, WRITE_PREPARED, false), + std::make_tuple(false, true, WRITE_PREPARED, true), + std::make_tuple(false, false, WRITE_UNPREPARED, false), + std::make_tuple(false, false, WRITE_UNPREPARED, true), + std::make_tuple(false, true, WRITE_UNPREPARED, false), + std::make_tuple(false, true, WRITE_UNPREPARED, true))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(TransactionTest, DoubleEmptyWrite) { @@ -4990,10 +4994,14 @@ TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) { #ifndef ROCKSDB_VALGRIND_RUN namespace { +// cmt_delay_ms is the delay between prepare and commit +// first_id is the id of the first transaction Status TransactionStressTestInserter(TransactionDB* db, const size_t num_transactions, const size_t num_sets, - const size_t num_keys_per_set) { + const size_t num_keys_per_set, + const uint64_t cmt_delay_ms = 0, + const uint64_t first_id = 0) { size_t seed = std::hash()(std::this_thread::get_id()); Random64 _rand(seed); WriteOptions write_options; @@ -5003,9 +5011,9 @@ Status TransactionStressTestInserter(TransactionDB* db, // separte functions are engaged for each. txn_options.set_snapshot = _rand.OneIn(2); - RandomTransactionInserter inserter(&_rand, write_options, read_options, - num_keys_per_set, - static_cast(num_sets)); + RandomTransactionInserter inserter( + &_rand, write_options, read_options, num_keys_per_set, + static_cast(num_sets), cmt_delay_ms, first_id); for (size_t t = 0; t < num_transactions; t++) { bool success = inserter.TransactionDBInsert(db, txn_options); @@ -5017,7 +5025,8 @@ Status TransactionStressTestInserter(TransactionDB* db, // Make sure at least some of the transactions succeeded. It's ok if // some failed due to write-conflicts. - if (inserter.GetFailureCount() > num_transactions / 2) { + if (num_transactions != 1 && + inserter.GetFailureCount() > num_transactions / 2) { return Status::TryAgain("Too many transactions failed! " + std::to_string(inserter.GetFailureCount()) + " / " + std::to_string(num_transactions)); @@ -5035,6 +5044,8 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { ReOpenNoDelete(); const size_t num_workers = 4; // worker threads count const size_t num_checkers = 2; // checker threads count + const size_t num_slow_checkers = 2; // checker threads emulating backups + const size_t num_slow_workers = 1; // slow worker threads count const size_t num_transactions_per_thread = 10000; const uint16_t num_sets = 3; const size_t num_keys_per_set = 100; @@ -5060,6 +5071,28 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { ASSERT_OK(s); } }; + std::function call_slow_checker = [&] { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 rand(seed); + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(100) + 1; + Status s = RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms); + ASSERT_OK(s); + } + }; + std::function call_slow_inserter = [&] { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 rand(seed); + uint64_t id = 0; + // Verify that data is consistent + while (finished < num_workers) { + uint64_t delay_ms = rand.Uniform(500) + 1; + ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set, + delay_ms, id++)); + } + }; for (uint32_t i = 0; i < num_workers; i++) { threads.emplace_back(call_inserter); @@ -5067,6 +5100,14 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { for (uint32_t i = 0; i < num_checkers; i++) { threads.emplace_back(call_checker); } + if (with_slow_threads_) { + for (uint32_t i = 0; i < num_slow_checkers; i++) { + threads.emplace_back(call_slow_checker); + } + for (uint32_t i = 0; i < num_slow_workers; i++) { + threads.emplace_back(call_slow_inserter); + } + } // Wait for all threads to finish for (auto& t : threads) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index cdc014acb..33b2c51ea 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -448,6 +448,31 @@ class TransactionTest : public TransactionTestBase, class TransactionStressTest : public TransactionTest {}; -class MySQLStyleTransactionTest : public TransactionTest {}; +class MySQLStyleTransactionTest + : public TransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { + public: + MySQLStyleTransactionTest() + : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam())), + with_slow_threads_(std::get<3>(GetParam())) { + if (with_slow_threads_ && + (txn_db_options.write_policy == WRITE_PREPARED || + txn_db_options.write_policy == WRITE_UNPREPARED)) { + // The corner case with slow threads involves the caches filling + // over which would not happen even with artifial delays. To help + // such cases to show up we lower the size of the cache-related data + // structures. + txn_db_options.wp_snapshot_cache_bits = 1; + txn_db_options.wp_commit_cache_bits = 10; + EXPECT_OK(ReOpen()); + } + }; + + protected: + // Also emulate slow threads by addin artiftial delays + const bool with_slow_threads_; +}; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e1190a864..2c642c814 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -34,10 +34,6 @@ namespace rocksdb { -#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ - ; // due to overhead by default skip such lines -// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) - // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. // In this way some data in the DB might not be committed. The DB provides // mechanisms to tell such data apart from committed data.