Stress/Crash Test for OptimisticTransactionDB (#11513)

Summary:
Context:
OptimisticTransactionDB has not been covered by db_stress (including crash test) like TransactionDB.
1. Adding the following gflag options to to test OptimisticTransactionDB
- `use_optimistic_txn`: When true, open OptimisticTransactionDB to test
- `occ_validation_policy`: `OccValidationPolicy::kValidateParallel = 1` by default.
- `share_occ_lock_buckets`: Use shared occ locks
- `occ_lock_bucket_count`: 500 by default. Number of buckets to use for shared occ lock.
2. Opening OptimisticTransactionDB and NewTxn/Commit added per `use_optimistic_txn` flag in `db_stress_test_base.cc`
3. OptimisticTransactionDB blackbox/whitebox test added in crash_test.mk

Please note that the existing flag `use_txn` is being used here. When `use_txn == true` and `use_optimistic_txn == false`, we use `TransactionDB` (a.k.a. pessimistic transaction db). When both `use_txn` and `use_optimistic_txn` are true, we use `OptimisticTransactionDB`. If `use_txn == false` but `use_optimistic_txn == true` throw error with message _"You cannot set use_optimistic_txn true while use_txn is false. Please set use_txn true if you want to use OptimisticTransactionDB"_.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11513

Test Plan:
**Crash Test**
Serial Validation
```
export CRASH_TEST_EXT_ARGS="--use_optimistic_txn=1 --use_txn=1 --use_put_entity_one_in=0 --occ_validation_policy=0"
make crash_test -j
```
Parallel Validation (no share bucket)
```
export CRASH_TEST_EXT_ARGS="--use_optimistic_txn=1 --use_txn=1 --use_put_entity_one_in=0 --occ_validation_policy=1 --share_occ_lock_buckets=0"
make crash_test -j
```
Parallel Validation (share bucket)
```
export CRASH_TEST_EXT_ARGS="--use_optimistic_txn=1 --use_txn=1 --use_put_entity_one_in=0 --occ_validation_policy=1 --share_occ_lock_buckets=1 --occ_lock_bucket_count=500"
make crash_test -j
```

**Stress Test**
```
./db_stress -use_optimistic_txn -threads=32
```

Reviewed By: pdillinger

Differential Revision: D46547387

Pulled By: jaykorean

fbshipit-source-id: ca19819ca6e0281694966998014b40d95d4e5960
oxigraph-main
Jay Huh 2 years ago committed by Facebook GitHub Bot
parent 1da9ac2363
commit 17d5200504
  1. 14
      crash_test.mk
  2. 19
      db_stress_tool/db_stress_common.h
  3. 17
      db_stress_tool/db_stress_gflags.cc
  4. 231
      db_stress_tool/db_stress_test_base.cc
  5. 2
      db_stress_tool/db_stress_test_base.h
  6. 12
      db_stress_tool/db_stress_tool.cc
  7. 64
      tools/db_crashtest.py

@ -21,6 +21,8 @@ CRASHTEST_PY=$(PYTHON) -u tools/db_crashtest.py --stress_cmd=$(DB_STRESS_CMD) --
blackbox_crash_test_with_multiops_wp_txn \ blackbox_crash_test_with_multiops_wp_txn \
crash_test_with_tiered_storage blackbox_crash_test_with_tiered_storage \ crash_test_with_tiered_storage blackbox_crash_test_with_tiered_storage \
whitebox_crash_test_with_tiered_storage \ whitebox_crash_test_with_tiered_storage \
whitebox_crash_test_with_optimistic_txn \
blackbox_crash_test_with_optimistic_txn \
crash_test: $(DB_STRESS_CMD) crash_test: $(DB_STRESS_CMD)
# Do not parallelize # Do not parallelize
@ -37,6 +39,11 @@ crash_test_with_txn: $(DB_STRESS_CMD)
$(CRASHTEST_MAKE) whitebox_crash_test_with_txn $(CRASHTEST_MAKE) whitebox_crash_test_with_txn
$(CRASHTEST_MAKE) blackbox_crash_test_with_txn $(CRASHTEST_MAKE) blackbox_crash_test_with_txn
crash_test_with_optimistic_txn: $(DB_STRESS_CMD)
# Do not parallelize
$(CRASHTEST_MAKE) whitebox_crash_test_with_optimistic_txn
$(CRASHTEST_MAKE) blackbox_crash_test_with_optimistic_txn
crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery
crash_test_with_ts: $(DB_STRESS_CMD) crash_test_with_ts: $(DB_STRESS_CMD)
@ -80,6 +87,9 @@ blackbox_crash_test_with_multiops_wp_txn: $(DB_STRESS_CMD)
blackbox_crash_test_with_tiered_storage: $(DB_STRESS_CMD) blackbox_crash_test_with_tiered_storage: $(DB_STRESS_CMD)
$(CRASHTEST_PY) --test_tiered_storage blackbox $(CRASH_TEST_EXT_ARGS) $(CRASHTEST_PY) --test_tiered_storage blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_optimistic_txn: $(DB_STRESS_CMD)
$(CRASHTEST_PY) --optimistic_txn blackbox $(CRASH_TEST_EXT_ARGS)
ifeq ($(CRASH_TEST_KILL_ODD),) ifeq ($(CRASH_TEST_KILL_ODD),)
CRASH_TEST_KILL_ODD=888887 CRASH_TEST_KILL_ODD=888887
endif endif
@ -105,3 +115,7 @@ whitebox_crash_test_with_ts: $(DB_STRESS_CMD)
whitebox_crash_test_with_tiered_storage: $(DB_STRESS_CMD) whitebox_crash_test_with_tiered_storage: $(DB_STRESS_CMD)
$(CRASHTEST_PY) --test_tiered_storage whitebox --random_kill_odd \ $(CRASHTEST_PY) --test_tiered_storage whitebox --random_kill_odd \
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)
whitebox_crash_test_with_optimistic_txn: $(DB_STRESS_CMD)
$(CRASHTEST_PY) --optimistic_txn whitebox --random_kill_odd \
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)

