Enable ReadAsync testing and fault injection in db_stress (#11037)

Summary:
The db_stress code uses a wrapper Env on top of the raw/fault injection Env. The wrapper, DbStressEnvWrapper, is a legacy Env and thus has a default implementation of ReadAsync that just does a sync read. As a result, the ReadAsync implementations of PosixFileSystem and other file systems weren't being tested. Also, the ReadAsync interface wasn't implemented in FaultInjectionTestFS. This change implements the necessary interfaces in FaultInjectionTestFS and derives DbStressEnvWrapper from FileSystemWrapper rather than EnvWrapper.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11037

Test Plan: Run db_stress standalone and crash test. With this change, db_stress is able to repro the bug fixed in https://github.com/facebook/rocksdb/issues/10890.

Reviewed By: akankshamahajan15

Differential Revision: D42061290

Pulled By: anand1976

fbshipit-source-id: 7f0331fd15ee33fb4f7f0f4b22b206fe801ba074
main
anand76 2 years ago committed by Facebook GitHub Bot
parent f02c708aa3
commit c3f720c60d
  1. 14
      db_stress_tool/db_stress_env_wrapper.h
  2. 10
      db_stress_tool/db_stress_test_base.cc
  3. 19
      db_stress_tool/db_stress_tool.cc
  4. 14
      db_stress_tool/no_batched_ops_stress.cc
  5. 39
      utilities/fault_injection_fs.cc
  6. 9
      utilities/fault_injection_fs.h

@ -12,13 +12,15 @@
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class DbStressEnvWrapper : public EnvWrapper { class DbStressFSWrapper : public FileSystemWrapper {
public: public:
explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {} explicit DbStressFSWrapper(const std::shared_ptr<FileSystem>& t)
static const char* kClassName() { return "DbStressEnv"; } : FileSystemWrapper(t) {}
static const char* kClassName() { return "DbStressFS"; }
const char* Name() const override { return kClassName(); } const char* Name() const override { return kClassName(); }
Status DeleteFile(const std::string& f) override { IOStatus DeleteFile(const std::string& f, const IOOptions& opts,
IODebugContext* dbg) override {
// We determine whether it is a manifest file by searching a strong, // We determine whether it is a manifest file by searching a strong,
// so that there will be false positive if the directory path contains the // so that there will be false positive if the directory path contains the
// keyword but it is unlikely. // keyword but it is unlikely.
@ -28,11 +30,11 @@ class DbStressEnvWrapper : public EnvWrapper {
f.find("checkpoint") != std::string::npos || f.find("checkpoint") != std::string::npos ||
f.find(".backup") != std::string::npos || f.find(".backup") != std::string::npos ||
f.find(".restore") != std::string::npos) { f.find(".restore") != std::string::npos) {
return target()->DeleteFile(f); return target()->DeleteFile(f, opts, dbg);
} }
// Rename the file instead of deletion to keep the history, and // Rename the file instead of deletion to keep the history, and
// at the same time it is not visible to RocksDB. // at the same time it is not visible to RocksDB.
return target()->RenameFile(f, f + "_renamed_"); return target()->RenameFile(f, f + "_renamed_", opts, dbg);
} }
// If true, all manifest files will not be delted in DeleteFile(). // If true, all manifest files will not be delted in DeleteFile().

@ -1046,9 +1046,11 @@ void StressTest::OperateDb(ThreadState* thread) {
TestIterateAgainstExpected(thread, read_opts, rand_column_families, TestIterateAgainstExpected(thread, read_opts, rand_column_families,
rand_keys); rand_keys);
} else { } else {
int num_seeks = static_cast<int>( int num_seeks = static_cast<int>(std::min(
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)), std::max(static_cast<uint64_t>(thread->rand.Uniform(4)),
FLAGS_ops_per_thread - i - 1)); static_cast<uint64_t>(1)),
std::max(static_cast<uint64_t>(FLAGS_ops_per_thread - i - 1),
static_cast<uint64_t>(1))));
rand_keys = GenerateNKeys(thread, num_seeks, i); rand_keys = GenerateNKeys(thread, num_seeks, i);
i += num_seeks - 1; i += num_seeks - 1;
TestIterate(thread, read_opts, rand_column_families, rand_keys); TestIterate(thread, read_opts, rand_column_families, rand_keys);
@ -3025,7 +3027,7 @@ bool InitializeOptionsFromFile(Options& options) {
FLAGS_options_file.c_str(), s.ToString().c_str()); FLAGS_options_file.c_str(), s.ToString().c_str());
exit(1); exit(1);
} }
db_options.env = new DbStressEnvWrapper(db_stress_env); db_options.env = new CompositeEnvWrapper(db_stress_env);
options = Options(db_options, cf_descriptors[0].options); options = Options(db_options, cf_descriptors[0].options);
return true; return true;
} }

