Allow TryAgain in db_stress with optimistic txn, and refactoring (#11653)

Summary:
In rare cases, optimistic transaction commit returns TryAgain. This change tolerates that intentional behavior in db_stress, up to a small limit in a row. This way, we don't miss a possible regression with excessive TryAgain, and trying again (rolling back the transaction) should have a well renewed chance of success as the writes will be associated with fresh sequence numbers.

Also, some of the APIs were not clear about Transaction semantics, so I have clarified:
* (Best I can tell....) Destroying a Transaction is safe without calling Rollback() (or at least should be). I don't know why it's a common pattern in our test code and examples to rollback before unconditional destruction. Stress test updated not to call Rollback unnecessarily (to test safe destruction).
* Despite essentially doing what is asked, simply trying Commit() again when it returns TryAgain does not have a chance of success, because of the transaction being bound to the DB state at the time of operations before Commit. Similar logic applies to Busy AFAIK. Commit() API comments updated, and expanded unit test in optimistic_transaction_test.

Also also, because I can't stop myself, I refactored a good portion of the transaction handling code in db_stress.
* Avoid existing and new copy-paste for most transaction interactions with a new ExecuteTransaction (higher-order) function.
* Use unique_ptr (nicely complements removing unnecessary Rollbacks)
* Abstract out a pattern for safely calling std::terminate() and use it in more places. (The TryAgain errors we saw did not have stack traces because of "terminate called recursively".)

Intended follow-up: resurrect use of `FLAGS_rollback_one_in` but also include non-trivial cases

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11653

Test Plan:
this is the test :)

Also, temporarily bypassed the new retry logic and boosted the chance of hitting TryAgain. Quickly reproduced the TryAgain error. Then re-enabled the new retry logic, and was not able to hit the error after running for tens of minutes, even with the boosted chances.

Reviewed By: cbi42

Differential Revision: D47882995

Pulled By: pdillinger

fbshipit-source-id: 21eadb1525423340dbf28d17cf166b9583311a0d
oxigraph-main
Peter Dillinger 1 year ago committed by Facebook GitHub Bot
parent c205a217e6
commit b3c54186ab
  1. 7
      db_stress_tool/db_stress_shared_state.h
  2. 76
      db_stress_tool/db_stress_test_base.cc
  3. 13
      db_stress_tool/db_stress_test_base.h
  4. 37
      db_stress_tool/multi_ops_txns_stress.cc
  5. 75
      db_stress_tool/no_batched_ops_stress.cc
  6. 6
      include/rocksdb/utilities/transaction.h
  7. 41
      utilities/transactions/optimistic_transaction_test.cc

@ -342,6 +342,13 @@ class SharedState {
uint64_t GetStartTimestamp() const { return start_timestamp_; } uint64_t GetStartTimestamp() const { return start_timestamp_; }
void SafeTerminate() {
// Grab mutex so that we don't call terminate while another thread is
// attempting to print a stack trace due to the first one
MutexLock l(&mu_);
std::terminate();
}
private: private:
static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; } static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; }

