Add stress test for best-efforts recovery (#6819)

Summary:
Add crash test for the case of best-efforts recovery.
After a certain amount of time, we kill the db_stress process, randomly delete some certain table files and restart db_stress. Given the randomness of file deletion, it is difficult to verify against a reference for data correctness. Therefore, we just check that the db can restart successfully.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6819

Test Plan:
```
./db_stress -best_efforts_recovery=true -disable_wal=1 -reopen=0
./db_stress -best_efforts_recovery=true -disable_wal=0 -skip_verifydb=1 -verify_db_one_in=0 -continuous_verification_interval=0
make crash_test_with_best_efforts_recovery
```

Reviewed By: anand1976

Differential Revision: D21436753

Pulled By: riversand963

fbshipit-source-id: 0b3605c922a16c37ed17d5ab6682ca4240e47926
main
Yanqin Jin 5 years ago committed by Facebook GitHub Bot
parent bacd6edcbe
commit 15d9f28da5
  1. 18
      Makefile
  2. 3
      db_stress_tool/db_stress_common.h
  3. 12
      db_stress_tool/db_stress_driver.cc
  4. 4
      db_stress_tool/db_stress_gflags.cc
  5. 4
      db_stress_tool/db_stress_test_base.cc
  6. 20
      db_stress_tool/db_stress_tool.cc
  7. 56
      tools/db_crashtest.py

@ -844,7 +844,8 @@ endif # PLATFORM_SHARED_EXT
dbg rocksdbjavastatic rocksdbjava install install-static install-shared uninstall \ dbg rocksdbjavastatic rocksdbjava install install-static install-shared uninstall \
analyze tools tools_lib \ analyze tools tools_lib \
blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \ blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \
blackbox_crash_test_with_txn whitebox_crash_test_with_txn blackbox_crash_test_with_txn whitebox_crash_test_with_txn \
blackbox_crash_test_with_best_efforts_recovery
all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS) all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS)
@ -1077,6 +1078,8 @@ crash_test_with_atomic_flush: whitebox_crash_test_with_atomic_flush blackbox_cra
crash_test_with_txn: whitebox_crash_test_with_txn blackbox_crash_test_with_txn crash_test_with_txn: whitebox_crash_test_with_txn blackbox_crash_test_with_txn
crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery
blackbox_crash_test: db_stress blackbox_crash_test: db_stress
$(PYTHON) -u tools/db_crashtest.py --simple blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py --simple blackbox $(CRASH_TEST_EXT_ARGS)
$(PYTHON) -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS)
@ -1087,6 +1090,9 @@ blackbox_crash_test_with_atomic_flush: db_stress
blackbox_crash_test_with_txn: db_stress blackbox_crash_test_with_txn: db_stress
$(PYTHON) -u tools/db_crashtest.py --txn blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py --txn blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_best_efforts_recovery: db_stress
$(PYTHON) -u tools/db_crashtest.py --test_best_efforts_recovery blackbox $(CRASH_TEST_EXT_ARGS)
ifeq ($(CRASH_TEST_KILL_ODD),) ifeq ($(CRASH_TEST_KILL_ODD),)
CRASH_TEST_KILL_ODD=888887 CRASH_TEST_KILL_ODD=888887
endif endif
@ -1125,6 +1131,11 @@ asan_crash_test_with_txn:
COMPILE_WITH_ASAN=1 $(MAKE) crash_test_with_txn COMPILE_WITH_ASAN=1 $(MAKE) crash_test_with_txn
$(MAKE) clean $(MAKE) clean
asan_crash_test_with_best_efforts_recovery:
$(MAKE) clean
COMPILE_WITH_ASAN=1 $(MAKE) crash_test_with_best_efforts_recovery
$(MAKE) clean
ubsan_check: ubsan_check:
$(MAKE) clean $(MAKE) clean
COMPILE_WITH_UBSAN=1 $(MAKE) check -j32 COMPILE_WITH_UBSAN=1 $(MAKE) check -j32
@ -1145,6 +1156,11 @@ ubsan_crash_test_with_txn:
COMPILE_WITH_UBSAN=1 $(MAKE) crash_test_with_txn COMPILE_WITH_UBSAN=1 $(MAKE) crash_test_with_txn
$(MAKE) clean $(MAKE) clean
ubsan_crash_test_with_best_efforts_recovery:
$(MAKE) clean
COMPILE_WITH_UBSAN=1 $(MAKE) crash_test_with_best_efforts_recovery
$(MAKE) clean
valgrind_test: valgrind_test:
ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check

