Atomic Flush Crash Test also covers the case that WAL is enabled. (#5729)

Summary:
AtomicFlushStressTest is a powerful test, but right now we only run it for atomic_flush=true + disable_wal=true. We further extend it to the case where atomic_flush=false + disable_wal = false. All the workload generation and validation can stay the same.
Atomic flush crash test is also changed to switch between the two test scenarios. It makes the name "atomic flush crash test" out of sync from what it really does. We leave it as it is to avoid troubles with continous test set-up.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5729

Test Plan: Run "CRASH_TEST_KILL_ODD=188 TEST_TMPDIR=/dev/shm/ USE_CLANG=1 make whitebox_crash_test_with_atomic_flush", observe the settings used and see it passed.

Differential Revision: D16969791

fbshipit-source-id: 56e37487000ae631e31b0100acd7bdc441c04163
main
sdong 5 years ago committed by Facebook Github Bot
parent 202942b20c
commit d8a27d9331
  1. 4
      Makefile
  2. 20
      tools/db_crashtest.py
  3. 22
      tools/db_stress.cc

@ -950,7 +950,7 @@ blackbox_crash_test: db_stress
python -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS) python -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_atomic_flush: db_stress blackbox_crash_test_with_atomic_flush: db_stress
python -u tools/db_crashtest.py --enable_atomic_flush blackbox $(CRASH_TEST_EXT_ARGS) python -u tools/db_crashtest.py --cf_consistency blackbox $(CRASH_TEST_EXT_ARGS)
ifeq ($(CRASH_TEST_KILL_ODD),) ifeq ($(CRASH_TEST_KILL_ODD),)
CRASH_TEST_KILL_ODD=888887 CRASH_TEST_KILL_ODD=888887
@ -963,7 +963,7 @@ whitebox_crash_test: db_stress
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)
whitebox_crash_test_with_atomic_flush: db_stress whitebox_crash_test_with_atomic_flush: db_stress
python -u tools/db_crashtest.py --enable_atomic_flush whitebox --random_kill_odd \ python -u tools/db_crashtest.py --cf_consistency whitebox --random_kill_odd \
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)
asan_check: asan_check:

@ -16,9 +16,9 @@ import argparse
# default_params < {blackbox,whitebox}_default_params < # default_params < {blackbox,whitebox}_default_params <
# simple_default_params < # simple_default_params <
# {blackbox,whitebox}_simple_default_params < args # {blackbox,whitebox}_simple_default_params < args
# for enable_atomic_flush: # for cf_consistency:
# default_params < {blackbox,whitebox}_default_params < # default_params < {blackbox,whitebox}_default_params <
# atomic_flush_params < args # cf_consistency_params < args
expected_values_file = tempfile.NamedTemporaryFile() expected_values_file = tempfile.NamedTemporaryFile()
@ -132,10 +132,10 @@ blackbox_simple_default_params = {
whitebox_simple_default_params = {} whitebox_simple_default_params = {}
atomic_flush_params = { cf_consistency_params = {
"disable_wal": 1, "disable_wal": lambda: random.randint(0, 1),
"reopen": 0, "reopen": 0,
"test_atomic_flush": 1, "test_cf_consistency": 1,
# use small value for write_buffer_size so that RocksDB triggers flush # use small value for write_buffer_size so that RocksDB triggers flush
# more frequently # more frequently
"write_buffer_size": 1024 * 1024, "write_buffer_size": 1024 * 1024,
@ -160,6 +160,8 @@ def finalize_and_sanitize(src_params):
if dest_params.get("test_batches_snapshots") == 1: if dest_params.get("test_batches_snapshots") == 1:
dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0 dest_params["delrangepercent"] = 0
if dest_params.get("disable_wal", 0) == 1:
dest_params["atomic_flush"] = 1
return dest_params return dest_params
@ -177,8 +179,8 @@ def gen_cmd_params(args):
params.update(blackbox_simple_default_params) params.update(blackbox_simple_default_params)
if args.test_type == 'whitebox': if args.test_type == 'whitebox':
params.update(whitebox_simple_default_params) params.update(whitebox_simple_default_params)
if args.enable_atomic_flush: if args.cf_consistency:
params.update(atomic_flush_params) params.update(cf_consistency_params)
for k, v in vars(args).items(): for k, v in vars(args).items():
if v is not None: if v is not None:
@ -191,7 +193,7 @@ def gen_cmd(params, unknown_params):
'--{0}={1}'.format(k, v) '--{0}={1}'.format(k, v)
for k, v in finalize_and_sanitize(params).items() for k, v in finalize_and_sanitize(params).items()
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'enable_atomic_flush']) 'random_kill_odd', 'cf_consistency'])
and v is not None] + unknown_params and v is not None] + unknown_params
return cmd return cmd
@ -388,7 +390,7 @@ def main():
db_stress multiple times") db_stress multiple times")
parser.add_argument("test_type", choices=["blackbox", "whitebox"]) parser.add_argument("test_type", choices=["blackbox", "whitebox"])
parser.add_argument("--simple", action="store_true") parser.add_argument("--simple", action="store_true")
parser.add_argument("--enable_atomic_flush", action='store_true') parser.add_argument("--cf_consistency", action='store_true')
all_params = dict(default_params.items() all_params = dict(default_params.items()
+ blackbox_default_params.items() + blackbox_default_params.items()

@ -136,9 +136,10 @@ DEFINE_bool(test_batches_snapshots, false,
DEFINE_bool(atomic_flush, false, DEFINE_bool(atomic_flush, false,
"If set, enables atomic flush in the options.\n"); "If set, enables atomic flush in the options.\n");
DEFINE_bool(test_atomic_flush, false, DEFINE_bool(test_cf_consistency, false,
"If set, runs the stress test dedicated to verifying atomic flush " "If set, runs the stress test dedicated to verifying writes to "
"functionality. Setting this implies `atomic_flush=true`.\n"); "multiple column families are consistent. Setting this implies "
"`atomic_flush=true` is set true if `disable_wal=false`.\n");
DEFINE_int32(threads, 32, "Number of concurrent threads to run."); DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
@ -3950,11 +3951,11 @@ class BatchedOpsStressTest : public StressTest {
virtual void VerifyDb(ThreadState* /* thread */) const {} virtual void VerifyDb(ThreadState* /* thread */) const {}
}; };
class AtomicFlushStressTest : public StressTest { class CfConsistencyStressTest : public StressTest {
public: public:
AtomicFlushStressTest() : batch_id_(0) {} CfConsistencyStressTest() : batch_id_(0) {}
virtual ~AtomicFlushStressTest() {} virtual ~CfConsistencyStressTest() {}
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts, virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */, const ReadOptions& /* read_opts */,
@ -4048,7 +4049,7 @@ class AtomicFlushStressTest : public StressTest {
std::unique_ptr<MutexLock>& /* lock */) { std::unique_ptr<MutexLock>& /* lock */) {
assert(false); assert(false);
fprintf(stderr, fprintf(stderr,
"AtomicFlushStressTest does not support TestIngestExternalFile " "CfConsistencyStressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result\n"); "because it's not possible to verify the result\n");
std::terminate(); std::terminate();
} }
@ -4461,9 +4462,10 @@ int main(int argc, char** argv) {
"Error: clear_column_family_one_in must be 0 when using backup\n"); "Error: clear_column_family_one_in must be 0 when using backup\n");
exit(1); exit(1);
} }
if (FLAGS_test_atomic_flush) { if (FLAGS_test_cf_consistency && FLAGS_disable_wal) {
FLAGS_atomic_flush = true; FLAGS_atomic_flush = true;
} }
if (FLAGS_read_only) { if (FLAGS_read_only) {
if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 || if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
FLAGS_delrangepercent != 0) { FLAGS_delrangepercent != 0) {
@ -4507,8 +4509,8 @@ int main(int argc, char** argv) {
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
std::unique_ptr<rocksdb::StressTest> stress; std::unique_ptr<rocksdb::StressTest> stress;
if (FLAGS_test_atomic_flush) { if (FLAGS_test_cf_consistency) {
stress.reset(new rocksdb::AtomicFlushStressTest()); stress.reset(new rocksdb::CfConsistencyStressTest());
} else if (FLAGS_test_batches_snapshots) { } else if (FLAGS_test_batches_snapshots) {
stress.reset(new rocksdb::BatchedOpsStressTest()); stress.reset(new rocksdb::BatchedOpsStressTest());
} else { } else {

Loading…
Cancel
Save