diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index 58d95b2ae..240295a81 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -21,6 +21,10 @@ #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" + +#include "db/dbformat.h" +#include "db/snapshot_impl.h" +#include "util/logging.h" #include "util/random.h" #include "util/string_util.h" @@ -28,13 +32,15 @@ namespace rocksdb { RandomTransactionInserter::RandomTransactionInserter( Random64* rand, const WriteOptions& write_options, - const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets) + const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets, + const uint64_t cmt_delay_ms, const uint64_t first_id) : rand_(rand), write_options_(write_options), read_options_(read_options), num_keys_(num_keys), num_sets_(num_sets), - txn_id_(0) {} + txn_id_(first_id), + cmt_delay_ms_(cmt_delay_ms) {} RandomTransactionInserter::~RandomTransactionInserter() { if (txn_ != nullptr) { @@ -51,17 +57,18 @@ bool RandomTransactionInserter::TransactionDBInsert( std::hash hasher; char name[64]; - snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d", + snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64, hasher(std::this_thread::get_id()), txn_id_++); assert(strlen(name) < 64 - 1); - txn_->SetName(name); + assert(txn_->SetName(name).ok()); - bool take_snapshot = rand_->OneIn(2); + // Take a snapshot if set_snapshot was not set or with 50% change otherwise + bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2); if (take_snapshot) { txn_->SetSnapshot(); read_options_.snapshot = txn_->GetSnapshot(); } - auto res = DoInsert(nullptr, txn_, false); + auto res = DoInsert(db, txn_, false); if (take_snapshot) { read_options_.snapshot = nullptr; } @@ -74,7 +81,7 @@ bool RandomTransactionInserter::OptimisticTransactionDBInsert( optimistic_txn_ = db->BeginTransaction(write_options_, txn_options, optimistic_txn_); - return DoInsert(nullptr, optimistic_txn_, true); + return DoInsert(db, optimistic_txn_, true); } bool RandomTransactionInserter::DBInsert(DB* db) { @@ -178,20 +185,39 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, } bytes_inserted_ += key.size() + sum.size(); } + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64 + "+%" PRIu64 "=%" PRIu64, + txn->GetName().c_str(), s.ToString().c_str(), + txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(), + int_value, incr, int_value + incr); } if (s.ok()) { if (txn != nullptr) { - if (!is_optimistic && !rand_->OneIn(10)) { - // also try commit without prpare + bool with_prepare = !is_optimistic && !rand_->OneIn(10); + if (with_prepare) { + // Also try commit without prepare s = txn->Prepare(); assert(s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Prepare of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); + db->GetDBOptions().env->SleepForMicroseconds( + static_cast(cmt_delay_ms_ * 1000)); } if (!rand_->OneIn(20)) { s = txn->Commit(); + assert(!with_prepare || s.ok()); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Commit of %" PRIu64 " %s (%s)", txn->GetId(), + s.ToString().c_str(), txn->GetName().c_str()); } else { // Also try 5% rollback s = txn->Rollback(); + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, + "Rollback %" PRIu64 " %s %s", txn->GetId(), + txn->GetName().c_str(), s.ToString().c_str()); assert(s.ok()); } assert(is_optimistic || s.ok()); @@ -226,6 +252,8 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, } } } else { + ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s", + s.ToString().c_str(), txn->GetName().c_str()); if (txn != nullptr) { assert(txn->Rollback().ok()); } @@ -246,7 +274,11 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, // Verify that the sum of the keys in each set are equal Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set, - bool take_snapshot, Random64* rand) { + bool take_snapshot, Random64* rand, + uint64_t delay_ms) { + // delay_ms is the delay between taking a snapshot and doing the reads. It + // emulates reads from a long-running backup job. + assert(delay_ms == 0 || take_snapshot); uint64_t prev_total = 0; uint32_t prev_i = 0; bool prev_assigned = false; @@ -254,6 +286,8 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, ReadOptions roptions; if (take_snapshot) { roptions.snapshot = db->GetSnapshot(); + db->GetDBOptions().env->SleepForMicroseconds( + static_cast(delay_ms * 1000)); } std::vector set_vec(num_sets); @@ -271,7 +305,9 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, // Use either point lookup or iterator. Point lookups are slower so we use // it less often. - if (num_keys_per_set != 0 && rand && rand->OneIn(10)) { // use point lookup + const bool use_point_lookup = + num_keys_per_set != 0 && rand && rand->OneIn(10); + if (use_point_lookup) { ReadOptions read_options; for (uint64_t k = 0; k < num_keys_per_set; k++) { std::string dont_care; @@ -299,17 +335,37 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, value.ToString().c_str()); return Status::Corruption(); } + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul, + roptions.snapshot + ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_ + : 0ul, + key.size(), key.data(), int_value); total += int_value; } delete iter; } if (prev_assigned && total != prev_total) { + db->GetDBOptions().info_log->Flush(); fprintf(stdout, - "RandomTransactionVerify found inconsistent totals. " - "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n", - prev_i, prev_total, set_i, total); + "RandomTransactionVerify found inconsistent totals using " + "pointlookup? %d " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 + " at snapshot %" PRIu64 "\n", + use_point_lookup, prev_i, prev_total, set_i, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); + fflush(stdout); return Status::Corruption(); + } else { + ROCKS_LOG_DEBUG( + db->GetDBOptions().info_log, + "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64 + " snap: %" PRIu64, + use_point_lookup, total, + roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul); } prev_total = total; prev_i = set_i; diff --git a/util/transaction_test_util.h b/util/transaction_test_util.h index 414a4267e..1aa4196ab 100644 --- a/util/transaction_test_util.h +++ b/util/transaction_test_util.h @@ -35,10 +35,13 @@ class RandomTransactionInserter { public: // num_keys is the number of keys in each set. // num_sets is the number of sets of keys. + // cmt_delay_ms is the delay between prepare (if there is any) and commit + // first_id is the id of the first transaction explicit RandomTransactionInserter( Random64* rand, const WriteOptions& write_options = WriteOptions(), const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000, - uint16_t num_sets = 3); + uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0, + const uint64_t first_id = 0); ~RandomTransactionInserter(); @@ -76,7 +79,8 @@ class RandomTransactionInserter { // Returns OK if Invariant is true. static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0, - bool take_snapshot = false, Random64* rand = nullptr); + bool take_snapshot = false, Random64* rand = nullptr, + uint64_t delay_ms = 0); // Returns the status of the previous Insert operation Status GetLastStatus() { return last_status_; } @@ -116,7 +120,9 @@ class RandomTransactionInserter { Transaction* txn_ = nullptr; Transaction* optimistic_txn_ = nullptr; - std::atomic txn_id_; + uint64_t txn_id_; + // The delay between ::Prepare and ::Commit + const uint64_t cmt_delay_ms_; bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); }; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 792f34730..58812ef8c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4999,7 +4999,9 @@ Status TransactionStressTestInserter(TransactionDB* db, WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; - txn_options.set_snapshot = true; + // Inside the inserter we might also retake the snapshot. We do both since two + // separte functions are engaged for each. + txn_options.set_snapshot = _rand.OneIn(2); RandomTransactionInserter inserter(&_rand, write_options, read_options, num_keys_per_set,