diff --git a/db_stress_tool/db_stress_env_wrapper.h b/db_stress_tool/db_stress_env_wrapper.h index 21f6db2ab..af60df9bc 100644 --- a/db_stress_tool/db_stress_env_wrapper.h +++ b/db_stress_tool/db_stress_env_wrapper.h @@ -12,13 +12,15 @@ #include "db_stress_tool/db_stress_common.h" namespace ROCKSDB_NAMESPACE { -class DbStressEnvWrapper : public EnvWrapper { +class DbStressFSWrapper : public FileSystemWrapper { public: - explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {} - static const char* kClassName() { return "DbStressEnv"; } + explicit DbStressFSWrapper(const std::shared_ptr& t) + : FileSystemWrapper(t) {} + static const char* kClassName() { return "DbStressFS"; } const char* Name() const override { return kClassName(); } - Status DeleteFile(const std::string& f) override { + IOStatus DeleteFile(const std::string& f, const IOOptions& opts, + IODebugContext* dbg) override { // We determine whether it is a manifest file by searching a strong, // so that there will be false positive if the directory path contains the // keyword but it is unlikely. @@ -28,11 +30,11 @@ class DbStressEnvWrapper : public EnvWrapper { f.find("checkpoint") != std::string::npos || f.find(".backup") != std::string::npos || f.find(".restore") != std::string::npos) { - return target()->DeleteFile(f); + return target()->DeleteFile(f, opts, dbg); } // Rename the file instead of deletion to keep the history, and // at the same time it is not visible to RocksDB. - return target()->RenameFile(f, f + "_renamed_"); + return target()->RenameFile(f, f + "_renamed_", opts, dbg); } // If true, all manifest files will not be delted in DeleteFile(). diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index e51b43176..a02d6bac1 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1046,9 +1046,11 @@ void StressTest::OperateDb(ThreadState* thread) { TestIterateAgainstExpected(thread, read_opts, rand_column_families, rand_keys); } else { - int num_seeks = static_cast( - std::min(static_cast(thread->rand.Uniform(4)), - FLAGS_ops_per_thread - i - 1)); + int num_seeks = static_cast(std::min( + std::max(static_cast(thread->rand.Uniform(4)), + static_cast(1)), + std::max(static_cast(FLAGS_ops_per_thread - i - 1), + static_cast(1)))); rand_keys = GenerateNKeys(thread, num_seeks, i); i += num_seeks - 1; TestIterate(thread, read_opts, rand_column_families, rand_keys); @@ -3025,7 +3027,7 @@ bool InitializeOptionsFromFile(Options& options) { FLAGS_options_file.c_str(), s.ToString().c_str()); exit(1); } - db_options.env = new DbStressEnvWrapper(db_stress_env); + db_options.env = new CompositeEnvWrapper(db_stress_env); options = Options(db_options, cf_descriptors[0].options); return true; } diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 6c5e952db..7f86ee8a6 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -29,8 +29,9 @@ namespace ROCKSDB_NAMESPACE { namespace { static std::shared_ptr env_guard; -static std::shared_ptr env_wrapper_guard; -static std::shared_ptr +static std::shared_ptr + env_wrapper_guard; +static std::shared_ptr dbsl_env_wrapper_guard; static std::shared_ptr fault_env_guard; } // namespace @@ -77,7 +78,7 @@ int db_stress_tool(int argc, char** argv) { s.ToString().c_str()); exit(1); } - dbsl_env_wrapper_guard = std::make_shared(raw_env); + dbsl_env_wrapper_guard = std::make_shared(raw_env); db_stress_listener_env = dbsl_env_wrapper_guard.get(); if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection || @@ -96,18 +97,10 @@ int db_stress_tool(int argc, char** argv) { raw_env = fault_env_guard.get(); } - env_wrapper_guard = std::make_shared(raw_env); + env_wrapper_guard = std::make_shared( + raw_env, std::make_shared(raw_env->GetFileSystem())); db_stress_env = env_wrapper_guard.get(); - if (FLAGS_write_fault_one_in) { - // In the write injection case, we need to use the FS interface and returns - // the IOStatus with different error and flags. Therefore, - // DbStressEnvWrapper cannot be used which will swallow the FS - // implementations. We should directly use the raw_env which is the - // CompositeEnvWrapper of env and fault_fs. - db_stress_env = raw_env; - } - FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); // The number of background threads should be at least as much the diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index bf01b788f..01f5d6763 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -700,6 +700,11 @@ class NonBatchedOpsStressTest : public StressTest { uint64_t count = 0; Status s; + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ++count; @@ -733,13 +738,20 @@ class NonBatchedOpsStressTest : public StressTest { s = iter->status(); } - if (!s.ok()) { + uint64_t error_count = 0; + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } + if (!s.ok() && (!fault_fs_guard || (fault_fs_guard && !error_count))) { fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); return s; } + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); + } thread->stats.AddPrefixes(1, count); return Status::OK(); diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 549051856..5261d79ea 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -26,6 +26,7 @@ #include "test_util/sync_point.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/string_util.h" #include "util/xxhash.h" @@ -412,6 +413,35 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n, return s; } +IOStatus TestFSRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { + IOStatus ret; + IOStatus s; + FSReadRequest res; + if (!fs_->IsFilesystemActive()) { + ret = fs_->GetError(); + } else { + ret = fs_->InjectThreadSpecificReadError( + FaultInjectionTestFS::ErrorOperation::kRead, &res.result, + use_direct_io(), req.scratch, /*need_count_increase=*/true, + /*fault_injected=*/nullptr); + } + if (ret.ok()) { + if (fs_->ShouldInjectRandomReadError()) { + ret = IOStatus::IOError("Injected read error"); + } else { + s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr); + } + } + if (!ret.ok()) { + res.status = ret; + cb(res, cb_arg); + } + return s; +} + IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) { @@ -803,6 +833,15 @@ IOStatus FaultInjectionTestFS::LinkFile(const std::string& s, return io_s; } +IOStatus FaultInjectionTestFS::Poll(std::vector& io_handles, + size_t min_completions) { + return target()->Poll(io_handles, min_completions); +} + +IOStatus FaultInjectionTestFS::AbortIO(std::vector& io_handles) { + return target()->AbortIO(io_handles); +} + void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) { MutexLock l(&mutex_); if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) { diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index 53c9ccb6f..cab0051bd 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -141,6 +141,10 @@ class TestFSRandomAccessFile : public FSRandomAccessFile { IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, Slice* result, char* scratch, IODebugContext* dbg) const override; + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) override; size_t GetRequiredBufferAlignment() const override { @@ -266,6 +270,11 @@ class FaultInjectionTestFS : public FileSystemWrapper { return io_s; } + virtual IOStatus Poll(std::vector& io_handles, + size_t min_completions) override; + + virtual IOStatus AbortIO(std::vector& io_handles) override; + void WritableFileClosed(const FSFileState& state); void WritableFileSynced(const FSFileState& state);