Secondary cache error injection (#9002)

Summary:
Implement secondary cache error injection in db_stress.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9002

Reviewed By: zhichao-cao

Differential Revision: D31874896

Pulled By: anand1976

fbshipit-source-id: 8cf04c061a4a44efa0fe88423d05cade67b85f73
main
anand76 3 years ago committed by Facebook GitHub Bot
parent e5b34f5867
commit 78556c14dd
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 1
      db_stress_tool/db_stress_common.h
  4. 3
      db_stress_tool/db_stress_gflags.cc
  5. 5
      db_stress_tool/db_stress_test_base.cc
  6. 1
      src.mk
  7. 1
      tools/db_crashtest.py
  8. 110
      utilities/fault_injection_secondary_cache.cc
  9. 94
      utilities/fault_injection_secondary_cache.h

@ -877,6 +877,7 @@ set(SOURCES
utilities/env_timed.cc
utilities/fault_injection_env.cc
utilities/fault_injection_fs.cc
utilities/fault_injection_secondary_cache.cc
utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc
utilities/merge_operators.cc

@ -395,6 +395,7 @@ cpp_library(
"utilities/env_timed.cc",
"utilities/fault_injection_env.cc",
"utilities/fault_injection_fs.cc",
"utilities/fault_injection_secondary_cache.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators.cc",
@ -722,6 +723,7 @@ cpp_library(
"utilities/env_timed.cc",
"utilities/fault_injection_env.cc",
"utilities/fault_injection_fs.cc",
"utilities/fault_injection_secondary_cache.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators.cc",

@ -266,6 +266,7 @@ DECLARE_uint64(batch_protection_bytes_per_key);
DECLARE_uint64(user_timestamp_size);
DECLARE_string(secondary_cache_uri);
DECLARE_int32(secondary_cache_fault_one_in);
constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;

@ -841,6 +841,9 @@ DEFINE_int32(open_metadata_write_fault_one_in, 0,
#ifndef ROCKSDB_LITE
DEFINE_string(secondary_cache_uri, "",
"Full URI for creating a customized secondary cache object");
DEFINE_int32(secondary_cache_fault_one_in, 0,
"On non-zero, enables fault injection in secondary cache inserts"
" and lookups");
#endif // ROCKSDB_LITE
DEFINE_int32(open_write_fault_one_in, 0,
"On non-zero, enables fault injection on file writes "

@ -22,6 +22,7 @@
#include "util/cast_util.h"
#include "utilities/backupable/backupable_db_impl.h"
#include "utilities/fault_injection_fs.h"
#include "utilities/fault_injection_secondary_cache.h"
namespace ROCKSDB_NAMESPACE {
@ -148,6 +149,10 @@ std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
exit(1);
}
if (FLAGS_secondary_cache_fault_one_in > 0) {
secondary_cache = std::make_shared<FaultInjectionSecondaryCache>(
secondary_cache, FLAGS_seed, FLAGS_secondary_cache_fault_one_in);
}
opts.secondary_cache = secondary_cache;
}
#endif

@ -249,6 +249,7 @@ LIB_SOURCES = \
utilities/env_timed.cc \
utilities/fault_injection_env.cc \
utilities/fault_injection_fs.cc \
utilities/fault_injection_secondary_cache.cc \
utilities/leveldb_options/leveldb_options.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators.cc \

@ -153,6 +153,7 @@ default_params = {
"max_write_buffer_size_to_maintain": lambda: random.choice(
[0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]),
"user_timestamp_size": 0,
"secondary_cache_fault_one_in" : lambda: random.choice([0, 0, 32]),
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

@ -0,0 +1,110 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This class implements a custom SecondaryCache that randomly injects an
// error status into Inserts/Lookups based on a specified probability.
#include "utilities/fault_injection_secondary_cache.h"
namespace ROCKSDB_NAMESPACE {
void FaultInjectionSecondaryCache::ResultHandle::UpdateHandleValue(
FaultInjectionSecondaryCache::ResultHandle* handle) {
ErrorContext* ctx = handle->cache_->GetErrorContext();
if (!ctx->rand.OneIn(handle->cache_->prob_)) {
handle->value_ = handle->base_->Value();
handle->size_ = handle->base_->Size();
}
handle->base_.reset();
}
bool FaultInjectionSecondaryCache::ResultHandle::IsReady() {
bool ready = true;
if (base_) {
ready = base_->IsReady();
if (ready) {
UpdateHandleValue(this);
}
}
return ready;
}
void FaultInjectionSecondaryCache::ResultHandle::Wait() {
base_->Wait();
UpdateHandleValue(this);
}
void* FaultInjectionSecondaryCache::ResultHandle::Value() { return value_; }
size_t FaultInjectionSecondaryCache::ResultHandle::Size() { return size_; }
void FaultInjectionSecondaryCache::ResultHandle::WaitAll(
FaultInjectionSecondaryCache* cache,
std::vector<SecondaryCacheResultHandle*> handles) {
std::vector<SecondaryCacheResultHandle*> base_handles;
for (SecondaryCacheResultHandle* hdl : handles) {
FaultInjectionSecondaryCache::ResultHandle* handle =
static_cast<FaultInjectionSecondaryCache::ResultHandle*>(hdl);
if (!handle->base_) {
continue;
}
base_handles.emplace_back(handle->base_.get());
}
cache->base_->WaitAll(base_handles);
for (SecondaryCacheResultHandle* hdl : handles) {
FaultInjectionSecondaryCache::ResultHandle* handle =
static_cast<FaultInjectionSecondaryCache::ResultHandle*>(hdl);
if (handle->base_) {
UpdateHandleValue(handle);
}
}
}
FaultInjectionSecondaryCache::ErrorContext*
FaultInjectionSecondaryCache::GetErrorContext() {
ErrorContext* ctx = static_cast<ErrorContext*>(thread_local_error_->Get());
if (!ctx) {
ctx = new ErrorContext(seed_);
thread_local_error_->Reset(ctx);
}
return ctx;
}
Status FaultInjectionSecondaryCache::Insert(
const Slice& key, void* value, const Cache::CacheItemHelper* helper) {
ErrorContext* ctx = GetErrorContext();
if (ctx->rand.OneIn(prob_)) {
return Status::IOError();
}
return base_->Insert(key, value, helper);
}
std::unique_ptr<SecondaryCacheResultHandle>
FaultInjectionSecondaryCache::Lookup(const Slice& key,
const Cache::CreateCallback& create_cb,
bool wait) {
std::unique_ptr<SecondaryCacheResultHandle> hdl =
base_->Lookup(key, create_cb, wait);
ErrorContext* ctx = GetErrorContext();
if (wait && ctx->rand.OneIn(prob_)) {
hdl.reset();
}
return std::unique_ptr<FaultInjectionSecondaryCache::ResultHandle>(
new FaultInjectionSecondaryCache::ResultHandle(this, std::move(hdl)));
}
void FaultInjectionSecondaryCache::Erase(const Slice& key) {
base_->Erase(key);
}
void FaultInjectionSecondaryCache::WaitAll(
std::vector<SecondaryCacheResultHandle*> handles) {
FaultInjectionSecondaryCache::ResultHandle::WaitAll(this, handles);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,94 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/secondary_cache.h"
#include "util/random.h"
#include "util/thread_local.h"
namespace ROCKSDB_NAMESPACE {
// This class implements a custom SecondaryCache that randomly injects an
// error status into Inserts/Lookups based on a specified probability.
// Its used by db_stress to verify correctness in the presence of
// secondary cache errors.
//
class FaultInjectionSecondaryCache : public SecondaryCache {
public:
explicit FaultInjectionSecondaryCache(
const std::shared_ptr<SecondaryCache>& base, uint32_t seed, int prob)
: base_(base),
seed_(seed),
prob_(prob),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)) {
}
virtual ~FaultInjectionSecondaryCache() override {}
const char* Name() const override { return "FaultInjectionSecondaryCache"; }
Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override;
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool wait) override;
void Erase(const Slice& /*key*/) override;
void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) override;
std::string GetPrintableOptions() const override { return ""; }
void EnableErrorInjection(uint64_t prob);
private:
class ResultHandle : public SecondaryCacheResultHandle {
public:
ResultHandle(FaultInjectionSecondaryCache* cache,
std::unique_ptr<SecondaryCacheResultHandle>&& base)
: cache_(cache), base_(std::move(base)), value_(nullptr), size_(0) {}
~ResultHandle() override {}
bool IsReady() override;
void Wait() override;
void* Value() override;
size_t Size() override;
static void WaitAll(FaultInjectionSecondaryCache* cache,
std::vector<SecondaryCacheResultHandle*> handles);
private:
static void UpdateHandleValue(ResultHandle* handle);
FaultInjectionSecondaryCache* cache_;
std::unique_ptr<SecondaryCacheResultHandle> base_;
void* value_;
size_t size_;
};
static void DeleteThreadLocalErrorContext(void* p) {
ErrorContext* ctx = static_cast<ErrorContext*>(p);
delete ctx;
}
const std::shared_ptr<SecondaryCache> base_;
uint32_t seed_;
int prob_;
struct ErrorContext {
Random rand;
explicit ErrorContext(uint32_t seed) : rand(seed) {}
};
std::unique_ptr<ThreadLocalPtr> thread_local_error_;
ErrorContext* GetErrorContext();
};
} // namespace ROCKSDB_NAMESPACE
Loading…
Cancel
Save