@ -504,14 +504,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
s = db_->Merge(write_opts, cfh, key, v); s = db_->Merge(write_opts, cfh, key, v);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(
s = NewTxn(write_opts, &txn); write_opts, /*thread=*/nullptr,
if (s.ok()) { [&](Transaction& txn) { return txn.Merge(cfh, key, v); });
s = txn->Merge(cfh, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
} }
} else if (FLAGS_use_put_entity_one_in > 0) { } else if (FLAGS_use_put_entity_one_in > 0) {
s = db_->PutEntity(write_opts, cfh, key, s = db_->PutEntity(write_opts, cfh, key,
@ -524,14 +519,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
s = db_->Put(write_opts, cfh, key, v); s = db_->Put(write_opts, cfh, key, v);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(
s = NewTxn(write_opts, &txn); write_opts, /*thread=*/nullptr,
if (s.ok()) { [&](Transaction& txn) { return txn.Put(cfh, key, v); });
s = txn->Put(cfh, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
} }
} }
@ -629,14 +619,15 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
} }
} }
Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { Status StressTest::NewTxn(WriteOptions& write_opts,
std::unique_ptr<Transaction>* out_txn) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
} }
write_opts.disableWAL = FLAGS_disable_wal; write_opts.disableWAL = FLAGS_disable_wal;
static std::atomic<uint64_t> txn_id = {0}; static std::atomic<uint64_t> txn_id = {0};
if (FLAGS_use_optimistic_txn) { if (FLAGS_use_optimistic_txn) {
*txn = optimistic_txn_db_->BeginTransaction(write_opts); out_txn->reset(optimistic_txn_db_->BeginTransaction(write_opts));
return Status::OK(); return Status::OK();
} else { } else {
TransactionOptions txn_options; TransactionOptions txn_options;
@ -644,30 +635,30 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
FLAGS_use_only_the_last_commit_time_batch_for_recovery; FLAGS_use_only_the_last_commit_time_batch_for_recovery;
txn_options.lock_timeout = 600000; // 10 min txn_options.lock_timeout = 600000; // 10 min
txn_options.deadlock_detect = true; txn_options.deadlock_detect = true;
*txn = txn_db_->BeginTransaction(write_opts, txn_options); out_txn->reset(txn_db_->BeginTransaction(write_opts, txn_options));
auto istr = std::to_string(txn_id.fetch_add(1)); auto istr = std::to_string(txn_id.fetch_add(1));
Status s = (*txn)->SetName("xid" + istr); Status s = (*out_txn)->SetName("xid" + istr);
return s; return s;
} }
} }
Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
} }
Status s = Status::OK(); Status s = Status::OK();
if (FLAGS_use_optimistic_txn) { if (FLAGS_use_optimistic_txn) {
assert(optimistic_txn_db_); assert(optimistic_txn_db_);
s = txn->Commit(); s = txn.Commit();
} else { } else {
assert(txn_db_); assert(txn_db_);
s = txn->Prepare(); s = txn.Prepare();
std::shared_ptr<const Snapshot> timestamped_snapshot; std::shared_ptr<const Snapshot> timestamped_snapshot;
if (s.ok()) { if (s.ok()) {
if (thread && FLAGS_create_timestamped_snapshot_one_in && if (thread && FLAGS_create_timestamped_snapshot_one_in &&
thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) {
uint64_t ts = db_stress_env->NowNanos(); uint64_t ts = db_stress_env->NowNanos();
s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts,
&timestamped_snapshot); &timestamped_snapshot);
std::pair<Status, std::shared_ptr<const Snapshot>> res; std::pair<Status, std::shared_ptr<const Snapshot>> res;
@ -686,7 +677,7 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
} }
} }
} else { } else {
s = txn->Commit(); s = txn.Commit();
} }
} }
if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
@ -696,18 +687,37 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
} }
} }
delete txn;
return s; return s;
} }
Status StressTest::RollbackTxn(Transaction* txn) { Status StressTest::ExecuteTransaction(
if (!FLAGS_use_txn) { WriteOptions& write_opts, ThreadState* thread,
return Status::InvalidArgument( std::function<Status(Transaction&)>&& ops) {
"RollbackTxn when FLAGS_use_txn is not" std::unique_ptr<Transaction> txn;
" set"); Status s = NewTxn(write_opts, &txn);
if (s.ok()) {
for (int tries = 1;; ++tries) {
s = ops(*txn);
if (s.ok()) {
s = CommitTxn(*txn, thread);
if (s.ok()) {
break;
}
}
// Optimistic txn might return TryAgain, in which case rollback
// and try again. But that shouldn't happen too many times in a row.
if (!s.IsTryAgain() || !FLAGS_use_optimistic_txn) {
break;
}
if (tries >= 5) {
break;
}
s = txn->Rollback();
if (!s.ok()) {
break;
}
}
} }
Status s = txn->Rollback();
delete txn;
return s; return s;
} }

