Simplify migration to FileSystem API (#6552)

Summary:
The current Env/FileSystem API separation has a couple of issues -
1. It requires the user to specify 2 options - ```Options::env``` and ```Options::file_system``` - which means they have to make code changes to benefit from the new APIs. Furthermore, there is a risk of accessing the same APIs in two different ways, through Env in the old way and through FileSystem in the new way. The two may not always match, for example, if env is ```PosixEnv``` and FileSystem is a custom implementation. Any stray RocksDB calls to env will use the ```PosixEnv``` implementation rather than the file_system implementation.
2. There needs to be a simple way for the FileSystem developer to instantiate an Env for backward compatibility purposes.

This PR solves the above issues and simplifies the migration in the following ways -
1. Embed a shared_ptr to the ```FileSystem``` in the ```Env```, and remove ```Options::file_system``` as a configurable option. This way, no code changes will be required in application code to benefit from the new API. The default Env constructor uses a ```LegacyFileSystemWrapper``` as the embedded ```FileSystem```.
1a. - This also makes it more robust by ensuring that even if RocksDB
  has some stray calls to Env APIs rather than FileSystem, they will go
  through the same object and thus there is no risk of getting out of
  sync.
2. Provide a ```NewCompositeEnv()``` API that can be used to construct a
PosixEnv with a custom FileSystem implementation. This eliminates an
indirection to call Env APIs, and relieves the FileSystem developer of
the burden of having to implement wrappers for the Env APIs.
3. Add a couple of missing FileSystem APIs - ```SanitizeEnvOptions()``` and
```NewLogger()```

Tests:
1. New unit tests
2. make check and make asan_check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6552

Reviewed By: riversand963

Differential Revision: D20592038

Pulled By: anand1976

fbshipit-source-id: c3801ad4153f96d21d5a3ae26c92ba454d1bf1f7
main
anand76 5 years ago committed by Facebook GitHub Bot
parent 43aee93d2b
commit a9d168cfd7
  1. 1
      db/corruption_test.cc
  2. 10
      db/db_impl/db_impl.cc
  3. 12
      db/db_impl/db_impl_open.cc
  4. 88
      db/error_handler_fs_test.cc
  5. 2
      db/memtable_list_test.cc
  6. 4
      db/repair.cc
  7. 5
      db/version_set.cc
  8. 111
      env/composite_env_wrapper.h
  9. 18
      env/env.cc
  10. 106
      env/env_posix.cc
  11. 110
      env/env_test.cc
  12. 12
      env/file_system.cc
  13. 37
      env/fs_posix.cc
  14. 14
      include/rocksdb/env.h
  15. 21
      include/rocksdb/file_system.h
  16. 5
      include/rocksdb/options.h
  17. 2
      options/db_options.cc
  18. 1
      options/options_helper.cc
  19. 2
      options/options_settable_test.cc
  20. 2
      test_util/fault_injection_test_fs.h
  21. 6
      tools/ldb_cmd.cc
  22. 6
      tools/ldb_cmd_test.cc

@ -190,7 +190,6 @@ class CorruptionTest : public testing::Test {
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
Options options; Options options;
EnvOptions env_options; EnvOptions env_options;
options.file_system.reset(new LegacyFileSystemWrapper(options.env));
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname)); ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
} }

