From 5c19a441c42fe516e3b9685a7d9b30d2a9234065 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 10 Apr 2020 17:18:56 -0700 Subject: [PATCH] Fault injection in db_stress (#6538) Summary: This PR implements a fault injection mechanism for injecting errors in reads in db_stress. The FaultInjectionTestFS is used for this purpose. A thread local structure is used to track the errors, so that each db_stress thread can independently enable/disable error injection and verify observed errors against expected errors. This is initially enabled only for Get and MultiGet, but can be extended to iterator as well once its proven stable. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6538 Test Plan: crash_test make check Reviewed By: riversand963 Differential Revision: D20714347 Pulled By: anand1976 fbshipit-source-id: d7598321d4a2d72bda0ced57411a337a91d87dc7 --- Makefile | 5 + TARGETS | 5 +- build_tools/build_detect_platform | 4 + db_stress_tool/db_stress_common.cc | 5 + db_stress_tool/db_stress_common.h | 10 +- db_stress_tool/db_stress_gflags.cc | 3 + db_stress_tool/db_stress_shared_state.cc | 9 ++ db_stress_tool/db_stress_shared_state.h | 39 +++++- db_stress_tool/db_stress_stat.h | 7 + db_stress_tool/db_stress_test_base.cc | 7 + db_stress_tool/db_stress_tool.cc | 15 +++ db_stress_tool/no_batched_ops_stress.cc | 88 +++++++++++-- port/stack_trace.cc | 36 +++++- port/stack_trace.h | 6 + table/block_based/block_based_table_reader.cc | 5 + table/block_based/partitioned_filter_block.cc | 13 ++ test_util/fault_injection_test_fs.cc | 112 +++++++++++++++- test_util/fault_injection_test_fs.h | 120 +++++++++++++++++- tools/db_crashtest.py | 3 +- 19 files changed, 469 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index b7feaa017..095ef87a5 100644 --- a/Makefile +++ b/Makefile @@ -441,7 +441,12 @@ BENCHTOOLOBJECTS = $(BENCH_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) ANALYZETOOLOBJECTS = $(ANALYZER_LIB_SOURCES:.cc=.o) +ifeq ($(DEBUG_LEVEL),0) STRESSTOOLOBJECTS = $(STRESS_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) +else +STRESSTOOLOBJECTS = $(STRESS_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) \ + $(TESTHARNESS) +endif EXPOBJECTS = $(LIBOBJECTS) $(TESTUTIL) diff --git a/TARGETS b/TARGETS index 3943ad313..f9b998c05 100644 --- a/TARGETS +++ b/TARGETS @@ -94,6 +94,9 @@ is_opt_mode = build_mode.startswith("opt") # -DNDEBUG is added by default in opt mode in fbcode. But adding it twice # doesn't harm and avoid forgetting to add it. ROCKSDB_COMPILER_FLAGS += (["-DNDEBUG"] if is_opt_mode else []) +ROCKSDB_STRESS_DEPS = ( + [":rocksdb_lib", ":rocksdb_test_lib"] if not is_opt_mode else [":rocksdb_lib"] +) sanitizer = read_config("fbcode", "sanitizer") @@ -436,7 +439,7 @@ cpp_library( os_deps = ROCKSDB_OS_DEPS, os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS, preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS, - deps = [":rocksdb_lib"], + deps = ROCKSDB_STRESS_DEPS, external_deps = ROCKSDB_EXTERNAL_DEPS, ) diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 7680aceab..f7250e9f5 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -246,6 +246,10 @@ JAVAC_ARGS="-source 7" if [ "$CROSS_COMPILE" = "true" -o "$FBCODE_BUILD" = "true" ]; then # Cross-compiling; do not try any compilation tests. # Also don't need any compilation tests if compiling on fbcode + if [ "$FBCODE_BUILD" = "true" ]; then + # Enable backtrace on fbcode since the necessary libraries are present + COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_BACKTRACE" + fi true else if ! test $ROCKSDB_DISABLE_FALLOCATE; then diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 953cfe505..f86d75644 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -13,6 +13,11 @@ #include ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr; +#ifndef NDEBUG +// If non-null, injects read error at a rate specified by the +// read_fault_one_in flag +std::shared_ptr fault_fs_guard; +#endif // NDEBUG enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 135a613eb..7f23062c7 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -58,6 +58,9 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/write_batch.h" +#ifndef NDEBUG +#include "test_util/fault_injection_test_fs.h" +#endif #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -66,10 +69,6 @@ #include "util/random.h" #include "util/string_util.h" #include "utilities/blob_db/blob_db.h" -// SyncPoint is not supported in Released Windows Mode. -#if !(defined NDEBUG) || !defined(OS_WIN) -#include "test_util/sync_point.h" -#endif // !(defined NDEBUG) || !defined(OS_WIN) #include "test_util/testutil.h" #include "utilities/merge_operators.h" @@ -237,6 +236,9 @@ const int kValueMaxLen = 100; // wrapped posix or hdfs environment extern ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env; +#ifndef NDEBUG +extern std::shared_ptr fault_fs_guard; +#endif extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e; extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 48b50a9a1..f1c10b0a1 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -671,4 +671,7 @@ DEFINE_int32(continuous_verification_interval, 1000, DEFINE_int32(approximate_size_one_in, 64, "If non-zero, DB::GetApproximateSizes() will be called against" " random key ranges."); + +DEFINE_int32(read_fault_one_in, 1000, + "On non-zero, enables fault injection on read"); #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.cc b/db_stress_tool/db_stress_shared_state.cc index d25a47b1b..d64a4edd4 100644 --- a/db_stress_tool/db_stress_shared_state.cc +++ b/db_stress_tool/db_stress_shared_state.cc @@ -14,5 +14,14 @@ namespace ROCKSDB_NAMESPACE { const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; +#if defined(ROCKSDB_SUPPORT_THREAD_LOCAL) +#if defined(OS_SOLARIS) +__thread bool SharedState::filter_read_error; +#else +thread_local bool SharedState::filter_read_error; +#endif // OS_SOLARIS +#else +bool SharedState::filter_read_error; +#endif // ROCKSDB_SUPPORT_THREAD_LOCAL } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index b68670b58..e7a3853eb 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -11,6 +11,10 @@ #pragma once #include "db_stress_tool/db_stress_stat.h" +// SyncPoint is not supported in Released Windows Mode. +#if !(defined NDEBUG) || !defined(OS_WIN) +#include "test_util/sync_point.h" +#endif // !(defined NDEBUG) || !defined(OS_WIN) #include "util/gflags_compat.h" DECLARE_uint64(seed); @@ -24,6 +28,7 @@ DECLARE_int32(clear_column_family_one_in); DECLARE_bool(test_batches_snapshots); DECLARE_int32(compaction_thread_pool_adjust_interval); DECLARE_int32(continuous_verification_interval); +DECLARE_int32(read_fault_one_in); namespace ROCKSDB_NAMESPACE { class StressTest; @@ -37,6 +42,20 @@ class SharedState { // indicates a key should definitely be deleted static const uint32_t DELETION_SENTINEL; + // Errors when reading filter blocks are ignored, so we use a thread + // local variable updated via sync points to keep track of errors injected + // while reading filter blocks in order to ignore the Get/MultiGet result + // for those calls +#if defined(ROCKSDB_SUPPORT_THREAD_LOCAL) +#if defined(OS_SOLARIS) + static __thread bool filter_read_error; +#else + static thread_local bool filter_read_error; +#endif // OS_SOLARIS +#else + static bool filter_read_error; +#endif // ROCKSDB_SUPPORT_THREAD_LOCAL + SharedState(Env* env, StressTest* stress_test) : cv_(&mu_), seed_(static_cast(FLAGS_seed)), @@ -171,9 +190,23 @@ class SharedState { ++num_bg_threads_; fprintf(stdout, "Starting continuous_verification_thread\n"); } +#ifndef NDEBUG + if (FLAGS_read_fault_one_in) { + SyncPoint::GetInstance()->SetCallBack("FilterReadError", + FilterReadErrorCallback); + SyncPoint::GetInstance()->EnableProcessing(); + } +#endif // NDEBUG } - ~SharedState() {} + ~SharedState() { +#ifndef NDEBUG + if (FLAGS_read_fault_one_in) { + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + } +#endif + } port::Mutex* GetMutex() { return &mu_; } @@ -329,6 +362,10 @@ class SharedState { } private: + static void FilterReadErrorCallback(void*) { + filter_read_error = true; + } + port::Mutex mu_; port::CondVar cv_; const uint32_t seed_; diff --git a/db_stress_tool/db_stress_stat.h b/db_stress_tool/db_stress_stat.h index a38c87334..a0accc4d9 100644 --- a/db_stress_tool/db_stress_stat.h +++ b/db_stress_tool/db_stress_stat.h @@ -42,6 +42,7 @@ class Stats { long range_deletions_; long covered_by_range_deletions_; long errors_; + long verified_errors_; long num_compact_files_succeed_; long num_compact_files_failed_; int next_report_; @@ -67,6 +68,7 @@ class Stats { range_deletions_ = 0; covered_by_range_deletions_ = 0; errors_ = 0; + verified_errors_ = 0; bytes_ = 0; seconds_ = 0; num_compact_files_succeed_ = 0; @@ -90,6 +92,7 @@ class Stats { range_deletions_ += other.range_deletions_; covered_by_range_deletions_ = other.covered_by_range_deletions_; errors_ += other.errors_; + verified_errors_ += other.verified_errors_; bytes_ += other.bytes_; seconds_ += other.seconds_; num_compact_files_succeed_ += other.num_compact_files_succeed_; @@ -163,6 +166,8 @@ class Stats { void AddErrors(long n) { errors_ += n; } + void AddVerifiedErrors(long n) { verified_errors_ += n; } + void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; } void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; } @@ -199,6 +204,8 @@ class Stats { covered_by_range_deletions_); fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); + fprintf(stdout, "%-12s: Got expected errors %ld times\n", "", + verified_errors_); fprintf(stdout, "%-12s: %ld CompactFiles() succeed\n", "", num_compact_files_succeed_); fprintf(stdout, "%-12s: %ld CompactFiles() did not succeed\n", "", diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 9f5de4dcd..68e3102a7 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -502,6 +502,12 @@ void StressTest::OperateDb(ThreadState* thread) { const int delRangeBound = delBound + static_cast(FLAGS_delrangepercent); const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); +#ifndef NDEBUG + if (FLAGS_read_fault_one_in) { + fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), + FLAGS_read_fault_one_in); + } +#endif // NDEBUG thread->stats.Start(); for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { if (thread->shared->HasVerificationFailedYet() || @@ -1721,6 +1727,7 @@ void StressTest::PrintEnv() const { FLAGS_max_write_batch_group_size_bytes); fprintf(stdout, "Use dynamic level : %d\n", static_cast(FLAGS_level_compaction_dynamic_level_bytes)); + fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in); fprintf(stdout, "------------------------------------------------\n"); } diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index ddb4c340d..9f9080336 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -23,11 +23,15 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_driver.h" +#ifndef NDEBUG +#include "test_util/fault_injection_test_fs.h" +#endif namespace ROCKSDB_NAMESPACE { namespace { static std::shared_ptr env_guard; static std::shared_ptr env_wrapper_guard; +static std::shared_ptr fault_env_guard; } // namespace KeyGenContext key_gen_ctx; @@ -69,6 +73,17 @@ int db_stress_tool(int argc, char** argv) { } else { raw_env = Env::Default(); } +#ifndef NDEBUG + if (FLAGS_read_fault_one_in) { + FaultInjectionTestFS* fs = + new FaultInjectionTestFS(raw_env->GetFileSystem()); + fault_fs_guard.reset(fs); + fault_fs_guard->SetFilesystemDirectWritable(true); + fault_env_guard = + std::make_shared(raw_env, fault_fs_guard); + raw_env = fault_env_guard.get(); + } +#endif env_wrapper_guard = std::make_shared(raw_env); db_stress_env = env_wrapper_guard.get(); diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 2e6734f21..ed5586162 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -9,6 +9,9 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#ifndef NDEBUG +#include "test_util/fault_injection_test_fs.h" +#endif // NDEBUG namespace ROCKSDB_NAMESPACE { class NonBatchedOpsStressTest : public StressTest { @@ -144,18 +147,52 @@ class NonBatchedOpsStressTest : public StressTest { std::string key_str = Key(rand_keys[0]); Slice key = key_str; std::string from_db; + int error_count = 0; + +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::filter_read_error = false; + } +#endif // NDEBUG Status s = db_->Get(read_opts, cfh, key, &from_db); +#ifndef NDEBUG + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } +#endif // NDEBUG if (s.ok()) { +#ifndef NDEBUG + if (fault_fs_guard) { + if (error_count && !SharedState::filter_read_error) { + // Grab mutex so multiple thread don't try to print the + // stack trace at the same time + MutexLock l(thread->shared->GetMutex()); + fprintf(stderr, "Didn't get expected error from Get\n"); + fprintf(stderr, "Callstack that injected the error\n"); + fault_fs_guard->PrintFaultBacktrace(); + std::terminate(); + } + } +#endif // NDEBUG // found case thread->stats.AddGets(1, 1); } else if (s.IsNotFound()) { // not found case thread->stats.AddGets(1, 0); } else { - // errors case - fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); + if (error_count == 0) { + // errors case + thread->stats.AddErrors(1); + } else { + thread->stats.AddVerifiedErrors(1); + } } +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); + } +#endif // NDEBUG return s; } @@ -171,6 +208,7 @@ class NonBatchedOpsStressTest : public StressTest { std::vector values(num_keys); std::vector statuses(num_keys); ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + int error_count = 0; // To appease clang analyzer const bool use_txn = FLAGS_use_txn; @@ -231,8 +269,19 @@ class NonBatchedOpsStressTest : public StressTest { } if (!use_txn) { +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::filter_read_error = false; + } +#endif // NDEBUG db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data()); +#ifndef NDEBUG + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } +#endif // NDEBUG } else { #ifndef ROCKSDB_LITE txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), @@ -243,8 +292,22 @@ class NonBatchedOpsStressTest : public StressTest { for (const auto& s : statuses) { if (s.ok()) { - // found case - thread->stats.AddGets(1, 1); +#ifndef NDEBUG + if (fault_fs_guard && error_count && !SharedState::filter_read_error) { + // Grab mutex so multiple thread don't try to print the + // stack trace at the same time + MutexLock l(thread->shared->GetMutex()); + fprintf(stderr, "Didn't get expected error from MultiGet\n"); + fprintf(stderr, "Callstack that injected the error\n"); + fault_fs_guard->PrintFaultBacktrace(); + std::terminate(); + } else { +#endif // NDEBUG + // found case + thread->stats.AddGets(1, 1); +#ifndef NDEBUG + } +#endif // NDEBUG } else if (s.IsNotFound()) { // not found case thread->stats.AddGets(1, 0); @@ -252,11 +315,20 @@ class NonBatchedOpsStressTest : public StressTest { // With txn this is sometimes expected. thread->stats.AddGets(1, 1); } else { - // errors case - fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); + if (error_count == 0) { + // errors case + fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddVerifiedErrors(1); + } } } +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); + } +#endif // NDEBUG return statuses; } diff --git a/port/stack_trace.cc b/port/stack_trace.cc index 32eb45bfb..d3975c686 100644 --- a/port/stack_trace.cc +++ b/port/stack_trace.cc @@ -14,6 +14,10 @@ namespace ROCKSDB_NAMESPACE { namespace port { void InstallStackTraceHandler() {} void PrintStack(int /*first_frames_to_skip*/) {} +void PrintAndFreeStack(void* /*callstack*/, int /*num_frames*/) {} +void* SaveStack(int* /*num_frames*/, int /*first_frames_to_skip*/) { + return nullptr; +} } // namespace port } // namespace ROCKSDB_NAMESPACE @@ -99,18 +103,38 @@ void PrintStackTraceLine(const char* symbol, void* frame) { } // namespace +void PrintStack(void* frames[], int num_frames) { + auto symbols = backtrace_symbols(frames, num_frames); + + for (int i = 0; i < num_frames; ++i) { + fprintf(stderr, "#%-2d ", i); + PrintStackTraceLine((symbols != nullptr) ? symbols[i] : nullptr, frames[i]); + } + free(symbols); +} + void PrintStack(int first_frames_to_skip) { const int kMaxFrames = 100; void* frames[kMaxFrames]; auto num_frames = backtrace(frames, kMaxFrames); - auto symbols = backtrace_symbols(frames, num_frames); + PrintStack(&frames[first_frames_to_skip], num_frames - first_frames_to_skip); +} - for (int i = first_frames_to_skip; i < num_frames; ++i) { - fprintf(stderr, "#%-2d ", i - first_frames_to_skip); - PrintStackTraceLine((symbols != nullptr) ? symbols[i] : nullptr, frames[i]); - } - free(symbols); +void PrintAndFreeStack(void* callstack, int num_frames) { + PrintStack(static_cast(callstack), num_frames); + free(callstack); +} + +void* SaveStack(int* num_frames, int first_frames_to_skip) { + const int kMaxFrames = 100; + void* frames[kMaxFrames]; + + auto count = backtrace(frames, kMaxFrames); + *num_frames = count - first_frames_to_skip; + void* callstack = malloc(sizeof(void*) * *num_frames); + memcpy(callstack, &frames[first_frames_to_skip], sizeof(void*) * *num_frames); + return callstack; } static void StackTraceHandler(int sig) { diff --git a/port/stack_trace.h b/port/stack_trace.h index 4924f5b1b..b3474ca3e 100644 --- a/port/stack_trace.h +++ b/port/stack_trace.h @@ -18,5 +18,11 @@ void InstallStackTraceHandler(); // Prints stack, skips skip_first_frames frames void PrintStack(int first_frames_to_skip = 0); +// Prints the given callstack +void PrintAndFreeStack(void* callstack, int num_frames); + +// Save the current callstack +void* SaveStack(int* num_frame, int first_frames_to_skip = 0); + } // namespace port } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 01557231b..f6a5068fa 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -994,6 +994,11 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( auto filter = new_table->CreateFilterBlockReader( prefetch_buffer, use_cache, prefetch_filter, pin_filter, lookup_context); +#ifndef NDEBUG + if (rep_->filter_type != Rep::FilterType::kNoFilter && !filter) { + TEST_SYNC_POINT("FilterReadError"); + } +#endif if (filter) { // Refer to the comment above about paritioned indexes always being cached if (prefetch_all) { diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index d0e79563b..04ce2c439 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -254,6 +254,7 @@ bool PartitionedFilterBlockReader::MayMatch( Status s = GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block); if (UNLIKELY(!s.ok())) { + TEST_SYNC_POINT("FilterReadError"); return true; } @@ -271,6 +272,7 @@ bool PartitionedFilterBlockReader::MayMatch( no_io, get_context, lookup_context, &filter_partition_block); if (UNLIKELY(!s.ok())) { + TEST_SYNC_POINT("FilterReadError"); return true; } @@ -310,6 +312,7 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { "Error retrieving top-level filter block while trying to " "cache filter partitions: %s", s.ToString().c_str()); + TEST_SYNC_POINT("FilterReadError"); return; } @@ -340,6 +343,11 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { prefetch_buffer.reset(new FilePrefetchBuffer()); s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, static_cast(prefetch_len)); +#ifndef NDEBUG + if (!s.ok()) { + TEST_SYNC_POINT("FilterReadError"); + } +#endif // After prefetch, read the partitions one by one ReadOptions read_options; @@ -362,6 +370,11 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { } } } +#ifndef NDEBUG + if (!s.ok()) { + TEST_SYNC_POINT("FilterReadError"); + } +#endif } } diff --git a/test_util/fault_injection_test_fs.cc b/test_util/fault_injection_test_fs.cc index d2106ce7b..57848645e 100644 --- a/test_util/fault_injection_test_fs.cc +++ b/test_util/fault_injection_test_fs.cc @@ -17,6 +17,8 @@ #include "test_util/fault_injection_test_fs.h" #include #include +#include "port/stack_trace.h" +#include "util/util.h" namespace ROCKSDB_NAMESPACE { @@ -195,6 +197,27 @@ IOStatus TestFSRandomRWFile::Sync(const IOOptions& options, return target_->Sync(options, dbg); } +TestFSRandomAccessFile::TestFSRandomAccessFile(const std::string& /*fname*/, + std::unique_ptr&& f, + FaultInjectionTestFS* fs) + : target_(std::move(f)), fs_(fs) { + assert(target_ != nullptr); +} + +IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n, + const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) const { + if (!fs_->IsFilesystemActive()) { + return fs_->GetError(); + } + IOStatus s = target_->Read(offset, n, options, result, scratch, dbg); + if (s.ok()) { + s = fs_->InjectError(FaultInjectionTestFS::ErrorOperation::kRead, result, + scratch); + } + return s; +} + IOStatus FaultInjectionTestFS::NewDirectory( const std::string& name, const IOOptions& options, std::unique_ptr* result, IODebugContext* dbg) { @@ -215,6 +238,9 @@ IOStatus FaultInjectionTestFS::NewWritableFile( if (!IsFilesystemActive()) { return GetError(); } + if (IsFilesystemDirectWritable()) { + return target()->NewWritableFile(fname, file_opts, result, dbg); + } // Not allow overwriting files IOStatus io_s = target()->FileExists(fname, IOOptions(), dbg); if (io_s.ok()) { @@ -244,6 +270,9 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile( if (!IsFilesystemActive()) { return GetError(); } + if (IsFilesystemDirectWritable()) { + return target()->ReopenWritableFile(fname, file_opts, result, dbg); + } IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg); if (io_s.ok()) { result->reset(new TestFSWritableFile(fname, std::move(*result), this)); @@ -265,6 +294,9 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile( if (!IsFilesystemActive()) { return GetError(); } + if (IsFilesystemDirectWritable()) { + return target()->NewRandomRWFile(fname, file_opts, result, dbg); + } IOStatus io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg); if (io_s.ok()) { result->reset(new TestFSRandomRWFile(fname, std::move(*result), this)); @@ -286,7 +318,14 @@ IOStatus FaultInjectionTestFS::NewRandomAccessFile( if (!IsFilesystemActive()) { return GetError(); } - return target()->NewRandomAccessFile(fname, file_opts, result, dbg); + IOStatus io_s = InjectError(ErrorOperation::kOpen, nullptr, nullptr); + if (io_s.ok()) { + io_s = target()->NewRandomAccessFile(fname, file_opts, result, dbg); + } + if (io_s.ok()) { + result->reset(new TestFSRandomAccessFile(fname, std::move(*result), this)); + } + return io_s; } IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f, @@ -427,4 +466,75 @@ void FaultInjectionTestFS::UntrackFile(const std::string& f) { open_files_.erase(f); } +IOStatus FaultInjectionTestFS::InjectError(ErrorOperation op, + Slice* result, + char* scratch) { + ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in) { + return IOStatus::OK(); + } + + if (ctx->rand.OneIn(ctx->one_in)) { + ctx->count++; + ctx->callstack = port::SaveStack(&ctx->frames); + switch (op) { + case kRead: + { + uint32_t type = ctx->rand.Uniform(3); + switch (type) { + // Inject IO error + case 0: + return IOStatus::IOError(); + // Inject random corruption + case 1: + { + if (result->data() == scratch) { + uint64_t offset = ctx->rand.Uniform((uint32_t)result->size()); + uint64_t len = + std::min(result->size() - offset, 64UL); + assert(offset < result->size()); + assert(offset + len <= result->size()); + std::string str = DBTestBase::RandomString(&ctx->rand, + static_cast(len)); + memcpy(scratch + offset, str.data(), len); + break; + } else { + FALLTHROUGH_INTENDED; + } + } + // Truncate the result + case 2: + { + assert(result->size() > 0); + uint64_t offset = ctx->rand.Uniform((uint32_t)result->size()); + assert(offset < result->size()); + *result = Slice(result->data(), offset); + break; + } + default: + assert(false); + } + break; + } + case kOpen: + return IOStatus::IOError(); + default: + assert(false); + } + } + return IOStatus::OK(); +} + +void FaultInjectionTestFS::PrintFaultBacktrace() { +#if defined(OS_LINUX) + ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + if (ctx == nullptr) { + return; + } + port::PrintAndFreeStack(ctx->callstack, ctx->frames); +#endif +} + } // namespace ROCKSDB_NAMESPACE diff --git a/test_util/fault_injection_test_fs.h b/test_util/fault_injection_test_fs.h index 3c0c53007..353cbda50 100644 --- a/test_util/fault_injection_test_fs.h +++ b/test_util/fault_injection_test_fs.h @@ -16,10 +16,12 @@ #pragma once +#include #include #include #include +#include "db/db_test_util.h" #include "db/version_set.h" #include "env/mock_env.h" #include "file/filename.h" @@ -80,6 +82,9 @@ class TestFSWritableFile : public FSWritableFile { IODebugContext* dbg) override { return target_->PositionedAppend(data, offset, options, dbg); } + virtual size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } virtual bool use_direct_io() const override { return target_->use_direct_io(); }; @@ -119,6 +124,25 @@ class TestFSRandomRWFile : public FSRandomRWFile { FaultInjectionTestFS* fs_; }; +class TestFSRandomAccessFile : public FSRandomAccessFile { + public: + explicit TestFSRandomAccessFile(const std::string& fname, + std::unique_ptr&& f, + FaultInjectionTestFS* fs); + ~TestFSRandomAccessFile() override {} + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override; + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + bool use_direct_io() const override { return target_->use_direct_io(); } + + private: + std::unique_ptr target_; + FaultInjectionTestFS* fs_; +}; + class TestFSDirectory : public FSDirectory { public: explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname, @@ -138,7 +162,10 @@ class TestFSDirectory : public FSDirectory { class FaultInjectionTestFS : public FileSystemWrapper { public: explicit FaultInjectionTestFS(std::shared_ptr base) - : FileSystemWrapper(base), filesystem_active_(true) {} + : FileSystemWrapper(base), + filesystem_active_(true), + filesystem_writable_(false), + thread_local_error_(new ThreadLocalPtr(nullptr)) {} virtual ~FaultInjectionTestFS() {} const char* Name() const override { return "FaultInjectionTestFS"; } @@ -217,6 +244,14 @@ class FaultInjectionTestFS : public FileSystemWrapper { MutexLock l(&mutex_); return filesystem_active_; } + + // Setting filesystem_writable_ makes NewWritableFile. ReopenWritableFile, + // and NewRandomRWFile bypass FaultInjectionTestFS and go directly to the + // target FS + bool IsFilesystemDirectWritable() { + MutexLock l(&mutex_); + return filesystem_writable_; + } void SetFilesystemActiveNoLock( bool active, IOStatus error = IOStatus::Corruption("Not active")) { filesystem_active_ = active; @@ -229,6 +264,11 @@ class FaultInjectionTestFS : public FileSystemWrapper { MutexLock l(&mutex_); SetFilesystemActiveNoLock(active, error); } + void SetFilesystemDirectWritable( + bool writable) { + MutexLock l(&mutex_); + filesystem_writable_ = writable; + } void AssertNoOpenFile() { assert(open_files_.empty()); } IOStatus GetError() { return error_; } @@ -238,6 +278,66 @@ class FaultInjectionTestFS : public FileSystemWrapper { error_ = io_error; } + // Specify what the operation, so we can inject the right type of error + enum ErrorOperation : char { + kRead = 0, + kOpen, + }; + + // Set thread-local parameters for error injection. The first argument, + // seed is the seed for the random number generator, and one_in determines + // the probability of injecting error (i.e an error is injected with + // 1/one_in probability) + void SetThreadLocalReadErrorContext(uint32_t seed, int one_in) { + struct ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + if (ctx == nullptr) { + ctx = new ErrorContext(seed); + thread_local_error_->Reset(ctx); + } + ctx->one_in = one_in; + ctx->count = 0; + } + + // Inject an error. For a READ operation, a status of IOError(), a + // corruption in the contents of scratch, or truncation of slice + // are the types of error with equal probability. For OPEN, + // its always an IOError. + IOStatus InjectError(ErrorOperation op, Slice* slice, char* scratch); + + // Get the count of how many times we injected since the previous call + int GetAndResetErrorCount() { + ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + int count = 0; + if (ctx != nullptr) { + count = ctx->count; + ctx->count = 0; + } + return count; + } + + void EnableErrorInjection() { + ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + if (ctx) { + ctx->enable_error_injection = true; + } + } + + void DisableErrorInjection() { + ErrorContext* ctx = + static_cast(thread_local_error_->Get()); + if (ctx) { + ctx->enable_error_injection = false; + } + } + + // We capture a backtrace every time a fault is injected, for debugging + // purposes. This call prints the backtrace to stderr and frees the + // saved callstack + void PrintFaultBacktrace(); + private: port::Mutex mutex_; std::map db_file_state_; @@ -245,7 +345,25 @@ class FaultInjectionTestFS : public FileSystemWrapper { std::unordered_map> dir_to_new_files_since_last_sync_; bool filesystem_active_; // Record flushes, syncs, writes + bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly + // to underlying FS for writable files IOStatus error_; + + struct ErrorContext { + Random rand; + int one_in; + int count; + bool enable_error_injection; + void* callstack; + int frames; + + explicit ErrorContext(uint32_t seed) + : rand(seed), + enable_error_injection(false), + frames(0) {} + }; + + std::unique_ptr thread_local_error_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index dea2a3fef..e947bf604 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -113,7 +113,8 @@ default_params = { "verify_db_one_in": 100000, "continuous_verification_interval" : 0, "max_key_len": 3, - "key_len_percent_dist": "1,30,69" + "key_len_percent_dist": "1,30,69", + "read_fault_one_in": lambda: random.choice([0, 1000]) } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'