@ -236,6 +236,9 @@ DECLARE_double(blob_db_gc_cutoff);
DECLARE_int32(approximate_size_one_in); DECLARE_int32(approximate_size_one_in);
DECLARE_bool(sync_fault_injection); DECLARE_bool(sync_fault_injection);
DECLARE_bool(best_efforts_recovery);
DECLARE_bool(skip_verifydb);
const long KB = 1024; const long KB = 1024;
const int kRandomValueMaxFactor = 3; const int kRandomValueMaxFactor = 3;
const int kValueMaxLen = 100; const int kValueMaxLen = 100;

@ -16,7 +16,7 @@ void ThreadBody(void* v) {
ThreadState* thread = reinterpret_cast<ThreadState*>(v); ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared; SharedState* shared = thread->shared;
if (shared->ShouldVerifyAtBeginning()) { if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread); thread->shared->GetStressTest()->VerifyDb(thread);
} }
{ {
@ -42,7 +42,9 @@ void ThreadBody(void* v) {
} }
} }
thread->shared->GetStressTest()->VerifyDb(thread); if (!FLAGS_skip_verifydb) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{ {
MutexLock l(shared->GetMutex()); MutexLock l(shared->GetMutex());
@ -118,6 +120,9 @@ bool RunStressTest(StressTest* stress) {
if (FLAGS_test_batches_snapshots) { if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n", fprintf(stdout, "%s Limited verification already done during gets\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str()); db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} else if (FLAGS_skip_verifydb) {
fprintf(stdout, "%s Verification skipped\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} else { } else {
fprintf(stdout, "%s Starting verification\n", fprintf(stdout, "%s Starting verification\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str()); db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
@ -140,7 +145,8 @@ bool RunStressTest(StressTest* stress) {
threads[i] = nullptr; threads[i] = nullptr;
} }
now = db_stress_env->NowMicros(); now = db_stress_env->NowMicros();
if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) { if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
!shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n", fprintf(stdout, "%s Verification successful\n",
db_stress_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
} }

@ -691,4 +691,8 @@ DEFINE_int32(read_fault_one_in, 1000,
DEFINE_bool(sync_fault_injection, false, DEFINE_bool(sync_fault_injection, false,
"If true, FaultInjectionTestFS will be used for write operations, " "If true, FaultInjectionTestFS will be used for write operations, "
" and unsynced data in DB will lost after crash."); " and unsynced data in DB will lost after crash.");
DEFINE_bool(best_efforts_recovery, false,
"If true, use best efforts recovery.");
DEFINE_bool(skip_verifydb, false, "If true, skip VerifyDb() calls.");
#endif // GFLAGS #endif // GFLAGS

@ -1729,6 +1729,8 @@ void StressTest::PrintEnv() const {
static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes)); static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in); fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery));
fprintf(stdout, "------------------------------------------------\n"); fprintf(stdout, "------------------------------------------------\n");
} }
@ -1913,6 +1915,8 @@ void StressTest::Open() {
options_.merge_operator = MergeOperators::CreatePutOperator(); options_.merge_operator = MergeOperators::CreatePutOperator();
} }
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
Status s; Status s;

@ -216,6 +216,26 @@ int db_stress_tool(int argc, char** argv) {
"Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n"); "Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n");
exit(1); exit(1);
} }
if (FLAGS_best_efforts_recovery && !FLAGS_skip_verifydb &&
!FLAGS_disable_wal) {
fprintf(stderr,
"With best-efforts recovery, either skip_verifydb or disable_wal "
"should be set to true.\n");
exit(1);
}
if (FLAGS_skip_verifydb) {
if (FLAGS_verify_db_one_in > 0) {
fprintf(stderr,
"Must set -verify_db_one_in=0 if skip_verifydb is true.\n");
exit(1);
}
if (FLAGS_continuous_verification_interval > 0) {
fprintf(stderr,
"Must set -continuous_verification_interval=0 if skip_verifydb "
"is true.\n");
exit(1);
}
}
rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_odds = FLAGS_kill_random_test;
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);