@ -150,7 +150,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_info_log_(options.info_log == nullptr), own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options)), initial_db_options_(SanitizeOptions(dbname, options)),
env_(initial_db_options_.env), env_(initial_db_options_.env),
fs_(initial_db_options_.file_system), fs_(initial_db_options_.env->GetFileSystem()),
immutable_db_options_(initial_db_options_), immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_), mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()), stats_(immutable_db_options_.statistics.get()),
@ -3478,12 +3478,8 @@ Status DBImpl::Close() {
Status DB::ListColumnFamilies(const DBOptions& db_options, Status DB::ListColumnFamilies(const DBOptions& db_options,
const std::string& name, const std::string& name,
std::vector<std::string>* column_families) { std::vector<std::string>* column_families) {
FileSystem* fs = db_options.file_system.get(); const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
LegacyFileSystemWrapper legacy_fs(db_options.env); return VersionSet::ListColumnFamilies(column_families, name, fs.get());
if (!fs) {
fs = &legacy_fs;
}
return VersionSet::ListColumnFamilies(column_families, name, fs);
} }
Snapshot::~Snapshot() {} Snapshot::~Snapshot() {}

@ -35,16 +35,8 @@ Options SanitizeOptions(const std::string& dbname, const Options& src) {
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
DBOptions result(src); DBOptions result(src);
if (result.file_system == nullptr) { if (result.env == nullptr) {
if (result.env == Env::Default()) { result.env = Env::Default();
result.file_system = FileSystem::Default();
} else {
result.file_system.reset(new LegacyFileSystemWrapper(result.env));
}
} else {
if (result.env == nullptr) {
result.env = Env::Default();
}
} }
// result.max_open_files means an "infinite" open files. // result.max_open_files means an "infinite" open files.

@ -44,7 +44,7 @@ class DBErrorHandlingFSTest : public DBTestBase {
class DBErrorHandlingFS : public FileSystemWrapper { class DBErrorHandlingFS : public FileSystemWrapper {
public: public:
DBErrorHandlingFS() DBErrorHandlingFS()
: FileSystemWrapper(FileSystem::Default().get()), : FileSystemWrapper(FileSystem::Default()),
trig_no_space(false), trig_no_space(false),
trig_io_error(false) {} trig_io_error(false) {}
@ -150,12 +150,13 @@ class ErrorHandlerFSListener : public EventListener {
}; };
TEST_F(DBErrorHandlingFSTest, FLushWriteError) { TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
Status s; Status s;
@ -181,12 +182,13 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, ManifestWriteError) { TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
Status s; Status s;
@ -223,12 +225,13 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) { TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
Status s; Status s;
@ -272,12 +275,13 @@ TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) { TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
@ -344,12 +348,13 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, CompactionWriteError) { TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
@ -387,10 +392,11 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, CorruptionError) { TEST_F(DBErrorHandlingFSTest, CorruptionError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
Status s; Status s;
@ -426,12 +432,13 @@ TEST_F(DBErrorHandlingFSTest, CorruptionError) {
} }
TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) { TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
Status s; Status s;
@ -460,12 +467,13 @@ TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
} }
TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) { TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
Status s; Status s;
@ -487,12 +495,13 @@ TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
} }
TEST_F(DBErrorHandlingFSTest, WALWriteError) { TEST_F(DBErrorHandlingFSTest, WALWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768; options.writable_file_max_buffer_size = 32768;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
@ -558,12 +567,13 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) {
} }
TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) { TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
FaultInjectionTestFS* fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default().get()); new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener( std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener()); new ErrorHandlerFSListener());
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();
options.file_system.reset(fault_fs); options.env = fault_fs_env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768; options.writable_file_max_buffer_size = 32768;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
@ -643,6 +653,7 @@ TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) { TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
std::vector<std::unique_ptr<Env>> fault_envs;
std::vector<FaultInjectionTestFS*> fault_fs; std::vector<FaultInjectionTestFS*> fault_fs;
std::vector<Options> options; std::vector<Options> options;
std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener; std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
@ -654,12 +665,13 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
for (auto i = 0; i < kNumDbInstances; ++i) { for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerFSListener()); listener.emplace_back(new ErrorHandlerFSListener());
options.emplace_back(GetDefaultOptions()); options.emplace_back(GetDefaultOptions());
fault_fs.emplace_back( fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default().get())); std::shared_ptr<FileSystem> fs(fault_fs.back());
fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
options[i].env = fault_envs.back().get();
options[i].create_if_missing = true; options[i].create_if_missing = true;
options[i].level0_file_num_compaction_trigger = 2; options[i].level0_file_num_compaction_trigger = 2;
options[i].writable_file_max_buffer_size = 32768; options[i].writable_file_max_buffer_size = 32768;
options[i].file_system.reset(fault_fs[i]);
options[i].listeners.emplace_back(listener[i]); options[i].listeners.emplace_back(listener[i]);
options[i].sst_file_manager = sfm; options[i].sst_file_manager = sfm;
DB* dbptr; DB* dbptr;
@ -742,6 +754,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) { TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
std::vector<std::unique_ptr<Env>> fault_envs;
std::vector<FaultInjectionTestFS*> fault_fs; std::vector<FaultInjectionTestFS*> fault_fs;
std::vector<Options> options; std::vector<Options> options;
std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener; std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
@ -753,12 +766,13 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
for (auto i = 0; i < kNumDbInstances; ++i) { for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerFSListener()); listener.emplace_back(new ErrorHandlerFSListener());
options.emplace_back(GetDefaultOptions()); options.emplace_back(GetDefaultOptions());
fault_fs.emplace_back( fault_fs.emplace_back(new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default().get())); std::shared_ptr<FileSystem> fs(fault_fs.back());
fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
options[i].env = fault_envs.back().get();
options[i].create_if_missing = true; options[i].create_if_missing = true;
options[i].level0_file_num_compaction_trigger = 2; options[i].level0_file_num_compaction_trigger = 2;
options[i].writable_file_max_buffer_size = 32768; options[i].writable_file_max_buffer_size = 32768;
options[i].file_system.reset(fault_fs[i]);
options[i].listeners.emplace_back(listener[i]); options[i].listeners.emplace_back(listener[i]);
options[i].sst_file_manager = sfm; options[i].sst_file_manager = sfm;
DB* dbptr; DB* dbptr;

@ -92,7 +92,6 @@ class MemTableListTest : public testing::Test {
CreateDB(); CreateDB();
// Create a mock VersionSet // Create a mock VersionSet
DBOptions db_options; DBOptions db_options;
db_options.file_system = FileSystem::Default();
ImmutableDBOptions immutable_db_options(db_options); ImmutableDBOptions immutable_db_options(db_options);
EnvOptions env_options; EnvOptions env_options;
std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16)); std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
@ -139,7 +138,6 @@ class MemTableListTest : public testing::Test {
CreateDB(); CreateDB();
// Create a mock VersionSet // Create a mock VersionSet
DBOptions db_options; DBOptions db_options;
db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env));
ImmutableDBOptions immutable_db_options(db_options); ImmutableDBOptions immutable_db_options(db_options);
EnvOptions env_options; EnvOptions env_options;

