Fault injection in db_stress (#6538)

Summary:
This PR implements a fault injection mechanism for injecting errors in reads in db_stress. The FaultInjectionTestFS is used for this purpose. A thread local structure is used to track the errors, so that each db_stress thread can independently enable/disable error injection and verify observed errors against expected errors. This is initially enabled only for Get and MultiGet, but can be extended to iterator as well once its proven stable.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6538

Test Plan:
crash_test
make check

Reviewed By: riversand963

Differential Revision: D20714347

Pulled By: anand1976

fbshipit-source-id: d7598321d4a2d72bda0ced57411a337a91d87dc7
main
anand76 5 years ago committed by Facebook GitHub Bot
parent 0c05624d50
commit 5c19a441c4
  1. 5
      Makefile
  2. 5
      TARGETS
  3. 4
      build_tools/build_detect_platform
  4. 5
      db_stress_tool/db_stress_common.cc
  5. 10
      db_stress_tool/db_stress_common.h
  6. 3
      db_stress_tool/db_stress_gflags.cc
  7. 9
      db_stress_tool/db_stress_shared_state.cc
  8. 39
      db_stress_tool/db_stress_shared_state.h
  9. 7
      db_stress_tool/db_stress_stat.h
  10. 7
      db_stress_tool/db_stress_test_base.cc
  11. 15
      db_stress_tool/db_stress_tool.cc
  12. 74
      db_stress_tool/no_batched_ops_stress.cc
  13. 36
      port/stack_trace.cc
  14. 6
      port/stack_trace.h
  15. 5
      table/block_based/block_based_table_reader.cc
  16. 13
      table/block_based/partitioned_filter_block.cc
  17. 112
      test_util/fault_injection_test_fs.cc
  18. 120
      test_util/fault_injection_test_fs.h
  19. 3
      tools/db_crashtest.py

@ -441,7 +441,12 @@ BENCHTOOLOBJECTS = $(BENCH_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
ANALYZETOOLOBJECTS = $(ANALYZER_LIB_SOURCES:.cc=.o)
ifeq ($(DEBUG_LEVEL),0)
STRESSTOOLOBJECTS = $(STRESS_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
else
STRESSTOOLOBJECTS = $(STRESS_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) \
$(TESTHARNESS)
endif
EXPOBJECTS = $(LIBOBJECTS) $(TESTUTIL)

@ -94,6 +94,9 @@ is_opt_mode = build_mode.startswith("opt")
# -DNDEBUG is added by default in opt mode in fbcode. But adding it twice
# doesn't harm and avoid forgetting to add it.
ROCKSDB_COMPILER_FLAGS += (["-DNDEBUG"] if is_opt_mode else [])
ROCKSDB_STRESS_DEPS = (
[":rocksdb_lib", ":rocksdb_test_lib"] if not is_opt_mode else [":rocksdb_lib"]
)
sanitizer = read_config("fbcode", "sanitizer")
@ -436,7 +439,7 @@ cpp_library(
os_deps = ROCKSDB_OS_DEPS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = [":rocksdb_lib"],
deps = ROCKSDB_STRESS_DEPS,
external_deps = ROCKSDB_EXTERNAL_DEPS,
)

@ -246,6 +246,10 @@ JAVAC_ARGS="-source 7"
if [ "$CROSS_COMPILE" = "true" -o "$FBCODE_BUILD" = "true" ]; then
# Cross-compiling; do not try any compilation tests.
# Also don't need any compilation tests if compiling on fbcode
if [ "$FBCODE_BUILD" = "true" ]; then
# Enable backtrace on fbcode since the necessary libraries are present
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_BACKTRACE"
fi
true
else
if ! test $ROCKSDB_DISABLE_FALLOCATE; then

@ -13,6 +13,11 @@
#include <cmath>
ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr;
#ifndef NDEBUG
// If non-null, injects read error at a rate specified by the
// read_fault_one_in flag
std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
#endif // NDEBUG
enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
ROCKSDB_NAMESPACE::kSnappyCompression;
enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =

@ -58,6 +58,9 @@
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
#ifndef NDEBUG
#include "test_util/fault_injection_test_fs.h"
#endif
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -66,10 +69,6 @@
#include "util/random.h"
#include "util/string_util.h"
#include "utilities/blob_db/blob_db.h"
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
#include "test_util/sync_point.h"
#endif // !(defined NDEBUG) || !defined(OS_WIN)
#include "test_util/testutil.h"
#include "utilities/merge_operators.h"
@ -237,6 +236,9 @@ const int kValueMaxLen = 100;
// wrapped posix or hdfs environment
extern ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env;
#ifndef NDEBUG
extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
#endif
extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;

@ -671,4 +671,7 @@ DEFINE_int32(continuous_verification_interval, 1000,
DEFINE_int32(approximate_size_one_in, 64,
"If non-zero, DB::GetApproximateSizes() will be called against"
" random key ranges.");
DEFINE_int32(read_fault_one_in, 1000,
"On non-zero, enables fault injection on read");
#endif // GFLAGS

@ -14,5 +14,14 @@
namespace ROCKSDB_NAMESPACE {
const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
#if defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
#if defined(OS_SOLARIS)
__thread bool SharedState::filter_read_error;
#else
thread_local bool SharedState::filter_read_error;
#endif // OS_SOLARIS
#else
bool SharedState::filter_read_error;
#endif // ROCKSDB_SUPPORT_THREAD_LOCAL
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS

@ -11,6 +11,10 @@
#pragma once
#include "db_stress_tool/db_stress_stat.h"
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
#include "test_util/sync_point.h"
#endif // !(defined NDEBUG) || !defined(OS_WIN)
#include "util/gflags_compat.h"
DECLARE_uint64(seed);
@ -24,6 +28,7 @@ DECLARE_int32(clear_column_family_one_in);
DECLARE_bool(test_batches_snapshots);
DECLARE_int32(compaction_thread_pool_adjust_interval);
DECLARE_int32(continuous_verification_interval);
DECLARE_int32(read_fault_one_in);
namespace ROCKSDB_NAMESPACE {
class StressTest;
@ -37,6 +42,20 @@ class SharedState {
// indicates a key should definitely be deleted
static const uint32_t DELETION_SENTINEL;
// Errors when reading filter blocks are ignored, so we use a thread
// local variable updated via sync points to keep track of errors injected
// while reading filter blocks in order to ignore the Get/MultiGet result
// for those calls
#if defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
#if defined(OS_SOLARIS)
static __thread bool filter_read_error;
#else
static thread_local bool filter_read_error;
#endif // OS_SOLARIS
#else
static bool filter_read_error;
#endif // ROCKSDB_SUPPORT_THREAD_LOCAL
SharedState(Env* env, StressTest* stress_test)
: cv_(&mu_),
seed_(static_cast<uint32_t>(FLAGS_seed)),
@ -171,9 +190,23 @@ class SharedState {
++num_bg_threads_;
fprintf(stdout, "Starting continuous_verification_thread\n");
}
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
SyncPoint::GetInstance()->SetCallBack("FilterReadError",
FilterReadErrorCallback);
SyncPoint::GetInstance()->EnableProcessing();
}
#endif // NDEBUG
}
~SharedState() {}
~SharedState() {
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
#endif
}
port::Mutex* GetMutex() { return &mu_; }
@ -329,6 +362,10 @@ class SharedState {
}
private:
static void FilterReadErrorCallback(void*) {
filter_read_error = true;
}
port::Mutex mu_;
port::CondVar cv_;
const uint32_t seed_;

@ -42,6 +42,7 @@ class Stats {
long range_deletions_;
long covered_by_range_deletions_;
long errors_;
long verified_errors_;
long num_compact_files_succeed_;
long num_compact_files_failed_;
int next_report_;
@ -67,6 +68,7 @@ class Stats {
range_deletions_ = 0;
covered_by_range_deletions_ = 0;
errors_ = 0;
verified_errors_ = 0;
bytes_ = 0;
seconds_ = 0;
num_compact_files_succeed_ = 0;
@ -90,6 +92,7 @@ class Stats {
range_deletions_ += other.range_deletions_;
covered_by_range_deletions_ = other.covered_by_range_deletions_;
errors_ += other.errors_;
verified_errors_ += other.verified_errors_;
bytes_ += other.bytes_;
seconds_ += other.seconds_;
num_compact_files_succeed_ += other.num_compact_files_succeed_;
@ -163,6 +166,8 @@ class Stats {
void AddErrors(long n) { errors_ += n; }
void AddVerifiedErrors(long n) { verified_errors_ += n; }
void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; }
void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; }
@ -199,6 +204,8 @@ class Stats {
covered_by_range_deletions_);
fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
fprintf(stdout, "%-12s: Got expected errors %ld times\n", "",
verified_errors_);
fprintf(stdout, "%-12s: %ld CompactFiles() succeed\n", "",
num_compact_files_succeed_);
fprintf(stdout, "%-12s: %ld CompactFiles() did not succeed\n", "",

@ -502,6 +502,12 @@ void StressTest::OperateDb(ThreadState* thread) {
const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
FLAGS_read_fault_one_in);
}
#endif // NDEBUG
thread->stats.Start();
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
if (thread->shared->HasVerificationFailedYet() ||
@ -1721,6 +1727,7 @@ void StressTest::PrintEnv() const {
FLAGS_max_write_batch_group_size_bytes);
fprintf(stdout, "Use dynamic level : %d\n",
static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
fprintf(stdout, "------------------------------------------------\n");
}

@ -23,11 +23,15 @@
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_driver.h"
#ifndef NDEBUG
#include "test_util/fault_injection_test_fs.h"
#endif
namespace ROCKSDB_NAMESPACE {
namespace {
static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper> env_wrapper_guard;
static std::shared_ptr<CompositeEnvWrapper> fault_env_guard;
} // namespace
KeyGenContext key_gen_ctx;
@ -69,6 +73,17 @@ int db_stress_tool(int argc, char** argv) {
} else {
raw_env = Env::Default();
}
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) {
FaultInjectionTestFS* fs =
new FaultInjectionTestFS(raw_env->GetFileSystem());
fault_fs_guard.reset(fs);
fault_fs_guard->SetFilesystemDirectWritable(true);
fault_env_guard =
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get();
}
#endif
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
db_stress_env = env_wrapper_guard.get();

@ -9,6 +9,9 @@
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#ifndef NDEBUG
#include "test_util/fault_injection_test_fs.h"
#endif // NDEBUG
namespace ROCKSDB_NAMESPACE {
class NonBatchedOpsStressTest : public StressTest {
@ -144,18 +147,52 @@ class NonBatchedOpsStressTest : public StressTest {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
std::string from_db;
int error_count = 0;
#ifndef NDEBUG
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::filter_read_error = false;
}
#endif // NDEBUG
Status s = db_->Get(read_opts, cfh, key, &from_db);
#ifndef NDEBUG
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
#endif // NDEBUG
if (s.ok()) {
#ifndef NDEBUG
if (fault_fs_guard) {
if (error_count && !SharedState::filter_read_error) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from Get\n");
fprintf(stderr, "Callstack that injected the error\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
}
}
#endif // NDEBUG
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
if (error_count == 0) {
// errors case
fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
#ifndef NDEBUG
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
#endif // NDEBUG
return s;
}
@ -171,6 +208,7 @@ class NonBatchedOpsStressTest : public StressTest {
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
int error_count = 0;
// To appease clang analyzer
const bool use_txn = FLAGS_use_txn;
@ -231,8 +269,19 @@ class NonBatchedOpsStressTest : public StressTest {
}
if (!use_txn) {
#ifndef NDEBUG
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::filter_read_error = false;
}
#endif // NDEBUG
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
#ifndef NDEBUG
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
#endif // NDEBUG
} else {
#ifndef ROCKSDB_LITE
txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
@ -243,8 +292,22 @@ class NonBatchedOpsStressTest : public StressTest {
for (const auto& s : statuses) {
if (s.ok()) {
#ifndef NDEBUG
if (fault_fs_guard && error_count && !SharedState::filter_read_error) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from MultiGet\n");
fprintf(stderr, "Callstack that injected the error\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
} else {
#endif // NDEBUG
// found case
thread->stats.AddGets(1, 1);
#ifndef NDEBUG
}
#endif // NDEBUG
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
@ -252,11 +315,20 @@ class NonBatchedOpsStressTest : public StressTest {
// With txn this is sometimes expected.
thread->stats.AddGets(1, 1);
} else {
if (error_count == 0) {
// errors case
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
}
#ifndef NDEBUG
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
#endif // NDEBUG
return statuses;
}

@ -14,6 +14,10 @@ namespace ROCKSDB_NAMESPACE {
namespace port {
void InstallStackTraceHandler() {}
void PrintStack(int /*first_frames_to_skip*/) {}
void PrintAndFreeStack(void* /*callstack*/, int /*num_frames*/) {}
void* SaveStack(int* /*num_frames*/, int /*first_frames_to_skip*/) {
return nullptr;
}
} // namespace port
} // namespace ROCKSDB_NAMESPACE
@ -99,18 +103,38 @@ void PrintStackTraceLine(const char* symbol, void* frame) {
} // namespace
void PrintStack(void* frames[], int num_frames) {
auto symbols = backtrace_symbols(frames, num_frames);
for (int i = 0; i < num_frames; ++i) {
fprintf(stderr, "#%-2d ", i);
PrintStackTraceLine((symbols != nullptr) ? symbols[i] : nullptr, frames[i]);
}
free(symbols);
}
void PrintStack(int first_frames_to_skip) {
const int kMaxFrames = 100;
void* frames[kMaxFrames];
auto num_frames = backtrace(frames, kMaxFrames);
auto symbols = backtrace_symbols(frames, num_frames);
PrintStack(&frames[first_frames_to_skip], num_frames - first_frames_to_skip);
}
for (int i = first_frames_to_skip; i < num_frames; ++i) {
fprintf(stderr, "#%-2d ", i - first_frames_to_skip);
PrintStackTraceLine((symbols != nullptr) ? symbols[i] : nullptr, frames[i]);
}
free(symbols);
void PrintAndFreeStack(void* callstack, int num_frames) {
PrintStack(static_cast<void**>(callstack), num_frames);
free(callstack);
}
void* SaveStack(int* num_frames, int first_frames_to_skip) {
const int kMaxFrames = 100;
void* frames[kMaxFrames];
auto count = backtrace(frames, kMaxFrames);
*num_frames = count - first_frames_to_skip;
void* callstack = malloc(sizeof(void*) * *num_frames);
memcpy(callstack, &frames[first_frames_to_skip], sizeof(void*) * *num_frames);
return callstack;
}
static void StackTraceHandler(int sig) {

@ -18,5 +18,11 @@ void InstallStackTraceHandler();
// Prints stack, skips skip_first_frames frames
void PrintStack(int first_frames_to_skip = 0);
// Prints the given callstack
void PrintAndFreeStack(void* callstack, int num_frames);
// Save the current callstack
void* SaveStack(int* num_frame, int first_frames_to_skip = 0);
} // namespace port
} // namespace ROCKSDB_NAMESPACE

@ -994,6 +994,11 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
auto filter = new_table->CreateFilterBlockReader(
prefetch_buffer, use_cache, prefetch_filter, pin_filter,
lookup_context);
#ifndef NDEBUG
if (rep_->filter_type != Rep::FilterType::kNoFilter && !filter) {
TEST_SYNC_POINT("FilterReadError");
}
#endif
if (filter) {
// Refer to the comment above about paritioned indexes always being cached
if (prefetch_all) {

@ -254,6 +254,7 @@ bool PartitionedFilterBlockReader::MayMatch(
Status s =
GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block);
if (UNLIKELY(!s.ok())) {
TEST_SYNC_POINT("FilterReadError");
return true;
}
@ -271,6 +272,7 @@ bool PartitionedFilterBlockReader::MayMatch(
no_io, get_context, lookup_context,
&filter_partition_block);
if (UNLIKELY(!s.ok())) {
TEST_SYNC_POINT("FilterReadError");
return true;
}
@ -310,6 +312,7 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
"Error retrieving top-level filter block while trying to "
"cache filter partitions: %s",
s.ToString().c_str());
TEST_SYNC_POINT("FilterReadError");
return;
}
@ -340,6 +343,11 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
prefetch_buffer.reset(new FilePrefetchBuffer());
s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
#ifndef NDEBUG
if (!s.ok()) {
TEST_SYNC_POINT("FilterReadError");
}
#endif
// After prefetch, read the partitions one by one
ReadOptions read_options;
@ -362,6 +370,11 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
}
}
}
#ifndef NDEBUG
if (!s.ok()) {
TEST_SYNC_POINT("FilterReadError");
}
#endif
}
}

@ -17,6 +17,8 @@
#include "test_util/fault_injection_test_fs.h"
#include <functional>
#include <utility>
#include "port/stack_trace.h"
#include "util/util.h"
namespace ROCKSDB_NAMESPACE {
@ -195,6 +197,27 @@ IOStatus TestFSRandomRWFile::Sync(const IOOptions& options,
return target_->Sync(options, dbg);
}
TestFSRandomAccessFile::TestFSRandomAccessFile(const std::string& /*fname*/,
std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs)
: target_(std::move(f)), fs_(fs) {
assert(target_ != nullptr);
}
IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) const {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = target_->Read(offset, n, options, result, scratch, dbg);
if (s.ok()) {
s = fs_->InjectError(FaultInjectionTestFS::ErrorOperation::kRead, result,
scratch);
}
return s;
}
IOStatus FaultInjectionTestFS::NewDirectory(
const std::string& name, const IOOptions& options,
std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {
@ -215,6 +238,9 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
// Not allow overwriting files
IOStatus io_s = target()->FileExists(fname, IOOptions(), dbg);
if (io_s.ok()) {
@ -244,6 +270,9 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->ReopenWritableFile(fname, file_opts, result, dbg);
}
IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSWritableFile(fname, std::move(*result), this));
@ -265,6 +294,9 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile(
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->NewRandomRWFile(fname, file_opts, result, dbg);
}
IOStatus io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
@ -286,7 +318,14 @@ IOStatus FaultInjectionTestFS::NewRandomAccessFile(
if (!IsFilesystemActive()) {
return GetError();
}
return target()->NewRandomAccessFile(fname, file_opts, result, dbg);
IOStatus io_s = InjectError(ErrorOperation::kOpen, nullptr, nullptr);
if (io_s.ok()) {
io_s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
}
if (io_s.ok()) {
result->reset(new TestFSRandomAccessFile(fname, std::move(*result), this));
}
return io_s;
}
IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
@ -427,4 +466,75 @@ void FaultInjectionTestFS::UntrackFile(const std::string& f) {
open_files_.erase(f);
}
IOStatus FaultInjectionTestFS::InjectError(ErrorOperation op,
Slice* result,
char* scratch) {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in) {
return IOStatus::OK();
}
if (ctx->rand.OneIn(ctx->one_in)) {
ctx->count++;
ctx->callstack = port::SaveStack(&ctx->frames);
switch (op) {
case kRead:
{
uint32_t type = ctx->rand.Uniform(3);
switch (type) {
// Inject IO error
case 0:
return IOStatus::IOError();
// Inject random corruption
case 1:
{
if (result->data() == scratch) {
uint64_t offset = ctx->rand.Uniform((uint32_t)result->size());
uint64_t len =
std::min<uint64_t>(result->size() - offset, 64UL);
assert(offset < result->size());
assert(offset + len <= result->size());
std::string str = DBTestBase::RandomString(&ctx->rand,
static_cast<int>(len));
memcpy(scratch + offset, str.data(), len);
break;
} else {
FALLTHROUGH_INTENDED;
}
}
// Truncate the result
case 2:
{
assert(result->size() > 0);
uint64_t offset = ctx->rand.Uniform((uint32_t)result->size());
assert(offset < result->size());
*result = Slice(result->data(), offset);
break;
}
default:
assert(false);
}
break;
}
case kOpen:
return IOStatus::IOError();
default:
assert(false);
}
}
return IOStatus::OK();
}
void FaultInjectionTestFS::PrintFaultBacktrace() {
#if defined(OS_LINUX)
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr) {
return;
}
port::PrintAndFreeStack(ctx->callstack, ctx->frames);
#endif
}
} // namespace ROCKSDB_NAMESPACE

@ -16,10 +16,12 @@
#pragma once
#include <algorithm>
#include <map>
#include <set>
#include <string>
#include "db/db_test_util.h"
#include "db/version_set.h"
#include "env/mock_env.h"
#include "file/filename.h"
@ -80,6 +82,9 @@ class TestFSWritableFile : public FSWritableFile {
IODebugContext* dbg) override {
return target_->PositionedAppend(data, offset, options, dbg);
}
virtual size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
virtual bool use_direct_io() const override {
return target_->use_direct_io();
};
@ -119,6 +124,25 @@ class TestFSRandomRWFile : public FSRandomRWFile {
FaultInjectionTestFS* fs_;
};
class TestFSRandomAccessFile : public FSRandomAccessFile {
public:
explicit TestFSRandomAccessFile(const std::string& fname,
std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs);
~TestFSRandomAccessFile() override {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
bool use_direct_io() const override { return target_->use_direct_io(); }
private:
std::unique_ptr<FSRandomAccessFile> target_;
FaultInjectionTestFS* fs_;
};
class TestFSDirectory : public FSDirectory {
public:
explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname,
@ -138,7 +162,10 @@ class TestFSDirectory : public FSDirectory {
class FaultInjectionTestFS : public FileSystemWrapper {
public:
explicit FaultInjectionTestFS(std::shared_ptr<FileSystem> base)
: FileSystemWrapper(base), filesystem_active_(true) {}
: FileSystemWrapper(base),
filesystem_active_(true),
filesystem_writable_(false),
thread_local_error_(new ThreadLocalPtr(nullptr)) {}
virtual ~FaultInjectionTestFS() {}
const char* Name() const override { return "FaultInjectionTestFS"; }
@ -217,6 +244,14 @@ class FaultInjectionTestFS : public FileSystemWrapper {
MutexLock l(&mutex_);
return filesystem_active_;
}
// Setting filesystem_writable_ makes NewWritableFile. ReopenWritableFile,
// and NewRandomRWFile bypass FaultInjectionTestFS and go directly to the
// target FS
bool IsFilesystemDirectWritable() {
MutexLock l(&mutex_);
return filesystem_writable_;
}
void SetFilesystemActiveNoLock(
bool active, IOStatus error = IOStatus::Corruption("Not active")) {
filesystem_active_ = active;
@ -229,6 +264,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
MutexLock l(&mutex_);
SetFilesystemActiveNoLock(active, error);
}
void SetFilesystemDirectWritable(
bool writable) {
MutexLock l(&mutex_);
filesystem_writable_ = writable;
}
void AssertNoOpenFile() { assert(open_files_.empty()); }
IOStatus GetError() { return error_; }
@ -238,6 +278,66 @@ class FaultInjectionTestFS : public FileSystemWrapper {
error_ = io_error;
}
// Specify what the operation, so we can inject the right type of error
enum ErrorOperation : char {
kRead = 0,
kOpen,
};
// Set thread-local parameters for error injection. The first argument,
// seed is the seed for the random number generator, and one_in determines
// the probability of injecting error (i.e an error is injected with
// 1/one_in probability)
void SetThreadLocalReadErrorContext(uint32_t seed, int one_in) {
struct ErrorContext* ctx =
static_cast<struct ErrorContext*>(thread_local_error_->Get());
if (ctx == nullptr) {
ctx = new ErrorContext(seed);
thread_local_error_->Reset(ctx);
}
ctx->one_in = one_in;
ctx->count = 0;
}
// Inject an error. For a READ operation, a status of IOError(), a
// corruption in the contents of scratch, or truncation of slice
// are the types of error with equal probability. For OPEN,
// its always an IOError.
IOStatus InjectError(ErrorOperation op, Slice* slice, char* scratch);
// Get the count of how many times we injected since the previous call
int GetAndResetErrorCount() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
int count = 0;
if (ctx != nullptr) {
count = ctx->count;
ctx->count = 0;
}
return count;
}
void EnableErrorInjection() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx) {
ctx->enable_error_injection = true;
}
}
void DisableErrorInjection() {
ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get());
if (ctx) {
ctx->enable_error_injection = false;
}
}
// We capture a backtrace every time a fault is injected, for debugging
// purposes. This call prints the backtrace to stderr and frees the
// saved callstack
void PrintFaultBacktrace();
private:
port::Mutex mutex_;
std::map<std::string, FSFileState> db_file_state_;
@ -245,7 +345,25 @@ class FaultInjectionTestFS : public FileSystemWrapper {
std::unordered_map<std::string, std::set<std::string>>
dir_to_new_files_since_last_sync_;
bool filesystem_active_; // Record flushes, syncs, writes
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
// to underlying FS for writable files
IOStatus error_;
struct ErrorContext {
Random rand;
int one_in;
int count;
bool enable_error_injection;
void* callstack;
int frames;
explicit ErrorContext(uint32_t seed)
: rand(seed),
enable_error_injection(false),
frames(0) {}
};
std::unique_ptr<ThreadLocalPtr> thread_local_error_;
};
} // namespace ROCKSDB_NAMESPACE

@ -113,7 +113,8 @@ default_params = {
"verify_db_one_in": 100000,
"continuous_verification_interval" : 0,
"max_key_len": 3,
"key_len_percent_dist": "1,30,69"
"key_len_percent_dist": "1,30,69",
"read_fault_one_in": lambda: random.choice([0, 1000])
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

Loading…
Cancel
Save