@ -6,6 +6,7 @@ import os
import sys import sys
import time import time
import random import random
import re
import tempfile import tempfile
import subprocess import subprocess
import shutil import shutil
@ -217,6 +218,13 @@ txn_params = {
"enable_pipelined_write": 0, "enable_pipelined_write": 0,
} }
best_efforts_recovery_params = {
"best_efforts_recovery": True,
"skip_verifydb": True,
"verify_db_one_in": 0,
"continuous_verification_interval": 0,
}
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v) dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()]) for (k, v) in src_params.items()])
@ -287,6 +295,8 @@ def gen_cmd_params(args):
params.update(cf_consistency_params) params.update(cf_consistency_params)
if args.txn: if args.txn:
params.update(txn_params) params.update(txn_params)
if args.test_best_efforts_recovery:
params.update(best_efforts_recovery_params)
for k, v in vars(args).items(): for k, v in vars(args).items():
if v is not None: if v is not None:
@ -300,11 +310,49 @@ def gen_cmd(params, unknown_params):
'--{0}={1}'.format(k, v) '--{0}={1}'.format(k, v)
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'cf_consistency', 'txn']) 'random_kill_odd', 'cf_consistency', 'txn',
'test_best_efforts_recovery'])
and v is not None] + unknown_params and v is not None] + unknown_params
return cmd return cmd
# Inject inconsistency to db directory.
def inject_inconsistencies_to_db_dir(dir_path):
files = os.listdir(dir_path)
file_num_rgx = re.compile(r'(?P<number>[0-9]{6})')
largest_fnum = 0
for f in files:
m = file_num_rgx.search(f)
if m and not f.startswith('LOG'):
largest_fnum = max(largest_fnum, int(m.group('number')))
candidates = [
f for f in files if re.search(r'[0-9]+\.sst', f)
]
deleted = 0
corrupted = 0
for f in candidates:
rnd = random.randint(0, 99)
f_path = os.path.join(dir_path, f)
if rnd < 10:
os.unlink(f_path)
deleted = deleted + 1
elif 10 <= rnd and rnd < 30:
with open(f_path, "a") as fd:
fd.write('12345678')
corrupted = corrupted + 1
print('Removed %d table files' % deleted)
print('Corrupted %d table files' % corrupted)
# Add corrupted MANIFEST and SST
for num in range(largest_fnum + 1, largest_fnum + 10):
rnd = random.randint(0, 1)
fname = ("MANIFEST-%06d" % num) if rnd == 0 else ("%06d.sst" % num)
print('Write %s' % fname)
with open(os.path.join(dir_path, fname), "w") as fd:
fd.write("garbage")
# This script runs and kills db_stress multiple times. It checks consistency # This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB. # in case of unsafe crashes in RocksDB.
def blackbox_crash_main(args, unknown_args): def blackbox_crash_main(args, unknown_args):
@ -360,6 +408,11 @@ def blackbox_crash_main(args, unknown_args):
time.sleep(1) # time to stabilize before the next run time.sleep(1) # time to stabilize before the next run
if args.test_best_efforts_recovery:
inject_inconsistencies_to_db_dir(dbname)
time.sleep(1) # time to stabilize before the next run
# we need to clean up after ourselves -- only do this on test success # we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True) shutil.rmtree(dbname, True)
@ -510,6 +563,7 @@ def main():
parser.add_argument("--simple", action="store_true") parser.add_argument("--simple", action="store_true")
parser.add_argument("--cf_consistency", action='store_true') parser.add_argument("--cf_consistency", action='store_true')
parser.add_argument("--txn", action='store_true') parser.add_argument("--txn", action='store_true')
parser.add_argument("--test_best_efforts_recovery", action='store_true')
all_params = dict(list(default_params.items()) all_params = dict(list(default_params.items())
+ list(blackbox_default_params.items()) + list(blackbox_default_params.items())

Loading…
Cancel
Save