@ -672,10 +672,6 @@ Status RepairDB(const std::string& dbname, const DBOptions& db_options,
Status RepairDB(const std::string& dbname, const Options& options) { Status RepairDB(const std::string& dbname, const Options& options) {
Options opts(options); Options opts(options);
if (opts.file_system == nullptr) {
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
;
}
DBOptions db_options(opts); DBOptions db_options(opts);
ColumnFamilyOptions cf_options(opts); ColumnFamilyOptions cf_options(opts);

@ -4928,9 +4928,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Status s; Status s;
{ {
std::unique_ptr<FSSequentialFile> file; std::unique_ptr<FSSequentialFile> file;
s = options.file_system->NewSequentialFile( const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
s = fs->NewSequentialFile(
dscname, dscname,
options.file_system->OptimizeForManifestRead(file_options_), &file, fs->OptimizeForManifestRead(file_options_), &file,
nullptr); nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -291,20 +291,18 @@ class CompositeEnvWrapper : public Env {
public: public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related // Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs // calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env, FileSystem* fs) explicit CompositeEnvWrapper(Env* env, std::shared_ptr<FileSystem> fs)
: env_target_(env), fs_env_target_(fs) {} : Env(fs), env_target_(env) {}
~CompositeEnvWrapper() {} ~CompositeEnvWrapper() {}
// Return the target to which this Env forwards all calls // Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; } Env* env_target() const { return env_target_; }
FileSystem* fs_env_target() const { return fs_env_target_; }
Status RegisterDbPaths(const std::vector<std::string>& paths) override { Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->RegisterDbPaths(paths); return file_system_->RegisterDbPaths(paths);
} }
Status UnregisterDbPaths(const std::vector<std::string>& paths) override { Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->UnregisterDbPaths(paths); return file_system_->UnregisterDbPaths(paths);
} }
// The following text is boilerplate that forwards all methods to target() // The following text is boilerplate that forwards all methods to target()
@ -315,7 +313,7 @@ class CompositeEnvWrapper : public Env {
std::unique_ptr<FSSequentialFile> file; std::unique_ptr<FSSequentialFile> file;
Status status; Status status;
status = status =
fs_env_target_->NewSequentialFile(f, FileOptions(options), &file, &dbg); file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) { if (status.ok()) {
r->reset(new CompositeSequentialFileWrapper(file)); r->reset(new CompositeSequentialFileWrapper(file));
} }
@ -327,8 +325,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg; IODebugContext dbg;
std::unique_ptr<FSRandomAccessFile> file; std::unique_ptr<FSRandomAccessFile> file;
Status status; Status status;
status = fs_env_target_->NewRandomAccessFile(f, FileOptions(options), &file, status =
&dbg); file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) { if (status.ok()) {
r->reset(new CompositeRandomAccessFileWrapper(file)); r->reset(new CompositeRandomAccessFileWrapper(file));
} }
@ -340,7 +338,7 @@ class CompositeEnvWrapper : public Env {
std::unique_ptr<FSWritableFile> file; std::unique_ptr<FSWritableFile> file;
Status status; Status status;
status = status =
fs_env_target_->NewWritableFile(f, FileOptions(options), &file, &dbg); file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) { if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file)); r->reset(new CompositeWritableFileWrapper(file));
} }
@ -352,8 +350,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg; IODebugContext dbg;
Status status; Status status;
std::unique_ptr<FSWritableFile> file; std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReopenWritableFile(fname, FileOptions(options), status = file_system_->ReopenWritableFile(fname, FileOptions(options),
&file, &dbg); &file, &dbg);
if (status.ok()) { if (status.ok()) {
result->reset(new CompositeWritableFileWrapper(file)); result->reset(new CompositeWritableFileWrapper(file));
} }
@ -366,8 +364,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg; IODebugContext dbg;
Status status; Status status;
std::unique_ptr<FSWritableFile> file; std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReuseWritableFile( status = file_system_->ReuseWritableFile(fname, old_fname,
fname, old_fname, FileOptions(options), &file, &dbg); FileOptions(options), &file, &dbg);
if (status.ok()) { if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file)); r->reset(new CompositeWritableFileWrapper(file));
} }
@ -379,8 +377,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg; IODebugContext dbg;
std::unique_ptr<FSRandomRWFile> file; std::unique_ptr<FSRandomRWFile> file;
Status status; Status status;
status = fs_env_target_->NewRandomRWFile(fname, FileOptions(options), &file, status =
&dbg); file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
if (status.ok()) { if (status.ok()) {
result->reset(new CompositeRandomRWFileWrapper(file)); result->reset(new CompositeRandomRWFileWrapper(file));
} }
@ -389,7 +387,7 @@ class CompositeEnvWrapper : public Env {
Status NewMemoryMappedFileBuffer( Status NewMemoryMappedFileBuffer(
const std::string& fname, const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override { std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return fs_env_target_->NewMemoryMappedFileBuffer(fname, result); return file_system_->NewMemoryMappedFileBuffer(fname, result);
} }
Status NewDirectory(const std::string& name, Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override { std::unique_ptr<Directory>* result) override {
@ -397,7 +395,7 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg; IODebugContext dbg;
std::unique_ptr<FSDirectory> dir; std::unique_ptr<FSDirectory> dir;
Status status; Status status;
status = fs_env_target_->NewDirectory(name, io_opts, &dir, &dbg); status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
if (status.ok()) { if (status.ok()) {
result->reset(new CompositeDirectoryWrapper(dir)); result->reset(new CompositeDirectoryWrapper(dir));
} }
@ -406,102 +404,108 @@ class CompositeEnvWrapper : public Env {
Status FileExists(const std::string& f) override { Status FileExists(const std::string& f) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->FileExists(f, io_opts, &dbg); return file_system_->FileExists(f, io_opts, &dbg);
} }
Status GetChildren(const std::string& dir, Status GetChildren(const std::string& dir,
std::vector<std::string>* r) override { std::vector<std::string>* r) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetChildren(dir, io_opts, r, &dbg); return file_system_->GetChildren(dir, io_opts, r, &dbg);
} }
Status GetChildrenFileAttributes( Status GetChildrenFileAttributes(
const std::string& dir, std::vector<FileAttributes>* result) override { const std::string& dir, std::vector<FileAttributes>* result) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetChildrenFileAttributes(dir, io_opts, result, return file_system_->GetChildrenFileAttributes(dir, io_opts, result, &dbg);
&dbg);
} }
Status DeleteFile(const std::string& f) override { Status DeleteFile(const std::string& f) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->DeleteFile(f, io_opts, &dbg); return file_system_->DeleteFile(f, io_opts, &dbg);
} }
Status Truncate(const std::string& fname, size_t size) override { Status Truncate(const std::string& fname, size_t size) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->Truncate(fname, size, io_opts, &dbg); return file_system_->Truncate(fname, size, io_opts, &dbg);
} }
Status CreateDir(const std::string& d) override { Status CreateDir(const std::string& d) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->CreateDir(d, io_opts, &dbg); return file_system_->CreateDir(d, io_opts, &dbg);
} }
Status CreateDirIfMissing(const std::string& d) override { Status CreateDirIfMissing(const std::string& d) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->CreateDirIfMissing(d, io_opts, &dbg); return file_system_->CreateDirIfMissing(d, io_opts, &dbg);
} }
Status DeleteDir(const std::string& d) override { Status DeleteDir(const std::string& d) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->DeleteDir(d, io_opts, &dbg); return file_system_->DeleteDir(d, io_opts, &dbg);
} }
Status GetFileSize(const std::string& f, uint64_t* s) override { Status GetFileSize(const std::string& f, uint64_t* s) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetFileSize(f, io_opts, s, &dbg); return file_system_->GetFileSize(f, io_opts, s, &dbg);
} }
Status GetFileModificationTime(const std::string& fname, Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override { uint64_t* file_mtime) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetFileModificationTime(fname, io_opts, file_mtime, return file_system_->GetFileModificationTime(fname, io_opts, file_mtime,
&dbg); &dbg);
} }
Status RenameFile(const std::string& s, const std::string& t) override { Status RenameFile(const std::string& s, const std::string& t) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->RenameFile(s, t, io_opts, &dbg); return file_system_->RenameFile(s, t, io_opts, &dbg);
} }
Status LinkFile(const std::string& s, const std::string& t) override { Status LinkFile(const std::string& s, const std::string& t) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->LinkFile(s, t, io_opts, &dbg); return file_system_->LinkFile(s, t, io_opts, &dbg);
} }
Status NumFileLinks(const std::string& fname, uint64_t* count) override { Status NumFileLinks(const std::string& fname, uint64_t* count) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->NumFileLinks(fname, io_opts, count, &dbg); return file_system_->NumFileLinks(fname, io_opts, count, &dbg);
} }
Status AreFilesSame(const std::string& first, const std::string& second, Status AreFilesSame(const std::string& first, const std::string& second,
bool* res) override { bool* res) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->AreFilesSame(first, second, io_opts, res, &dbg); return file_system_->AreFilesSame(first, second, io_opts, res, &dbg);
} }
Status LockFile(const std::string& f, FileLock** l) override { Status LockFile(const std::string& f, FileLock** l) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->LockFile(f, io_opts, l, &dbg); return file_system_->LockFile(f, io_opts, l, &dbg);
} }
Status UnlockFile(FileLock* l) override { Status UnlockFile(FileLock* l) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->UnlockFile(l, io_opts, &dbg); return file_system_->UnlockFile(l, io_opts, &dbg);
} }
Status GetAbsolutePath(const std::string& db_path, Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override { std::string* output_path) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetAbsolutePath(db_path, io_opts, output_path, &dbg); return file_system_->GetAbsolutePath(db_path, io_opts, output_path, &dbg);
}
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->NewLogger(fname, io_opts, result, &dbg);
} }
#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION) #if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
@ -531,10 +535,6 @@ class CompositeEnvWrapper : public Env {
Status GetTestDirectory(std::string* path) override { Status GetTestDirectory(std::string* path) override {
return env_target_->GetTestDirectory(path); return env_target_->GetTestDirectory(path);
} }
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
return env_target_->NewLogger(fname, result);
}
uint64_t NowMicros() override { return env_target_->NowMicros(); } uint64_t NowMicros() override { return env_target_->NowMicros(); }
uint64_t NowNanos() override { return env_target_->NowNanos(); } uint64_t NowNanos() override { return env_target_->NowNanos(); }
uint64_t NowCPUNanos() override { return env_target_->NowCPUNanos(); } uint64_t NowCPUNanos() override { return env_target_->NowCPUNanos(); }
@ -590,46 +590,41 @@ class CompositeEnvWrapper : public Env {
} }
EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override { EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForLogRead(FileOptions(env_options)); return file_system_->OptimizeForLogRead(FileOptions(env_options));
} }
EnvOptions OptimizeForManifestRead( EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override { const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestRead( return file_system_->OptimizeForManifestRead(FileOptions(env_options));
FileOptions(env_options));
} }
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override { const DBOptions& db_options) const override {
return fs_env_target_->OptimizeForLogWrite(FileOptions(env_options), return file_system_->OptimizeForLogWrite(FileOptions(env_options),
db_options); db_options);
} }
EnvOptions OptimizeForManifestWrite( EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override { const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestWrite( return file_system_->OptimizeForManifestWrite(FileOptions(env_options));
FileOptions(env_options));
} }
EnvOptions OptimizeForCompactionTableWrite( EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override { const ImmutableDBOptions& immutable_ops) const override {
return fs_env_target_->OptimizeForCompactionTableWrite( return file_system_->OptimizeForCompactionTableWrite(
FileOptions(env_options), FileOptions(env_options), immutable_ops);
immutable_ops);
} }
EnvOptions OptimizeForCompactionTableRead( EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override { const ImmutableDBOptions& db_options) const override {
return fs_env_target_->OptimizeForCompactionTableRead( return file_system_->OptimizeForCompactionTableRead(
FileOptions(env_options), FileOptions(env_options), db_options);
db_options);
} }
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override { Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts; IOOptions io_opts;
IODebugContext dbg; IODebugContext dbg;
return fs_env_target_->GetFreeSpace(path, io_opts, diskfree, &dbg); return file_system_->GetFreeSpace(path, io_opts, diskfree, &dbg);
} }
private: private:
Env* env_target_; Env* env_target_;
FileSystem* fs_env_target_;
}; };
class LegacySequentialFileWrapper : public FSSequentialFile { class LegacySequentialFileWrapper : public FSSequentialFile {
@ -1067,6 +1062,10 @@ class LegacyFileSystemWrapper : public FileSystem {
return status_to_io_status(target_->NewLogger(fname, result)); return status_to_io_status(target_->NewLogger(fname, result));
} }
void SanitizeFileOptions(FileOptions* opts) const override {
target_->SanitizeEnvOptions(opts);
}
FileOptions OptimizeForLogRead( FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override { const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options); return target_->OptimizeForLogRead(file_options);

18
env/env.cc vendored

@ -22,6 +22,14 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Env::Env() : thread_status_updater_(nullptr) {
file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
}
Env::Env(std::shared_ptr<FileSystem> fs)
: thread_status_updater_(nullptr),
file_system_(fs) {}
Env::~Env() { Env::~Env() {
} }
@ -472,4 +480,14 @@ Status NewEnvLogger(const std::string& fname, Env* env,
return Status::OK(); return Status::OK();
} }
const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
return file_system_;
}
#ifdef OS_WIN
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}
#endif
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

