diff --git a/crash_test.mk b/crash_test.mk index 5e8b3573a..a71a55c15 100644 --- a/crash_test.mk +++ b/crash_test.mk @@ -21,6 +21,8 @@ CRASHTEST_PY=$(PYTHON) -u tools/db_crashtest.py --stress_cmd=$(DB_STRESS_CMD) -- blackbox_crash_test_with_multiops_wp_txn \ crash_test_with_tiered_storage blackbox_crash_test_with_tiered_storage \ whitebox_crash_test_with_tiered_storage \ + whitebox_crash_test_with_optimistic_txn \ + blackbox_crash_test_with_optimistic_txn \ crash_test: $(DB_STRESS_CMD) # Do not parallelize @@ -37,6 +39,11 @@ crash_test_with_txn: $(DB_STRESS_CMD) $(CRASHTEST_MAKE) whitebox_crash_test_with_txn $(CRASHTEST_MAKE) blackbox_crash_test_with_txn +crash_test_with_optimistic_txn: $(DB_STRESS_CMD) +# Do not parallelize + $(CRASHTEST_MAKE) whitebox_crash_test_with_optimistic_txn + $(CRASHTEST_MAKE) blackbox_crash_test_with_optimistic_txn + crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery crash_test_with_ts: $(DB_STRESS_CMD) @@ -80,6 +87,9 @@ blackbox_crash_test_with_multiops_wp_txn: $(DB_STRESS_CMD) blackbox_crash_test_with_tiered_storage: $(DB_STRESS_CMD) $(CRASHTEST_PY) --test_tiered_storage blackbox $(CRASH_TEST_EXT_ARGS) +blackbox_crash_test_with_optimistic_txn: $(DB_STRESS_CMD) + $(CRASHTEST_PY) --optimistic_txn blackbox $(CRASH_TEST_EXT_ARGS) + ifeq ($(CRASH_TEST_KILL_ODD),) CRASH_TEST_KILL_ODD=888887 endif @@ -105,3 +115,7 @@ whitebox_crash_test_with_ts: $(DB_STRESS_CMD) whitebox_crash_test_with_tiered_storage: $(DB_STRESS_CMD) $(CRASHTEST_PY) --test_tiered_storage whitebox --random_kill_odd \ $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) + +whitebox_crash_test_with_optimistic_txn: $(DB_STRESS_CMD) + $(CRASHTEST_PY) --optimistic_txn whitebox --random_kill_odd \ + $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 8756932a7..c0b1e6fd2 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -54,6 +54,7 @@ #include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/debug.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" @@ -194,9 +195,6 @@ DECLARE_bool(rate_limit_user_ops); DECLARE_bool(rate_limit_auto_wal_flush); DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_truncate); -DECLARE_bool(use_txn); -DECLARE_uint64(txn_write_policy); -DECLARE_bool(unordered_write); DECLARE_int32(backup_one_in); DECLARE_uint64(backup_max_size); DECLARE_int32(checkpoint_one_in); @@ -255,6 +253,21 @@ DECLARE_int32(continuous_verification_interval); DECLARE_int32(get_property_one_in); DECLARE_string(file_checksum_impl); +// Options for transaction dbs. +// Use TransactionDB (a.k.a. Pessimistic Transaction DB) +// OR OptimisticTransactionDB +DECLARE_bool(use_txn); + +// Options for TransactionDB (a.k.a. Pessimistic Transaction DB) +DECLARE_uint64(txn_write_policy); +DECLARE_bool(unordered_write); + +// Options for OptimisticTransactionDB +DECLARE_bool(use_optimistic_txn); +DECLARE_uint64(occ_validation_policy); +DECLARE_bool(share_occ_lock_buckets); +DECLARE_uint32(occ_lock_bucket_count); + // Options for StackableDB-based BlobDB DECLARE_bool(use_blob_db); DECLARE_uint64(blob_db_min_blob_size); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 9ce10f06c..df4c3be0b 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -669,13 +669,24 @@ DEFINE_uint64(sst_file_manager_bytes_per_truncate, 0, "many bytes. By default whole files will be deleted."); DEFINE_bool(use_txn, false, - "Use TransactionDB. Currently the default write policy is " - "TxnDBWritePolicy::WRITE_PREPARED"); + "Use TransactionDB or OptimisticTransactionDB. When " + "use_optimistic_txn == false (by default), " + "it's (Pessimistic) TransactionDB"); DEFINE_uint64(txn_write_policy, 0, "The transaction write policy. Default is " "TxnDBWritePolicy::WRITE_COMMITTED. Note that this should not be " - "changed accross crashes."); + "changed across crashes."); + +DEFINE_bool(use_optimistic_txn, false, "Use OptimisticTransactionDB."); +DEFINE_uint64(occ_validation_policy, 1, + "Optimistic Concurrency Control Validation Policy for " + "OptimisticTransactionDB"); +DEFINE_bool(share_occ_lock_buckets, false, + "Share a pool of locks across DB instances for buckets"); +DEFINE_uint32( + occ_lock_bucket_count, 500, + "Bucket Count for shared Optimistic Concurrency Control (OCC) locks"); DEFINE_bool(unordered_write, false, "Turn on the unordered_write feature. This options is currently " diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 0b239b0f5..29addfdff 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -301,7 +301,7 @@ void StressTest::FinishInitDb(SharedState* shared) { exit(1); } } - if (FLAGS_use_txn) { + if (FLAGS_use_txn && !FLAGS_use_optimistic_txn) { // It's OK here without sync because unsynced data cannot be lost at this // point // - even with sync_fault_injection=1 as the @@ -556,6 +556,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, delete db_; db_ = nullptr; txn_db_ = nullptr; + optimistic_txn_db_ = nullptr; db_preload_finished_.store(true); auto now = clock_->NowMicros(); @@ -634,55 +635,66 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { } write_opts.disableWAL = FLAGS_disable_wal; static std::atomic txn_id = {0}; - TransactionOptions txn_options; - txn_options.use_only_the_last_commit_time_batch_for_recovery = - FLAGS_use_only_the_last_commit_time_batch_for_recovery; - txn_options.lock_timeout = 600000; // 10 min - txn_options.deadlock_detect = true; - *txn = txn_db_->BeginTransaction(write_opts, txn_options); - auto istr = std::to_string(txn_id.fetch_add(1)); - Status s = (*txn)->SetName("xid" + istr); - return s; + if (FLAGS_use_optimistic_txn) { + *txn = optimistic_txn_db_->BeginTransaction(write_opts); + return Status::OK(); + } else { + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = + FLAGS_use_only_the_last_commit_time_batch_for_recovery; + txn_options.lock_timeout = 600000; // 10 min + txn_options.deadlock_detect = true; + *txn = txn_db_->BeginTransaction(write_opts, txn_options); + auto istr = std::to_string(txn_id.fetch_add(1)); + Status s = (*txn)->SetName("xid" + istr); + return s; + } } Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { if (!FLAGS_use_txn) { return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); } - assert(txn_db_); - Status s = txn->Prepare(); - std::shared_ptr timestamped_snapshot; - if (s.ok()) { - if (thread && FLAGS_create_timestamped_snapshot_one_in && - thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { - uint64_t ts = db_stress_env->NowNanos(); - s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, - ×tamped_snapshot); - - std::pair> res; - if (thread->tid == 0) { - uint64_t now = db_stress_env->NowNanos(); - res = txn_db_->CreateTimestampedSnapshot(now); - if (res.first.ok()) { - assert(res.second); - assert(res.second->GetTimestamp() == now); - if (timestamped_snapshot) { - assert(res.second->GetTimestamp() > - timestamped_snapshot->GetTimestamp()); + Status s = Status::OK(); + if (FLAGS_use_optimistic_txn) { + assert(optimistic_txn_db_); + s = txn->Commit(); + } else { + assert(txn_db_); + s = txn->Prepare(); + std::shared_ptr timestamped_snapshot; + if (s.ok()) { + if (thread && FLAGS_create_timestamped_snapshot_one_in && + thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { + uint64_t ts = db_stress_env->NowNanos(); + s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, + ×tamped_snapshot); + + std::pair> res; + if (thread->tid == 0) { + uint64_t now = db_stress_env->NowNanos(); + res = txn_db_->CreateTimestampedSnapshot(now); + if (res.first.ok()) { + assert(res.second); + assert(res.second->GetTimestamp() == now); + if (timestamped_snapshot) { + assert(res.second->GetTimestamp() > + timestamped_snapshot->GetTimestamp()); + } + } else { + assert(!res.second); } - } else { - assert(!res.second); } + } else { + s = txn->Commit(); } - } else { - s = txn->Commit(); } - } - if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && - thread->rand.OneInOpt(50000)) { - uint64_t now = db_stress_env->NowNanos(); - constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; - txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); + if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && + thread->rand.OneInOpt(50000)) { + uint64_t now = db_stress_env->NowNanos(); + constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; + txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); + } } delete txn; return s; @@ -2311,24 +2323,39 @@ void StressTest::PrintEnv() const { fprintf(stdout, "Format version : %d\n", FLAGS_format_version); fprintf(stdout, "TransactionDB : %s\n", FLAGS_use_txn ? "true" : "false"); - if (FLAGS_use_txn) { - fprintf(stdout, "Two write queues: : %s\n", - FLAGS_two_write_queues ? "true" : "false"); - fprintf(stdout, "Write policy : %d\n", - static_cast(FLAGS_txn_write_policy)); - if (static_cast(TxnDBWritePolicy::WRITE_PREPARED) == - FLAGS_txn_write_policy || - static_cast(TxnDBWritePolicy::WRITE_UNPREPARED) == - FLAGS_txn_write_policy) { - fprintf(stdout, "Snapshot cache bits : %d\n", - static_cast(FLAGS_wp_snapshot_cache_bits)); - fprintf(stdout, "Commit cache bits : %d\n", - static_cast(FLAGS_wp_commit_cache_bits)); - } - fprintf(stdout, "last cwb for recovery : %s\n", - FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true" - : "false"); + fprintf(stdout, "TransactionDB Type : %s\n", + FLAGS_use_optimistic_txn ? "Optimistic" : "Pessimistic"); + if (FLAGS_use_optimistic_txn) { + fprintf(stdout, "OCC Validation Type : %d\n", + static_cast(FLAGS_occ_validation_policy)); + if (static_cast(OccValidationPolicy::kValidateParallel) == + FLAGS_occ_validation_policy) { + fprintf(stdout, "Share Lock Buckets : %s\n", + FLAGS_share_occ_lock_buckets ? "true" : "false"); + if (FLAGS_share_occ_lock_buckets) { + fprintf(stdout, "Lock Bucket Count : %d\n", + static_cast(FLAGS_occ_lock_bucket_count)); + } + } + } else { + fprintf(stdout, "Two write queues: : %s\n", + FLAGS_two_write_queues ? "true" : "false"); + fprintf(stdout, "Write policy : %d\n", + static_cast(FLAGS_txn_write_policy)); + if (static_cast(TxnDBWritePolicy::WRITE_PREPARED) == + FLAGS_txn_write_policy || + static_cast(TxnDBWritePolicy::WRITE_UNPREPARED) == + FLAGS_txn_write_policy) { + fprintf(stdout, "Snapshot cache bits : %d\n", + static_cast(FLAGS_wp_snapshot_cache_bits)); + fprintf(stdout, "Commit cache bits : %d\n", + static_cast(FLAGS_wp_commit_cache_bits)); + } + fprintf(stdout, "last cwb for recovery : %s\n", + FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true" + : "false"); + } } fprintf(stdout, "Stacked BlobDB : %s\n", @@ -2477,6 +2504,7 @@ void StressTest::PrintEnv() const { void StressTest::Open(SharedState* shared) { assert(db_ == nullptr); assert(txn_db_ == nullptr); + assert(optimistic_txn_db_ == nullptr); if (!InitializeOptionsFromFile(options_)) { InitializeOptionsFromFlags(cache_, filter_policy_, options_); } @@ -2704,36 +2732,65 @@ void StressTest::Open(SharedState* shared) { break; } } else { - TransactionDBOptions txn_db_options; - assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED); - txn_db_options.write_policy = - static_cast(FLAGS_txn_write_policy); - if (FLAGS_unordered_write) { - assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED); - options_.unordered_write = true; - options_.two_write_queues = true; - txn_db_options.skip_concurrency_control = true; + if (FLAGS_use_optimistic_txn) { + OptimisticTransactionDBOptions optimistic_txn_db_options; + optimistic_txn_db_options.validate_policy = + static_cast(FLAGS_occ_validation_policy); + + if (FLAGS_share_occ_lock_buckets) { + optimistic_txn_db_options.shared_lock_buckets = + MakeSharedOccLockBuckets(FLAGS_occ_lock_bucket_count); + } else { + optimistic_txn_db_options.occ_lock_buckets = + FLAGS_occ_lock_bucket_count; + optimistic_txn_db_options.shared_lock_buckets = nullptr; + } + s = OptimisticTransactionDB::Open( + options_, optimistic_txn_db_options, FLAGS_db, cf_descriptors, + &column_families_, &optimistic_txn_db_); + if (!s.ok()) { + fprintf(stderr, "Error in opening the OptimisticTransactionDB [%s]\n", + s.ToString().c_str()); + fflush(stderr); + } + assert(s.ok()); + { + db_ = optimistic_txn_db_; + db_aptr_.store(optimistic_txn_db_, std::memory_order_release); + } } else { - options_.two_write_queues = FLAGS_two_write_queues; - } - txn_db_options.wp_snapshot_cache_bits = - static_cast(FLAGS_wp_snapshot_cache_bits); - txn_db_options.wp_commit_cache_bits = - static_cast(FLAGS_wp_commit_cache_bits); - PrepareTxnDbOptions(shared, txn_db_options); - s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, - cf_descriptors, &column_families_, &txn_db_); - if (!s.ok()) { - fprintf(stderr, "Error in opening the TransactionDB [%s]\n", - s.ToString().c_str()); - fflush(stderr); - } - assert(s.ok()); + TransactionDBOptions txn_db_options; + assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED); + txn_db_options.write_policy = + static_cast(FLAGS_txn_write_policy); + if (FLAGS_unordered_write) { + assert(txn_db_options.write_policy == + TxnDBWritePolicy::WRITE_PREPARED); + options_.unordered_write = true; + options_.two_write_queues = true; + txn_db_options.skip_concurrency_control = true; + } else { + options_.two_write_queues = FLAGS_two_write_queues; + } + txn_db_options.wp_snapshot_cache_bits = + static_cast(FLAGS_wp_snapshot_cache_bits); + txn_db_options.wp_commit_cache_bits = + static_cast(FLAGS_wp_commit_cache_bits); + PrepareTxnDbOptions(shared, txn_db_options); + s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, + cf_descriptors, &column_families_, &txn_db_); + if (!s.ok()) { + fprintf(stderr, "Error in opening the TransactionDB [%s]\n", + s.ToString().c_str()); + fflush(stderr); + } + assert(s.ok()); - // Do not swap the order of the following. - { - db_ = txn_db_; - db_aptr_.store(txn_db_, std::memory_order_release); + // Do not swap the order of the following. + { + db_ = txn_db_; + db_aptr_.store(txn_db_, std::memory_order_release); + } } } if (!s.ok()) { @@ -2811,10 +2868,12 @@ void StressTest::Reopen(ThreadState* thread) { } assert(s.ok()); } - assert(txn_db_ == nullptr || db_ == txn_db_); + assert((txn_db_ == nullptr && optimistic_txn_db_ == nullptr) || + (db_ == txn_db_ || db_ == optimistic_txn_db_)); delete db_; db_ = nullptr; txn_db_ = nullptr; + optimistic_txn_db_ = nullptr; num_times_reopened_++; auto now = clock_->NowMicros(); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index f7ee247a7..29159a494 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -17,6 +17,7 @@ namespace ROCKSDB_NAMESPACE { class SystemClock; class Transaction; class TransactionDB; +class OptimisticTransactionDB; struct TransactionDBOptions; class StressTest { @@ -261,6 +262,7 @@ class StressTest { std::shared_ptr filter_policy_; DB* db_; TransactionDB* txn_db_; + OptimisticTransactionDB* optimistic_txn_db_; // Currently only used in MultiOpsTxnsStressTest std::atomic db_aptr_; diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 62e96af23..f2b124b4c 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -274,6 +274,14 @@ int db_stress_tool(int argc, char** argv) { CheckAndSetOptionsForMultiOpsTxnStressTest(); } + if (!FLAGS_use_txn && FLAGS_use_optimistic_txn) { + fprintf( + stderr, + "You cannot set use_optimistic_txn true while use_txn is false. Please " + "set use_txn true if you want to use OptimisticTransactionDB\n"); + exit(1); + } + if (FLAGS_create_timestamped_snapshot_one_in > 0) { if (!FLAGS_use_txn) { fprintf(stderr, "timestamped snapshot supported only in TransactionDB\n"); @@ -291,8 +299,8 @@ int db_stress_tool(int argc, char** argv) { exit(1); } - if (FLAGS_use_txn && FLAGS_sync_fault_injection && - FLAGS_txn_write_policy != 0) { + if (FLAGS_use_txn && !FLAGS_use_optimistic_txn && + FLAGS_sync_fault_injection && FLAGS_txn_write_policy != 0) { fprintf(stderr, "For TransactionDB, correctness testing with unsync data loss is " "currently compatible with only write committed policy\n"); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 6cf3ea4c0..f33afe1bb 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -254,7 +254,7 @@ def setup_expected_values_dir(): # set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not # specified. - if test_exp_tmpdir is None or test_exp_tmpdir == "": + if test_exp_tmpdir is None or test_exp_tmpdir == "": test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) if test_exp_tmpdir is None or test_exp_tmpdir == "": @@ -280,7 +280,7 @@ def setup_multiops_txn_key_spaces_file(): # set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not # specified. - if test_exp_tmpdir is None or test_exp_tmpdir == "": + if test_exp_tmpdir is None or test_exp_tmpdir == "": test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) if test_exp_tmpdir is None or test_exp_tmpdir == "": @@ -369,8 +369,10 @@ cf_consistency_params = { "ingest_external_file_one_in": 0, } +# For pessimistic transaction db txn_params = { "use_txn": 1, + "use_optimistic_txn": 0, # Avoid lambda to set it once for the entire test "txn_write_policy": random.randint(0, 2), "unordered_write": random.randint(0, 1), @@ -383,7 +385,18 @@ txn_params = { "enable_pipelined_write": 0, "create_timestamped_snapshot_one_in": random.choice([0, 20]), # PutEntity in transactions is not yet implemented - "use_put_entity_one_in" : 0, + "use_put_entity_one_in": 0, +} + +# For optimistic transaction db +optimistic_txn_params = { + "use_txn": 1, + "use_optimistic_txn": 1, + "occ_validation_policy": random.randint(0, 1), + "share_occ_lock_buckets": random.randint(0, 1), + "occ_lock_bucket_count": lambda: random.choice([10, 100, 500]), + # PutEntity in transactions is not yet implemented + "use_put_entity_one_in": 0, } best_efforts_recovery_params = { @@ -425,7 +438,7 @@ ts_params = { "use_txn": 0, "ingest_external_file_one_in": 0, # PutEntity with timestamps is not yet implemented - "use_put_entity_one_in" : 0, + "use_put_entity_one_in": 0, } tiered_params = { @@ -481,9 +494,9 @@ multiops_txn_default_params = { "create_timestamped_snapshot_one_in": 50, "sync_fault_injection": 0, # PutEntity in transactions is not yet implemented - "use_put_entity_one_in" : 0, - "use_get_entity" : 0, - "use_multi_get_entity" : 0, + "use_put_entity_one_in": 0, + "use_get_entity": 0, + "use_multi_get_entity": 0, } multiops_wc_txn_params = { @@ -549,12 +562,16 @@ def finalize_and_sanitize(src_params): # Multi-key operations are not currently compatible with transactions or # timestamp. - if (dest_params.get("test_batches_snapshots") == 1 or - dest_params.get("use_txn") == 1 or - dest_params.get("user_timestamp_size") > 0): + if ( + dest_params.get("test_batches_snapshots") == 1 + or dest_params.get("use_txn") == 1 + or dest_params.get("user_timestamp_size") > 0 + ): dest_params["ingest_external_file_one_in"] = 0 - if (dest_params.get("test_batches_snapshots") == 1 or - dest_params.get("use_txn") == 1): + if ( + dest_params.get("test_batches_snapshots") == 1 + or dest_params.get("use_txn") == 1 + ): dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delrangepercent"] = 0 if ( @@ -632,7 +649,7 @@ def finalize_and_sanitize(src_params): dest_params["unordered_write"] = 0 # For TransactionDB, correctness testing with unsync data loss is currently # compatible with only write committed policy - if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): + if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0: dest_params["sync_fault_injection"] = 0 dest_params["manual_wal_flush_one_in"] = 0 # PutEntity is currently not supported by SstFileWriter or in conjunction with Merge @@ -662,6 +679,8 @@ def gen_cmd_params(args): params.update(cf_consistency_params) if args.txn: params.update(txn_params) + if args.optimistic_txn: + params.update(optimistic_txn_params) if args.test_best_efforts_recovery: params.update(best_efforts_recovery_params) if args.enable_ts: @@ -707,6 +726,7 @@ def gen_cmd(params, unknown_params): "random_kill_odd", "cf_consistency", "txn", + "optimistic_txn", "test_best_efforts_recovery", "enable_ts", "test_multiops_txn", @@ -879,9 +899,17 @@ def whitebox_crash_main(args, unknown_args): "ops_per_thread": cmd_params["ops_per_thread"], } - cur_compaction_style = additional_opts.get("compaction_style", cmd_params.get("compaction_style", 0)) - if prev_compaction_style != -1 and prev_compaction_style != cur_compaction_style: - print("`compaction_style` is changed in current run so `destroy_db_initially` is set to 1 as a short-term solution to avoid cycling through previous db of different compaction style." + "\n") + cur_compaction_style = additional_opts.get( + "compaction_style", cmd_params.get("compaction_style", 0) + ) + if ( + prev_compaction_style != -1 + and prev_compaction_style != cur_compaction_style + ): + print( + "`compaction_style` is changed in current run so `destroy_db_initially` is set to 1 as a short-term solution to avoid cycling through previous db of different compaction style." + + "\n" + ) additional_opts["destroy_db_initially"] = 1 prev_compaction_style = cur_compaction_style @@ -959,7 +987,7 @@ def whitebox_crash_main(args, unknown_args): os.mkdir(dbname) except OSError: pass - if (expected_values_dir is not None): + if expected_values_dir is not None: shutil.rmtree(expected_values_dir, True) os.mkdir(expected_values_dir) @@ -980,6 +1008,7 @@ def main(): parser.add_argument("--simple", action="store_true") parser.add_argument("--cf_consistency", action="store_true") parser.add_argument("--txn", action="store_true") + parser.add_argument("--optimistic_txn", action="store_true") parser.add_argument("--test_best_efforts_recovery", action="store_true") parser.add_argument("--enable_ts", action="store_true") parser.add_argument("--test_multiops_txn", action="store_true") @@ -1004,6 +1033,7 @@ def main(): + list(cf_consistency_params.items()) + list(tiered_params.items()) + list(txn_params.items()) + + list(optimistic_txn_params.items()) ) for k, v in all_params.items():