Make the Env class Customizable (#9293)

Summary:
Allows the Env to have options (Configurable) and loads like other Customizable classes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9293

Reviewed By: pdillinger, zhichao-cao

Differential Revision: D33181591

Pulled By: mrambacher

fbshipit-source-id: 55e823886c654d214eda9eedd45ccdc54dac14d7
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 677d2b4a8f
commit fe31dc53ca
  1. 1
      HISTORY.md
  2. 30
      db/corruption_test.cc
  3. 4
      db/db_basic_test.cc
  4. 3
      db/db_secondary_test.cc
  5. 4
      db/db_sst_test.cc
  6. 3
      db/db_test_util.h
  7. 2
      db/external_sst_file_test.cc
  8. 2
      db/listener_test.cc
  9. 2
      db/merge_test.cc
  10. 2
      db_stress_tool/db_stress_env_wrapper.h
  11. 2
      db_stress_tool/multi_ops_txns_stress.cc
  12. 81
      env/composite_env.cc
  13. 72
      env/composite_env_wrapper.h
  14. 195
      env/env.cc
  15. 20
      env/env_posix.cc
  16. 218
      env/env_test.cc
  17. 7
      env/file_system.cc
  18. 19
      env/mock_env.cc
  19. 4
      env/mock_env.h
  20. 8
      hdfs/env_hdfs.h
  21. 184
      include/rocksdb/env.h
  22. 21
      options/options_test.cc
  23. 4
      port/win/env_win.cc
  24. 3
      port/win/env_win.h
  25. 23
      test_util/testutil.h
  26. 1
      tools/db_bench_tool.cc
  27. 11
      tools/ldb_cmd_test.cc
  28. 2
      utilities/backupable/backupable_db_test.cc
  29. 3
      utilities/fault_injection_env.h
  30. 73
      utilities/object_registry_test.cc
  31. 2
      utilities/ttl/ttl_test.cc

@ -3,6 +3,7 @@
### Public API change
* Added values to `TraceFilterType`: `kTraceFilterIteratorSeek`, `kTraceFilterIteratorSeekForPrev`, and `kTraceFilterMultiGet`. They can be set in `TraceOptions` to filter out the operation types after which they are named.
* Added `TraceOptions::preserve_write_order`. When enabled it guarantees write records are traced in the same order they are logged to WAL and applied to the DB. By default it is disabled (false) to match the legacy behavior and prevent regression.
* Made the Env class extend the Customizable class. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
## 6.28.0 (2021-12-17)
### New Features

@ -39,11 +39,35 @@
namespace ROCKSDB_NAMESPACE {
static constexpr int kValueSize = 1000;
namespace {
// A wrapper that allows injection of errors.
class ErrorEnv : public EnvWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;
explicit ErrorEnv(Env* _target)
: EnvWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0) {}
const char* Name() const override { return "ErrorEnv"; }
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return Status::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, result, soptions);
}
};
} // namespace
class CorruptionTest : public testing::Test {
public:
std::shared_ptr<Env> env_guard_;
test::ErrorEnv* env_;
ErrorEnv* env_;
std::string dbname_;
std::shared_ptr<Cache> tiny_cache_;
Options options_;
@ -58,7 +82,7 @@ class CorruptionTest : public testing::Test {
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
EXPECT_NE(base_env, nullptr);
env_ = new test::ErrorEnv(base_env);
env_ = new ErrorEnv(base_env);
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options_.env = env_;
dbname_ = test::PerThreadDBPath(env_, "corruption_test");

@ -1048,6 +1048,8 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
static const char* kClassName() { return "TestEnv"; }
const char* Name() const override { return kClassName(); }
class TestLogger : public Logger {
public:
@ -3064,6 +3066,8 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
public:
FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
static const char* kClassName() { return "FakeDirectIOEnv"; }
const char* Name() const override { return kClassName(); }
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,

@ -426,6 +426,9 @@ namespace {
class TraceFileEnv : public EnvWrapper {
public:
explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
static const char* kClassName() { return "TraceFileEnv"; }
const char* Name() const override { return kClassName(); }
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& env_options) override {

@ -789,8 +789,8 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
Status DeleteFile(const std::string& fname) {
const char* Name() const override { return "MyEnv"; }
Status DeleteFile(const std::string& fname) override {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}

@ -115,6 +115,9 @@ class SpecialEnv : public EnvWrapper {
public:
explicit SpecialEnv(Env* base, bool time_elapse_only_sleep = false);
static const char* kClassName() { return "SpecialEnv"; }
const char* Name() const override { return kClassName(); }
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& soptions) override {
class SSTableFile : public WritableFile {

@ -27,6 +27,8 @@ class ExternalSSTTestEnv : public EnvWrapper {
public:
ExternalSSTTestEnv(Env* t, bool fail_link)
: EnvWrapper(t), fail_link_(fail_link) {}
static const char* kClassName() { return "ExternalSSTTestEnv"; }
const char* Name() const override { return kClassName(); }
Status LinkFile(const std::string& s, const std::string& t) override {
if (fail_link_) {

@ -690,6 +690,8 @@ class TableFileCreationListener : public EventListener {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* t) : EnvWrapper(t) {}
static const char* kClassName() { return "TestEnv"; }
const char* Name() const override { return kClassName(); }
void SetStatus(Status s) { status_ = s; }

@ -72,6 +72,8 @@ class CountMergeOperator : public AssociativeMergeOperator {
class EnvMergeTest : public EnvWrapper {
public:
EnvMergeTest() : EnvWrapper(Env::Default()) {}
static const char* kClassName() { return "MergeEnv"; }
const char* Name() const override { return kClassName(); }
// ~EnvMergeTest() override {}
uint64_t NowNanos() override {

@ -15,6 +15,8 @@ namespace ROCKSDB_NAMESPACE {
class DbStressEnvWrapper : public EnvWrapper {
public:
explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {}
static const char* kClassName() { return "DbStressEnv"; }
const char* Name() const override { return kClassName(); }
Status DeleteFile(const std::string& f) override {
// We determine whether it is a manifest file by searching a strong,

@ -20,7 +20,9 @@ namespace ROCKSDB_NAMESPACE {
// TODO: move these to gflags.
static constexpr uint32_t kInitNumC = 1000;
#ifndef ROCKSDB_LITE
static constexpr uint32_t kInitialCARatio = 3;
#endif // ROCKSDB_LITE
static constexpr bool kDoPreload = true;
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
//
#include "env/composite_env_wrapper.h"
#include "rocksdb/utilities/options_type.h"
namespace ROCKSDB_NAMESPACE {
namespace {
@ -380,4 +381,84 @@ Status CompositeEnv::NewDirectory(const std::string& name,
return status;
}
namespace {
static std::unordered_map<std::string, OptionTypeInfo>
composite_env_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"target",
{0, OptionType::kCustomizable, OptionVerificationType::kByName,
OptionTypeFlags::kDontSerialize | OptionTypeFlags::kRawPointer,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto target = static_cast<EnvWrapper::Target*>(addr);
return Env::CreateFromString(opts, value, &(target->env),
&(target->guard));
},
nullptr, nullptr}},
#endif // ROCKSDB_LITE
};
static std::unordered_map<std::string, OptionTypeInfo>
composite_fs_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"file_system",
OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif // ROCKSDB_LITE
};
static std::unordered_map<std::string, OptionTypeInfo>
composite_clock_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"clock",
OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif // ROCKSDB_LITE
};
} // namespace
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}
CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), target_(env) {
RegisterOptions("", &target_, &composite_env_wrapper_type_info);
RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}
CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), target_(env) {
RegisterOptions("", &target_, &composite_env_wrapper_type_info);
RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}
Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
target_.Prepare();
if (file_system_ == nullptr) {
file_system_ = target_.env->GetFileSystem();
}
if (system_clock_ == nullptr) {
system_clock_ = target_.env->GetSystemClock();
}
return Env::PrepareOptions(options);
}
#ifndef ROCKSDB_LITE
std::string CompositeEnvWrapper::SerializeOptions(
const ConfigOptions& config_options, const std::string& header) const {
auto options = CompositeEnv::SerializeOptions(config_options, header);
if (target_.env != nullptr && target_.env != Env::Default()) {
options.append("target=");
options.append(target_.env->ToString(config_options));
}
return options;
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -254,6 +254,8 @@ class CompositeEnvWrapper : public CompositeEnv {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env)
: CompositeEnvWrapper(env, env->GetFileSystem(), env->GetSystemClock()) {}
explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<FileSystem>& fs)
: CompositeEnvWrapper(env, fs, env->GetSystemClock()) {}
@ -261,82 +263,110 @@ class CompositeEnvWrapper : public CompositeEnv {
: CompositeEnvWrapper(env, env->GetFileSystem(), sc) {}
explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc);
explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs)
: CompositeEnvWrapper(env, fs, env->GetSystemClock()) {}
explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), env_target_(env) {}
: CompositeEnvWrapper(env, env->GetFileSystem(), sc) {}
explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc);
static const char* kClassName() { return "CompositeEnv"; }
const char* Name() const override { return kClassName(); }
bool IsInstanceOf(const std::string& name) const override {
if (name == kClassName()) {
return true;
} else {
return CompositeEnv::IsInstanceOf(name);
}
}
const Customizable* Inner() const override { return target_.env; }
Status PrepareOptions(const ConfigOptions& options) override;
#ifndef ROCKSDB_LITE
std::string SerializeOptions(const ConfigOptions& config_options,
const std::string& header) const override;
#endif // ROCKSDB_LITE
// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }
Env* env_target() const { return target_.env; }
#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override {
return env_target_->LoadLibrary(lib_name, search_path, result);
return target_.env->LoadLibrary(lib_name, search_path, result);
}
#endif
void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr, void (*u)(void* arg) = nullptr) override {
return env_target_->Schedule(f, a, pri, tag, u);
return target_.env->Schedule(f, a, pri, tag, u);
}
int UnSchedule(void* tag, Priority pri) override {
return env_target_->UnSchedule(tag, pri);
return target_.env->UnSchedule(tag, pri);
}
void StartThread(void (*f)(void*), void* a) override {
return env_target_->StartThread(f, a);
return target_.env->StartThread(f, a);
}
void WaitForJoin() override { return env_target_->WaitForJoin(); }
void WaitForJoin() override { return target_.env->WaitForJoin(); }
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return env_target_->GetThreadPoolQueueLen(pri);
return target_.env->GetThreadPoolQueueLen(pri);
}
Status GetHostName(char* name, uint64_t len) override {
return env_target_->GetHostName(name, len);
return target_.env->GetHostName(name, len);
}
void SetBackgroundThreads(int num, Priority pri) override {
return env_target_->SetBackgroundThreads(num, pri);
return target_.env->SetBackgroundThreads(num, pri);
}
int GetBackgroundThreads(Priority pri) override {
return env_target_->GetBackgroundThreads(pri);
return target_.env->GetBackgroundThreads(pri);
}
Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
return env_target_->SetAllowNonOwnerAccess(allow_non_owner_access);
return target_.env->SetAllowNonOwnerAccess(allow_non_owner_access);
}
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
return env_target_->IncBackgroundThreadsIfNeeded(num, pri);
return target_.env->IncBackgroundThreadsIfNeeded(num, pri);
}
void LowerThreadPoolIOPriority(Priority pool) override {
env_target_->LowerThreadPoolIOPriority(pool);
target_.env->LowerThreadPoolIOPriority(pool);
}
void LowerThreadPoolCPUPriority(Priority pool) override {
env_target_->LowerThreadPoolCPUPriority(pool);
target_.env->LowerThreadPoolCPUPriority(pool);
}
Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
return env_target_->LowerThreadPoolCPUPriority(pool, pri);
return target_.env->LowerThreadPoolCPUPriority(pool, pri);
}
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
return env_target_->GetThreadList(thread_list);
return target_.env->GetThreadList(thread_list);
}
ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return env_target_->GetThreadStatusUpdater();
return target_.env->GetThreadStatusUpdater();
}
uint64_t GetThreadID() const override { return env_target_->GetThreadID(); }
uint64_t GetThreadID() const override { return target_.env->GetThreadID(); }
std::string GenerateUniqueId() override {
return env_target_->GenerateUniqueId();
return target_.env->GenerateUniqueId();
}
private:
Env* env_target_;
EnvWrapper::Target target_;
};
} // namespace ROCKSDB_NAMESPACE

