diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 0137f0b2e..604e8c631 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -342,6 +342,13 @@ class SharedState { uint64_t GetStartTimestamp() const { return start_timestamp_; } + void SafeTerminate() { + // Grab mutex so that we don't call terminate while another thread is + // attempting to print a stack trace due to the first one + MutexLock l(&mu_); + std::terminate(); + } + private: static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; } diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 2c62049c3..ff004ae0f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -504,14 +504,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, s = db_->Merge(write_opts, cfh, key, v); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Merge(cfh, key, v); - if (s.ok()) { - s = CommitTxn(txn); - } - } + s = ExecuteTransaction( + write_opts, /*thread=*/nullptr, + [&](Transaction& txn) { return txn.Merge(cfh, key, v); }); } } else if (FLAGS_use_put_entity_one_in > 0) { s = db_->PutEntity(write_opts, cfh, key, @@ -524,14 +519,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, s = db_->Put(write_opts, cfh, key, v); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Put(cfh, key, v); - if (s.ok()) { - s = CommitTxn(txn); - } - } + s = ExecuteTransaction( + write_opts, /*thread=*/nullptr, + [&](Transaction& txn) { return txn.Put(cfh, key, v); }); } } @@ -629,14 +619,15 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn, } } -Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { +Status StressTest::NewTxn(WriteOptions& write_opts, + std::unique_ptr* out_txn) { if (!FLAGS_use_txn) { return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); } write_opts.disableWAL = FLAGS_disable_wal; static std::atomic txn_id = {0}; if (FLAGS_use_optimistic_txn) { - *txn = optimistic_txn_db_->BeginTransaction(write_opts); + out_txn->reset(optimistic_txn_db_->BeginTransaction(write_opts)); return Status::OK(); } else { TransactionOptions txn_options; @@ -644,31 +635,31 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { FLAGS_use_only_the_last_commit_time_batch_for_recovery; txn_options.lock_timeout = 600000; // 10 min txn_options.deadlock_detect = true; - *txn = txn_db_->BeginTransaction(write_opts, txn_options); + out_txn->reset(txn_db_->BeginTransaction(write_opts, txn_options)); auto istr = std::to_string(txn_id.fetch_add(1)); - Status s = (*txn)->SetName("xid" + istr); + Status s = (*out_txn)->SetName("xid" + istr); return s; } } -Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { +Status StressTest::CommitTxn(Transaction& txn, ThreadState* thread) { if (!FLAGS_use_txn) { return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); } Status s = Status::OK(); if (FLAGS_use_optimistic_txn) { assert(optimistic_txn_db_); - s = txn->Commit(); + s = txn.Commit(); } else { assert(txn_db_); - s = txn->Prepare(); + s = txn.Prepare(); std::shared_ptr timestamped_snapshot; if (s.ok()) { if (thread && FLAGS_create_timestamped_snapshot_one_in && thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { uint64_t ts = db_stress_env->NowNanos(); - s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, - ×tamped_snapshot); + s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, + ×tamped_snapshot); std::pair> res; if (thread->tid == 0) { @@ -686,7 +677,7 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { } } } else { - s = txn->Commit(); + s = txn.Commit(); } } if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && @@ -696,18 +687,37 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); } } - delete txn; return s; } -Status StressTest::RollbackTxn(Transaction* txn) { - if (!FLAGS_use_txn) { - return Status::InvalidArgument( - "RollbackTxn when FLAGS_use_txn is not" - " set"); +Status StressTest::ExecuteTransaction( + WriteOptions& write_opts, ThreadState* thread, + std::function&& ops) { + std::unique_ptr txn; + Status s = NewTxn(write_opts, &txn); + if (s.ok()) { + for (int tries = 1;; ++tries) { + s = ops(*txn); + if (s.ok()) { + s = CommitTxn(*txn, thread); + if (s.ok()) { + break; + } + } + // Optimistic txn might return TryAgain, in which case rollback + // and try again. But that shouldn't happen too many times in a row. + if (!s.IsTryAgain() || !FLAGS_use_optimistic_txn) { + break; + } + if (tries >= 5) { + break; + } + s = txn->Rollback(); + if (!s.ok()) { + break; + } + } } - Status s = txn->Rollback(); - delete txn; return s; } diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 29159a494..dc235fcdf 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -64,11 +64,14 @@ class StressTest { virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn, SharedState* shared); - Status NewTxn(WriteOptions& write_opts, Transaction** txn); - - Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr); - - Status RollbackTxn(Transaction* txn); + // ExecuteTransaction is recommended instead + Status NewTxn(WriteOptions& write_opts, + std::unique_ptr* out_txn); + Status CommitTxn(Transaction& txn, ThreadState* thread = nullptr); + + // Creates a transaction, executes `ops`, and tries to commit + Status ExecuteTransaction(WriteOptions& write_opts, ThreadState* thread, + std::function&& ops); virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {} diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 89b061004..23850da5c 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -560,7 +560,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t new_a) { std::string old_pk = Record::EncodePrimaryKey(old_a); std::string new_pk = Record::EncodePrimaryKey(new_a); - Transaction* txn = nullptr; + std::unique_ptr txn; WriteOptions wopts; Status s = NewTxn(wopts, &txn); if (!s.ok()) { @@ -572,7 +572,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, assert(txn); txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr); - const Defer cleanup([new_a, &s, thread, txn, this]() { + const Defer cleanup([new_a, &s, thread, this]() { if (s.ok()) { // Two gets, one for existing pk, one for locking potential new pk. thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1); @@ -594,7 +594,6 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, } auto& key_gen = key_gen_for_a_[thread->tid]; key_gen->UndoAllocation(new_a); - RollbackTxn(txn).PermitUncheckedError(); }); ReadOptions ropts; @@ -671,7 +670,6 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, auto& key_gen = key_gen_for_a_.at(thread->tid); if (s.ok()) { - delete txn; key_gen->Replace(old_a, old_a_pos, new_a); } return s; @@ -681,7 +679,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c, uint32_t old_c_pos, uint32_t new_c) { - Transaction* txn = nullptr; + std::unique_ptr txn; WriteOptions wopts; Status s = NewTxn(wopts, &txn); if (!s.ok()) { @@ -694,7 +692,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, Iterator* it = nullptr; long iterations = 0; - const Defer cleanup([new_c, &s, thread, &it, txn, this, &iterations]() { + const Defer cleanup([new_c, &s, thread, &it, this, &iterations]() { delete it; if (s.ok()) { thread->stats.AddIterations(iterations); @@ -719,7 +717,6 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, } auto& key_gen = key_gen_for_c_[thread->tid]; key_gen->UndoAllocation(new_c); - RollbackTxn(txn).PermitUncheckedError(); }); // TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take @@ -868,7 +865,6 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); if (s.ok()) { - delete txn; auto& key_gen = key_gen_for_c_.at(thread->tid); key_gen->Replace(old_c, old_c_pos, new_c); } @@ -880,7 +876,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a, uint32_t b_delta) { std::string pk_str = Record::EncodePrimaryKey(a); - Transaction* txn = nullptr; + std::unique_ptr txn; WriteOptions wopts; Status s = NewTxn(wopts, &txn); if (!s.ok()) { @@ -891,7 +887,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, assert(txn); - const Defer cleanup([&s, thread, txn, this]() { + const Defer cleanup([&s, thread]() { if (s.ok()) { thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); thread->stats.AddBytesForWrites( @@ -908,7 +904,6 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, } else { thread->stats.AddErrors(1); } - RollbackTxn(txn).PermitUncheckedError(); }); ReadOptions ropts; ropts.rate_limiter_priority = @@ -952,9 +947,6 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn); - if (s.ok()) { - delete txn; - } return s; } @@ -964,7 +956,7 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, // pk may or may not exist PinnableSlice value; - Transaction* txn = nullptr; + std::unique_ptr txn; WriteOptions wopts; Status s = NewTxn(wopts, &txn); if (!s.ok()) { @@ -975,7 +967,7 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, assert(txn); - const Defer cleanup([&s, thread, txn, this]() { + const Defer cleanup([&s, thread]() { if (s.ok()) { thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); return; @@ -984,7 +976,6 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, } else { thread->stats.AddErrors(1); } - RollbackTxn(txn).PermitUncheckedError(); }); std::shared_ptr snapshot; @@ -1001,9 +992,6 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, if (s.ok()) { s = txn->Commit(); } - if (s.ok()) { - delete txn; - } return s; } @@ -1011,7 +999,7 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, ReadOptions ropts, uint32_t c) { std::string sk = Record::EncodeSecondaryKey(c); - Transaction* txn = nullptr; + std::unique_ptr txn; WriteOptions wopts; Status s = NewTxn(wopts, &txn); if (!s.ok()) { @@ -1022,13 +1010,12 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, assert(txn); - const Defer cleanup([&s, thread, txn, this]() { + const Defer cleanup([&s, thread]() { if (s.ok()) { thread->stats.AddIterations(1); return; } thread->stats.AddErrors(1); - RollbackTxn(txn).PermitUncheckedError(); }); std::shared_ptr snapshot; @@ -1056,10 +1043,6 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, s = iter->status(); } - if (s.ok()) { - delete txn; - } - return s; } diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index de03e9795..31aac13ee 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -442,7 +442,7 @@ class NonBatchedOpsStressTest : public StressTest { if (!s.ok()) { fprintf(stderr, "dropping column family error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, &column_families_[cf]); @@ -451,7 +451,7 @@ class NonBatchedOpsStressTest : public StressTest { if (!s.ok()) { fprintf(stderr, "creating column family error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } thread->shared->UnlockColumnFamily(cf); } @@ -603,7 +603,7 @@ class NonBatchedOpsStressTest : public StressTest { // Create a transaction in order to write some data. The purpose is to // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction // will be rolled back once MultiGet returns. - Transaction* txn = nullptr; + std::unique_ptr txn; if (use_txn) { WriteOptions wo; if (FLAGS_rate_limit_auto_wal_flush) { @@ -612,7 +612,7 @@ class NonBatchedOpsStressTest : public StressTest { Status s = NewTxn(wo, &txn); if (!s.ok()) { fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } for (size_t i = 0; i < num_keys; ++i) { @@ -662,7 +662,7 @@ class NonBatchedOpsStressTest : public StressTest { } if (!s.ok()) { fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } else { ryw_expected_values.push_back(std::nullopt); @@ -865,9 +865,6 @@ class NonBatchedOpsStressTest : public StressTest { if (readoptionscopy.snapshot) { db_->ReleaseSnapshot(readoptionscopy.snapshot); } - if (use_txn) { - RollbackTxn(txn); - } return statuses; } @@ -1278,14 +1275,9 @@ class NonBatchedOpsStressTest : public StressTest { s = db_->Merge(write_opts, cfh, k, write_ts, v); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Merge(cfh, k, v); - if (s.ok()) { - s = CommitTxn(txn, thread); - } - } + s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) { + return txn.Merge(cfh, k, v); + }); } } else if (FLAGS_use_put_entity_one_in > 0 && (value_base % FLAGS_use_put_entity_one_in) == 0) { @@ -1299,14 +1291,9 @@ class NonBatchedOpsStressTest : public StressTest { s = db_->Put(write_opts, cfh, k, write_ts, v); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Put(cfh, k, v); - if (s.ok()) { - s = CommitTxn(txn, thread); - } - } + s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) { + return txn.Put(cfh, k, v); + }); } } @@ -1319,11 +1306,11 @@ class NonBatchedOpsStressTest : public StressTest { } else if (!is_db_stopped_ || s.severity() < Status::Severity::kFatalError) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } else { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } @@ -1364,14 +1351,9 @@ class NonBatchedOpsStressTest : public StressTest { s = db_->Delete(write_opts, cfh, key, write_ts); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Delete(cfh, key); - if (s.ok()) { - s = CommitTxn(txn, thread); - } - } + s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) { + return txn.Delete(cfh, key); + }); } pending_expected_value.Commit(); @@ -1384,11 +1366,11 @@ class NonBatchedOpsStressTest : public StressTest { } else if (!is_db_stopped_ || s.severity() < Status::Severity::kFatalError) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } else { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } } else { @@ -1401,14 +1383,9 @@ class NonBatchedOpsStressTest : public StressTest { s = db_->SingleDelete(write_opts, cfh, key, write_ts); } } else { - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->SingleDelete(cfh, key); - if (s.ok()) { - s = CommitTxn(txn, thread); - } - } + s = ExecuteTransaction(write_opts, thread, [&](Transaction& txn) { + return txn.SingleDelete(cfh, key); + }); } pending_expected_value.Commit(); thread->stats.AddSingleDeletes(1); @@ -1420,11 +1397,11 @@ class NonBatchedOpsStressTest : public StressTest { } else if (!is_db_stopped_ || s.severity() < Status::Severity::kFatalError) { fprintf(stderr, "single delete error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } else { fprintf(stderr, "single delete error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } } @@ -1481,11 +1458,11 @@ class NonBatchedOpsStressTest : public StressTest { } else if (!is_db_stopped_ || s.severity() < Status::Severity::kFatalError) { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } else { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } } for (PendingExpectedValue& pending_expected_value : @@ -1567,7 +1544,7 @@ class NonBatchedOpsStressTest : public StressTest { } if (!s.ok()) { fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str()); - std::terminate(); + thread->shared->SafeTerminate(); } for (size_t i = 0; i < pending_expected_values.size(); ++i) { diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 947fcec55..4ac47ec04 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -140,6 +140,9 @@ class Transaction { Transaction(const Transaction&) = delete; void operator=(const Transaction&) = delete; + // The transaction is safely discarded on destruction, though must be + // discarded before the DB is closed or destroyed. (Calling Rollback() + // is not necessary before destruction.) virtual ~Transaction() {} // If a transaction has a snapshot set, the transaction will ensure that @@ -227,7 +230,8 @@ class Transaction { // Status::Busy() may be returned if the transaction could not guarantee // that there are no write conflicts. Status::TryAgain() may be returned // if the memtable history size is not large enough - // (See max_write_buffer_size_to_maintain). + // (see max_write_buffer_size_to_maintain). In either case, a Rollback() + // or new transaction is required to expect a different result. // // If this transaction was created by a TransactionDB(), Status::Expired() // may be returned if this transaction has lived for longer than diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 46d51956f..04e443a74 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -322,17 +322,11 @@ TEST_P(OptimisticTransactionTest, FlushTest) { delete txn; } -TEST_P(OptimisticTransactionTest, FlushTest2) { - WriteOptions write_options; - ReadOptions read_options, snapshot_read_options; +namespace { +void FlushTest2PopulateTxn(Transaction* txn) { + ReadOptions snapshot_read_options; std::string value; - ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar"))); - ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar"))); - - Transaction* txn = txn_db->BeginTransaction(write_options); - ASSERT_NE(txn, nullptr); - snapshot_read_options.snapshot = txn->GetSnapshot(); ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); @@ -342,6 +336,21 @@ TEST_P(OptimisticTransactionTest, FlushTest2) { ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); +} +} // namespace + +TEST_P(OptimisticTransactionTest, FlushTest2) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + + ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar"))); + + Transaction* txn = txn_db->BeginTransaction(write_options); + ASSERT_NE(txn, nullptr); + + FlushTest2PopulateTxn(txn); // Put a random key so we have a MemTable to flush ASSERT_OK(txn_db->Put(write_options, "dummy", "dummy")); @@ -367,9 +376,23 @@ TEST_P(OptimisticTransactionTest, FlushTest2) { // txn should not commit since MemTableList History is not large enough ASSERT_TRUE(s.IsTryAgain()); + // simply trying Commit again doesn't help + s = txn->Commit(); + ASSERT_TRUE(s.IsTryAgain()); + ASSERT_OK(txn_db->Get(read_options, "foo", &value)); ASSERT_EQ(value, "bar"); + // But rolling back and redoing does + ASSERT_OK(txn->Rollback()); + + FlushTest2PopulateTxn(txn); + + ASSERT_OK(txn->Commit()); + + ASSERT_OK(txn_db->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar2"); + delete txn; }