From c3f720c60db59c27486d8f18e094f9d1eb3c33cf Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 15 Dec 2022 15:48:50 -0800 Subject: [PATCH] Enable ReadAsync testing and fault injection in db_stress (#11037) Summary: The db_stress code uses a wrapper Env on top of the raw/fault injection Env. The wrapper, DbStressEnvWrapper, is a legacy Env and thus has a default implementation of ReadAsync that just does a sync read. As a result, the ReadAsync implementations of PosixFileSystem and other file systems weren't being tested. Also, the ReadAsync interface wasn't implemented in FaultInjectionTestFS. This change implements the necessary interfaces in FaultInjectionTestFS and derives DbStressEnvWrapper from FileSystemWrapper rather than EnvWrapper. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11037 Test Plan: Run db_stress standalone and crash test. With this change, db_stress is able to repro the bug fixed in https://github.com/facebook/rocksdb/issues/10890. Reviewed By: akankshamahajan15 Differential Revision: D42061290 Pulled By: anand1976 fbshipit-source-id: 7f0331fd15ee33fb4f7f0f4b22b206fe801ba074 --- db_stress_tool/db_stress_env_wrapper.h | 14 +++++---- db_stress_tool/db_stress_test_base.cc | 10 ++++--- db_stress_tool/db_stress_tool.cc | 19 ++++-------- db_stress_tool/no_batched_ops_stress.cc | 14 ++++++++- utilities/fault_injection_fs.cc | 39 +++++++++++++++++++++++++ utilities/fault_injection_fs.h | 9 ++++++ 6 files changed, 81 insertions(+), 24 deletions(-) diff --git a/db_stress_tool/db_stress_env_wrapper.h b/db_stress_tool/db_stress_env_wrapper.h index 21f6db2ab..af60df9bc 100644 --- a/db_stress_tool/db_stress_env_wrapper.h +++ b/db_stress_tool/db_stress_env_wrapper.h @@ -12,13 +12,15 @@ #include "db_stress_tool/db_stress_common.h" namespace ROCKSDB_NAMESPACE { -class DbStressEnvWrapper : public EnvWrapper { +class DbStressFSWrapper : public FileSystemWrapper { public: - explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {} - static const char* kClassName() { return "DbStressEnv"; } + explicit DbStressFSWrapper(const std::shared_ptr& t) + : FileSystemWrapper(t) {} + static const char* kClassName() { return "DbStressFS"; } const char* Name() const override { return kClassName(); } - Status DeleteFile(const std::string& f) override { + IOStatus DeleteFile(const std::string& f, const IOOptions& opts, + IODebugContext* dbg) override { // We determine whether it is a manifest file by searching a strong, // so that there will be false positive if the directory path contains the // keyword but it is unlikely. @@ -28,11 +30,11 @@ class DbStressEnvWrapper : public EnvWrapper { f.find("checkpoint") != std::string::npos || f.find(".backup") != std::string::npos || f.find(".restore") != std::string::npos) { - return target()->DeleteFile(f); + return target()->DeleteFile(f, opts, dbg); } // Rename the file instead of deletion to keep the history, and // at the same time it is not visible to RocksDB. - return target()->RenameFile(f, f + "_renamed_"); + return target()->RenameFile(f, f + "_renamed_", opts, dbg); } // If true, all manifest files will not be delted in DeleteFile(). diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index e51b43176..a02d6bac1 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1046,9 +1046,11 @@ void StressTest::OperateDb(ThreadState* thread) { TestIterateAgainstExpected(thread, read_opts, rand_column_families, rand_keys); } else { - int num_seeks = static_cast( - std::min(static_cast(thread->rand.Uniform(4)), - FLAGS_ops_per_thread - i - 1)); + int num_seeks = static_cast(std::min( + std::max(static_cast(thread->rand.Uniform(4)), + static_cast(1)), + std::max(static_cast(FLAGS_ops_per_thread - i - 1), + static_cast(1)))); rand_keys = GenerateNKeys(thread, num_seeks, i); i += num_seeks - 1; TestIterate(thread, read_opts, rand_column_families, rand_keys); @@ -3025,7 +3027,7 @@ bool InitializeOptionsFromFile(Options& options) { FLAGS_options_file.c_str(), s.ToString().c_str()); exit(1); } - db_options.env = new DbStressEnvWrapper(db_stress_env); + db_options.env = new CompositeEnvWrapper(db_stress_env); options = Options(db_options, cf_descriptors[0].options); return true; } diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 6c5e952db..7f86ee8a6 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -29,8 +29,9 @@ namespace ROCKSDB_NAMESPACE { namespace { static std::shared_ptr env_guard; -static std::shared_ptr env_wrapper_guard; -static std::shared_ptr +static std::shared_ptr + env_wrapper_guard; +static std::shared_ptr dbsl_env_wrapper_guard; static std::shared_ptr fault_env_guard; } // namespace @@ -77,7 +78,7 @@ int db_stress_tool(int argc, char** argv) { s.ToString().c_str()); exit(1); } - dbsl_env_wrapper_guard = std::make_shared(raw_env); + dbsl_env_wrapper_guard = std::make_shared(raw_env); db_stress_listener_env = dbsl_env_wrapper_guard.get(); if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection || @@ -96,18 +97,10 @@ int db_stress_tool(int argc, char** argv) { raw_env = fault_env_guard.get(); } - env_wrapper_guard = std::make_shared(raw_env); + env_wrapper_guard = std::make_shared( + raw_env, std::make_shared(raw_env->GetFileSystem())); db_stress_env = env_wrapper_guard.get(); - if (FLAGS_write_fault_one_in) { - // In the write injection case, we need to use the FS interface and returns - // the IOStatus with different error and flags. Therefore, - // DbStressEnvWrapper cannot be used which will swallow the FS - // implementations. We should directly use the raw_env which is the - // CompositeEnvWrapper of env and fault_fs. - db_stress_env = raw_env; - } - FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); // The number of background threads should be at least as much the diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index bf01b788f..01f5d6763 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -700,6 +700,11 @@ class NonBatchedOpsStressTest : public StressTest { uint64_t count = 0; Status s; + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ++count; @@ -733,13 +738,20 @@ class NonBatchedOpsStressTest : public StressTest { s = iter->status(); } - if (!s.ok()) { + uint64_t error_count = 0; + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } + if (!s.ok() && (!fault_fs_guard || (fault_fs_guard && !error_count))) { fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); return s; } + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); + } thread->stats.AddPrefixes(1, count); return Status::OK(); diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 549051856..5261d79ea 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -26,6 +26,7 @@ #include "test_util/sync_point.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/string_util.h" #include "util/xxhash.h" @@ -412,6 +413,35 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n, return s; } +IOStatus TestFSRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { + IOStatus ret; + IOStatus s; + FSReadRequest res; + if (!fs_->IsFilesystemActive()) { + ret = fs_->GetError(); + } else { + ret = fs_->InjectThreadSpecificReadError( + FaultInjectionTestFS::ErrorOperation::kRead, &res.result, + use_direct_io(), req.scratch, /*need_count_increase=*/true, + /*fault_injected=*/nullptr); + } + if (ret.ok()) { + if (fs_->ShouldInjectRandomReadError()) { + ret = IOStatus::IOError("Injected read error"); + } else { + s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr); + } + } + if (!ret.ok()) { + res.status = ret; + cb(res, cb_arg); + } + return s; +} + IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) { @@ -803,6 +833,15 @@ IOStatus FaultInjectionTestFS::LinkFile(const std::string& s, return io_s; } +IOStatus FaultInjectionTestFS::Poll(std::vector& io_handles, + size_t min_completions) { + return target()->Poll(io_handles, min_completions); +} + +IOStatus FaultInjectionTestFS::AbortIO(std::vector& io_handles) { + return target()->AbortIO(io_handles); +} + void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) { MutexLock l(&mutex_); if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) { diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index 53c9ccb6f..cab0051bd 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -141,6 +141,10 @@ class TestFSRandomAccessFile : public FSRandomAccessFile { IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, Slice* result, char* scratch, IODebugContext* dbg) const override; + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) override; size_t GetRequiredBufferAlignment() const override { @@ -266,6 +270,11 @@ class FaultInjectionTestFS : public FileSystemWrapper { return io_s; } + virtual IOStatus Poll(std::vector& io_handles, + size_t min_completions) override; + + virtual IOStatus AbortIO(std::vector& io_handles) override; + void WritableFileClosed(const FSFileState& state); void WritableFileSynced(const FSFileState& state);