195
env/env.cc vendored

@ -13,6 +13,7 @@
#include "env/composite_env_wrapper.h"
#include "env/emulated_clock.h"
#include "env/mock_env.h"
#include "env/unique_id_gen.h"
#include "logging/env_logger.h"
#include "memory/arena.h"
@ -29,13 +30,43 @@
namespace ROCKSDB_NAMESPACE {
namespace {
#ifndef ROCKSDB_LITE
static int RegisterBuiltinEnvs(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<Env>(MockEnv::kClassName(), [](const std::string& /*uri*/,
std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(MockEnv::Create(Env::Default()));
return guard->get();
});
library.Register<Env>(
CompositeEnvWrapper::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new CompositeEnvWrapper(Env::Default()));
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
#endif // ROCKSDB_LITE
static void RegisterSystemEnvs() {
#ifndef ROCKSDB_LITE
static std::once_flag loaded;
std::call_once(loaded, [&]() {
RegisterBuiltinEnvs(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
}
class LegacySystemClock : public SystemClock {
private:
Env* env_;
public:
explicit LegacySystemClock(Env* env) : env_(env) {}
const char* Name() const override { return "Legacy System Clock"; }
const char* Name() const override { return "LegacySystemClock"; }
// Returns the number of micro-seconds since some fixed point in time.
// It is often used as system time such as in GenericRateLimiter
@ -66,6 +97,16 @@ class LegacySystemClock : public SystemClock {
std::string TimeToString(uint64_t time) override {
return env_->TimeToString(time);
}
#ifndef ROCKSDB_LITE
std::string SerializeOptions(const ConfigOptions& /*config_options*/,
const std::string& /*prefix*/) const override {
// We do not want the LegacySystemClock to appear in the serialized output.
// This clock is an internal class for those who do not implement one and
// would be part of the Env. As such, do not serialize it here.
return "";
}
#endif // ROCKSDB_LITE
};
class LegacySequentialFileWrapper : public FSSequentialFile {
@ -561,6 +602,15 @@ class LegacyFileSystemWrapper : public FileSystem {
return status_to_io_status(target_->IsDirectory(path, is_dir));
}
#ifndef ROCKSDB_LITE
std::string SerializeOptions(const ConfigOptions& /*config_options*/,
const std::string& /*prefix*/) const override {
// We do not want the LegacyFileSystem to appear in the serialized output.
// This clock is an internal class for those who do not implement one and
// would be part of the Env. As such, do not serialize it here.
return "";
}
#endif // ROCKSDB_LITE
private:
Env* target_;
};
@ -594,19 +644,19 @@ Status Env::LoadEnv(const std::string& value, Env** result) {
Status Env::CreateFromString(const ConfigOptions& config_options,
const std::string& value, Env** result) {
Env* env = *result;
Status s;
#ifndef ROCKSDB_LITE
(void)config_options;
s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
#else
(void)config_options;
s = Status::NotSupported("Cannot load environment in LITE mode", value);
#endif
if (s.ok()) {
*result = env;
Env* base = Env::Default();
if (value.empty() || base->IsInstanceOf(value)) {
*result = base;
return Status::OK();
} else {
RegisterSystemEnvs();
Env* env = *result;
Status s = LoadStaticObject<Env>(config_options, value, nullptr, &env);
if (s.ok()) {
*result = env;
}
return s;
}
return s;
}
Status Env::LoadEnv(const std::string& value, Env** result,
@ -618,37 +668,46 @@ Status Env::CreateFromString(const ConfigOptions& config_options,
const std::string& value, Env** result,
std::shared_ptr<Env>* guard) {
assert(result);
if (value.empty()) {
*result = Env::Default();
return Status::OK();
}
Status s;
#ifndef ROCKSDB_LITE
Env* env = nullptr;
std::unique_ptr<Env> uniq_guard;
std::string err_msg;
assert(guard != nullptr);
(void)config_options;
env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
&err_msg);
if (!env) {
s = Status::NotSupported(std::string("Cannot load ") + Env::Type() + ": " +
value);
env = Env::Default();
}
if (s.ok() && uniq_guard) {
guard->reset(uniq_guard.release());
*result = guard->get();
} else {
*result = env;
std::unique_ptr<Env> uniq;
Env* env = *result;
std::string id;
std::unordered_map<std::string, std::string> opt_map;
Status status =
Customizable::GetOptionsMap(config_options, env, value, &id, &opt_map);
if (!status.ok()) { // GetOptionsMap failed
return status;
}
Env* base = Env::Default();
if (id.empty() || base->IsInstanceOf(id)) {
env = base;
status = Status::OK();
} else {
RegisterSystemEnvs();
#ifndef ROCKSDB_LITE
std::string errmsg;
env = config_options.registry->NewObject<Env>(id, &uniq, &errmsg);
if (!env) {
status = Status::NotSupported(
std::string("Cannot load environment[") + id + "]: ", errmsg);
}
#else
(void)config_options;
(void)result;
(void)guard;
s = Status::NotSupported("Cannot load environment in LITE mode", value);
status =
Status::NotSupported("Cannot load environment in LITE mode", value);
#endif
return s;
}
if (config_options.ignore_unsupported_options && status.IsNotSupported()) {
status = Status::OK();
} else if (status.ok()) {
status = Customizable::ConfigureNewObject(config_options, env, opt_map);
}
if (status.ok()) {
guard->reset(uniq.release());
*result = env;
}
return status;
}
Status Env::CreateFromUri(const ConfigOptions& config_options,
@ -1029,9 +1088,65 @@ Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
return ReadFileToString(fs.get(), fname, data);
}
namespace {
static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"target",
{0, OptionType::kCustomizable, OptionVerificationType::kByName,
OptionTypeFlags::kDontSerialize | OptionTypeFlags::kRawPointer,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
EnvWrapper::Target* target = static_cast<EnvWrapper::Target*>(addr);
return Env::CreateFromString(opts, value, &(target->env),
&(target->guard));
},
nullptr, nullptr}},
#endif // ROCKSDB_LITE
};
} // namespace
EnvWrapper::EnvWrapper(Env* t) : target_(t) {
RegisterOptions("", &target_, &env_wrapper_type_info);
}
EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
RegisterOptions("", &target_, &env_wrapper_type_info);
}
EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
RegisterOptions("", &target_, &env_wrapper_type_info);
}
EnvWrapper::~EnvWrapper() {
}
Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
target_.Prepare();
return Env::PrepareOptions(options);
}
#ifndef ROCKSDB_LITE
std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
const std::string& header) const {
auto parent = Env::SerializeOptions(config_options, "");
if (config_options.IsShallow() || target_.env == nullptr ||
target_.env == Env::Default()) {
return parent;
} else {
std::string result = header;
if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
result.append(OptionTypeInfo::kIdPropName()).append("=");
}
result.append(parent);
if (!EndsWith(result, config_options.delimiter)) {
result.append(config_options.delimiter);
}
result.append("target=").append(target_.env->ToString(config_options));
return result;
}
}
#endif // ROCKSDB_LITE
namespace { // anonymous namespace
void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {

20
env/env_posix.cc vendored

@ -209,7 +209,10 @@ class PosixClock : public SystemClock {
class PosixEnv : public CompositeEnv {
public:
PosixEnv(const PosixEnv* default_env, const std::shared_ptr<FileSystem>& fs);
static const char* kClassName() { return "PosixEnv"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kDefaultName(); }
~PosixEnv() override {
if (this == Env::Default()) {
for (const auto tid : threads_to_join_) {
@ -419,16 +422,6 @@ PosixEnv::PosixEnv()
thread_status_updater_ = CreateThreadStatusUpdater();
}
PosixEnv::PosixEnv(const PosixEnv* default_env,
const std::shared_ptr<FileSystem>& fs)
: CompositeEnv(fs, default_env->GetSystemClock()),
thread_pools_(default_env->thread_pools_),
mu_(default_env->mu_),
threads_to_join_(default_env->threads_to_join_),
allow_non_owner_access_(default_env->allow_non_owner_access_) {
thread_status_updater_ = default_env->thread_status_updater_;
}
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
@ -499,11 +492,6 @@ Env* Env::Default() {
return &default_env;
}
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
}
//
// Default Posix SystemClock
//

218
env/env_test.cc vendored

@ -40,11 +40,13 @@
#include "env/env_chroot.h"
#include "env/env_encryption_ctr.h"
#include "env/fs_readonly.h"
#include "env/mock_env.h"
#include "env/unique_id_gen.h"
#include "logging/log_buffer.h"
#include "logging/logging.h"
#include "port/malloc.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/env_encryption.h"
@ -2136,29 +2138,29 @@ class TestEnv : public EnvWrapper {
public:
explicit TestEnv() : EnvWrapper(Env::Default()),
close_count(0) { }
class TestLogger : public Logger {
public:
using Logger::Logv;
TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() override {
if (!closed_) {
Status s = CloseHelper();
s.PermitUncheckedError();
const char* Name() const override { return "TestEnv"; }
class TestLogger : public Logger {
public:
using Logger::Logv;
explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() override {
if (!closed_) {
Status s = CloseHelper();
s.PermitUncheckedError();
}
}
}
void Logv(const char* /*format*/, va_list /*ap*/) override{};
void Logv(const char* /*format*/, va_list /*ap*/) override {}
protected:
Status CloseImpl() override { return CloseHelper(); }
protected:
Status CloseImpl() override { return CloseHelper(); }
private:
Status CloseHelper() {
env->CloseCountInc();;
return Status::OK();
}
TestEnv* env;
};
private:
Status CloseHelper() {
env->CloseCountInc();
return Status::OK();
}
TestEnv* env;
};
void CloseCountInc() { close_count++; }
@ -2894,9 +2896,185 @@ TEST_F(EnvTest, FailureToCreateLockFile) {
// Clean up
ASSERT_OK(DestroyDir(env, dir));
}
TEST_F(EnvTest, CreateDefaultEnv) {
ConfigOptions options;
options.ignore_unsupported_options = false;
std::shared_ptr<Env> guard;
Env* env = nullptr;
ASSERT_OK(Env::CreateFromString(options, "", &env));
ASSERT_EQ(env, Env::Default());
env = nullptr;
ASSERT_OK(Env::CreateFromString(options, Env::kDefaultName(), &env));
ASSERT_EQ(env, Env::Default());
env = nullptr;
ASSERT_OK(Env::CreateFromString(options, "", &env, &guard));
ASSERT_EQ(env, Env::Default());
ASSERT_EQ(guard, nullptr);
env = nullptr;
ASSERT_OK(Env::CreateFromString(options, Env::kDefaultName(), &env, &guard));
ASSERT_EQ(env, Env::Default());
ASSERT_EQ(guard, nullptr);
#ifndef ROCKSDB_LITE
std::string opt_str = env->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env));
ASSERT_EQ(env, Env::Default());
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &guard));
ASSERT_EQ(env, Env::Default());
ASSERT_EQ(guard, nullptr);
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
namespace {
class WrappedEnv : public EnvWrapper {
public:
explicit WrappedEnv(Env* t) : EnvWrapper(t) {}
explicit WrappedEnv(const std::shared_ptr<Env>& t) : EnvWrapper(t) {}
static const char* kClassName() { return "WrappedEnv"; }
const char* Name() const override { return kClassName(); }
static void Register(ObjectLibrary& lib, const std::string& /*arg*/) {
lib.Register<Env>(WrappedEnv::kClassName(), [](const std::string& /*uri*/,
std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(nullptr));
return guard->get();
});
}
};
} // namespace
TEST_F(EnvTest, CreateMockEnv) {
ConfigOptions options;
options.ignore_unsupported_options = false;
WrappedEnv::Register(*(options.registry->AddLibrary("test")), "");
std::shared_ptr<Env> guard, copy;
std::string opt_str;
Env* env = nullptr;
ASSERT_NOK(Env::CreateFromString(options, MockEnv::kClassName(), &env));
ASSERT_OK(
Env::CreateFromString(options, MockEnv::kClassName(), &env, &guard));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
opt_str = env->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(copy, guard);
std::string mismatch;
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
guard.reset(MockEnv::Create(Env::Default(), SystemClock::Default()));
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
std::unique_ptr<Env> wrapped_env(new WrappedEnv(Env::Default()));
guard.reset(MockEnv::Create(wrapped_env.get(), SystemClock::Default()));
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
opt_str = copy->ToString(options);
}
TEST_F(EnvTest, CreateWrappedEnv) {
ConfigOptions options;
options.ignore_unsupported_options = false;
WrappedEnv::Register(*(options.registry->AddLibrary("test")), "");
Env* env = nullptr;
std::shared_ptr<Env> guard, copy;
std::string opt_str;
std::string mismatch;
ASSERT_NOK(Env::CreateFromString(options, WrappedEnv::kClassName(), &env));
ASSERT_OK(
Env::CreateFromString(options, WrappedEnv::kClassName(), &env, &guard));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_FALSE(guard->AreEquivalent(options, Env::Default(), &mismatch));
opt_str = env->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(copy, guard);
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
guard.reset(new WrappedEnv(std::make_shared<WrappedEnv>(Env::Default())));
ASSERT_NE(guard.get(), env);
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(copy, guard);
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
guard.reset(new WrappedEnv(std::make_shared<WrappedEnv>(
std::make_shared<WrappedEnv>(Env::Default()))));
ASSERT_NE(guard.get(), env);
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(copy, guard);
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
}
TEST_F(EnvTest, CreateCompositeEnv) {
ConfigOptions options;
options.ignore_unsupported_options = false;
std::shared_ptr<Env> guard, copy;
Env* env = nullptr;
std::string mismatch, opt_str;
WrappedEnv::Register(*(options.registry->AddLibrary("test")), "");
std::unique_ptr<Env> base(NewCompositeEnv(FileSystem::Default()));
std::unique_ptr<Env> wrapped(new WrappedEnv(Env::Default()));
std::shared_ptr<FileSystem> timed_fs =
std::make_shared<TimedFileSystem>(FileSystem::Default());
std::shared_ptr<SystemClock> clock =
std::make_shared<EmulatedSystemClock>(SystemClock::Default());
opt_str = base->ToString(options);
ASSERT_NOK(Env::CreateFromString(options, opt_str, &env));
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &guard));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_EQ(env->GetFileSystem(), FileSystem::Default());
ASSERT_EQ(env->GetSystemClock(), SystemClock::Default());
base = NewCompositeEnv(timed_fs);
opt_str = base->ToString(options);
ASSERT_NOK(Env::CreateFromString(options, opt_str, &env));
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &guard));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_NE(env->GetFileSystem(), FileSystem::Default());
ASSERT_EQ(env->GetSystemClock(), SystemClock::Default());
env = nullptr;
guard.reset(new CompositeEnvWrapper(wrapped.get(), timed_fs));
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
env = nullptr;
guard.reset(new CompositeEnvWrapper(wrapped.get(), clock));
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
env = nullptr;
guard.reset(new CompositeEnvWrapper(wrapped.get(), timed_fs, clock));
opt_str = guard->ToString(options);
ASSERT_OK(Env::CreateFromString(options, opt_str, &env, &copy));
ASSERT_NE(env, nullptr);
ASSERT_NE(env, Env::Default());
ASSERT_TRUE(guard->AreEquivalent(options, copy.get(), &mismatch));
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -56,6 +56,13 @@ static int RegisterBuiltinFileSystems(ObjectLibrary& library,
}
return guard->get();
});
library.Register<FileSystem>(
MockFileSystem::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<FileSystem>* guard,
std::string* /*errmsg*/) {
guard->reset(new MockFileSystem(SystemClock::Default()));
return guard->get();
});
#ifndef OS_WIN
library.Register<FileSystem>(
ChrootFileSystem::kClassName(),

19
env/mock_env.cc vendored

@ -565,12 +565,21 @@ class TestMemLogger : public Logger {
}
size_t GetLogFileSize() const override { return log_size_; }
};
static std::unordered_map<std::string, OptionTypeInfo> mock_fs_type_info = {
#ifndef ROCKSDB_LITE
{"supports_direct_io",
{0, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
} // namespace
MockFileSystem::MockFileSystem(const std::shared_ptr<SystemClock>& clock,
bool supports_direct_io)
: system_clock_(clock), supports_direct_io_(supports_direct_io) {
clock_ = system_clock_.get();
RegisterOptions("", &supports_direct_io_, &mock_fs_type_info);
}
MockFileSystem::~MockFileSystem() {
@ -578,6 +587,16 @@ MockFileSystem::~MockFileSystem() {
i->second->Unref();
}
}
Status MockFileSystem::PrepareOptions(const ConfigOptions& options) {
Status s = FileSystem::PrepareOptions(options);
if (s.ok() && system_clock_ == SystemClock::Default()) {
system_clock_ = options.env->GetSystemClock();
clock_ = system_clock_.get();
}
return s;
}
IOStatus MockFileSystem::GetAbsolutePath(const std::string& db_path,
const IOOptions& /*options*/,
std::string* output_path,

4
env/mock_env.h vendored

@ -107,6 +107,7 @@ class MockFileSystem : public FileSystem {
}
Status CorruptBuffer(const std::string& fname);
Status PrepareOptions(const ConfigOptions& options) override;
private:
bool RenameFileInternal(const std::string& src, const std::string& dest);
@ -130,6 +131,9 @@ class MockEnv : public CompositeEnvWrapper {
static MockEnv* Create(Env* base);
static MockEnv* Create(Env* base, const std::shared_ptr<SystemClock>& clock);
static const char* kClassName() { return "MockEnv"; }
const char* Name() const override { return kClassName(); }
Status CorruptBuffer(const std::string& fname);
private:
MockEnv(Env* env, const std::shared_ptr<FileSystem>& fs,

@ -48,6 +48,10 @@ class HdfsEnv : public Env {
posixEnv = Env::Default();
fileSys_ = connectToPath(fsname_);
}
static const char* kClassName() { return "HdfsEnv"; }
const char* Name() const override { return kClassName(); }
static const char* kNickName() { return "hdfs"; }
const char* NickName() const override { return kNickName(); }
virtual ~HdfsEnv() {
fprintf(stderr, "Destroying HdfsEnv::Default()\n");
@ -242,6 +246,10 @@ class HdfsEnv : public Env {
fprintf(stderr, "Please see hdfs/README for details\n");
abort();
}
static const char* kClassName() { return "HdfsEnv"; }
const char* Name() const override { return kClassName(); }
static const char* kNickName() { return "hdfs"; }
const char* NickName() const override { return kNickName(); }
virtual ~HdfsEnv() {
}

@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include "rocksdb/customizable.h"
#include "rocksdb/functor_wrapper.h"
#include "rocksdb/status.h"
#include "rocksdb/thread_status.h"
@ -148,8 +149,9 @@ struct EnvOptions {
// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
class Env {
class Env : public Customizable {
public:
static const char* kDefaultName() { return "DefaultEnv"; }
struct FileAttributes {
// File name
std::string name;
@ -172,6 +174,10 @@ class Env {
static const char* Type() { return "Environment"; }
// Deprecated. Will be removed in a major release. Derived classes
// should implement this method.
const char* Name() const override { return ""; }
// Loads the environment specified by the input value into the result
// The CreateFromString alternative should be used; this method may be
// deprecated in a future release.
@ -1339,253 +1345,297 @@ extern Status ReadFileToString(Env* env, const std::string& fname,
// functionality of another Env.
class EnvWrapper : public Env {
public:
// The Target struct allows an Env to be stored as a raw (Env*) or
// std::shared_ptr<Env>. By using this struct, the wrapping/calling
// class does not need to worry about the ownership/lifetime of the
// wrapped target env. If the guard is set, then the Env will point
// to the guard.get().
struct Target {
Env* env; // The raw Env
std::shared_ptr<Env> guard; // The guarded Env
// Creates a Target without assuming ownership of the target Env
explicit Target(Env* t) : env(t) {}
// Creates a Target from the guarded env, assuming ownership
explicit Target(std::unique_ptr<Env>&& t) : guard(t.release()) {
env = guard.get();
}
// Creates a Target from the guarded env, assuming ownership
explicit Target(const std::shared_ptr<Env>& t) : guard(t) {
env = guard.get();
}
// Makes sure the raw Env is not nullptr
void Prepare() {
if (guard.get() != nullptr) {
env = guard.get();
} else if (env == nullptr) {
env = Env::Default();
}
}
};
// Initialize an EnvWrapper that delegates all calls to *t
explicit EnvWrapper(Env* t) : target_(t) {}
explicit EnvWrapper(Env* t);
explicit EnvWrapper(std::unique_ptr<Env>&& t);
explicit EnvWrapper(const std::shared_ptr<Env>& t);
~EnvWrapper() override;
// Return the target to which this Env forwards all calls
Env* target() const { return target_; }
Env* target() const { return target_.env; }
// Deprecated. Will be removed in a major release. Derived classes
// should implement this method.
const char* Name() const override { return target_.env->Name(); }
// The following text is boilerplate that forwards all methods to target()
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return target_->RegisterDbPaths(paths);
return target_.env->RegisterDbPaths(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
return target_->UnregisterDbPaths(paths);
return target_.env->UnregisterDbPaths(paths);
}
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& options) override {
return target_->NewSequentialFile(f, r, options);
return target_.env->NewSequentialFile(f, r, options);
}
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& options) override {
return target_->NewRandomAccessFile(f, r, options);
return target_.env->NewRandomAccessFile(f, r, options);
}
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& options) override {
return target_->NewWritableFile(f, r, options);
return target_.env->NewWritableFile(f, r, options);
}
Status ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return target_->ReopenWritableFile(fname, result, options);
return target_.env->ReopenWritableFile(fname, result, options);
}
Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options) override {
return target_->ReuseWritableFile(fname, old_fname, r, options);
return target_.env->ReuseWritableFile(fname, old_fname, r, options);
}
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override {
return target_->NewRandomRWFile(fname, result, options);
return target_.env->NewRandomRWFile(fname, result, options);
}
Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return target_->NewMemoryMappedFileBuffer(fname, result);
return target_.env->NewMemoryMappedFileBuffer(fname, result);
}
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override {
return target_->NewDirectory(name, result);
return target_.env->NewDirectory(name, result);
}
Status FileExists(const std::string& f) override {
return target_->FileExists(f);
return target_.env->FileExists(f);
}
Status GetChildren(const std::string& dir,
std::vector<std::string>* r) override {
return target_->GetChildren(dir, r);
return target_.env->GetChildren(dir, r);
}
Status GetChildrenFileAttributes(
const std::string& dir, std::vector<FileAttributes>* result) override {
return target_->GetChildrenFileAttributes(dir, result);
return target_.env->GetChildrenFileAttributes(dir, result);
}
Status DeleteFile(const std::string& f) override {
return target_->DeleteFile(f);
return target_.env->DeleteFile(f);
}
Status Truncate(const std::string& fname, size_t size) override {
return target_->Truncate(fname, size);
return target_.env->Truncate(fname, size);
}
Status CreateDir(const std::string& d) override {
return target_->CreateDir(d);
return target_.env->CreateDir(d);
}
Status CreateDirIfMissing(const std::string& d) override {
return target_->CreateDirIfMissing(d);
return target_.env->CreateDirIfMissing(d);
}
Status DeleteDir(const std::string& d) override {
return target_->DeleteDir(d);
return target_.env->DeleteDir(d);
}
Status GetFileSize(const std::string& f, uint64_t* s) override {
return target_->GetFileSize(f, s);
return target_.env->GetFileSize(f, s);
}
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override {
return target_->GetFileModificationTime(fname, file_mtime);
return target_.env->GetFileModificationTime(fname, file_mtime);
}
Status RenameFile(const std::string& s, const std::string& t) override {
return target_->RenameFile(s, t);
return target_.env->RenameFile(s, t);
}
Status LinkFile(const std::string& s, const std::string& t) override {
return target_->LinkFile(s, t);
return target_.env->LinkFile(s, t);
}
Status NumFileLinks(const std::string& fname, uint64_t* count) override {
return target_->NumFileLinks(fname, count);
return target_.env->NumFileLinks(fname, count);
}
Status AreFilesSame(const std::string& first, const std::string& second,
bool* res) override {
return target_->AreFilesSame(first, second, res);
return target_.env->AreFilesSame(first, second, res);
}
Status LockFile(const std::string& f, FileLock** l) override {
return target_->LockFile(f, l);
return target_.env->LockFile(f, l);
}
Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); }
Status UnlockFile(FileLock* l) override { return target_.env->UnlockFile(l); }
Status IsDirectory(const std::string& path, bool* is_dir) override {
return target_->IsDirectory(path, is_dir);
return target_.env->IsDirectory(path, is_dir);
}
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override {
return target_->LoadLibrary(lib_name, search_path, result);
return target_.env->LoadLibrary(lib_name, search_path, result);
}
void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr, void (*u)(void* arg) = nullptr) override {
return target_->Schedule(f, a, pri, tag, u);
return target_.env->Schedule(f, a, pri, tag, u);
}
int UnSchedule(void* tag, Priority pri) override {
return target_->UnSchedule(tag, pri);
return target_.env->UnSchedule(tag, pri);
}
void StartThread(void (*f)(void*), void* a) override {
return target_->StartThread(f, a);
return target_.env->StartThread(f, a);
}
void WaitForJoin() override { return target_->WaitForJoin(); }
void WaitForJoin() override { return target_.env->WaitForJoin(); }
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return target_->GetThreadPoolQueueLen(pri);
return target_.env->GetThreadPoolQueueLen(pri);
}
Status GetTestDirectory(std::string* path) override {
return target_->GetTestDirectory(path);
return target_.env->GetTestDirectory(path);
}
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
return target_->NewLogger(fname, result);
return target_.env->NewLogger(fname, result);
}
uint64_t NowMicros() override { return target_->NowMicros(); }
uint64_t NowNanos() override { return target_->NowNanos(); }
uint64_t NowCPUNanos() override { return target_->NowCPUNanos(); }
uint64_t NowMicros() override { return target_.env->NowMicros(); }
uint64_t NowNanos() override { return target_.env->NowNanos(); }
uint64_t NowCPUNanos() override { return target_.env->NowCPUNanos(); }
void SleepForMicroseconds(int micros) override {
target_->SleepForMicroseconds(micros);
target_.env->SleepForMicroseconds(micros);
}
Status GetHostName(char* name, uint64_t len) override {
return target_->GetHostName(name, len);
return target_.env->GetHostName(name, len);
}
Status GetCurrentTime(int64_t* unix_time) override {
return target_->GetCurrentTime(unix_time);
return target_.env->GetCurrentTime(unix_time);
}
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
return target_->GetAbsolutePath(db_path, output_path);
return target_.env->GetAbsolutePath(db_path, output_path);
}
void SetBackgroundThreads(int num, Priority pri) override {
return target_->SetBackgroundThreads(num, pri);
return target_.env->SetBackgroundThreads(num, pri);
}
int GetBackgroundThreads(Priority pri) override {
return target_->GetBackgroundThreads(pri);
return target_.env->GetBackgroundThreads(pri);
}
Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
return target_->SetAllowNonOwnerAccess(allow_non_owner_access);
return target_.env->SetAllowNonOwnerAccess(allow_non_owner_access);
}
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
return target_->IncBackgroundThreadsIfNeeded(num, pri);
return target_.env->IncBackgroundThreadsIfNeeded(num, pri);
}
void LowerThreadPoolIOPriority(Priority pool) override {
target_->LowerThreadPoolIOPriority(pool);
target_.env->LowerThreadPoolIOPriority(pool);
}
void LowerThreadPoolCPUPriority(Priority pool) override {
target_->LowerThreadPoolCPUPriority(pool);
target_.env->LowerThreadPoolCPUPriority(pool);
}
Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
return target_->LowerThreadPoolCPUPriority(pool, pri);
return target_.env->LowerThreadPoolCPUPriority(pool, pri);
}
std::string TimeToString(uint64_t time) override {
return target_->TimeToString(time);
return target_.env->TimeToString(time);
}
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
return target_->GetThreadList(thread_list);
return target_.env->GetThreadList(thread_list);
}
ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return target_->GetThreadStatusUpdater();
return target_.env->GetThreadStatusUpdater();
}
uint64_t GetThreadID() const override { return target_->GetThreadID(); }
uint64_t GetThreadID() const override { return target_.env->GetThreadID(); }
std::string GenerateUniqueId() override {
return target_->GenerateUniqueId();
return target_.env->GenerateUniqueId();
}
EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return target_->OptimizeForLogRead(env_options);
return target_.env->OptimizeForLogRead(env_options);
}
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override {
return target_->OptimizeForManifestRead(env_options);
return target_.env->OptimizeForManifestRead(env_options);
}
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override {
return target_->OptimizeForLogWrite(env_options, db_options);
return target_.env->OptimizeForLogWrite(env_options, db_options);
}
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override {
return target_->OptimizeForManifestWrite(env_options);
return target_.env->OptimizeForManifestWrite(env_options);
}
EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override {
return target_->OptimizeForCompactionTableWrite(env_options, immutable_ops);
return target_.env->OptimizeForCompactionTableWrite(env_options,
immutable_ops);
}
EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForCompactionTableRead(env_options, db_options);
return target_.env->OptimizeForCompactionTableRead(env_options, db_options);
}
EnvOptions OptimizeForBlobFileRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForBlobFileRead(env_options, db_options);
return target_.env->OptimizeForBlobFileRead(env_options, db_options);
}
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
return target_->GetFreeSpace(path, diskfree);
return target_.env->GetFreeSpace(path, diskfree);
}
void SanitizeEnvOptions(EnvOptions* env_opts) const override {
target_->SanitizeEnvOptions(env_opts);
target_.env->SanitizeEnvOptions(env_opts);
}
Status PrepareOptions(const ConfigOptions& options) override;
#ifndef ROCKSDB_LITE
std::string SerializeOptions(const ConfigOptions& config_options,
const std::string& header) const override;
#endif // ROCKSDB_LITE
private:
Env* target_;
Target target_;
};
class SequentialFileWrapper : public SequentialFile {

@ -1270,6 +1270,13 @@ TEST_F(OptionsTest, MemTableRepFactoryCreateFromString) {
}
#ifndef ROCKSDB_LITE // GetOptionsFromString is not supported in RocksDB Lite
class CustomEnv : public EnvWrapper {
public:
explicit CustomEnv(Env* _target) : EnvWrapper(_target) {}
static const char* kClassName() { return "CustomEnv"; }
const char* Name() const override { return kClassName(); }
};
TEST_F(OptionsTest, GetOptionsFromStringTest) {
Options base_options, new_options;
ConfigOptions config_options;
@ -1284,14 +1291,8 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
NewBlockBasedTableFactory(block_based_table_options));
// Register an Env with object registry.
const static char* kCustomEnvName = "CustomEnv";
class CustomEnv : public EnvWrapper {
public:
explicit CustomEnv(Env* _target) : EnvWrapper(_target) {}
};
ObjectLibrary::Default()->Register<Env>(
kCustomEnvName,
CustomEnv::kClassName(),
[](const std::string& /*name*/, std::unique_ptr<Env>* /*env_guard*/,
std::string* /* errmsg */) {
static CustomEnv env(Env::Default());
@ -1337,7 +1338,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.max_open_files, 1);
ASSERT_TRUE(new_options.rate_limiter.get() != nullptr);
Env* newEnv = new_options.env;
ASSERT_OK(Env::LoadEnv(kCustomEnvName, &newEnv));
ASSERT_OK(Env::LoadEnv(CustomEnv::kClassName(), &newEnv));
ASSERT_EQ(newEnv, new_options.env);
config_options.ignore_unknown_options = false;
@ -2192,10 +2193,6 @@ TEST_F(OptionsTest, OptionsListenerTest) {
#ifndef ROCKSDB_LITE
const static std::string kCustomEnvName = "Custom";
const static std::string kCustomEnvProp = "env=" + kCustomEnvName;
class CustomEnv : public EnvWrapper {
public:
explicit CustomEnv(Env* _target) : EnvWrapper(_target) {}
};
static int RegisterCustomEnv(ObjectLibrary& library, const std::string& arg) {
library.Register<Env>(

@ -1413,10 +1413,6 @@ const std::shared_ptr<SystemClock>& SystemClock::Default() {
std::make_shared<port::WinClock>();
return clock;
}
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}
} // namespace ROCKSDB_NAMESPACE
#endif

@ -260,6 +260,9 @@ class WinEnv : public CompositeEnv {
WinEnv();
~WinEnv();
static const char* kClassName() { return "WinEnv"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kDefaultName(); }
Status GetHostName(char* name, uint64_t len) override;

@ -58,29 +58,6 @@ extern std::string RandomKey(Random* rnd, int len,
extern Slice CompressibleString(Random* rnd, double compressed_fraction,
int len, std::string* dst);
// A wrapper that allows injection of errors.
class ErrorEnv : public EnvWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;
ErrorEnv(Env* _target)
: EnvWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0) {}
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return Status::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, result, soptions);
}
};
#ifndef NDEBUG
// An internal comparator that just forward comparing results from the
// user comparator in it. Can be used to test entities that have no dependency

@ -1592,6 +1592,7 @@ struct ReportFileOpCounters {
class ReportFileOpEnv : public EnvWrapper {
public:
explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }
const char* Name() const override { return "ReportFileOpEnv"; }
void reset() {
counters_.open_counter_ = 0;

@ -866,11 +866,18 @@ TEST_F(LdbCmdTest, TestBadDbPath) {
ASSERT_EQ(1,
LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
}
namespace {
class WrappedEnv : public EnvWrapper {
public:
explicit WrappedEnv(Env* t) : EnvWrapper(t) {}
static const char* kClassName() { return "WrappedEnv"; }
const char* Name() const override { return kClassName(); }
};
} // namespace
TEST_F(LdbCmdTest, LoadCFOptionsAndOverride) {
// Env* base_env = TryLoadCustomOrDefaultEnv();
// std::unique_ptr<Env> env(NewMemEnv(base_env));
std::unique_ptr<Env> env(new EnvWrapper(Env::Default()));
std::unique_ptr<Env> env(new WrappedEnv(Env::Default()));
Options opts;
opts.env = env.get();
opts.create_if_missing = true;

@ -172,6 +172,7 @@ class DummyDB : public StackableDB {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* t) : EnvWrapper(t) {}
const char* Name() const override { return "TestEnv"; }
class DummySequentialFile : public SequentialFile {
public:
@ -417,6 +418,7 @@ class TestEnv : public EnvWrapper {
class FileManager : public EnvWrapper {
public:
explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {}
const char* Name() const override { return "FileManager"; }
Status GetRandomFileInDir(const std::string& dir, std::string* fname,
uint64_t* fsize) {

@ -151,6 +151,9 @@ class FaultInjectionTestEnv : public EnvWrapper {
: EnvWrapper(base), filesystem_active_(true) {}
virtual ~FaultInjectionTestEnv() { error_.PermitUncheckedError(); }
static const char* kClassName() { return "FaultInjectionTestEnv"; }
const char* Name() const override { return kClassName(); }
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;

@ -27,14 +27,23 @@ static FactoryFunc<Env> test_reg_a = ObjectLibrary::Default()->Register<Env>(
return Env::Default();
});
class WrappedEnv : public EnvWrapper {
private:
std::string id_;
public:
WrappedEnv(Env* t, const std::string& id) : EnvWrapper(t), id_(id) {}
const char* Name() const override { return id_.c_str(); }
std::string GetId() const override { return id_; }
};
static FactoryFunc<Env> test_reg_b = ObjectLibrary::Default()->Register<Env>(
ObjectLibrary::PatternEntry("b", false).AddSeparator("://"),
[](const std::string& /*uri*/, std::unique_ptr<Env>* env_guard,
[](const std::string& uri, std::unique_ptr<Env>* env_guard,
std::string* /* errmsg */) {
++ObjRegistryTest::num_b;
// Env::Default() is a singleton so we can't grant ownership directly to
// the caller - we must wrap it first.
env_guard->reset(new EnvWrapper(Env::Default()));
env_guard->reset(new WrappedEnv(Env::Default(), uri));
return env_guard->get();
});
@ -99,12 +108,12 @@ TEST_F(ObjRegistryTest, CheckShared) {
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
library->Register<Env>("guarded",
[](const std::string& uri, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(Env::Default(), uri));
return guard->get();
});
ASSERT_OK(registry->NewSharedObject<Env>("guarded", &shared));
ASSERT_NE(shared, nullptr);
@ -124,12 +133,12 @@ TEST_F(ObjRegistryTest, CheckStatic) {
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
library->Register<Env>("guarded",
[](const std::string& uri, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(Env::Default(), uri));
return guard->get();
});
ASSERT_NOK(registry->NewStaticObject<Env>("guarded", &env));
ASSERT_EQ(env, nullptr);
@ -149,12 +158,12 @@ TEST_F(ObjRegistryTest, CheckUnique) {
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
library->Register<Env>("guarded",
[](const std::string& uri, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(Env::Default(), uri));
return guard->get();
});
ASSERT_OK(registry->NewUniqueObject<Env>("guarded", &unique));
ASSERT_NE(unique, nullptr);
@ -171,19 +180,19 @@ TEST_F(ObjRegistryTest, TestRegistryParents) {
auto cousin = ObjectRegistry::NewInstance(uncle);
auto library = parent->AddLibrary("parent");
library->Register<Env>(
"parent", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
library->Register<Env>("parent",
[](const std::string& uri, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(Env::Default(), uri));
return guard->get();
});
library = cousin->AddLibrary("cousin");
library->Register<Env>(
"cousin", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
library->Register<Env>("cousin",
[](const std::string& uri, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new WrappedEnv(Env::Default(), uri));
return guard->get();
});
std::unique_ptr<Env> guard;
std::string msg;

@ -35,7 +35,7 @@ class SpecialTimeEnv : public EnvWrapper {
explicit SpecialTimeEnv(Env* base) : EnvWrapper(base) {
EXPECT_OK(base->GetCurrentTime(&current_time_));
}
const char* Name() const override { return "SpecialTimeEnv"; }
void Sleep(int64_t sleep_time) { current_time_ += sleep_time; }
Status GetCurrentTime(int64_t* current_time) override {
*current_time = current_time_;

Loading…
Cancel
Save