106
env/env_posix.cc vendored

@ -128,21 +128,25 @@ class PosixDynamicLibrary : public DynamicLibrary {
class PosixEnv : public CompositeEnvWrapper { class PosixEnv : public CompositeEnvWrapper {
public: public:
PosixEnv(); // This constructor is for constructing non-default Envs, mainly by
// NewCompositeEnv(). It allows new instances to share the same
// threadpool and other resources as the default Env, while allowing
// a non-default FileSystem implementation
PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs);
~PosixEnv() override { ~PosixEnv() override {
for (const auto tid : threads_to_join_) { if (this == Env::Default()) {
pthread_join(tid, nullptr); for (const auto tid : threads_to_join_) {
} pthread_join(tid, nullptr);
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { }
thread_pools_[pool_id].JoinAllThreads(); for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
} thread_pools_[pool_id].JoinAllThreads();
// Delete the thread_status_updater_ only when the current Env is not }
// Env::Default(). This is to avoid the free-after-use error when // Do not delete the thread_status_updater_ in order to avoid the
// Env::Default() is destructed while some other child threads are // free after use when Env::Default() is destructed while some other
// still trying to update thread status. // child threads are still trying to update thread status. All
if (this != Env::Default()) { // PosixEnv instances use the same thread_status_updater_, so never
delete thread_status_updater_; // explicitly delete it.
} }
} }
@ -252,34 +256,6 @@ class PosixEnv : public CompositeEnvWrapper {
uint64_t GetThreadID() const override { return gettid(pthread_self()); } uint64_t GetThreadID() const override { return gettid(pthread_self()); }
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
FILE* f;
{
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(),
"w"
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 7)
"e" // glibc extension to enable O_CLOEXEC
#endif
#endif
);
}
if (f == nullptr) {
result->reset();
return IOError("when fopen a file for new logger", fname, errno);
} else {
int fd = fileno(f);
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
#endif
SetFD_CLOEXEC(fd, nullptr);
result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
return Status::OK();
}
}
uint64_t NowMicros() override { uint64_t NowMicros() override {
struct timeval tv; struct timeval tv;
gettimeofday(&tv, nullptr); gettimeofday(&tv, nullptr);
@ -406,18 +382,34 @@ class PosixEnv : public CompositeEnvWrapper {
} }
private: private:
std::vector<ThreadPoolImpl> thread_pools_; friend Env* Env::Default();
pthread_mutex_t mu_; // Constructs the default Env, a singleton
std::vector<pthread_t> threads_to_join_; PosixEnv();
// The below 4 members are only used by the default PosixEnv instance.
// Non-default instances simply maintain references to the backing
// members in te default instance
std::vector<ThreadPoolImpl> thread_pools_storage_;
pthread_mutex_t mu_storage_;
std::vector<pthread_t> threads_to_join_storage_;
bool allow_non_owner_access_storage_;
std::vector<ThreadPoolImpl>& thread_pools_;
pthread_mutex_t& mu_;
std::vector<pthread_t>& threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner // If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files. // has no access to db files.
bool allow_non_owner_access_; bool& allow_non_owner_access_;
}; };
PosixEnv::PosixEnv() PosixEnv::PosixEnv()
: CompositeEnvWrapper(this, FileSystem::Default().get()), : CompositeEnvWrapper(this, FileSystem::Default()),
thread_pools_(Priority::TOTAL), thread_pools_storage_(Priority::TOTAL),
allow_non_owner_access_(true) { allow_non_owner_access_storage_(true),
thread_pools_(thread_pools_storage_),
mu_(mu_storage_),
threads_to_join_(threads_to_join_storage_),
allow_non_owner_access_(allow_non_owner_access_storage_) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority( thread_pools_[pool_id].SetThreadPriority(
@ -428,6 +420,15 @@ PosixEnv::PosixEnv()
thread_status_updater_ = CreateThreadStatusUpdater(); thread_status_updater_ = CreateThreadStatusUpdater();
} }
PosixEnv::PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs)
: CompositeEnvWrapper(this, fs),
thread_pools_(default_env->thread_pools_),
mu_(default_env->mu_),
threads_to_join_(default_env->threads_to_join_),
allow_non_owner_access_(default_env->allow_non_owner_access_) {
thread_status_updater_ = default_env->thread_status_updater_;
}
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) { void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
@ -519,9 +520,12 @@ Env* Env::Default() {
CompressionContextCache::InitSingleton(); CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS(); INIT_SYNC_POINT_SINGLETONS();
static PosixEnv default_env; static PosixEnv default_env;
static CompositeEnvWrapper composite_env(&default_env, return &default_env;
FileSystem::Default().get()); }
return &composite_env;
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

110
env/env_test.cc vendored

@ -35,6 +35,8 @@
#include "port/malloc.h" #include "port/malloc.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/fault_injection_test_env.h"
#include "test_util/fault_injection_test_fs.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -1980,6 +1982,114 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true))); ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true)));
#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
class EnvFSTestWithParam
: public ::testing::Test,
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
EnvFSTestWithParam() {
bool env_non_null = std::get<0>(GetParam());
bool env_default = std::get<1>(GetParam());
bool fs_default = std::get<2>(GetParam());
env_ = env_non_null ? (env_default ? Env::Default() : nullptr) : nullptr;
fs_ = fs_default
? FileSystem::Default()
: std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
if (env_non_null && env_default && !fs_default) {
env_ptr_ = NewCompositeEnv(fs_);
}
if (env_non_null && !env_default && fs_default) {
env_ptr_ = std::unique_ptr<Env>(new FaultInjectionTestEnv(Env::Default()));
fs_.reset();
}
if (env_non_null && !env_default && !fs_default) {
env_ptr_.reset(new FaultInjectionTestEnv(Env::Default()));
composite_env_ptr_.reset(new CompositeEnvWrapper(env_ptr_.get(), fs_));
env_ = composite_env_ptr_.get();
} else {
env_ = env_ptr_.get();
}
dbname1_ = test::PerThreadDBPath("env_fs_test1");
dbname2_ = test::PerThreadDBPath("env_fs_test2");
}
~EnvFSTestWithParam() = default;
Env* env_;
std::unique_ptr<Env> env_ptr_;
std::unique_ptr<Env> composite_env_ptr_;
std::shared_ptr<FileSystem> fs_;
std::string dbname1_;
std::string dbname2_;
};
TEST_P(EnvFSTestWithParam, OptionsTest) {
Options opts;
opts.env = env_;
opts.create_if_missing = true;
std::string dbname = dbname1_;
if (env_) {
if (fs_) {
ASSERT_EQ(fs_.get(), env_->GetFileSystem().get());
} else {
ASSERT_NE(FileSystem::Default().get(), env_->GetFileSystem().get());
}
}
for (int i = 0; i < 2; ++i) {
DB* db;
Status s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
WriteOptions wo;
db->Put(wo, "a", "a");
db->Flush(FlushOptions());
db->Put(wo, "b", "b");
db->Flush(FlushOptions());
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
std::string val;
ASSERT_OK(db->Get(ReadOptions(), "a", &val));
ASSERT_EQ("a", val);
ASSERT_OK(db->Get(ReadOptions(), "b", &val));
ASSERT_EQ("b", val);
db->Close();
delete db;
DestroyDB(dbname, opts);
dbname = dbname2_;
}
}
// The parameters are as follows -
// 1. True means Options::env is non-null, false means null
// 2. True means use Env::Default, false means custom
// 3. True means use FileSystem::Default, false means custom
INSTANTIATE_TEST_CASE_P(
EnvFSTest, EnvFSTestWithParam,
::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Bool()));
// This test ensures that default Env and those allocated by
// NewCompositeEnv() all share the same threadpool
TEST_F(EnvTest, MultipleCompositeEnv) {
std::shared_ptr<FaultInjectionTestFS> fs1 =
std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
std::shared_ptr<FaultInjectionTestFS> fs2 =
std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
std::unique_ptr<Env> env1 = NewCompositeEnv(fs1);
std::unique_ptr<Env> env2 = NewCompositeEnv(fs2);
Env::Default()->SetBackgroundThreads(8, Env::HIGH);
Env::Default()->SetBackgroundThreads(16, Env::LOW);
ASSERT_EQ(env1->GetBackgroundThreads(Env::LOW), 16);
ASSERT_EQ(env1->GetBackgroundThreads(Env::HIGH), 8);
ASSERT_EQ(env2->GetBackgroundThreads(Env::LOW), 16);
ASSERT_EQ(env2->GetBackgroundThreads(Env::HIGH), 8);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

12
env/file_system.cc vendored

@ -26,6 +26,18 @@ Status FileSystem::Load(const std::string& value,
return s; return s;
} }
IOStatus FileSystem::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) {
IOStatus s = RenameFile(old_fname, fname, opts.io_options, dbg);
if (!s.ok()) {
return s;
}
return NewWritableFile(fname, opts, result, dbg);
}
FileOptions FileSystem::OptimizeForLogRead( FileOptions FileSystem::OptimizeForLogRead(
const FileOptions& file_options) const { const FileOptions& file_options) const {
FileOptions optimized_file_options(file_options); FileOptions optimized_file_options(file_options);

37
env/fs_posix.cc vendored

@ -47,6 +47,7 @@
#include <set> #include <set>
#include <vector> #include <vector>
#include "env/composite_env_wrapper.h"
#include "env/io_posix.h" #include "env/io_posix.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "logging/posix_logger.h" #include "logging/posix_logger.h"
@ -81,6 +82,10 @@ inline mode_t GetDBFileMode(bool allow_non_owner_access) {
return allow_non_owner_access ? 0644 : 0600; return allow_non_owner_access ? 0644 : 0600;
} }
static uint64_t gettid() {
return Env::Default()->GetThreadID();
}
// list of pathnames that are locked // list of pathnames that are locked
// Only used for error message. // Only used for error message.
struct LockHoldingInfo { struct LockHoldingInfo {
@ -540,10 +545,34 @@ class PosixFileSystem : public FileSystem {
return IOStatus::OK(); return IOStatus::OK();
} }
IOStatus NewLogger(const std::string& /*fname*/, const IOOptions& /*opts*/, IOStatus NewLogger(const std::string& fname, const IOOptions& /*opts*/,
std::shared_ptr<ROCKSDB_NAMESPACE::Logger>* /*ptr*/, std::shared_ptr<Logger>* result,
IODebugContext* /*dbg*/) override { IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported(); FILE* f;
{
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(),
"w"
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 7)
"e" // glibc extension to enable O_CLOEXEC
#endif
#endif
);
}
if (f == nullptr) {
result->reset();
return status_to_io_status(
IOError("when fopen a file for new logger", fname, errno));
} else {
int fd = fileno(f);
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
#endif
SetFD_CLOEXEC(fd, nullptr);
result->reset(new PosixLogger(f, &gettid, Env::Default()));
return IOStatus::OK();
}
} }
IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/, IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,

