Improve stress test for transactions (#9568)

Summary:
Test only, no change to functionality.
Extremely low risk of library regression.

Update test key generation by maintaining existing and non-existing keys.
Update db_crashtest.py to drive multiops_txn stress test for both write-committed and write-prepared.
Add a make target 'blackbox_crash_test_with_multiops_txn'.

Running the following commands caught the bug exposed in https://github.com/facebook/rocksdb/issues/9571.
```
$rm -rf /tmp/rocksdbtest/*
$./db_stress -progress_reports=0 -test_multi_ops_txns -use_txn -clear_column_family_one_in=0 \
    -column_families=1 -writepercent=0 -delpercent=0 -delrangepercent=0 -customopspercent=60 \
   -readpercent=20 -prefixpercent=0 -iterpercent=20 -reopen=0 -ops_per_thread=1000 -ub_a=10000 \
   -ub_c=100 -destroy_db_initially=0 -key_spaces_path=/dev/shm/key_spaces_desc -threads=32 -read_fault_one_in=0
$./db_stress -progress_reports=0 -test_multi_ops_txns -use_txn -clear_column_family_one_in=0
   -column_families=1 -writepercent=0 -delpercent=0 -delrangepercent=0 -customopspercent=60 -readpercent=20 \
   -prefixpercent=0 -iterpercent=20 -reopen=0 -ops_per_thread=1000 -ub_a=10000 -ub_c=100 -destroy_db_initially=0 \
   -key_spaces_path=/dev/shm/key_spaces_desc -threads=32 -read_fault_one_in=0
```

Running the following command caught a bug which will be fixed in https://github.com/facebook/rocksdb/issues/9648 .
```
$TEST_TMPDIR=/dev/shm make blackbox_crash_test_with_multiops_wc_txn
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9568

Reviewed By: jay-zhuang

Differential Revision: D34308154

Pulled By: riversand963

fbshipit-source-id: 99ff1b65c19b46c471d2f2d3b47adcd342a1b9e7
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent fe9a344c55
commit 5894761056
  1. 4
      Makefile
  2. 10
      crash_test.mk
  3. 5
      db/write_batch.cc
  4. 8
      db_stress_tool/db_stress_common.h
  5. 22
      db_stress_tool/db_stress_gflags.cc
  6. 38
      db_stress_tool/db_stress_test_base.cc
  7. 788
      db_stress_tool/multi_ops_txns_stress.cc
  8. 88
      db_stress_tool/multi_ops_txns_stress.h
  9. 5
      include/rocksdb/utilities/transaction_db.h
  10. 94
      tools/db_crashtest.py

@ -789,7 +789,9 @@ endif # PLATFORM_SHARED_EXT
blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \ blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \
blackbox_crash_test_with_txn whitebox_crash_test_with_txn \ blackbox_crash_test_with_txn whitebox_crash_test_with_txn \
blackbox_crash_test_with_best_efforts_recovery \ blackbox_crash_test_with_best_efforts_recovery \
blackbox_crash_test_with_ts whitebox_crash_test_with_ts blackbox_crash_test_with_ts whitebox_crash_test_with_ts \
blackbox_crash_test_with_multiops_wc_txn \
blackbox_crash_test_with_multiops_wp_txn
all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS) all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS)

@ -16,7 +16,9 @@ CRASHTEST_PY=$(PYTHON) -u tools/db_crashtest.py --stress_cmd=$(DB_STRESS_CMD)
blackbox_crash_test_with_txn blackbox_crash_test_with_ts \ blackbox_crash_test_with_txn blackbox_crash_test_with_ts \
blackbox_crash_test_with_best_efforts_recovery \ blackbox_crash_test_with_best_efforts_recovery \
whitebox_crash_test whitebox_crash_test_with_atomic_flush \ whitebox_crash_test whitebox_crash_test_with_atomic_flush \
whitebox_crash_test_with_txn whitebox_crash_test_with_ts whitebox_crash_test_with_txn whitebox_crash_test_with_ts \
blackbox_crash_test_with_multiops_wc_txn \
blackbox_crash_test_with_multiops_wp_txn
crash_test: $(DB_STRESS_CMD) crash_test: $(DB_STRESS_CMD)
# Do not parallelize # Do not parallelize
@ -56,6 +58,12 @@ blackbox_crash_test_with_best_efforts_recovery: $(DB_STRESS_CMD)
blackbox_crash_test_with_ts: $(DB_STRESS_CMD) blackbox_crash_test_with_ts: $(DB_STRESS_CMD)
$(CRASHTEST_PY) --enable_ts blackbox $(CRASH_TEST_EXT_ARGS) $(CRASHTEST_PY) --enable_ts blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_multiops_wc_txn: $(DB_STRESS_CMD)
$(PYTHON) -u tools/db_crashtest.py --test_multiops_txn --write_policy write_committed blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_multiops_wp_txn: $(DB_STRESS_CMD)
$(PYTHON) -u tools/db_crashtest.py --test_multiops_txn --write_policy write_prepared blackbox $(CRASH_TEST_EXT_ARGS)
ifeq ($(CRASH_TEST_KILL_ODD),) ifeq ($(CRASH_TEST_KILL_ODD),)
CRASH_TEST_KILL_ODD=888887 CRASH_TEST_KILL_ODD=888887
endif endif

@ -2307,6 +2307,7 @@ class MemTableInserter : public WriteBatch::Handler {
assert(db_); assert(db_);
if (recovering_log_number_ != 0) { if (recovering_log_number_ != 0) {
db_->mutex()->AssertHeld();
// during recovery we rebuild a hollow transaction // during recovery we rebuild a hollow transaction
// from all encountered prepare sections of the wal // from all encountered prepare sections of the wal
if (db_->allow_2pc() == false) { if (db_->allow_2pc() == false) {
@ -2337,6 +2338,7 @@ class MemTableInserter : public WriteBatch::Handler {
assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0)); assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
if (recovering_log_number_ != 0) { if (recovering_log_number_ != 0) {
db_->mutex()->AssertHeld();
assert(db_->allow_2pc()); assert(db_->allow_2pc());
size_t batch_cnt = size_t batch_cnt =
write_after_commit_ write_after_commit_
@ -2357,6 +2359,9 @@ class MemTableInserter : public WriteBatch::Handler {
} }
Status MarkNoop(bool empty_batch) override { Status MarkNoop(bool empty_batch) override {
if (recovering_log_number_ != 0) {
db_->mutex()->AssertHeld();
}
// A hack in pessimistic transaction could result into a noop at the start // A hack in pessimistic transaction could result into a noop at the start
// of the write batch, that should be ignored. // of the write batch, that should be ignored.
if (!empty_batch) { if (!empty_batch) {

@ -115,6 +115,7 @@ DECLARE_int32(level0_stop_writes_trigger);
DECLARE_int32(block_size); DECLARE_int32(block_size);
DECLARE_int32(format_version); DECLARE_int32(format_version);
DECLARE_int32(index_block_restart_interval); DECLARE_int32(index_block_restart_interval);
DECLARE_bool(disable_auto_compactions);
DECLARE_int32(max_background_compactions); DECLARE_int32(max_background_compactions);
DECLARE_int32(num_bottom_pri_threads); DECLARE_int32(num_bottom_pri_threads);
DECLARE_int32(compaction_thread_pool_adjust_interval); DECLARE_int32(compaction_thread_pool_adjust_interval);
@ -274,6 +275,13 @@ DECLARE_int32(secondary_cache_fault_one_in);
DECLARE_int32(prepopulate_block_cache); DECLARE_int32(prepopulate_block_cache);
DECLARE_bool(two_write_queues);
#ifndef ROCKSDB_LITE
DECLARE_bool(use_only_the_last_commit_time_batch_for_recovery);
DECLARE_uint64(wp_snapshot_cache_bits);
DECLARE_uint64(wp_commit_cache_bits);
#endif // !ROCKSDB_LITE
constexpr long KB = 1024; constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3; constexpr int kRandomValueMaxFactor = 3;
constexpr int kValueMaxLen = 100; constexpr int kValueMaxLen = 100;

@ -225,6 +225,10 @@ DEFINE_int32(
"Number of keys between restart points " "Number of keys between restart points "
"for delta encoding of keys in index block."); "for delta encoding of keys in index block.");
DEFINE_bool(disable_auto_compactions,
ROCKSDB_NAMESPACE::Options().disable_auto_compactions,
"If true, RocksDB internally will not trigger compactions.");
DEFINE_int32(max_background_compactions, DEFINE_int32(max_background_compactions,
ROCKSDB_NAMESPACE::Options().max_background_compactions, ROCKSDB_NAMESPACE::Options().max_background_compactions,
"The maximum number of concurrent background compactions " "The maximum number of concurrent background compactions "
@ -890,4 +894,22 @@ DEFINE_int32(prepopulate_block_cache,
"Options related to cache warming (see `enum " "Options related to cache warming (see `enum "
"PrepopulateBlockCache` in table.h)"); "PrepopulateBlockCache` in table.h)");
DEFINE_bool(two_write_queues, false,
"Set to true to enable two write queues. Default: false");
#ifndef ROCKSDB_LITE
DEFINE_bool(use_only_the_last_commit_time_batch_for_recovery, false,
"If true, the commit-time write batch will not be immediately "
"inserted into the memtables. Default: false");
DEFINE_uint64(
wp_snapshot_cache_bits, 7ull,
"Number of bits to represent write-prepared transaction db's snapshot "
"cache. Default: 7 (128 entries)");
DEFINE_uint64(wp_commit_cache_bits, 23ull,
"Number of bits to represent write-prepared transaction db's "
"commit cache. Default: 23 (8M entries)");
#endif // !ROCKSDB_LITE
#endif // GFLAGS #endif // GFLAGS

@ -604,8 +604,11 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
} }
write_opts.disableWAL = FLAGS_disable_wal;
static std::atomic<uint64_t> txn_id = {0}; static std::atomic<uint64_t> txn_id = {0};
TransactionOptions txn_options; TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery =
FLAGS_use_only_the_last_commit_time_batch_for_recovery;
txn_options.lock_timeout = 600000; // 10 min txn_options.lock_timeout = 600000; // 10 min
txn_options.deadlock_detect = true; txn_options.deadlock_detect = true;
*txn = txn_db_->BeginTransaction(write_opts, txn_options); *txn = txn_db_->BeginTransaction(write_opts, txn_options);
@ -2153,6 +2156,28 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Format version : %d\n", FLAGS_format_version); fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n", fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false"); FLAGS_use_txn ? "true" : "false");
if (FLAGS_use_txn) {
#ifndef ROCKSDB_LITE
fprintf(stdout, "Two write queues: : %s\n",
FLAGS_two_write_queues ? "true" : "false");
fprintf(stdout, "Write policy : %d\n",
static_cast<int>(FLAGS_txn_write_policy));
if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) ==
FLAGS_txn_write_policy ||
static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) ==
FLAGS_txn_write_policy) {
fprintf(stdout, "Snapshot cache bits : %d\n",
static_cast<int>(FLAGS_wp_snapshot_cache_bits));
fprintf(stdout, "Commit cache bits : %d\n",
static_cast<int>(FLAGS_wp_commit_cache_bits));
}
fprintf(stdout, "last cwb for recovery : %s\n",
FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true"
: "false");
#endif // !ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
fprintf(stdout, "Stacked BlobDB : %s\n", fprintf(stdout, "Stacked BlobDB : %s\n",
FLAGS_use_blob_db ? "true" : "false"); FLAGS_use_blob_db ? "true" : "false");
@ -2316,6 +2341,7 @@ void StressTest::Open() {
options_.memtable_prefix_bloom_size_ratio = options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio; FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering; options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
options_.disable_auto_compactions = FLAGS_disable_auto_compactions;
options_.max_background_compactions = FLAGS_max_background_compactions; options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes; options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style = options_.compaction_style =
@ -2704,7 +2730,13 @@ void StressTest::Open() {
options_.unordered_write = true; options_.unordered_write = true;
options_.two_write_queues = true; options_.two_write_queues = true;
txn_db_options.skip_concurrency_control = true; txn_db_options.skip_concurrency_control = true;
} else {
options_.two_write_queues = FLAGS_two_write_queues;
} }
txn_db_options.wp_snapshot_cache_bits =
static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
txn_db_options.wp_commit_cache_bits =
static_cast<size_t>(FLAGS_wp_commit_cache_bits);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_); cf_descriptors, &column_families_, &txn_db_);
if (!s.ok()) { if (!s.ok()) {
@ -2762,7 +2794,11 @@ void StressTest::Open() {
exit(1); exit(1);
#endif #endif
} }
if (s.ok() && FLAGS_continuous_verification_interval > 0 && !cmp_db_) { // Secondary instance does not support write-prepared/write-unprepared
// transactions, thus just disable secondary instance if we use
// transaction.
if (s.ok() && FLAGS_continuous_verification_interval > 0 &&
!FLAGS_use_txn && !cmp_db_) {
Options tmp_opts; Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance. // TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1; tmp_opts.max_open_files = -1;

File diff suppressed because it is too large Load Diff

@ -196,7 +196,7 @@ class MultiOpsTxnsStressTest : public StressTest {
void FinishInitDb(SharedState*) override; void FinishInitDb(SharedState*) override;
void ReopenAndPreloadDb(SharedState* shared); void ReopenAndPreloadDbIfNeeded(SharedState* shared);
bool IsStateTracked() const override { return false; } bool IsStateTracked() const override { return false; }
@ -262,10 +262,10 @@ class MultiOpsTxnsStressTest : public StressTest {
const std::vector<int>& rand_column_families) override; const std::vector<int>& rand_column_families) override;
Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
uint32_t new_a); uint32_t old_a_pos, uint32_t new_a);
Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c, Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c,
uint32_t new_c); uint32_t old_c_pos, uint32_t new_c);
Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a, Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a,
uint32_t b_delta); uint32_t b_delta);
@ -276,16 +276,88 @@ class MultiOpsTxnsStressTest : public StressTest {
void VerifyDb(ThreadState* thread) const override; void VerifyDb(ThreadState* thread) const override;
void ContinuouslyVerifyDb(ThreadState* thread) const override {
VerifyDb(thread);
}
protected: protected:
uint32_t ChooseA(ThreadState* thread); using KeySet = std::set<uint32_t>;
class KeyGenerator {
public:
explicit KeyGenerator(uint32_t s, uint32_t low, uint32_t high,
KeySet&& existing_uniq, KeySet&& non_existing_uniq)
: rand_(s),
low_(low),
high_(high),
existing_uniq_(std::move(existing_uniq)),
non_existing_uniq_(std::move(non_existing_uniq)) {}
~KeyGenerator() {
assert(!existing_uniq_.empty());
assert(!non_existing_uniq_.empty());
}
void FinishInit();
std::pair<uint32_t, uint32_t> ChooseExisting();
void Replace(uint32_t old_val, uint32_t old_pos, uint32_t new_val);
uint32_t Allocate();
void UndoAllocation(uint32_t new_val);
std::string ToString() const {
std::ostringstream oss;
oss << "[" << low_ << ", " << high_ << "): " << existing_.size()
<< " elements, " << existing_uniq_.size() << " unique values, "
<< non_existing_uniq_.size() << " unique non-existing values";
return oss.str();
}
private:
Random rand_;
uint32_t low_ = 0;
uint32_t high_ = 0;
std::vector<uint32_t> existing_{};
KeySet existing_uniq_{};
KeySet non_existing_uniq_{};
bool initialized_ = false;
};
// Return <a, pos>
std::pair<uint32_t, uint32_t> ChooseExistingA(ThreadState* thread);
uint32_t GenerateNextA(ThreadState* thread);
// Return <c, pos>
std::pair<uint32_t, uint32_t> ChooseExistingC(ThreadState* thread);
uint32_t GenerateNextC(ThreadState* thread);
uint32_t GenerateNextA(); std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_;
private: private:
void PreloadDb(SharedState* shared, size_t num_c); struct KeySpaces {
uint32_t lb_a = 0;
uint32_t ub_a = 0;
uint32_t lb_c = 0;
uint32_t ub_c = 0;
explicit KeySpaces() = default;
explicit KeySpaces(uint32_t _lb_a, uint32_t _ub_a, uint32_t _lb_c,
uint32_t _ub_c)
: lb_a(_lb_a), ub_a(_ub_a), lb_c(_lb_c), ub_c(_ub_c) {}
std::string EncodeTo() const;
bool DecodeFrom(Slice data);
};
void PersistKeySpacesDesc(const std::string& key_spaces_path, uint32_t lb_a,
uint32_t ub_a, uint32_t lb_c, uint32_t ub_c);
KeySpaces ReadKeySpacesDesc(const std::string& key_spaces_path);
void PreloadDb(SharedState* shared, int threads, uint32_t lb_a, uint32_t ub_a,
uint32_t lb_c, uint32_t ub_c);
// TODO (yanqin) encapsulate the selection of keys a separate class. void ScanExistingDb(SharedState* shared, int threads);
std::atomic<uint32_t> next_a_{0};
}; };
class InvariantChecker { class InvariantChecker {

@ -226,8 +226,12 @@ struct TransactionDBOptions {
private: private:
// 128 entries // 128 entries
// Should the default value change, please also update wp_snapshot_cache_bits
// in db_stress_gflags.cc
size_t wp_snapshot_cache_bits = static_cast<size_t>(7); size_t wp_snapshot_cache_bits = static_cast<size_t>(7);
// 8m entry, 64MB size // 8m entry, 64MB size
// Should the default value change, please also update wp_commit_cache_bits
// in db_stress_gflags.cc
size_t wp_commit_cache_bits = static_cast<size_t>(23); size_t wp_commit_cache_bits = static_cast<size_t>(23);
// For testing, whether transaction name should be auto-generated or not. This // For testing, whether transaction name should be auto-generated or not. This
@ -239,6 +243,7 @@ struct TransactionDBOptions {
friend class WritePreparedTransactionTestBase; friend class WritePreparedTransactionTestBase;
friend class TransactionTestBase; friend class TransactionTestBase;
friend class MySQLStyleTransactionTest; friend class MySQLStyleTransactionTest;
friend class StressTest;
}; };
struct TransactionOptions { struct TransactionOptions {

@ -24,6 +24,10 @@ import argparse
# cf_consistency_params < args # cf_consistency_params < args
# for txn: # for txn:
# default_params < {blackbox,whitebox}_default_params < txn_params < args # default_params < {blackbox,whitebox}_default_params < txn_params < args
# for ts:
# default_params < {blackbox,whitebox}_default_params < ts_params < args
# for multiops_txn:
# default_params < {blackbox,whitebox}_default_params < multiops_txn_params < args
default_params = { default_params = {
@ -199,6 +203,21 @@ def setup_expected_values_dir():
os.mkdir(expected_values_dir) os.mkdir(expected_values_dir)
return expected_values_dir return expected_values_dir
multiops_txn_key_spaces_file = None
def setup_multiops_txn_key_spaces_file():
global multiops_txn_key_spaces_file
if multiops_txn_key_spaces_file is not None:
return multiops_txn_key_spaces_file
key_spaces_file_prefix = "rocksdb_crashtest_multiops_txn_key_spaces"
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
multiops_txn_key_spaces_file = tempfile.mkstemp(
prefix=key_spaces_file_prefix)[1]
else:
multiops_txn_key_spaces_file = tempfile.mkstemp(
prefix=key_spaces_file_prefix, dir=test_tmpdir)[1]
return multiops_txn_key_spaces_file
def is_direct_io_supported(dbname): def is_direct_io_supported(dbname):
with tempfile.NamedTemporaryFile(dir=dbname) as f: with tempfile.NamedTemporaryFile(dir=dbname) as f:
@ -323,6 +342,61 @@ ts_params = {
"use_block_based_filter": 0, "use_block_based_filter": 0,
} }
multiops_txn_default_params = {
"test_cf_consistency": 0,
"test_batches_snapshots": 0,
"test_multi_ops_txns": 1,
"use_txn": 1,
"two_write_queues": lambda: random.choice([0, 1]),
# TODO: enable write-prepared
"disable_wal": 0,
"use_only_the_last_commit_time_batch_for_recovery": lambda: random.choice([0, 1]),
"clear_column_family_one_in": 0,
"column_families": 1,
"enable_pipelined_write": lambda: random.choice([0, 1]),
# This test already acquires snapshots in reads
"acquire_snapshot_one_in": 0,
"backup_one_in": 0,
"writepercent": 0,
"delpercent": 0,
"delrangepercent": 0,
"customopspercent": 80,
"readpercent": 5,
"iterpercent": 15,
"prefixpercent": 0,
"verify_db_one_in": 1000,
"continuous_verification_interval": 1000,
"delay_snapshot_read_one_in": 3,
# 65536 is the smallest possible value for write_buffer_size. Smaller
# values will be sanitized to 65536 during db open. SetOptions currently
# does not sanitize options, but very small write_buffer_size may cause
# assertion failure in
# https://github.com/facebook/rocksdb/blob/7.0.fb/db/memtable.cc#L117.
"write_buffer_size": 65536,
# flush more frequently to generate more files, thus trigger more
# compactions.
"flush_one_in": 1000,
"key_spaces_path": setup_multiops_txn_key_spaces_file(),
}
multiops_wc_txn_params = {
"txn_write_policy": 0,
# TODO re-enable pipelined write. Not well tested atm
"enable_pipelined_write": 0,
}
multiops_wp_txn_params = {
"txn_write_policy": 1,
"wp_snapshot_cache_bits": 1,
# try small wp_commit_cache_bits, e.g. 0 once we explore storing full
# commit sequence numbers in commit cache
"wp_commit_cache_bits": 10,
# pipeline write is not currnetly compatible with WritePrepared txns
"enable_pipelined_write": 0,
# OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
"checkpoint_one_in": 0,
}
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v) dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()]) for (k, v) in src_params.items()])
@ -407,6 +481,8 @@ def finalize_and_sanitize(src_params):
if (dest_params.get("prefix_size") == -1 and if (dest_params.get("prefix_size") == -1 and
dest_params.get("memtable_whole_key_filtering") == 0): dest_params.get("memtable_whole_key_filtering") == 0):
dest_params["memtable_prefix_bloom_size_ratio"] = 0 dest_params["memtable_prefix_bloom_size_ratio"] = 0
if dest_params.get("two_write_queues") == 1:
dest_params["enable_pipelined_write"] = 0
return dest_params return dest_params
def gen_cmd_params(args): def gen_cmd_params(args):
@ -431,6 +507,12 @@ def gen_cmd_params(args):
params.update(best_efforts_recovery_params) params.update(best_efforts_recovery_params)
if args.enable_ts: if args.enable_ts:
params.update(ts_params) params.update(ts_params)
if args.test_multiops_txn:
params.update(multiops_txn_default_params)
if args.write_policy == 'write_committed':
params.update(multiops_wc_txn_params)
elif args.write_policy == 'write_prepared':
params.update(multiops_wp_txn_params)
# Best-effort recovery and BlobDB are currently incompatible. Test BE recovery # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery
# if specified on the command line; otherwise, apply BlobDB related overrides # if specified on the command line; otherwise, apply BlobDB related overrides
@ -453,7 +535,8 @@ def gen_cmd(params, unknown_params):
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'cf_consistency', 'txn', 'random_kill_odd', 'cf_consistency', 'txn',
'test_best_efforts_recovery', 'enable_ts', 'stress_cmd']) 'test_best_efforts_recovery', 'enable_ts',
'test_multiops_txn', 'write_policy', 'stress_cmd'])
and v is not None] + unknown_params and v is not None] + unknown_params
return cmd return cmd
@ -713,6 +796,8 @@ def main():
parser.add_argument("--txn", action='store_true') parser.add_argument("--txn", action='store_true')
parser.add_argument("--test_best_efforts_recovery", action='store_true') parser.add_argument("--test_best_efforts_recovery", action='store_true')
parser.add_argument("--enable_ts", action='store_true') parser.add_argument("--enable_ts", action='store_true')
parser.add_argument("--test_multiops_txn", action='store_true')
parser.add_argument("--write_policy", choices=["write_committed", "write_prepared"])
parser.add_argument("--stress_cmd") parser.add_argument("--stress_cmd")
all_params = dict(list(default_params.items()) all_params = dict(list(default_params.items())
@ -722,7 +807,10 @@ def main():
+ list(blackbox_simple_default_params.items()) + list(blackbox_simple_default_params.items())
+ list(whitebox_simple_default_params.items()) + list(whitebox_simple_default_params.items())
+ list(blob_params.items()) + list(blob_params.items())
+ list(ts_params.items())) + list(ts_params.items())
+ list(multiops_txn_default_params.items())
+ list(multiops_wc_txn_params.items())
+ list(multiops_wp_txn_params.items()))
for k, v in all_params.items(): for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v)) parser.add_argument("--" + k, type=type(v() if callable(v) else v))
@ -744,6 +832,8 @@ def main():
# Only delete the `expected_values_dir` if test passes # Only delete the `expected_values_dir` if test passes
if expected_values_dir is not None: if expected_values_dir is not None:
shutil.rmtree(expected_values_dir) shutil.rmtree(expected_values_dir)
if multiops_txn_key_spaces_file is not None:
os.remove(multiops_txn_key_spaces_file)
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save