@ -54,6 +54,7 @@
#include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
@ -194,9 +195,6 @@ DECLARE_bool(rate_limit_user_ops);
DECLARE_bool(rate_limit_auto_wal_flush); DECLARE_bool(rate_limit_auto_wal_flush);
DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_sec);
DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_uint64(sst_file_manager_bytes_per_truncate);
DECLARE_bool(use_txn);
DECLARE_uint64(txn_write_policy);
DECLARE_bool(unordered_write);
DECLARE_int32(backup_one_in); DECLARE_int32(backup_one_in);
DECLARE_uint64(backup_max_size); DECLARE_uint64(backup_max_size);
DECLARE_int32(checkpoint_one_in); DECLARE_int32(checkpoint_one_in);
@ -255,6 +253,21 @@ DECLARE_int32(continuous_verification_interval);
DECLARE_int32(get_property_one_in); DECLARE_int32(get_property_one_in);
DECLARE_string(file_checksum_impl); DECLARE_string(file_checksum_impl);
// Options for transaction dbs.
// Use TransactionDB (a.k.a. Pessimistic Transaction DB)
// OR OptimisticTransactionDB
DECLARE_bool(use_txn);
// Options for TransactionDB (a.k.a. Pessimistic Transaction DB)
DECLARE_uint64(txn_write_policy);
DECLARE_bool(unordered_write);
// Options for OptimisticTransactionDB
DECLARE_bool(use_optimistic_txn);
DECLARE_uint64(occ_validation_policy);
DECLARE_bool(share_occ_lock_buckets);
DECLARE_uint32(occ_lock_bucket_count);
// Options for StackableDB-based BlobDB // Options for StackableDB-based BlobDB
DECLARE_bool(use_blob_db); DECLARE_bool(use_blob_db);
DECLARE_uint64(blob_db_min_blob_size); DECLARE_uint64(blob_db_min_blob_size);

