diff --git a/db/db_bench.cc b/db/db_bench.cc index 15167b8bb..ca05b2b20 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -487,10 +487,18 @@ DEFINE_uint64(transaction_sets, 2, "Number of keys each transaction will " "modify (use in RandomTransaction only). Max: 9999"); +DEFINE_bool(transaction_set_snapshot, false, + "Setting to true will have each transaction call SetSnapshot()" + " upon creation."); + DEFINE_int32(transaction_sleep, 0, "Max microseconds to sleep in between " "reading and writing a value (used in RandomTransaction only). "); +DEFINE_uint64(transaction_lock_timeout, 100, + "If using a transaction_db, specifies the lock wait timeout in" + " milliseconds before failing a transaction waiting on a lock"); + DEFINE_bool(compaction_measure_io_stats, false, "Measure times spents on I/Os while in compactions. "); @@ -3645,13 +3653,20 @@ class Benchmark { assert(txn); } else if (FLAGS_transaction_db) { TransactionDB* txn_db = reinterpret_cast(db_.db); + TransactionOptions txn_options; - txn_options.expiration = 10000000; + txn_options.lock_timeout = FLAGS_transaction_lock_timeout; + txn = txn_db->BeginTransaction(write_options_, txn_options); + assert(txn); } else { batch = new WriteBatch(); } + if (txn && FLAGS_transaction_set_snapshot) { + txn->SetSnapshot(); + } + // pick a random number to use to increment a key in each set uint64_t incr = (thread->rand.Next() % 100) + 1; @@ -3686,9 +3701,13 @@ class Benchmark { } } else if (s.IsNotFound()) { int_value = 0; - } else { - fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); + } else if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + fprintf(stderr, "Get returned an unexpected error: %s\n", + s.ToString().c_str()); abort(); + } else { + failed = true; + break; } if (FLAGS_transaction_sleep > 0) { @@ -3700,8 +3719,10 @@ class Benchmark { if (txn) { s = txn->Put(key, sum); if (!s.ok()) { - failed = true; - break; + // Since we did a GetForUpdate, Put should not fail. + fprintf(stderr, "Put returned an unexpected error: %s\n", + s.ToString().c_str()); + abort(); } } else { batch->Put(key, sum); @@ -3710,7 +3731,9 @@ class Benchmark { if (txn) { if (failed) { + transactions_aborted++; txn->Rollback(); + s = Status::OK(); } else { s = txn->Commit(); } @@ -3719,10 +3742,15 @@ class Benchmark { } if (!s.ok()) { + failed = true; + // Ideally, we'd want to run this stress test with enough concurrency // on a small enough set of keys that we get some failed transactions // due to conflicts. - if (txn && s.IsBusy()) { + if (FLAGS_optimistic_transaction_db && + (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + transactions_aborted++; + } else if (FLAGS_transaction_db && s.IsExpired()) { transactions_aborted++; } else { fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str()); @@ -3737,6 +3765,10 @@ class Benchmark { delete batch; } + if (!failed) { + thread->stats.FinishedOps(nullptr, db, 1); + } + transactions_done++; } @@ -3807,8 +3839,7 @@ class Benchmark { prev_total = total; } - fprintf(stdout, "RandomTransactionVerify Success! Total:%" PRIu64 "\n", - prev_total); + fprintf(stdout, "RandomTransactionVerify Success!\n"); } void Compact(ThreadState* thread) {