diff --git a/CMakeLists.txt b/CMakeLists.txt index c06d3ffe0..9ac79153a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -877,6 +877,7 @@ set(SOURCES utilities/env_timed.cc utilities/fault_injection_env.cc utilities/fault_injection_fs.cc + utilities/fault_injection_secondary_cache.cc utilities/leveldb_options/leveldb_options.cc utilities/memory/memory_util.cc utilities/merge_operators.cc diff --git a/TARGETS b/TARGETS index ccae9b57a..a842673a8 100644 --- a/TARGETS +++ b/TARGETS @@ -395,6 +395,7 @@ cpp_library( "utilities/env_timed.cc", "utilities/fault_injection_env.cc", "utilities/fault_injection_fs.cc", + "utilities/fault_injection_secondary_cache.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", "utilities/merge_operators.cc", @@ -722,6 +723,7 @@ cpp_library( "utilities/env_timed.cc", "utilities/fault_injection_env.cc", "utilities/fault_injection_fs.cc", + "utilities/fault_injection_secondary_cache.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", "utilities/merge_operators.cc", diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 87840b0e4..1c415785f 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -266,6 +266,7 @@ DECLARE_uint64(batch_protection_bytes_per_key); DECLARE_uint64(user_timestamp_size); DECLARE_string(secondary_cache_uri); +DECLARE_int32(secondary_cache_fault_one_in); constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 89ccf5bc4..a9bc883f5 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -841,6 +841,9 @@ DEFINE_int32(open_metadata_write_fault_one_in, 0, #ifndef ROCKSDB_LITE DEFINE_string(secondary_cache_uri, "", "Full URI for creating a customized secondary cache object"); +DEFINE_int32(secondary_cache_fault_one_in, 0, + "On non-zero, enables fault injection in secondary cache inserts" + " and lookups"); #endif // ROCKSDB_LITE DEFINE_int32(open_write_fault_one_in, 0, "On non-zero, enables fault injection on file writes " diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 6b2df770a..d16fcd326 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -22,6 +22,7 @@ #include "util/cast_util.h" #include "utilities/backupable/backupable_db_impl.h" #include "utilities/fault_injection_fs.h" +#include "utilities/fault_injection_secondary_cache.h" namespace ROCKSDB_NAMESPACE { @@ -148,6 +149,10 @@ std::shared_ptr StressTest::NewCache(size_t capacity, FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str()); exit(1); } + if (FLAGS_secondary_cache_fault_one_in > 0) { + secondary_cache = std::make_shared( + secondary_cache, FLAGS_seed, FLAGS_secondary_cache_fault_one_in); + } opts.secondary_cache = secondary_cache; } #endif diff --git a/src.mk b/src.mk index c59f5b7e5..46b1afcd9 100644 --- a/src.mk +++ b/src.mk @@ -249,6 +249,7 @@ LIB_SOURCES = \ utilities/env_timed.cc \ utilities/fault_injection_env.cc \ utilities/fault_injection_fs.cc \ + utilities/fault_injection_secondary_cache.cc \ utilities/leveldb_options/leveldb_options.cc \ utilities/memory/memory_util.cc \ utilities/merge_operators.cc \ diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 8ba3b2298..71f44923b 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -153,6 +153,7 @@ default_params = { "max_write_buffer_size_to_maintain": lambda: random.choice( [0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]), "user_timestamp_size": 0, + "secondary_cache_fault_one_in" : lambda: random.choice([0, 0, 32]), } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR' diff --git a/utilities/fault_injection_secondary_cache.cc b/utilities/fault_injection_secondary_cache.cc new file mode 100644 index 000000000..eb484f8b6 --- /dev/null +++ b/utilities/fault_injection_secondary_cache.cc @@ -0,0 +1,110 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// This class implements a custom SecondaryCache that randomly injects an +// error status into Inserts/Lookups based on a specified probability. + +#include "utilities/fault_injection_secondary_cache.h" + +namespace ROCKSDB_NAMESPACE { + +void FaultInjectionSecondaryCache::ResultHandle::UpdateHandleValue( + FaultInjectionSecondaryCache::ResultHandle* handle) { + ErrorContext* ctx = handle->cache_->GetErrorContext(); + if (!ctx->rand.OneIn(handle->cache_->prob_)) { + handle->value_ = handle->base_->Value(); + handle->size_ = handle->base_->Size(); + } + handle->base_.reset(); +} + +bool FaultInjectionSecondaryCache::ResultHandle::IsReady() { + bool ready = true; + if (base_) { + ready = base_->IsReady(); + if (ready) { + UpdateHandleValue(this); + } + } + return ready; +} + +void FaultInjectionSecondaryCache::ResultHandle::Wait() { + base_->Wait(); + UpdateHandleValue(this); +} + +void* FaultInjectionSecondaryCache::ResultHandle::Value() { return value_; } + +size_t FaultInjectionSecondaryCache::ResultHandle::Size() { return size_; } + +void FaultInjectionSecondaryCache::ResultHandle::WaitAll( + FaultInjectionSecondaryCache* cache, + std::vector handles) { + std::vector base_handles; + for (SecondaryCacheResultHandle* hdl : handles) { + FaultInjectionSecondaryCache::ResultHandle* handle = + static_cast(hdl); + if (!handle->base_) { + continue; + } + base_handles.emplace_back(handle->base_.get()); + } + + cache->base_->WaitAll(base_handles); + for (SecondaryCacheResultHandle* hdl : handles) { + FaultInjectionSecondaryCache::ResultHandle* handle = + static_cast(hdl); + if (handle->base_) { + UpdateHandleValue(handle); + } + } +} + +FaultInjectionSecondaryCache::ErrorContext* +FaultInjectionSecondaryCache::GetErrorContext() { + ErrorContext* ctx = static_cast(thread_local_error_->Get()); + if (!ctx) { + ctx = new ErrorContext(seed_); + thread_local_error_->Reset(ctx); + } + + return ctx; +} + +Status FaultInjectionSecondaryCache::Insert( + const Slice& key, void* value, const Cache::CacheItemHelper* helper) { + ErrorContext* ctx = GetErrorContext(); + if (ctx->rand.OneIn(prob_)) { + return Status::IOError(); + } + + return base_->Insert(key, value, helper); +} + +std::unique_ptr +FaultInjectionSecondaryCache::Lookup(const Slice& key, + const Cache::CreateCallback& create_cb, + bool wait) { + std::unique_ptr hdl = + base_->Lookup(key, create_cb, wait); + ErrorContext* ctx = GetErrorContext(); + if (wait && ctx->rand.OneIn(prob_)) { + hdl.reset(); + } + return std::unique_ptr( + new FaultInjectionSecondaryCache::ResultHandle(this, std::move(hdl))); +} + +void FaultInjectionSecondaryCache::Erase(const Slice& key) { + base_->Erase(key); +} + +void FaultInjectionSecondaryCache::WaitAll( + std::vector handles) { + FaultInjectionSecondaryCache::ResultHandle::WaitAll(this, handles); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/fault_injection_secondary_cache.h b/utilities/fault_injection_secondary_cache.h new file mode 100644 index 000000000..778fad037 --- /dev/null +++ b/utilities/fault_injection_secondary_cache.h @@ -0,0 +1,94 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/secondary_cache.h" +#include "util/random.h" +#include "util/thread_local.h" + +namespace ROCKSDB_NAMESPACE { + +// This class implements a custom SecondaryCache that randomly injects an +// error status into Inserts/Lookups based on a specified probability. +// Its used by db_stress to verify correctness in the presence of +// secondary cache errors. +// +class FaultInjectionSecondaryCache : public SecondaryCache { + public: + explicit FaultInjectionSecondaryCache( + const std::shared_ptr& base, uint32_t seed, int prob) + : base_(base), + seed_(seed), + prob_(prob), + thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)) { + } + + virtual ~FaultInjectionSecondaryCache() override {} + + const char* Name() const override { return "FaultInjectionSecondaryCache"; } + + Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) override; + + std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, + bool wait) override; + + void Erase(const Slice& /*key*/) override; + + void WaitAll(std::vector handles) override; + + std::string GetPrintableOptions() const override { return ""; } + + void EnableErrorInjection(uint64_t prob); + + private: + class ResultHandle : public SecondaryCacheResultHandle { + public: + ResultHandle(FaultInjectionSecondaryCache* cache, + std::unique_ptr&& base) + : cache_(cache), base_(std::move(base)), value_(nullptr), size_(0) {} + + ~ResultHandle() override {} + + bool IsReady() override; + + void Wait() override; + + void* Value() override; + + size_t Size() override; + + static void WaitAll(FaultInjectionSecondaryCache* cache, + std::vector handles); + + private: + static void UpdateHandleValue(ResultHandle* handle); + + FaultInjectionSecondaryCache* cache_; + std::unique_ptr base_; + void* value_; + size_t size_; + }; + + static void DeleteThreadLocalErrorContext(void* p) { + ErrorContext* ctx = static_cast(p); + delete ctx; + } + + const std::shared_ptr base_; + uint32_t seed_; + int prob_; + + struct ErrorContext { + Random rand; + + explicit ErrorContext(uint32_t seed) : rand(seed) {} + }; + std::unique_ptr thread_local_error_; + + ErrorContext* GetErrorContext(); +}; + +} // namespace ROCKSDB_NAMESPACE