From aed30ddf21d5b3425028b95fda721b0988cf10b9 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Mon, 26 Sep 2022 18:01:59 -0700 Subject: [PATCH] Support WriteCommit policy with sync_fault_injection=1 (#10624) Summary: **Context:** Prior to this PR, correctness testing with un-sync data loss [disabled](https://github.com/facebook/rocksdb/pull/10605) transaction (`use_txn=1`) thus all of the `txn_write_policy` . This PR improved that by adding support for one policy - WriteCommit (`txn_write_policy=0`). **Summary:** They key to this support is (a) handle Mark{Begin, End}Prepare/MarkCommit/MarkRollback in constructing ExpectedState under WriteCommit policy correctly and (b) monitor CI jobs and solve any test incompatibility issue till jobs are stable. (b) will be part of the test plan. For (a) - During prepare (i.e, between `MarkBeginPrepare()` and `MarkEndPrepare(xid)`), `ExpectedStateTraceRecordHandler` will buffer all writes by adding all writes to an internal `WriteBatch`. - On `MarkEndPrepare()`, that `WriteBatch` will be associated with the transaction's `xid`. - During the commit (i.e, on `MarkCommit(xid)`), `ExpectedStateTraceRecordHandler` will retrieve and iterate the internal `WriteBatch` and finally apply those writes to `ExpectedState` - During the rollback (i.e, on `MarkRollback(xid)`), `ExpectedStateTraceRecordHandler` will erase the internal `WriteBatch` from the map. For (b) - one major issue described below: - TransactionsDB in db stress recovers prepared-but-not-committed txns from the previous crashed run by randomly committing or rolling back it at the start of the current run, see a historical [PR](https://github.com/facebook/rocksdb/commit/6d06be22c083ccf185fd38dba49fde73b644b4c1) predated correctness testing. - And we will verify those processed keys in a recovered db against their expected state. - However since now we turn on `sync_fault_injection=1` where the expected state is constructed from the trace instead of using the LATEST.state from previous run. The expected state now used to verify those processed keys won't contain UNKNOWN_SENTINEL as they should - see test 1 for a failed case. - Therefore, we decided to manually update its expected state to be UNKNOWN_SENTINEL as part of the processing. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10624 Test Plan: 1. Test exposed the major issue described above. This test will fail without setting UNKNOWN_SENTINEL in expected state during the processing and pass after ``` db=/dev/shm/rocksdb_crashtest_blackbox exp=/dev/shm/rocksdb_crashtest_expected dbt=$db.tmp expt=$exp.tmp rm -rf $db $exp mkdir -p $exp echo "RUN 1" ./db_stress \ --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \ --use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 & pid=$! sleep 0.2 sleep 20 kill $pid sleep 0.2 echo "RUN 2" ./db_stress \ --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \ --use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 & pid=$! sleep 0.2 sleep 20 kill $pid sleep 0.2 echo "RUN 3" ./db_stress \ --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \ --use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 ``` 2. Manual testing to ensure ExpectedState is constructed correctly during recovery by verifying it against previously crashed TransactionDB's WAL. - Run the following command to crash a TransactionDB with WriteCommit policy. Then `./ldb dump_wal` on its WAL file ``` db=/dev/shm/rocksdb_crashtest_blackbox exp=/dev/shm/rocksdb_crashtest_expected rm -rf $db $exp mkdir -p $exp ./db_stress \ --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \ --use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 & pid=$! sleep 30 kill $pid sleep 1 ``` - Run the following command to verify recovery of the crashed db under debugger. Compare the step-wise result with WAL records (e.g, WriteBatch content, xid, prepare/commit/rollback marker) ``` ./db_stress \ --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \ --use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 ``` 3. Automatic testing by triggering all RocksDB stress/crash test jobs for 3 rounds with no failure. Reviewed By: ajkr, riversand963 Differential Revision: D39199373 Pulled By: hx235 fbshipit-source-id: 7a1dec0e3e2ee6ea86ddf5dd19ceb5543a3d6f0c --- db_stress_tool/db_stress_test_base.cc | 65 +++++++++++++------ db_stress_tool/db_stress_test_base.h | 15 ++++- db_stress_tool/db_stress_tool.cc | 8 +++ db_stress_tool/expected_state.cc | 85 ++++++++++++++++++++++++- db_stress_tool/multi_ops_txns_stress.cc | 22 +++++++ db_stress_tool/multi_ops_txns_stress.h | 4 ++ tools/db_crashtest.py | 10 +-- 7 files changed, 181 insertions(+), 28 deletions(-) diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 1b850d425..f8fd1e5dc 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -21,6 +21,7 @@ #include "rocksdb/sst_file_manager.h" #include "rocksdb/types.h" #include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "test_util/testutil.h" #include "util/cast_util.h" #include "utilities/backup/backup_engine_impl.h" @@ -309,7 +310,15 @@ void StressTest::FinishInitDb(SharedState* shared) { exit(1); } } - +#ifndef ROCKSDB_LITE + if (FLAGS_use_txn) { + // It's OK here without sync because unsynced data cannot be lost at this + // point + // - even with sync_fault_injection=1 as the + // file is still directly writable until after FinishInitDb() + ProcessRecoveredPreparedTxns(shared); + } +#endif if (FLAGS_enable_compaction_filter) { auto* compaction_filter_factory = reinterpret_cast( @@ -555,6 +564,42 @@ Status StressTest::SetOptions(ThreadState* thread) { } #ifndef ROCKSDB_LITE +void StressTest::ProcessRecoveredPreparedTxns(SharedState* shared) { + assert(txn_db_); + std::vector recovered_prepared_trans; + txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans); + for (Transaction* txn : recovered_prepared_trans) { + ProcessRecoveredPreparedTxnsHelper(txn, shared); + delete txn; + } + recovered_prepared_trans.clear(); + txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans); + assert(recovered_prepared_trans.size() == 0); +} + +void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn, + SharedState* shared) { + thread_local Random rand(static_cast(FLAGS_seed)); + for (size_t i = 0; i < column_families_.size(); ++i) { + std::unique_ptr wbwi_iter( + txn->GetWriteBatch()->NewIterator(column_families_[i])); + for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) { + uint64_t key_val; + if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) { + shared->Put(static_cast(i) /* cf_idx */, key_val, + 0 /* value_base */, true /* pending */); + } + } + } + if (rand.OneIn(2)) { + Status s = txn->Commit(); + assert(s.ok()); + } else { + Status s = txn->Rollback(); + assert(s.ok()); + } +} + Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { if (!FLAGS_use_txn) { return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); @@ -2648,24 +2693,6 @@ void StressTest::Open(SharedState* shared) { db_ = txn_db_; db_aptr_.store(txn_db_, std::memory_order_release); } - - // after a crash, rollback to commit recovered transactions - std::vector trans; - txn_db_->GetAllPreparedTransactions(&trans); - Random rand(static_cast(FLAGS_seed)); - for (auto txn : trans) { - if (rand.OneIn(2)) { - s = txn->Commit(); - assert(s.ok()); - } else { - s = txn->Rollback(); - assert(s.ok()); - } - delete txn; - } - trans.clear(); - txn_db_->GetAllPreparedTransactions(&trans); - assert(trans.size() == 0); #endif } if (!s.ok()) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index b963642a6..374cb9349 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -34,13 +34,10 @@ class StressTest { // The initialization work is split into two parts to avoid a circular // dependency with `SharedState`. virtual void FinishInitDb(SharedState*); - void TrackExpectedState(SharedState* shared); - void OperateDb(ThreadState* thread); virtual void VerifyDb(ThreadState* thread) const = 0; virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0; - void PrintStatistics(); protected: @@ -54,6 +51,18 @@ class StressTest { Status SetOptions(ThreadState* thread); #ifndef ROCKSDB_LITE + // For transactionsDB, there can be txns prepared but not yet committeed + // right before previous stress run crash. + // They will be recovered and processed through + // ProcessRecoveredPreparedTxnsHelper on the start of current stress run. + void ProcessRecoveredPreparedTxns(SharedState* shared); + + // Default implementation will first update ExpectedState to be + // `SharedState::UNKNOWN` for each keys in `txn` and then randomly + // commit or rollback `txn`. + virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn, + SharedState* shared); + Status NewTxn(WriteOptions& write_opts, Transaction** txn); Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr); diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 8aaab31c0..6a85b7f30 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -286,6 +286,14 @@ int db_stress_tool(int argc, char** argv) { exit(1); } + if (FLAGS_use_txn && FLAGS_sync_fault_injection && + FLAGS_txn_write_policy != 0) { + fprintf(stderr, + "For TransactionDB, correctness testing with unsync data loss is " + "currently compatible with only write committed policy\n"); + exit(1); + } + #ifndef NDEBUG KillPoint* kp = KillPoint::GetInstance(); kp->rocksdb_kill_odds = FLAGS_kill_random_test; diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index f014b2e73..c9720e59d 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -343,7 +343,9 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, public WriteBatch::Handler { public: ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state) - : max_write_ops_(max_write_ops), state_(state) {} + : max_write_ops_(max_write_ops), + state_(state), + buffered_writes_(nullptr) {} ~ExpectedStateTraceRecordHandler() { assert(IsDone()); } @@ -391,6 +393,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, } uint32_t value_id = GetValueBase(value); + bool should_buffer_write = !(buffered_writes_ == nullptr); + if (should_buffer_write) { + return WriteBatchInternal::Put(buffered_writes_.get(), column_family_id, + key, value); + } + state_->Put(column_family_id, static_cast(key_id), value_id, false /* pending */); ++num_write_ops_; @@ -406,6 +414,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, return Status::Corruption("unable to parse key", key.ToString()); } + bool should_buffer_write = !(buffered_writes_ == nullptr); + if (should_buffer_write) { + return WriteBatchInternal::Delete(buffered_writes_.get(), + column_family_id, key); + } + state_->Delete(column_family_id, static_cast(key_id), false /* pending */); ++num_write_ops_; @@ -414,6 +428,18 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, Status SingleDeleteCF(uint32_t column_family_id, const Slice& key_with_ts) override { + bool should_buffer_write = !(buffered_writes_ == nullptr); + if (should_buffer_write) { + Slice key = + StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size); + Slice ts = + ExtractTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size); + std::array key_with_ts_arr{{key, ts}}; + return WriteBatchInternal::SingleDelete( + buffered_writes_.get(), column_family_id, + SliceParts(key_with_ts_arr.data(), 2)); + } + return DeleteCF(column_family_id, key_with_ts); } @@ -433,6 +459,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, return Status::Corruption("unable to parse end key", end_key.ToString()); } + bool should_buffer_write = !(buffered_writes_ == nullptr); + if (should_buffer_write) { + return WriteBatchInternal::DeleteRange( + buffered_writes_.get(), column_family_id, begin_key, end_key); + } + state_->DeleteRange(column_family_id, static_cast(begin_key_id), static_cast(end_key_id), false /* pending */); ++num_write_ops_; @@ -443,13 +475,64 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, const Slice& value) override { Slice key = StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size); + + bool should_buffer_write = !(buffered_writes_ == nullptr); + if (should_buffer_write) { + return WriteBatchInternal::Merge(buffered_writes_.get(), column_family_id, + key, value); + } + return PutCF(column_family_id, key, value); } + Status MarkBeginPrepare(bool = false) override { + assert(!buffered_writes_); + buffered_writes_.reset(new WriteBatch()); + return Status::OK(); + } + + Status MarkEndPrepare(const Slice& xid) override { + assert(buffered_writes_); + std::string xid_str = xid.ToString(); + assert(xid_to_buffered_writes_.find(xid_str) == + xid_to_buffered_writes_.end()); + + xid_to_buffered_writes_[xid_str].swap(buffered_writes_); + + buffered_writes_.reset(); + + return Status::OK(); + } + + Status MarkCommit(const Slice& xid) override { + std::string xid_str = xid.ToString(); + assert(xid_to_buffered_writes_.find(xid_str) != + xid_to_buffered_writes_.end()); + assert(xid_to_buffered_writes_.at(xid_str)); + + Status s = xid_to_buffered_writes_.at(xid_str)->Iterate(this); + xid_to_buffered_writes_.erase(xid_str); + + return s; + } + + Status MarkRollback(const Slice& xid) override { + std::string xid_str = xid.ToString(); + assert(xid_to_buffered_writes_.find(xid_str) != + xid_to_buffered_writes_.end()); + assert(xid_to_buffered_writes_.at(xid_str)); + xid_to_buffered_writes_.erase(xid_str); + + return Status::OK(); + } + private: uint64_t num_write_ops_ = 0; uint64_t max_write_ops_; ExpectedState* state_; + std::unordered_map> + xid_to_buffered_writes_; + std::unique_ptr buffered_writes_; }; } // anonymous namespace diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 7c6cf8d1a..7db5e8942 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -329,6 +329,10 @@ void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) { if (FLAGS_enable_compaction_filter) { // TODO (yanqin) enable compaction filter } +#ifndef ROCKSDB_LITE + ProcessRecoveredPreparedTxns(shared); +#endif + ReopenAndPreloadDbIfNeeded(shared); // TODO (yanqin) parallelize if key space is large for (auto& key_gen : key_gen_for_a_) { @@ -1353,6 +1357,18 @@ uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) { } #ifndef ROCKSDB_LITE +void MultiOpsTxnsStressTest::ProcessRecoveredPreparedTxnsHelper( + Transaction* txn, SharedState*) { + thread_local Random rand(static_cast(FLAGS_seed)); + if (rand.OneIn(2)) { + Status s = txn->Commit(); + assert(s.ok()); + } else { + Status s = txn->Rollback(); + assert(s.ok()); + } +} + Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) { WriteBatch* ctwb = txn.GetCommitTimeWriteBatch(); assert(ctwb); @@ -1776,6 +1792,12 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() { exit(1); } } + if (FLAGS_sync_fault_injection == 1) { + fprintf(stderr, + "Sync fault injection is currently not supported in " + "-test_multi_ops_txns\n"); + exit(1); + } #else fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n"); exit(1); diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index ddb3ebfaa..7463d05d7 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -345,6 +345,10 @@ class MultiOpsTxnsStressTest : public StressTest { uint32_t GenerateNextC(ThreadState* thread); #ifndef ROCKSDB_LITE + // Randomly commit or rollback `txn` + void ProcessRecoveredPreparedTxnsHelper(Transaction* txn, + SharedState*) override; + // Some applications, e.g. MyRocks writes a KV pair to the database via // commit-time-write-batch (ctwb) in additional to the transaction's regular // write batch. The key is usually constant representing some system diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 5f2e3e2e8..048fffc81 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -445,6 +445,7 @@ multiops_txn_default_params = { # Re-enable once we have a compaction for MultiOpsTxnStressTest "enable_compaction_filter": 0, "create_timestamped_snapshot_one_in": 50, + "sync_fault_injection": 0, } multiops_wc_txn_params = { @@ -513,10 +514,6 @@ def finalize_and_sanitize(src_params): dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delrangepercent"] = 0 dest_params["ingest_external_file_one_in"] = 0 - # Correctness testing with unsync data loss is not currently compatible - # with transactions - if dest_params.get("use_txn") == 1: - dest_params["sync_fault_injection"] = 0 if ( dest_params.get("disable_wal") == 1 or dest_params.get("sync_fault_injection") == 1 @@ -594,7 +591,10 @@ def finalize_and_sanitize(src_params): if dest_params.get("create_timestamped_snapshot_one_in", 0) > 0: dest_params["txn_write_policy"] = 0 dest_params["unordered_write"] = 0 - + # For TransactionDB, correctness testing with unsync data loss is currently + # compatible with only write committed policy + if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): + dest_params["sync_fault_injection"] = 0 return dest_params