@ -64,11 +64,14 @@ class StressTest {
virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn, virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
SharedState* shared); SharedState* shared);
Status NewTxn(WriteOptions& write_opts, Transaction** txn); // ExecuteTransaction is recommended instead
Status NewTxn(WriteOptions& write_opts,
Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr); std::unique_ptr<Transaction>* out_txn);
Status CommitTxn(Transaction& txn, ThreadState* thread = nullptr);
Status RollbackTxn(Transaction* txn);
// Creates a transaction, executes `ops`, and tries to commit
Status ExecuteTransaction(WriteOptions& write_opts, ThreadState* thread,
std::function<Status(Transaction&)>&& ops);
virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {} virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}

@ -560,7 +560,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
uint32_t new_a) { uint32_t new_a) {
std::string old_pk = Record::EncodePrimaryKey(old_a); std::string old_pk = Record::EncodePrimaryKey(old_a);
std::string new_pk = Record::EncodePrimaryKey(new_a); std::string new_pk = Record::EncodePrimaryKey(new_a);
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
WriteOptions wopts; WriteOptions wopts;
Status s = NewTxn(wopts, &txn); Status s = NewTxn(wopts, &txn);
if (!s.ok()) { if (!s.ok()) {
@ -572,7 +572,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
assert(txn); assert(txn);
txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr); txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr);
const Defer cleanup([new_a, &s, thread, txn, this]() { const Defer cleanup([new_a, &s, thread, this]() {
if (s.ok()) { if (s.ok()) {
// Two gets, one for existing pk, one for locking potential new pk. // Two gets, one for existing pk, one for locking potential new pk.
thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1); thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1);
@ -594,7 +594,6 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
} }
auto& key_gen = key_gen_for_a_[thread->tid]; auto& key_gen = key_gen_for_a_[thread->tid];
key_gen->UndoAllocation(new_a); key_gen->UndoAllocation(new_a);
RollbackTxn(txn).PermitUncheckedError();
}); });
ReadOptions ropts; ReadOptions ropts;
@ -671,7 +670,6 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
auto& key_gen = key_gen_for_a_.at(thread->tid); auto& key_gen = key_gen_for_a_.at(thread->tid);
if (s.ok()) { if (s.ok()) {
delete txn;
key_gen->Replace(old_a, old_a_pos, new_a); key_gen->Replace(old_a, old_a_pos, new_a);
} }
return s; return s;
@ -681,7 +679,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
uint32_t old_c, uint32_t old_c,
uint32_t old_c_pos, uint32_t old_c_pos,
uint32_t new_c) { uint32_t new_c) {
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
WriteOptions wopts; WriteOptions wopts;
Status s = NewTxn(wopts, &txn); Status s = NewTxn(wopts, &txn);
if (!s.ok()) { if (!s.ok()) {
@ -694,7 +692,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Iterator* it = nullptr; Iterator* it = nullptr;
long iterations = 0; long iterations = 0;
const Defer cleanup([new_c, &s, thread, &it, txn, this, &iterations]() { const Defer cleanup([new_c, &s, thread, &it, this, &iterations]() {
delete it; delete it;
if (s.ok()) { if (s.ok()) {
thread->stats.AddIterations(iterations); thread->stats.AddIterations(iterations);
@ -719,7 +717,6 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
} }
auto& key_gen = key_gen_for_c_[thread->tid]; auto& key_gen = key_gen_for_c_[thread->tid];
key_gen->UndoAllocation(new_c); key_gen->UndoAllocation(new_c);
RollbackTxn(txn).PermitUncheckedError();
}); });
// TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take // TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take
@ -868,7 +865,6 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
if (s.ok()) { if (s.ok()) {
delete txn;
auto& key_gen = key_gen_for_c_.at(thread->tid); auto& key_gen = key_gen_for_c_.at(thread->tid);
key_gen->Replace(old_c, old_c_pos, new_c); key_gen->Replace(old_c, old_c_pos, new_c);
} }
@ -880,7 +876,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
uint32_t a, uint32_t a,
uint32_t b_delta) { uint32_t b_delta) {
std::string pk_str = Record::EncodePrimaryKey(a); std::string pk_str = Record::EncodePrimaryKey(a);
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
WriteOptions wopts; WriteOptions wopts;
Status s = NewTxn(wopts, &txn); Status s = NewTxn(wopts, &txn);
if (!s.ok()) { if (!s.ok()) {
@ -891,7 +887,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
assert(txn); assert(txn);
const Defer cleanup([&s, thread, txn, this]() { const Defer cleanup([&s, thread]() {
if (s.ok()) { if (s.ok()) {
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
thread->stats.AddBytesForWrites( thread->stats.AddBytesForWrites(
@ -908,7 +904,6 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
} else { } else {
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
} }
RollbackTxn(txn).PermitUncheckedError();
}); });
ReadOptions ropts; ReadOptions ropts;
ropts.rate_limiter_priority = ropts.rate_limiter_priority =
@ -952,9 +947,6 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);
if (s.ok()) {
delete txn;
}
return s; return s;
} }
@ -964,7 +956,7 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
// pk may or may not exist // pk may or may not exist
PinnableSlice value; PinnableSlice value;
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
WriteOptions wopts; WriteOptions wopts;
Status s = NewTxn(wopts, &txn); Status s = NewTxn(wopts, &txn);
if (!s.ok()) { if (!s.ok()) {
@ -975,7 +967,7 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
assert(txn); assert(txn);
const Defer cleanup([&s, thread, txn, this]() { const Defer cleanup([&s, thread]() {
if (s.ok()) { if (s.ok()) {
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
return; return;
@ -984,7 +976,6 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
} else { } else {
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
} }
RollbackTxn(txn).PermitUncheckedError();
}); });
std::shared_ptr<const Snapshot> snapshot; std::shared_ptr<const Snapshot> snapshot;
@ -1001,9 +992,6 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
if (s.ok()) { if (s.ok()) {
s = txn->Commit(); s = txn->Commit();
} }
if (s.ok()) {
delete txn;
}
return s; return s;
} }
@ -1011,7 +999,7 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
ReadOptions ropts, uint32_t c) { ReadOptions ropts, uint32_t c) {
std::string sk = Record::EncodeSecondaryKey(c); std::string sk = Record::EncodeSecondaryKey(c);
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
WriteOptions wopts; WriteOptions wopts;
Status s = NewTxn(wopts, &txn); Status s = NewTxn(wopts, &txn);
if (!s.ok()) { if (!s.ok()) {
@ -1022,13 +1010,12 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
assert(txn); assert(txn);
const Defer cleanup([&s, thread, txn, this]() { const Defer cleanup([&s, thread]() {
if (s.ok()) { if (s.ok()) {
thread->stats.AddIterations(1); thread->stats.AddIterations(1);
return; return;
} }
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
RollbackTxn(txn).PermitUncheckedError();
}); });
std::shared_ptr<const Snapshot> snapshot; std::shared_ptr<const Snapshot> snapshot;
@ -1056,10 +1043,6 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
s = iter->status(); s = iter->status();
} }
if (s.ok()) {
delete txn;
}
return s; return s;
} }

@ -442,7 +442,7 @@ class NonBatchedOpsStressTest : public StressTest {
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "dropping column family error: %s\n", fprintf(stderr, "dropping column family error: %s\n",
s.ToString().c_str()); s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
&column_families_[cf]); &column_families_[cf]);
@ -451,7 +451,7 @@ class NonBatchedOpsStressTest : public StressTest {
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "creating column family error: %s\n", fprintf(stderr, "creating column family error: %s\n",
s.ToString().c_str()); s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
thread->shared->UnlockColumnFamily(cf); thread->shared->UnlockColumnFamily(cf);
} }
@ -603,7 +603,7 @@ class NonBatchedOpsStressTest : public StressTest {
// Create a transaction in order to write some data. The purpose is to // Create a transaction in order to write some data. The purpose is to
// exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
// will be rolled back once MultiGet returns. // will be rolled back once MultiGet returns.
Transaction* txn = nullptr; std::unique_ptr<Transaction> txn;
if (use_txn) { if (use_txn) {
WriteOptions wo; WriteOptions wo;
if (FLAGS_rate_limit_auto_wal_flush) { if (FLAGS_rate_limit_auto_wal_flush) {
@ -612,7 +612,7 @@ class NonBatchedOpsStressTest : public StressTest {
Status s = NewTxn(wo, &txn); Status s = NewTxn(wo, &txn);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str()); fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} }
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
@ -662,7 +662,7 @@ class NonBatchedOpsStressTest : public StressTest {
} }
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str()); fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} else { } else {
ryw_expected_values.push_back(std::nullopt); ryw_expected_values.push_back(std::nullopt);
@ -865,9 +865,6 @@ class NonBatchedOpsStressTest : public StressTest {
if (readoptionscopy.snapshot) { if (readoptionscopy.snapshot) {
db_->ReleaseSnapshot(readoptionscopy.snapshot); db_->ReleaseSnapshot(readoptionscopy.snapshot);
} }
if (use_txn) {
RollbackTxn(txn);
}
return statuses; return statuses;
} }
@ -1278,14 +1275,9 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->Merge(write_opts, cfh, k, write_ts, v); s = db_->Merge(write_opts, cfh, k, write_ts, v);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
s = NewTxn(write_opts, &txn); return txn.Merge(cfh, k, v);
if (s.ok()) { });
s = txn->Merge(cfh, k, v);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
}
} }
} else if (FLAGS_use_put_entity_one_in > 0 && } else if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) { (value_base % FLAGS_use_put_entity_one_in) == 0) {
@ -1299,14 +1291,9 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->Put(write_opts, cfh, k, write_ts, v); s = db_->Put(write_opts, cfh, k, write_ts, v);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
s = NewTxn(write_opts, &txn); return txn.Put(cfh, k, v);
if (s.ok()) { });
s = txn->Put(cfh, k, v);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
}
} }
} }
@ -1319,11 +1306,11 @@ class NonBatchedOpsStressTest : public StressTest {
} else if (!is_db_stopped_ || } else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) { s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} else { } else {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} }
@ -1364,14 +1351,9 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->Delete(write_opts, cfh, key, write_ts); s = db_->Delete(write_opts, cfh, key, write_ts);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
s = NewTxn(write_opts, &txn); return txn.Delete(cfh, key);
if (s.ok()) { });
s = txn->Delete(cfh, key);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
}
} }
pending_expected_value.Commit(); pending_expected_value.Commit();
@ -1384,11 +1366,11 @@ class NonBatchedOpsStressTest : public StressTest {
} else if (!is_db_stopped_ || } else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) { s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} else { } else {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} }
} else { } else {
@ -1401,14 +1383,9 @@ class NonBatchedOpsStressTest : public StressTest {
s = db_->SingleDelete(write_opts, cfh, key, write_ts); s = db_->SingleDelete(write_opts, cfh, key, write_ts);
} }
} else { } else {
Transaction* txn; s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) {
s = NewTxn(write_opts, &txn); return txn.SingleDelete(cfh, key);
if (s.ok()) { });
s = txn->SingleDelete(cfh, key);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
}
} }
pending_expected_value.Commit(); pending_expected_value.Commit();
thread->stats.AddSingleDeletes(1); thread->stats.AddSingleDeletes(1);
@ -1420,11 +1397,11 @@ class NonBatchedOpsStressTest : public StressTest {
} else if (!is_db_stopped_ || } else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) { s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str()); fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} else { } else {
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str()); fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} }
} }
@ -1481,11 +1458,11 @@ class NonBatchedOpsStressTest : public StressTest {
} else if (!is_db_stopped_ || } else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) { s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} else { } else {
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
} }
for (PendingExpectedValue& pending_expected_value : for (PendingExpectedValue& pending_expected_value :
@ -1567,7 +1544,7 @@ class NonBatchedOpsStressTest : public StressTest {
} }
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str()); fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
std::terminate(); thread->shared->SafeTerminate();
} }
for (size_t i = 0; i < pending_expected_values.size(); ++i) { for (size_t i = 0; i < pending_expected_values.size(); ++i) {

@ -140,6 +140,9 @@ class Transaction {
Transaction(const Transaction&) = delete; Transaction(const Transaction&) = delete;
void operator=(const Transaction&) = delete; void operator=(const Transaction&) = delete;
// The transaction is safely discarded on destruction, though must be
// discarded before the DB is closed or destroyed. (Calling Rollback()
// is not necessary before destruction.)
virtual ~Transaction() {} virtual ~Transaction() {}
// If a transaction has a snapshot set, the transaction will ensure that // If a transaction has a snapshot set, the transaction will ensure that
@ -227,7 +230,8 @@ class Transaction {
// Status::Busy() may be returned if the transaction could not guarantee // Status::Busy() may be returned if the transaction could not guarantee
// that there are no write conflicts. Status::TryAgain() may be returned // that there are no write conflicts. Status::TryAgain() may be returned
// if the memtable history size is not large enough // if the memtable history size is not large enough
// (See max_write_buffer_size_to_maintain). // (see max_write_buffer_size_to_maintain). In either case, a Rollback()
// or new transaction is required to expect a different result.
// //
// If this transaction was created by a TransactionDB(), Status::Expired() // If this transaction was created by a TransactionDB(), Status::Expired()
// may be returned if this transaction has lived for longer than // may be returned if this transaction has lived for longer than

@ -322,17 +322,11 @@ TEST_P(OptimisticTransactionTest, FlushTest) {
delete txn; delete txn;
} }
TEST_P(OptimisticTransactionTest, FlushTest2) { namespace {
WriteOptions write_options; void FlushTest2PopulateTxn(Transaction* txn) {
ReadOptions read_options, snapshot_read_options; ReadOptions snapshot_read_options;
std::string value; std::string value;
ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_NE(txn, nullptr);
snapshot_read_options.snapshot = txn->GetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot();
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
@ -342,6 +336,21 @@ TEST_P(OptimisticTransactionTest, FlushTest2) {
ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
ASSERT_EQ(value, "bar2"); ASSERT_EQ(value, "bar2");
}
} // namespace
TEST_P(OptimisticTransactionTest, FlushTest2) {
WriteOptions write_options;
ReadOptions read_options;
std::string value;
ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_NE(txn, nullptr);
FlushTest2PopulateTxn(txn);
// Put a random key so we have a MemTable to flush // Put a random key so we have a MemTable to flush
ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy")); ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy"));
@ -367,9 +376,23 @@ TEST_P(OptimisticTransactionTest, FlushTest2) {
// txn should not commit since MemTableList History is not large enough // txn should not commit since MemTableList History is not large enough
ASSERT_TRUE(s.IsTryAgain()); ASSERT_TRUE(s.IsTryAgain());
// simply trying Commit again doesn't help
s = txn->Commit();
ASSERT_TRUE(s.IsTryAgain());
ASSERT_OK(txn_db->Get(read_options, "foo", &value)); ASSERT_OK(txn_db->Get(read_options, "foo", &value));
ASSERT_EQ(value, "bar"); ASSERT_EQ(value, "bar");
// But rolling back and redoing does
ASSERT_OK(txn->Rollback());
FlushTest2PopulateTxn(txn);
ASSERT_OK(txn->Commit());
ASSERT_OK(txn_db->Get(read_options, "foo", &value));
ASSERT_EQ(value, "bar2");
delete txn; delete txn;
} }

Loading…
Cancel
Save