@ -29,8 +29,9 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace { namespace {
static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard; static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper> env_wrapper_guard; static std::shared_ptr<ROCKSDB_NAMESPACE::CompositeEnvWrapper>
static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper> env_wrapper_guard;
static std::shared_ptr<ROCKSDB_NAMESPACE::CompositeEnvWrapper>
dbsl_env_wrapper_guard; dbsl_env_wrapper_guard;
static std::shared_ptr<CompositeEnvWrapper> fault_env_guard; static std::shared_ptr<CompositeEnvWrapper> fault_env_guard;
} // namespace } // namespace
@ -77,7 +78,7 @@ int db_stress_tool(int argc, char** argv) {
s.ToString().c_str()); s.ToString().c_str());
exit(1); exit(1);
} }
dbsl_env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); dbsl_env_wrapper_guard = std::make_shared<CompositeEnvWrapper>(raw_env);
db_stress_listener_env = dbsl_env_wrapper_guard.get(); db_stress_listener_env = dbsl_env_wrapper_guard.get();
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection || if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
@ -96,18 +97,10 @@ int db_stress_tool(int argc, char** argv) {
raw_env = fault_env_guard.get(); raw_env = fault_env_guard.get();
} }
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); env_wrapper_guard = std::make_shared<CompositeEnvWrapper>(
raw_env, std::make_shared<DbStressFSWrapper>(raw_env->GetFileSystem()));
db_stress_env = env_wrapper_guard.get(); db_stress_env = env_wrapper_guard.get();
if (FLAGS_write_fault_one_in) {
// In the write injection case, we need to use the FS interface and returns
// the IOStatus with different error and flags. Therefore,
// DbStressEnvWrapper cannot be used which will swallow the FS
// implementations. We should directly use the raw_env which is the
// CompositeEnvWrapper of env and fault_fs.
db_stress_env = raw_env;
}
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// The number of background threads should be at least as much the // The number of background threads should be at least as much the

@ -700,6 +700,11 @@ class NonBatchedOpsStressTest : public StressTest {
uint64_t count = 0; uint64_t count = 0;
Status s; Status s;
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) { iter->Next()) {
++count; ++count;
@ -733,13 +738,20 @@ class NonBatchedOpsStressTest : public StressTest {
s = iter->status(); s = iter->status();
} }
if (!s.ok()) { uint64_t error_count = 0;
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
if (!s.ok() && (!fault_fs_guard || (fault_fs_guard && !error_count))) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1); thread->stats.AddErrors(1);
return s; return s;
} }
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
thread->stats.AddPrefixes(1, count); thread->stats.AddPrefixes(1, count);
return Status::OK(); return Status::OK();

@ -26,6 +26,7 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
@ -412,6 +413,35 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
return s; return s;
} }
IOStatus TestFSRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
IOStatus ret;
IOStatus s;
FSReadRequest res;
if (!fs_->IsFilesystemActive()) {
ret = fs_->GetError();
} else {
ret = fs_->InjectThreadSpecificReadError(
FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
use_direct_io(), req.scratch, /*need_count_increase=*/true,
/*fault_injected=*/nullptr);
}
if (ret.ok()) {
if (fs_->ShouldInjectRandomReadError()) {
ret = IOStatus::IOError("Injected read error");
} else {
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
}
}
if (!ret.ok()) {
res.status = ret;
cb(res, cb_arg);
}
return s;
}
IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, const IOOptions& options,
IODebugContext* dbg) { IODebugContext* dbg) {
@ -803,6 +833,15 @@ IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
return io_s; return io_s;
} }
IOStatus FaultInjectionTestFS::Poll(std::vector<void*>& io_handles,
size_t min_completions) {
return target()->Poll(io_handles, min_completions);
}
IOStatus FaultInjectionTestFS::AbortIO(std::vector<void*>& io_handles) {
return target()->AbortIO(io_handles);
}
void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) { void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) { if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {

@ -141,6 +141,10 @@ class TestFSRandomAccessFile : public FSRandomAccessFile {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch, Slice* result, char* scratch,
IODebugContext* dbg) const override; IODebugContext* dbg) const override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override;
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override; const IOOptions& options, IODebugContext* dbg) override;
size_t GetRequiredBufferAlignment() const override { size_t GetRequiredBufferAlignment() const override {
@ -266,6 +270,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
return io_s; return io_s;
} }
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t min_completions) override;
virtual IOStatus AbortIO(std::vector<void*>& io_handles) override;
void WritableFileClosed(const FSFileState& state); void WritableFileClosed(const FSFileState& state);
void WritableFileSynced(const FSFileState& state); void WritableFileSynced(const FSFileState& state);

Loading…
Cancel
Save