@ -57,6 +57,7 @@ struct MutableDBOptions;
class RateLimiter; class RateLimiter;
class ThreadStatusUpdater; class ThreadStatusUpdater;
struct ThreadStatus; struct ThreadStatus;
class FileSystem;
const size_t kDefaultPageSize = 4 * 1024; const size_t kDefaultPageSize = 4 * 1024;
@ -140,7 +141,9 @@ class Env {
uint64_t size_bytes; uint64_t size_bytes;
}; };
Env() : thread_status_updater_(nullptr) {} Env();
// Construct an Env with a separate FileSystem implementation
Env(std::shared_ptr<FileSystem> fs);
// No copying allowed // No copying allowed
Env(const Env&) = delete; Env(const Env&) = delete;
void operator=(const Env&) = delete; void operator=(const Env&) = delete;
@ -539,12 +542,19 @@ class Env {
virtual void SanitizeEnvOptions(EnvOptions* /*env_opts*/) const {} virtual void SanitizeEnvOptions(EnvOptions* /*env_opts*/) const {}
// Get the FileSystem implementation this Env was constructed with. It
// could be a fully implemented one, or a wrapper class around the Env
const std::shared_ptr<FileSystem>& GetFileSystem() const;
// If you're adding methods here, remember to add them to EnvWrapper too. // If you're adding methods here, remember to add them to EnvWrapper too.
protected: protected:
// The pointer to an internal structure that will update the // The pointer to an internal structure that will update the
// status of each thread. // status of each thread.
ThreadStatusUpdater* thread_status_updater_; ThreadStatusUpdater* thread_status_updater_;
// Pointer to the underlying FileSystem implementation
std::shared_ptr<FileSystem> file_system_;
}; };
// The factory function to construct a ThreadStatusUpdater. Any Env // The factory function to construct a ThreadStatusUpdater. Any Env
@ -1603,4 +1613,6 @@ Env* NewTimedEnv(Env* base_env);
Status NewEnvLogger(const std::string& fname, Env* env, Status NewEnvLogger(const std::string& fname, Env* env,
std::shared_ptr<Logger>* result); std::shared_ptr<Logger>* result);
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -102,6 +102,9 @@ struct FileOptions : EnvOptions {
FileOptions(const EnvOptions& opts) FileOptions(const EnvOptions& opts)
: EnvOptions(opts) {} : EnvOptions(opts) {}
FileOptions(const FileOptions& opts)
: EnvOptions(opts), io_options(opts.io_options) {}
}; };
// A structure to pass back some debugging information from the FileSystem // A structure to pass back some debugging information from the FileSystem
@ -263,7 +266,7 @@ class FileSystem {
const std::string& old_fname, const std::string& old_fname,
const FileOptions& file_opts, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) = 0; IODebugContext* dbg);
// Open `fname` for random read and write, if file doesn't exist the file // Open `fname` for random read and write, if file doesn't exist the file
// will be created. On success, stores a pointer to the new file in // will be created. On success, stores a pointer to the new file in
@ -465,6 +468,10 @@ class FileSystem {
std::string* output_path, std::string* output_path,
IODebugContext* dbg) = 0; IODebugContext* dbg) = 0;
// Sanitize the FileOptions. Typically called by a FileOptions/EnvOptions
// copy constructor
virtual void SanitizeFileOptions(FileOptions* /*opts*/) const {}
// OptimizeForLogRead will create a new FileOptions object that is a copy of // OptimizeForLogRead will create a new FileOptions object that is a copy of
// the FileOptions in the parameters, but is optimized for reading log files. // the FileOptions in the parameters, but is optimized for reading log files.
virtual FileOptions OptimizeForLogRead(const FileOptions& file_options) const; virtual FileOptions OptimizeForLogRead(const FileOptions& file_options) const;
@ -1001,11 +1008,13 @@ class FSDirectory {
class FileSystemWrapper : public FileSystem { class FileSystemWrapper : public FileSystem {
public: public:
// Initialize an EnvWrapper that delegates all calls to *t // Initialize an EnvWrapper that delegates all calls to *t
explicit FileSystemWrapper(FileSystem* t) : target_(t) {} explicit FileSystemWrapper(std::shared_ptr<FileSystem> t) : target_(t) {}
~FileSystemWrapper() override {} ~FileSystemWrapper() override {}
const char* Name() const override { return target_->Name(); }
// Return the target to which this Env forwards all calls // Return the target to which this Env forwards all calls
FileSystem* target() const { return target_; } FileSystem* target() const { return target_.get(); }
// The following text is boilerplate that forwards all methods to target() // The following text is boilerplate that forwards all methods to target()
IOStatus NewSequentialFile(const std::string& f, IOStatus NewSequentialFile(const std::string& f,
@ -1149,6 +1158,10 @@ class FileSystemWrapper : public FileSystem {
return target_->NewLogger(fname, options, result, dbg); return target_->NewLogger(fname, options, result, dbg);
} }
void SanitizeFileOptions(FileOptions* opts) const override {
target_->SanitizeFileOptions(opts);
}
FileOptions OptimizeForLogRead( FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override { const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options); return target_->OptimizeForLogRead(file_options);
@ -1182,7 +1195,7 @@ class FileSystemWrapper : public FileSystem {
} }
private: private:
FileSystem* target_; std::shared_ptr<FileSystem> target_;
}; };
class FSSequentialFileWrapper : public FSSequentialFile { class FSSequentialFileWrapper : public FSSequentialFile {

@ -396,11 +396,6 @@ struct DBOptions {
// Default: Env::Default() // Default: Env::Default()
Env* env = Env::Default(); Env* env = Env::Default();
// Use the specified object to interact with the storage to
// read/write files. This is in addition to env. This option should be used
// if the desired storage subsystem provides a FileSystem implementation.
std::shared_ptr<FileSystem> file_system = nullptr;
// Use to control write rate of flush and compaction. Flush has higher // Use to control write rate of flush and compaction. Flush has higher
// priority than compaction. Rate limiting is disabled if nullptr. // priority than compaction. Rate limiting is disabled if nullptr.
// If rate limiter is enabled, bytes_per_sync is set to 1MB by default. // If rate limiter is enabled, bytes_per_sync is set to 1MB by default.

@ -26,7 +26,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
error_if_exists(options.error_if_exists), error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks), paranoid_checks(options.paranoid_checks),
env(options.env), env(options.env),
fs(options.file_system), fs(options.env->GetFileSystem()),
rate_limiter(options.rate_limiter), rate_limiter(options.rate_limiter),
sst_file_manager(options.sst_file_manager), sst_file_manager(options.sst_file_manager),
info_log(options.info_log), info_log(options.info_log),

@ -38,7 +38,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.error_if_exists = immutable_db_options.error_if_exists; options.error_if_exists = immutable_db_options.error_if_exists;
options.paranoid_checks = immutable_db_options.paranoid_checks; options.paranoid_checks = immutable_db_options.paranoid_checks;
options.env = immutable_db_options.env; options.env = immutable_db_options.env;
options.file_system = immutable_db_options.fs;
options.rate_limiter = immutable_db_options.rate_limiter; options.rate_limiter = immutable_db_options.rate_limiter;
options.sst_file_manager = immutable_db_options.sst_file_manager; options.sst_file_manager = immutable_db_options.sst_file_manager;
options.info_log = immutable_db_options.info_log; options.info_log = immutable_db_options.info_log;

@ -181,8 +181,6 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
const OffsetGap kDBOptionsBlacklist = { const OffsetGap kDBOptionsBlacklist = {
{offsetof(struct DBOptions, env), sizeof(Env*)}, {offsetof(struct DBOptions, env), sizeof(Env*)},
{offsetof(struct DBOptions, file_system),
sizeof(std::shared_ptr<FileSystem>)},
{offsetof(struct DBOptions, rate_limiter), {offsetof(struct DBOptions, rate_limiter),
sizeof(std::shared_ptr<RateLimiter>)}, sizeof(std::shared_ptr<RateLimiter>)},
{offsetof(struct DBOptions, sst_file_manager), {offsetof(struct DBOptions, sst_file_manager),

@ -137,7 +137,7 @@ class TestFSDirectory : public FSDirectory {
class FaultInjectionTestFS : public FileSystemWrapper { class FaultInjectionTestFS : public FileSystemWrapper {
public: public:
explicit FaultInjectionTestFS(FileSystem* base) explicit FaultInjectionTestFS(std::shared_ptr<FileSystem> base)
: FileSystemWrapper(base), filesystem_active_(true) {} : FileSystemWrapper(base), filesystem_active_(true) {}
virtual ~FaultInjectionTestFS() {} virtual ~FaultInjectionTestFS() {}

@ -295,8 +295,6 @@ void LDBCommand::Run() {
options_.env = env; options_.env = env;
} }
options_.file_system.reset(new LegacyFileSystemWrapper(options_.env));
if (db_ == nullptr && !NoDBOpen()) { if (db_ == nullptr && !NoDBOpen()) {
OpenDB(); OpenDB();
if (exec_state_.IsFailed() && try_load_options_) { if (exec_state_.IsFailed() && try_load_options_) {
@ -1170,7 +1168,7 @@ void GetLiveFilesChecksumInfoFromVersionSet(Options options,
/*block_cache_tracer=*/nullptr); /*block_cache_tracer=*/nullptr);
std::vector<std::string> cf_name_list; std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path, s = versions.ListColumnFamilies(&cf_name_list, db_path,
options.file_system.get()); immutable_db_options.fs.get());
if (s.ok()) { if (s.ok()) {
std::vector<ColumnFamilyDescriptor> cf_list; std::vector<ColumnFamilyDescriptor> cf_list;
for (const auto& name : cf_name_list) { for (const auto& name : cf_name_list) {
@ -1913,8 +1911,6 @@ void ReduceDBLevelsCommand::DoCommand() {
Status st; Status st;
Options opt = PrepareOptionsForOpenDB(); Options opt = PrepareOptionsForOpenDB();
int old_level_num = -1; int old_level_num = -1;
opt.file_system.reset(new LegacyFileSystemWrapper(opt.env));
;
st = GetOldNumOfLevels(opt, &old_level_num); st = GetOldNumOfLevels(opt, &old_level_num);
if (!st.ok()) { if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed(st.ToString()); exec_state_ = LDBCommandExecuteResult::Failed(st.ToString());

@ -81,8 +81,6 @@ TEST_F(LdbCmdTest, MemEnv) {
opts.env = env.get(); opts.env = env.get();
opts.create_if_missing = true; opts.create_if_missing = true;
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
DB* db = nullptr; DB* db = nullptr;
std::string dbname = test::TmpDir(); std::string dbname = test::TmpDir();
ASSERT_OK(DB::Open(opts, dbname, &db)); ASSERT_OK(DB::Open(opts, dbname, &db));
@ -199,7 +197,7 @@ class FileChecksumTestHelper {
std::vector<std::string> cf_name_list; std::vector<std::string> cf_name_list;
Status s; Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_, s = versions.ListColumnFamilies(&cf_name_list, dbname_,
options_.file_system.get()); immutable_db_options.fs.get());
if (s.ok()) { if (s.ok()) {
std::vector<ColumnFamilyDescriptor> cf_list; std::vector<ColumnFamilyDescriptor> cf_list;
for (const auto& name : cf_name_list) { for (const auto& name : cf_name_list) {
@ -264,7 +262,6 @@ TEST_F(LdbCmdTest, DumpFileChecksumNoChecksum) {
Options opts; Options opts;
opts.env = env.get(); opts.env = env.get();
opts.create_if_missing = true; opts.create_if_missing = true;
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
DB* db = nullptr; DB* db = nullptr;
std::string dbname = test::TmpDir(); std::string dbname = test::TmpDir();
@ -351,7 +348,6 @@ TEST_F(LdbCmdTest, DumpFileChecksumCRC32) {
opts.create_if_missing = true; opts.create_if_missing = true;
opts.sst_file_checksum_func = opts.sst_file_checksum_func =
std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c()); std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
DB* db = nullptr; DB* db = nullptr;
std::string dbname = test::TmpDir(); std::string dbname = test::TmpDir();

Loading…
Cancel
Save