@ -669,13 +669,24 @@ DEFINE_uint64(sst_file_manager_bytes_per_truncate, 0,
"many bytes. By default whole files will be deleted."); "many bytes. By default whole files will be deleted.");
DEFINE_bool(use_txn, false, DEFINE_bool(use_txn, false,
"Use TransactionDB. Currently the default write policy is " "Use TransactionDB or OptimisticTransactionDB. When "
"TxnDBWritePolicy::WRITE_PREPARED"); "use_optimistic_txn == false (by default), "
"it's (Pessimistic) TransactionDB");
DEFINE_uint64(txn_write_policy, 0, DEFINE_uint64(txn_write_policy, 0,
"The transaction write policy. Default is " "The transaction write policy. Default is "
"TxnDBWritePolicy::WRITE_COMMITTED. Note that this should not be " "TxnDBWritePolicy::WRITE_COMMITTED. Note that this should not be "
"changed accross crashes."); "changed across crashes.");
DEFINE_bool(use_optimistic_txn, false, "Use OptimisticTransactionDB.");
DEFINE_uint64(occ_validation_policy, 1,
"Optimistic Concurrency Control Validation Policy for "
"OptimisticTransactionDB");
DEFINE_bool(share_occ_lock_buckets, false,
"Share a pool of locks across DB instances for buckets");
DEFINE_uint32(
occ_lock_bucket_count, 500,
"Bucket Count for shared Optimistic Concurrency Control (OCC) locks");
DEFINE_bool(unordered_write, false, DEFINE_bool(unordered_write, false,
"Turn on the unordered_write feature. This options is currently " "Turn on the unordered_write feature. This options is currently "

@ -301,7 +301,7 @@ void StressTest::FinishInitDb(SharedState* shared) {
exit(1); exit(1);
} }
} }
if (FLAGS_use_txn) { if (FLAGS_use_txn && !FLAGS_use_optimistic_txn) {
// It's OK here without sync because unsynced data cannot be lost at this // It's OK here without sync because unsynced data cannot be lost at this
// point // point
// - even with sync_fault_injection=1 as the // - even with sync_fault_injection=1 as the
@ -556,6 +556,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
delete db_; delete db_;
db_ = nullptr; db_ = nullptr;
txn_db_ = nullptr; txn_db_ = nullptr;
optimistic_txn_db_ = nullptr;
db_preload_finished_.store(true); db_preload_finished_.store(true);
auto now = clock_->NowMicros(); auto now = clock_->NowMicros();
@ -634,55 +635,66 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
} }
write_opts.disableWAL = FLAGS_disable_wal; write_opts.disableWAL = FLAGS_disable_wal;
static std::atomic<uint64_t> txn_id = {0}; static std::atomic<uint64_t> txn_id = {0};
TransactionOptions txn_options; if (FLAGS_use_optimistic_txn) {
txn_options.use_only_the_last_commit_time_batch_for_recovery = *txn = optimistic_txn_db_->BeginTransaction(write_opts);
FLAGS_use_only_the_last_commit_time_batch_for_recovery; return Status::OK();
txn_options.lock_timeout = 600000; // 10 min } else {
txn_options.deadlock_detect = true; TransactionOptions txn_options;
*txn = txn_db_->BeginTransaction(write_opts, txn_options); txn_options.use_only_the_last_commit_time_batch_for_recovery =
auto istr = std::to_string(txn_id.fetch_add(1)); FLAGS_use_only_the_last_commit_time_batch_for_recovery;
Status s = (*txn)->SetName("xid" + istr); txn_options.lock_timeout = 600000; // 10 min
return s; txn_options.deadlock_detect = true;
*txn = txn_db_->BeginTransaction(write_opts, txn_options);
auto istr = std::to_string(txn_id.fetch_add(1));
Status s = (*txn)->SetName("xid" + istr);
return s;
}
} }
Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
} }
assert(txn_db_); Status s = Status::OK();
Status s = txn->Prepare(); if (FLAGS_use_optimistic_txn) {
std::shared_ptr<const Snapshot> timestamped_snapshot; assert(optimistic_txn_db_);
if (s.ok()) { s = txn->Commit();
if (thread && FLAGS_create_timestamped_snapshot_one_in && } else {
thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { assert(txn_db_);
uint64_t ts = db_stress_env->NowNanos(); s = txn->Prepare();
s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, std::shared_ptr<const Snapshot> timestamped_snapshot;
&timestamped_snapshot); if (s.ok()) {
if (thread && FLAGS_create_timestamped_snapshot_one_in &&
std::pair<Status, std::shared_ptr<const Snapshot>> res; thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) {
if (thread->tid == 0) { uint64_t ts = db_stress_env->NowNanos();
uint64_t now = db_stress_env->NowNanos(); s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts,
res = txn_db_->CreateTimestampedSnapshot(now); &timestamped_snapshot);
if (res.first.ok()) {
assert(res.second); std::pair<Status, std::shared_ptr<const Snapshot>> res;
assert(res.second->GetTimestamp() == now); if (thread->tid == 0) {
if (timestamped_snapshot) { uint64_t now = db_stress_env->NowNanos();
assert(res.second->GetTimestamp() > res = txn_db_->CreateTimestampedSnapshot(now);
timestamped_snapshot->GetTimestamp()); if (res.first.ok()) {
assert(res.second);
assert(res.second->GetTimestamp() == now);
if (timestamped_snapshot) {
assert(res.second->GetTimestamp() >
timestamped_snapshot->GetTimestamp());
}
} else {
assert(!res.second);
} }
} else {
assert(!res.second);
} }
} else {
s = txn->Commit();
} }
} else {
s = txn->Commit();
} }
} if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 &&
if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && thread->rand.OneInOpt(50000)) {
thread->rand.OneInOpt(50000)) { uint64_t now = db_stress_env->NowNanos();
uint64_t now = db_stress_env->NowNanos(); constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000;
constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000; txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff);
txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); }
} }
delete txn; delete txn;
return s; return s;
@ -2311,24 +2323,39 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Format version : %d\n", FLAGS_format_version); fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n", fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false"); FLAGS_use_txn ? "true" : "false");
if (FLAGS_use_txn) { if (FLAGS_use_txn) {
fprintf(stdout, "Two write queues: : %s\n", fprintf(stdout, "TransactionDB Type : %s\n",
FLAGS_two_write_queues ? "true" : "false"); FLAGS_use_optimistic_txn ? "Optimistic" : "Pessimistic");
fprintf(stdout, "Write policy : %d\n", if (FLAGS_use_optimistic_txn) {
static_cast<int>(FLAGS_txn_write_policy)); fprintf(stdout, "OCC Validation Type : %d\n",
if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) == static_cast<int>(FLAGS_occ_validation_policy));
FLAGS_txn_write_policy || if (static_cast<uint64_t>(OccValidationPolicy::kValidateParallel) ==
static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) == FLAGS_occ_validation_policy) {
FLAGS_txn_write_policy) { fprintf(stdout, "Share Lock Buckets : %s\n",
fprintf(stdout, "Snapshot cache bits : %d\n", FLAGS_share_occ_lock_buckets ? "true" : "false");
static_cast<int>(FLAGS_wp_snapshot_cache_bits)); if (FLAGS_share_occ_lock_buckets) {
fprintf(stdout, "Commit cache bits : %d\n", fprintf(stdout, "Lock Bucket Count : %d\n",
static_cast<int>(FLAGS_wp_commit_cache_bits)); static_cast<int>(FLAGS_occ_lock_bucket_count));
} }
fprintf(stdout, "last cwb for recovery : %s\n", }
FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true" } else {
: "false"); fprintf(stdout, "Two write queues: : %s\n",
FLAGS_two_write_queues ? "true" : "false");
fprintf(stdout, "Write policy : %d\n",
static_cast<int>(FLAGS_txn_write_policy));
if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) ==
FLAGS_txn_write_policy ||
static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) ==
FLAGS_txn_write_policy) {
fprintf(stdout, "Snapshot cache bits : %d\n",
static_cast<int>(FLAGS_wp_snapshot_cache_bits));
fprintf(stdout, "Commit cache bits : %d\n",
static_cast<int>(FLAGS_wp_commit_cache_bits));
}
fprintf(stdout, "last cwb for recovery : %s\n",
FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true"
: "false");
}
} }
fprintf(stdout, "Stacked BlobDB : %s\n", fprintf(stdout, "Stacked BlobDB : %s\n",
@ -2477,6 +2504,7 @@ void StressTest::PrintEnv() const {
void StressTest::Open(SharedState* shared) { void StressTest::Open(SharedState* shared) {
assert(db_ == nullptr); assert(db_ == nullptr);
assert(txn_db_ == nullptr); assert(txn_db_ == nullptr);
assert(optimistic_txn_db_ == nullptr);
if (!InitializeOptionsFromFile(options_)) { if (!InitializeOptionsFromFile(options_)) {
InitializeOptionsFromFlags(cache_, filter_policy_, options_); InitializeOptionsFromFlags(cache_, filter_policy_, options_);
} }
@ -2704,36 +2732,65 @@ void StressTest::Open(SharedState* shared) {
break; break;
} }
} else { } else {
TransactionDBOptions txn_db_options; if (FLAGS_use_optimistic_txn) {
assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED); OptimisticTransactionDBOptions optimistic_txn_db_options;
txn_db_options.write_policy = optimistic_txn_db_options.validate_policy =
static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy); static_cast<OccValidationPolicy>(FLAGS_occ_validation_policy);
if (FLAGS_unordered_write) {
assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED); if (FLAGS_share_occ_lock_buckets) {
options_.unordered_write = true; optimistic_txn_db_options.shared_lock_buckets =
options_.two_write_queues = true; MakeSharedOccLockBuckets(FLAGS_occ_lock_bucket_count);
txn_db_options.skip_concurrency_control = true; } else {
optimistic_txn_db_options.occ_lock_buckets =
FLAGS_occ_lock_bucket_count;
optimistic_txn_db_options.shared_lock_buckets = nullptr;
}
s = OptimisticTransactionDB::Open(
options_, optimistic_txn_db_options, FLAGS_db, cf_descriptors,
&column_families_, &optimistic_txn_db_);
if (!s.ok()) {
fprintf(stderr, "Error in opening the OptimisticTransactionDB [%s]\n",
s.ToString().c_str());
fflush(stderr);
}
assert(s.ok());
{
db_ = optimistic_txn_db_;
db_aptr_.store(optimistic_txn_db_, std::memory_order_release);
}
} else { } else {
options_.two_write_queues = FLAGS_two_write_queues; TransactionDBOptions txn_db_options;
} assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
txn_db_options.wp_snapshot_cache_bits = txn_db_options.write_policy =
static_cast<size_t>(FLAGS_wp_snapshot_cache_bits); static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
txn_db_options.wp_commit_cache_bits = if (FLAGS_unordered_write) {
static_cast<size_t>(FLAGS_wp_commit_cache_bits); assert(txn_db_options.write_policy ==
PrepareTxnDbOptions(shared, txn_db_options); TxnDBWritePolicy::WRITE_PREPARED);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, options_.unordered_write = true;
cf_descriptors, &column_families_, &txn_db_); options_.two_write_queues = true;
if (!s.ok()) { txn_db_options.skip_concurrency_control = true;
fprintf(stderr, "Error in opening the TransactionDB [%s]\n", } else {
s.ToString().c_str()); options_.two_write_queues = FLAGS_two_write_queues;
fflush(stderr); }
} txn_db_options.wp_snapshot_cache_bits =
assert(s.ok()); static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
txn_db_options.wp_commit_cache_bits =
static_cast<size_t>(FLAGS_wp_commit_cache_bits);
PrepareTxnDbOptions(shared, txn_db_options);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_);
if (!s.ok()) {
fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
s.ToString().c_str());
fflush(stderr);
}
assert(s.ok());
// Do not swap the order of the following. // Do not swap the order of the following.
{ {
db_ = txn_db_; db_ = txn_db_;
db_aptr_.store(txn_db_, std::memory_order_release); db_aptr_.store(txn_db_, std::memory_order_release);
}
} }
} }
if (!s.ok()) { if (!s.ok()) {
@ -2811,10 +2868,12 @@ void StressTest::Reopen(ThreadState* thread) {
} }
assert(s.ok()); assert(s.ok());
} }
assert(txn_db_ == nullptr || db_ == txn_db_); assert((txn_db_ == nullptr && optimistic_txn_db_ == nullptr) ||
(db_ == txn_db_ || db_ == optimistic_txn_db_));
delete db_; delete db_;
db_ = nullptr; db_ = nullptr;
txn_db_ = nullptr; txn_db_ = nullptr;
optimistic_txn_db_ = nullptr;
num_times_reopened_++; num_times_reopened_++;
auto now = clock_->NowMicros(); auto now = clock_->NowMicros();

@ -17,6 +17,7 @@ namespace ROCKSDB_NAMESPACE {
class SystemClock; class SystemClock;
class Transaction; class Transaction;
class TransactionDB; class TransactionDB;
class OptimisticTransactionDB;
struct TransactionDBOptions; struct TransactionDBOptions;
class StressTest { class StressTest {
@ -261,6 +262,7 @@ class StressTest {
std::shared_ptr<const FilterPolicy> filter_policy_; std::shared_ptr<const FilterPolicy> filter_policy_;
DB* db_; DB* db_;
TransactionDB* txn_db_; TransactionDB* txn_db_;
OptimisticTransactionDB* optimistic_txn_db_;
// Currently only used in MultiOpsTxnsStressTest // Currently only used in MultiOpsTxnsStressTest
std::atomic<DB*> db_aptr_; std::atomic<DB*> db_aptr_;

@ -274,6 +274,14 @@ int db_stress_tool(int argc, char** argv) {
CheckAndSetOptionsForMultiOpsTxnStressTest(); CheckAndSetOptionsForMultiOpsTxnStressTest();
} }
if (!FLAGS_use_txn && FLAGS_use_optimistic_txn) {
fprintf(
stderr,
"You cannot set use_optimistic_txn true while use_txn is false. Please "
"set use_txn true if you want to use OptimisticTransactionDB\n");
exit(1);
}
if (FLAGS_create_timestamped_snapshot_one_in > 0) { if (FLAGS_create_timestamped_snapshot_one_in > 0) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
fprintf(stderr, "timestamped snapshot supported only in TransactionDB\n"); fprintf(stderr, "timestamped snapshot supported only in TransactionDB\n");
@ -291,8 +299,8 @@ int db_stress_tool(int argc, char** argv) {
exit(1); exit(1);
} }
if (FLAGS_use_txn && FLAGS_sync_fault_injection && if (FLAGS_use_txn && !FLAGS_use_optimistic_txn &&
FLAGS_txn_write_policy != 0) { FLAGS_sync_fault_injection && FLAGS_txn_write_policy != 0) {
fprintf(stderr, fprintf(stderr,
"For TransactionDB, correctness testing with unsync data loss is " "For TransactionDB, correctness testing with unsync data loss is "
"currently compatible with only write committed policy\n"); "currently compatible with only write committed policy\n");

@ -254,7 +254,7 @@ def setup_expected_values_dir():
# set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not # set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not
# specified. # specified.
if test_exp_tmpdir is None or test_exp_tmpdir == "": if test_exp_tmpdir is None or test_exp_tmpdir == "":
test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_exp_tmpdir is None or test_exp_tmpdir == "": if test_exp_tmpdir is None or test_exp_tmpdir == "":
@ -280,7 +280,7 @@ def setup_multiops_txn_key_spaces_file():
# set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not # set the value to _TEST_DIR_ENV_VAR if _TEST_EXPECTED_DIR_ENV_VAR is not
# specified. # specified.
if test_exp_tmpdir is None or test_exp_tmpdir == "": if test_exp_tmpdir is None or test_exp_tmpdir == "":
test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) test_exp_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_exp_tmpdir is None or test_exp_tmpdir == "": if test_exp_tmpdir is None or test_exp_tmpdir == "":
@ -369,8 +369,10 @@ cf_consistency_params = {
"ingest_external_file_one_in": 0, "ingest_external_file_one_in": 0,
} }
# For pessimistic transaction db
txn_params = { txn_params = {
"use_txn": 1, "use_txn": 1,
"use_optimistic_txn": 0,
# Avoid lambda to set it once for the entire test # Avoid lambda to set it once for the entire test
"txn_write_policy": random.randint(0, 2), "txn_write_policy": random.randint(0, 2),
"unordered_write": random.randint(0, 1), "unordered_write": random.randint(0, 1),
@ -383,7 +385,18 @@ txn_params = {
"enable_pipelined_write": 0, "enable_pipelined_write": 0,
"create_timestamped_snapshot_one_in": random.choice([0, 20]), "create_timestamped_snapshot_one_in": random.choice([0, 20]),
# PutEntity in transactions is not yet implemented # PutEntity in transactions is not yet implemented
"use_put_entity_one_in" : 0, "use_put_entity_one_in": 0,
}
# For optimistic transaction db
optimistic_txn_params = {
"use_txn": 1,
"use_optimistic_txn": 1,
"occ_validation_policy": random.randint(0, 1),
"share_occ_lock_buckets": random.randint(0, 1),
"occ_lock_bucket_count": lambda: random.choice([10, 100, 500]),
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in": 0,
} }
best_efforts_recovery_params = { best_efforts_recovery_params = {
@ -425,7 +438,7 @@ ts_params = {
"use_txn": 0, "use_txn": 0,
"ingest_external_file_one_in": 0, "ingest_external_file_one_in": 0,
# PutEntity with timestamps is not yet implemented # PutEntity with timestamps is not yet implemented
"use_put_entity_one_in" : 0, "use_put_entity_one_in": 0,
} }
tiered_params = { tiered_params = {
@ -481,9 +494,9 @@ multiops_txn_default_params = {
"create_timestamped_snapshot_one_in": 50, "create_timestamped_snapshot_one_in": 50,
"sync_fault_injection": 0, "sync_fault_injection": 0,
# PutEntity in transactions is not yet implemented # PutEntity in transactions is not yet implemented
"use_put_entity_one_in" : 0, "use_put_entity_one_in": 0,
"use_get_entity" : 0, "use_get_entity": 0,
"use_multi_get_entity" : 0, "use_multi_get_entity": 0,
} }
multiops_wc_txn_params = { multiops_wc_txn_params = {
@ -549,12 +562,16 @@ def finalize_and_sanitize(src_params):
# Multi-key operations are not currently compatible with transactions or # Multi-key operations are not currently compatible with transactions or
# timestamp. # timestamp.
if (dest_params.get("test_batches_snapshots") == 1 or if (
dest_params.get("use_txn") == 1 or dest_params.get("test_batches_snapshots") == 1
dest_params.get("user_timestamp_size") > 0): or dest_params.get("use_txn") == 1
or dest_params.get("user_timestamp_size") > 0
):
dest_params["ingest_external_file_one_in"] = 0 dest_params["ingest_external_file_one_in"] = 0
if (dest_params.get("test_batches_snapshots") == 1 or if (
dest_params.get("use_txn") == 1): dest_params.get("test_batches_snapshots") == 1
or dest_params.get("use_txn") == 1
):
dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0 dest_params["delrangepercent"] = 0
if ( if (
@ -632,7 +649,7 @@ def finalize_and_sanitize(src_params):
dest_params["unordered_write"] = 0 dest_params["unordered_write"] = 0
# For TransactionDB, correctness testing with unsync data loss is currently # For TransactionDB, correctness testing with unsync data loss is currently
# compatible with only write committed policy # compatible with only write committed policy
if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0:
dest_params["sync_fault_injection"] = 0 dest_params["sync_fault_injection"] = 0
dest_params["manual_wal_flush_one_in"] = 0 dest_params["manual_wal_flush_one_in"] = 0
# PutEntity is currently not supported by SstFileWriter or in conjunction with Merge # PutEntity is currently not supported by SstFileWriter or in conjunction with Merge
@ -662,6 +679,8 @@ def gen_cmd_params(args):
params.update(cf_consistency_params) params.update(cf_consistency_params)
if args.txn: if args.txn:
params.update(txn_params) params.update(txn_params)
if args.optimistic_txn:
params.update(optimistic_txn_params)
if args.test_best_efforts_recovery: if args.test_best_efforts_recovery:
params.update(best_efforts_recovery_params) params.update(best_efforts_recovery_params)
if args.enable_ts: if args.enable_ts:
@ -707,6 +726,7 @@ def gen_cmd(params, unknown_params):
"random_kill_odd", "random_kill_odd",
"cf_consistency", "cf_consistency",
"txn", "txn",
"optimistic_txn",
"test_best_efforts_recovery", "test_best_efforts_recovery",
"enable_ts", "enable_ts",
"test_multiops_txn", "test_multiops_txn",
@ -879,9 +899,17 @@ def whitebox_crash_main(args, unknown_args):
"ops_per_thread": cmd_params["ops_per_thread"], "ops_per_thread": cmd_params["ops_per_thread"],
} }
cur_compaction_style = additional_opts.get("compaction_style", cmd_params.get("compaction_style", 0)) cur_compaction_style = additional_opts.get(
if prev_compaction_style != -1 and prev_compaction_style != cur_compaction_style: "compaction_style", cmd_params.get("compaction_style", 0)
print("`compaction_style` is changed in current run so `destroy_db_initially` is set to 1 as a short-term solution to avoid cycling through previous db of different compaction style." + "\n") )
if (
prev_compaction_style != -1
and prev_compaction_style != cur_compaction_style
):
print(
"`compaction_style` is changed in current run so `destroy_db_initially` is set to 1 as a short-term solution to avoid cycling through previous db of different compaction style."
+ "\n"
)
additional_opts["destroy_db_initially"] = 1 additional_opts["destroy_db_initially"] = 1
prev_compaction_style = cur_compaction_style prev_compaction_style = cur_compaction_style
@ -959,7 +987,7 @@ def whitebox_crash_main(args, unknown_args):
os.mkdir(dbname) os.mkdir(dbname)
except OSError: except OSError:
pass pass
if (expected_values_dir is not None): if expected_values_dir is not None:
shutil.rmtree(expected_values_dir, True) shutil.rmtree(expected_values_dir, True)
os.mkdir(expected_values_dir) os.mkdir(expected_values_dir)
@ -980,6 +1008,7 @@ def main():
parser.add_argument("--simple", action="store_true") parser.add_argument("--simple", action="store_true")
parser.add_argument("--cf_consistency", action="store_true") parser.add_argument("--cf_consistency", action="store_true")
parser.add_argument("--txn", action="store_true") parser.add_argument("--txn", action="store_true")
parser.add_argument("--optimistic_txn", action="store_true")
parser.add_argument("--test_best_efforts_recovery", action="store_true") parser.add_argument("--test_best_efforts_recovery", action="store_true")
parser.add_argument("--enable_ts", action="store_true") parser.add_argument("--enable_ts", action="store_true")
parser.add_argument("--test_multiops_txn", action="store_true") parser.add_argument("--test_multiops_txn", action="store_true")
@ -1004,6 +1033,7 @@ def main():
+ list(cf_consistency_params.items()) + list(cf_consistency_params.items())
+ list(tiered_params.items()) + list(tiered_params.items())
+ list(txn_params.items()) + list(txn_params.items())
+ list(optimistic_txn_params.items())
) )
for k, v in all_params.items(): for k, v in all_params.items():

Loading…
Cancel
Save