Create a CustomEnv class; Add WinFileSystem; Make LegacyFileSystemWrapper private (#7703)

Summary:
This PR does the following:
-> Creates a WinFileSystem class.  This class is the Windows equivalent of the PosixFileSystem and will be used on Windows systems.
-> Introduces a CustomEnv class.  A CustomEnv is an Env that takes a FileSystem as constructor argument.  I believe there will only ever be two implementations of this class (PosixEnv and WinEnv).  There is still a CustomEnvWrapper class that takes an Env and a FileSystem and wraps the Env calls with the input Env but uses the FileSystem for the FileSystem calls
-> Eliminates the public uses of the LegacyFileSystemWrapper.

With this change in place, there are effectively the following patterns of Env:
- "Base Env classes" (PosixEnv, WinEnv).  These classes implement the core Env functions (e.g. Threads) and have a hard-coded input FileSystem.  These classes inherit from CompositeEnv, implement the core Env functions (threads) and delegate the FileSystem-like calls to the input file system.
- Wrapped Composite Env classes (MemEnv).  These classes take in an Env and a FileSystem.  The core env functions are re-directed to the wrapped env.  The file system calls are redirected to the input file system
- Legacy Wrapped Env classes.  These classes take in an Env input (but no FileSystem).  The core env functions are re-directed to the wrapped env.  A "Legacy File System" is created using this env and the file system calls directed to the env itself.

With these changes in place, the PosixEnv becomes a singleton -- there is only ever one created.  Any other use of the PosixEnv is via another wrapped env.  This cleans up some of the issues with the env construction and destruction.

Additionally, there were places in the code that required had an Env when they required a FileSystem.  Many of these places would wrap the Env with a LegacyFileSystemWrapper instead of using the env->GetFileSystem().  These places were changed, thereby removing layers of additional redirection (LegacyFileSystem --> Env --> Env::FileSystem).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7703

Reviewed By: zhichao-cao

Differential Revision: D25762190

Pulled By: anand1976

fbshipit-source-id: 1a088e97fc916f28ac69c149cd1dcad0ab31704b
main
mrambacher 4 years ago committed by Facebook GitHub Bot
parent c1a65a4de4
commit e628f59e87
  1. 27
      db/blob/blob_file_builder_test.cc
  2. 2
      db/compaction/compaction_job_test.cc
  3. 4
      db/db_basic_test.cc
  4. 5
      db/db_impl/db_impl.cc
  5. 41
      db/db_test2.cc
  6. 2
      db/flush_job_test.cc
  7. 1
      db/repair.cc
  8. 8
      db/repair_test.cc
  9. 4
      db/wal_manager_test.cc
  10. 364
      env/composite_env_wrapper.h
  11. 257
      env/env.cc
  12. 26
      env/env_posix.cc
  13. 9
      env/file_system.cc
  14. 9
      file/delete_scheduler_test.cc
  15. 26
      file/file_util.h
  16. 4
      file/prefetch_test.cc
  17. 9
      file/sst_file_manager_impl.cc
  18. 3
      include/rocksdb/env.h
  19. 7
      include/rocksdb/file_system.h
  20. 2
      port/win/env_default.cc
  21. 956
      port/win/env_win.cc
  22. 397
      port/win/env_win.h
  23. 553
      port/win/io_win.cc
  24. 258
      port/win/io_win.h
  25. 18
      port/win/win_logger.cc
  26. 15
      port/win/win_logger.h
  27. 20
      test_util/testutil.cc
  28. 4
      test_util/testutil.h
  29. 6
      tools/db_bench_tool_test.cc
  30. 8
      utilities/blob_db/blob_db_test.cc
  31. 10
      utilities/options/options_util.cc

@ -15,7 +15,6 @@
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_sequential_reader.h"
#include "env/composite_env_wrapper.h"
#include "env/mock_env.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
@ -40,7 +39,9 @@ class TestFileNumberGenerator {
class BlobFileBuilderTest : public testing::Test {
protected:
BlobFileBuilderTest() : mock_env_(Env::Default()), fs_(&mock_env_) {}
BlobFileBuilderTest() : mock_env_(Env::Default()) {
fs_ = mock_env_.GetFileSystem();
}
void VerifyBlobFile(uint64_t blob_file_number,
const std::string& blob_file_path,
@ -54,7 +55,7 @@ class BlobFileBuilderTest : public testing::Test {
std::unique_ptr<FSRandomAccessFile> file;
constexpr IODebugContext* dbg = nullptr;
ASSERT_OK(
fs_.NewRandomAccessFile(blob_file_path, file_options_, &file, dbg));
fs_->NewRandomAccessFile(blob_file_path, file_options_, &file, dbg));
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), blob_file_path,
@ -108,7 +109,7 @@ class BlobFileBuilderTest : public testing::Test {
}
MockEnv mock_env_;
LegacyFileSystemWrapper fs_;
std::shared_ptr<FileSystem> fs_;
FileOptions file_options_;
};
@ -138,7 +139,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -221,7 +222,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -306,7 +307,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -358,7 +359,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -440,7 +441,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -517,7 +518,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_,
BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(),
&immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
@ -571,12 +572,12 @@ class BlobFileBuilderIOErrorTest
BlobFileBuilderIOErrorTest()
: mock_env_(Env::Default()),
fault_injection_env_(&mock_env_),
fs_(&fault_injection_env_),
fs_(fault_injection_env_.GetFileSystem()),
sync_point_(GetParam()) {}
MockEnv mock_env_;
FaultInjectionTestEnv fault_injection_env_;
LegacyFileSystemWrapper fs_;
std::shared_ptr<FileSystem> fs_;
FileOptions file_options_;
std::string sync_point_;
};
@ -616,7 +617,7 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_,
&fs_, &immutable_cf_options, &mutable_cf_options,
fs_.get(), &immutable_cf_options, &mutable_cf_options,
&file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint,
&blob_file_paths, &blob_file_additions);

@ -72,7 +72,7 @@ class CompactionJobTestBase : public testing::Test {
CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
std::function<std::string(uint64_t)> encode_u64_ts)
: env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
fs_(env_->GetFileSystem()),
dbname_(std::move(dbname)),
ucmp_(ucmp),
db_options_(),

@ -3274,7 +3274,9 @@ class DeadlineFS : public FileSystemWrapper {
// Increment the IO counter and return a delay in microseconds
IOStatus ShouldDelay(const IOOptions& opts) {
if (!deadline_.count() && !io_timeout_.count()) {
if (timedout_) {
return IOStatus::TimedOut();
} else if (!deadline_.count() && !io_timeout_.count()) {
return IOStatus::OK();
}
if (!ignore_deadline_ && delay_trigger_ == io_count_++) {

@ -53,7 +53,6 @@
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/write_callback.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
@ -3099,8 +3098,8 @@ const std::string& DBImpl::GetName() const { return dbname_; }
Env* DBImpl::GetEnv() const { return env_; }
FileSystem* DB::GetFileSystem() const {
static LegacyFileSystemWrapper fs_wrap(GetEnv());
return &fs_wrap;
const auto& fs = GetEnv()->GetFileSystem();
return fs.get();
}
FileSystem* DBImpl::GetFileSystem() const {

@ -3685,20 +3685,26 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
TEST_F(DBTest2, TestNumPread) {
Options options = CurrentOptions();
bool prefetch_supported =
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// disable block cache
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
env_->count_random_reads_ = true;
env_->random_file_open_counter_.store(0);
ASSERT_OK(Put("bar", "foo"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
// After flush, we'll open the file and read footer, meta block,
// property block and index block.
ASSERT_EQ(4, env_->random_read_counter_.Read());
if (prefetch_supported) {
// After flush, we'll open the file and read footer, meta block,
// property block and index block.
ASSERT_EQ(4, env_->random_read_counter_.Read());
} else {
// With prefetch not supported, we will do a single read into a buffer
ASSERT_EQ(1, env_->random_read_counter_.Read());
}
ASSERT_EQ(1, env_->random_file_open_counter_.load());
// One pread per a normal data block read
@ -3714,19 +3720,30 @@ TEST_F(DBTest2, TestNumPread) {
ASSERT_OK(Put("bar2", "foo2"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Flush());
// After flush, we'll open the file and read footer, meta block,
// property block and index block.
ASSERT_EQ(4, env_->random_read_counter_.Read());
if (prefetch_supported) {
// After flush, we'll open the file and read footer, meta block,
// property block and index block.
ASSERT_EQ(4, env_->random_read_counter_.Read());
} else {
// With prefetch not supported, we will do a single read into a buffer
ASSERT_EQ(1, env_->random_read_counter_.Read());
}
ASSERT_EQ(1, env_->random_file_open_counter_.load());
// Compaction needs two input blocks, which requires 2 preads, and
// generate a new SST file which needs 4 preads (footer, meta block,
// property block and index block). In total 6.
env_->random_file_open_counter_.store(0);
env_->random_read_counter_.Reset();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(6, env_->random_read_counter_.Read());
// All compactin input files should have already been opened.
if (prefetch_supported) {
// Compaction needs two input blocks, which requires 2 preads, and
// generate a new SST file which needs 4 preads (footer, meta block,
// property block and index block). In total 6.
ASSERT_EQ(6, env_->random_read_counter_.Read());
} else {
// With prefetch off, compaction needs two input blocks,
// followed by a single buffered read. In total 3.
ASSERT_EQ(3, env_->random_read_counter_.Read());
}
// All compaction input files should have already been opened.
ASSERT_EQ(1, env_->random_file_open_counter_.load());
// One pread per a normal data block read

@ -32,7 +32,7 @@ class FlushJobTestBase : public testing::Test {
protected:
FlushJobTestBase(std::string dbname, const Comparator* ucmp)
: env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
fs_(env_->GetFileSystem()),
dbname_(std::move(dbname)),
ucmp_(ucmp),
options_(),

@ -439,7 +439,6 @@ class Repairer {
range_del_iters.emplace_back(range_del_iter);
}
LegacyFileSystemWrapper fs(env_);
IOStatus io_s;
status = BuildTable(
dbname_, /* versions */ nullptr, immutable_db_options_,

@ -80,8 +80,8 @@ TEST_F(RepairTest, CorruptManifest) {
Close();
ASSERT_OK(env_->FileExists(manifest_path));
LegacyFileSystemWrapper fs(env_);
ASSERT_OK(CreateFile(&fs, manifest_path, "blah", false /* use_fsync */));
ASSERT_OK(CreateFile(env_->GetFileSystem(), manifest_path, "blah",
false /* use_fsync */));
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
@ -163,8 +163,8 @@ TEST_F(RepairTest, CorruptSst) {
ASSERT_OK(GetFirstSstPath(&sst_path));
ASSERT_FALSE(sst_path.empty());
LegacyFileSystemWrapper fs(env_);
ASSERT_OK(CreateFile(&fs, sst_path, "blah", false /* use_fsync */));
ASSERT_OK(CreateFile(env_->GetFileSystem(), sst_path, "blah",
false /* use_fsync */));
Close();
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));

@ -47,8 +47,7 @@ class WalManagerTest : public testing::Test {
std::numeric_limits<uint64_t>::max());
db_options_.wal_dir = dbname_;
db_options_.env = env_.get();
fs_.reset(new LegacyFileSystemWrapper(env_.get()));
db_options_.fs = fs_;
db_options_.fs = env_->GetFileSystem();
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
@ -116,7 +115,6 @@ class WalManagerTest : public testing::Test {
WriteBufferManager write_buffer_manager_;
std::unique_ptr<VersionSet> versions_;
std::unique_ptr<WalManager> wal_manager_;
std::shared_ptr<LegacyFileSystemWrapper> fs_;
std::unique_ptr<log::Writer> current_log_writer_;
uint64_t current_log_number_;

@ -8,6 +8,13 @@
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#ifdef _WIN32
// Windows API macro interference
#undef DeleteFile
#undef GetCurrentTime
#undef LoadLibrary
#endif
namespace ROCKSDB_NAMESPACE {
// The CompositeEnvWrapper class provides an interface that is compatible
@ -271,16 +278,11 @@ class CompositeDirectoryWrapper : public Directory {
std::unique_ptr<FSDirectory> target_;
};
class CompositeEnvWrapper : public Env {
class CompositeEnv : public Env {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env, std::shared_ptr<FileSystem> fs)
: Env(fs), env_target_(env) {}
~CompositeEnvWrapper() {}
// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }
explicit CompositeEnv(const std::shared_ptr<FileSystem>& fs) : Env(fs) {}
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return file_system_->RegisterDbPaths(paths);
@ -498,6 +500,66 @@ class CompositeEnvWrapper : public Env {
return file_system_->IsDirectory(path, io_opts, is_dir, &dbg);
}
Status GetTestDirectory(std::string* path) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->GetTestDirectory(io_opts, path, &dbg);
}
EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return file_system_->OptimizeForLogRead(FileOptions(env_options));
}
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override {
return file_system_->OptimizeForManifestRead(FileOptions(env_options));
}
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override {
return file_system_->OptimizeForLogWrite(FileOptions(env_options),
db_options);
}
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override {
return file_system_->OptimizeForManifestWrite(FileOptions(env_options));
}
EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override {
return file_system_->OptimizeForCompactionTableWrite(
FileOptions(env_options), immutable_ops);
}
EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return file_system_->OptimizeForCompactionTableRead(
FileOptions(env_options), db_options);
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->GetFreeSpace(path, io_opts, diskfree, &dbg);
}
};
class CompositeEnvWrapper : public CompositeEnv {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<FileSystem>& fs)
: CompositeEnv(fs), env_target_(env) {}
// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }
#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
@ -522,11 +584,7 @@ class CompositeEnvWrapper : public Env {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return env_target_->GetThreadPoolQueueLen(pri);
}
Status GetTestDirectory(std::string* path) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->GetTestDirectory(io_opts, path, &dbg);
}
uint64_t NowMicros() override { return env_target_->NowMicros(); }
uint64_t NowNanos() override { return env_target_->NowNanos(); }
uint64_t NowCPUNanos() override { return env_target_->NowCPUNanos(); }
@ -585,45 +643,6 @@ class CompositeEnvWrapper : public Env {
return env_target_->GenerateUniqueId();
}
EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return file_system_->OptimizeForLogRead(FileOptions(env_options));
}
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override {
return file_system_->OptimizeForManifestRead(FileOptions(env_options));
}
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override {
return file_system_->OptimizeForLogWrite(FileOptions(env_options),
db_options);
}
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override {
return file_system_->OptimizeForManifestWrite(FileOptions(env_options));
}
EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override {
return file_system_->OptimizeForCompactionTableWrite(
FileOptions(env_options), immutable_ops);
}
EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return file_system_->OptimizeForCompactionTableRead(
FileOptions(env_options), db_options);
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->GetFreeSpace(path, io_opts, diskfree, &dbg);
}
private:
Env* env_target_;
};
@ -880,249 +899,6 @@ class LegacyDirectoryWrapper : public FSDirectory {
std::unique_ptr<Directory> target_;
};
class LegacyFileSystemWrapper : public FileSystem {
public:
// Initialize an EnvWrapper that delegates all calls to *t
explicit LegacyFileSystemWrapper(Env* t) : target_(t) {}
~LegacyFileSystemWrapper() override {}
const char* Name() const override { return "Legacy File System"; }
// Return the target to which this Env forwards all calls
Env* target() const { return target_; }
// The following text is boilerplate that forwards all methods to target()
IOStatus NewSequentialFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<SequentialFile> file;
Status s = target_->NewSequentialFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacySequentialFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomAccessFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomAccessFile> file;
Status s = target_->NewRandomAccessFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyRandomAccessFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->NewWritableFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReopenWritableFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReuseWritableFile(fname, old_fname, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomRWFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomRWFile> file;
Status s = target_->NewRandomRWFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyRandomRWFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return status_to_io_status(
target_->NewMemoryMappedFileBuffer(fname, result));
}
IOStatus NewDirectory(const std::string& name, const IOOptions& /*io_opts*/,
std::unique_ptr<FSDirectory>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<Directory> dir;
Status s = target_->NewDirectory(name, &dir);
if (s.ok()) {
result->reset(new LegacyDirectoryWrapper(std::move(dir)));
}
return status_to_io_status(std::move(s));
}
IOStatus FileExists(const std::string& f, const IOOptions& /*io_opts*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->FileExists(f));
}
IOStatus GetChildren(const std::string& dir, const IOOptions& /*io_opts*/,
std::vector<std::string>* r,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildren(dir, r));
}
IOStatus GetChildrenFileAttributes(const std::string& dir,
const IOOptions& /*options*/,
std::vector<FileAttributes>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildrenFileAttributes(dir, result));
}
IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteFile(f));
}
IOStatus Truncate(const std::string& fname, size_t size,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(fname, size));
}
IOStatus CreateDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDir(d));
}
IOStatus CreateDirIfMissing(const std::string& d,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDirIfMissing(d));
}
IOStatus DeleteDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteDir(d));
}
IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
uint64_t* s, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFileSize(f, s));
}
IOStatus GetFileModificationTime(const std::string& fname,
const IOOptions& /*options*/,
uint64_t* file_mtime,
IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->GetFileModificationTime(fname, file_mtime));
}
IOStatus GetAbsolutePath(const std::string& db_path,
const IOOptions& /*options*/,
std::string* output_path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetAbsolutePath(db_path, output_path));
}
IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RenameFile(s, t));
}
IOStatus LinkFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LinkFile(s, t));
}
IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*options*/,
uint64_t* count, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NumFileLinks(fname, count));
}
IOStatus AreFilesSame(const std::string& first, const std::string& second,
const IOOptions& /*options*/, bool* res,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->AreFilesSame(first, second, res));
}
IOStatus LockFile(const std::string& f, const IOOptions& /*options*/,
FileLock** l, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LockFile(f, l));
}
IOStatus UnlockFile(FileLock* l, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->UnlockFile(l));
}
IOStatus GetTestDirectory(const IOOptions& /*options*/, std::string* path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetTestDirectory(path));
}
IOStatus NewLogger(const std::string& fname, const IOOptions& /*options*/,
std::shared_ptr<Logger>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NewLogger(fname, result));
}
void SanitizeFileOptions(FileOptions* opts) const override {
target_->SanitizeEnvOptions(opts);
}
FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options);
}
FileOptions OptimizeForManifestRead(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestRead(file_options);
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override {
return target_->OptimizeForLogWrite(file_options, db_options);
}
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestWrite(file_options);
}
FileOptions OptimizeForCompactionTableWrite(
const FileOptions& file_options,
const ImmutableDBOptions& immutable_ops) const override {
return target_->OptimizeForCompactionTableWrite(file_options,
immutable_ops);
}
FileOptions OptimizeForCompactionTableRead(
const FileOptions& file_options,
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForCompactionTableRead(file_options, db_options);
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
IOStatus GetFreeSpace(const std::string& path, const IOOptions& /*options*/,
uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree));
}
IOStatus IsDirectory(const std::string& path, const IOOptions& /*options*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->IsDirectory(path, is_dir));
}
private:
Env* target_;
};
inline std::unique_ptr<FSSequentialFile> NewLegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>& file) {
return std::unique_ptr<FSSequentialFile>(

257
env/env.cc vendored

@ -20,6 +20,248 @@
#include "util/autovector.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class LegacyFileSystemWrapper : public FileSystem {
public:
// Initialize an EnvWrapper that delegates all calls to *t
explicit LegacyFileSystemWrapper(Env* t) : target_(t) {}
~LegacyFileSystemWrapper() override {}
const char* Name() const override { return "Legacy File System"; }
// Return the target to which this Env forwards all calls
Env* target() const { return target_; }
// The following text is boilerplate that forwards all methods to target()
IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<SequentialFile> file;
Status s = target_->NewSequentialFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacySequentialFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomAccessFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomAccessFile> file;
Status s = target_->NewRandomAccessFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyRandomAccessFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->NewWritableFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReopenWritableFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReuseWritableFile(fname, old_fname, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomRWFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomRWFile> file;
Status s = target_->NewRandomRWFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyRandomRWFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return status_to_io_status(
target_->NewMemoryMappedFileBuffer(fname, result));
}
IOStatus NewDirectory(const std::string& name, const IOOptions& /*io_opts*/,
std::unique_ptr<FSDirectory>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<Directory> dir;
Status s = target_->NewDirectory(name, &dir);
if (s.ok()) {
result->reset(new LegacyDirectoryWrapper(std::move(dir)));
}
return status_to_io_status(std::move(s));
}
IOStatus FileExists(const std::string& f, const IOOptions& /*io_opts*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->FileExists(f));
}
IOStatus GetChildren(const std::string& dir, const IOOptions& /*io_opts*/,
std::vector<std::string>* r,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildren(dir, r));
}
IOStatus GetChildrenFileAttributes(const std::string& dir,
const IOOptions& /*options*/,
std::vector<FileAttributes>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildrenFileAttributes(dir, result));
}
IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteFile(f));
}
IOStatus Truncate(const std::string& fname, size_t size,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(fname, size));
}
IOStatus CreateDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDir(d));
}
IOStatus CreateDirIfMissing(const std::string& d,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDirIfMissing(d));
}
IOStatus DeleteDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteDir(d));
}
IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
uint64_t* s, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFileSize(f, s));
}
IOStatus GetFileModificationTime(const std::string& fname,
const IOOptions& /*options*/,
uint64_t* file_mtime,
IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->GetFileModificationTime(fname, file_mtime));
}
IOStatus GetAbsolutePath(const std::string& db_path,
const IOOptions& /*options*/,
std::string* output_path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetAbsolutePath(db_path, output_path));
}
IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RenameFile(s, t));
}
IOStatus LinkFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LinkFile(s, t));
}
IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*options*/,
uint64_t* count, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NumFileLinks(fname, count));
}
IOStatus AreFilesSame(const std::string& first, const std::string& second,
const IOOptions& /*options*/, bool* res,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->AreFilesSame(first, second, res));
}
IOStatus LockFile(const std::string& f, const IOOptions& /*options*/,
FileLock** l, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LockFile(f, l));
}
IOStatus UnlockFile(FileLock* l, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->UnlockFile(l));
}
IOStatus GetTestDirectory(const IOOptions& /*options*/, std::string* path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetTestDirectory(path));
}
IOStatus NewLogger(const std::string& fname, const IOOptions& /*options*/,
std::shared_ptr<Logger>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NewLogger(fname, result));
}
void SanitizeFileOptions(FileOptions* opts) const override {
target_->SanitizeEnvOptions(opts);
}
FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options);
}
FileOptions OptimizeForManifestRead(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestRead(file_options);
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override {
return target_->OptimizeForLogWrite(file_options, db_options);
}
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestWrite(file_options);
}
FileOptions OptimizeForCompactionTableWrite(
const FileOptions& file_options,
const ImmutableDBOptions& immutable_ops) const override {
return target_->OptimizeForCompactionTableWrite(file_options,
immutable_ops);
}
FileOptions OptimizeForCompactionTableRead(
const FileOptions& file_options,
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForCompactionTableRead(file_options, db_options);
}
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
IOStatus GetFreeSpace(const std::string& path, const IOOptions& /*options*/,
uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree));
}
IOStatus IsDirectory(const std::string& path, const IOOptions& /*options*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->IsDirectory(path, is_dir));
}
private:
Env* target_;
};
} // end anonymous namespace
Env::Env() : thread_status_updater_(nullptr) {
file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
@ -386,13 +628,13 @@ void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
bool should_sync) {
LegacyFileSystemWrapper lfsw(env);
return WriteStringToFile(&lfsw, data, fname, should_sync);
const auto& fs = env->GetFileSystem();
return WriteStringToFile(fs.get(), data, fname, should_sync);
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
LegacyFileSystemWrapper lfsw(env);
return ReadFileToString(&lfsw, fname, data);
const auto& fs = env->GetFileSystem();
return ReadFileToString(fs.get(), fname, data);
}
EnvWrapper::~EnvWrapper() {
@ -488,11 +730,4 @@ Status NewEnvLogger(const std::string& fname, Env* env,
const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
return file_system_;
}
#ifdef OS_WIN
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}
#endif
} // namespace ROCKSDB_NAMESPACE

26
env/env_posix.cc vendored

@ -122,14 +122,9 @@ class PosixDynamicLibrary : public DynamicLibrary {
};
#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
class PosixEnv : public CompositeEnvWrapper {
class PosixEnv : public CompositeEnv {
public:
// This constructor is for constructing non-default Envs, mainly by
// NewCompositeEnv(). It allows new instances to share the same
// threadpool and other resources as the default Env, while allowing
// a non-default FileSystem implementation
PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs);
PosixEnv(const PosixEnv* default_env, const std::shared_ptr<FileSystem>& fs);
~PosixEnv() override {
if (this == Env::Default()) {
for (const auto tid : threads_to_join_) {
@ -387,7 +382,7 @@ class PosixEnv : public CompositeEnvWrapper {
};
PosixEnv::PosixEnv()
: CompositeEnvWrapper(this, FileSystem::Default()),
: CompositeEnv(FileSystem::Default()),
thread_pools_storage_(Priority::TOTAL),
allow_non_owner_access_storage_(true),
thread_pools_(thread_pools_storage_),
@ -404,12 +399,13 @@ PosixEnv::PosixEnv()
thread_status_updater_ = CreateThreadStatusUpdater();
}
PosixEnv::PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs)
: CompositeEnvWrapper(this, fs),
thread_pools_(default_env->thread_pools_),
mu_(default_env->mu_),
threads_to_join_(default_env->threads_to_join_),
allow_non_owner_access_(default_env->allow_non_owner_access_) {
PosixEnv::PosixEnv(const PosixEnv* default_env,
const std::shared_ptr<FileSystem>& fs)
: CompositeEnv(fs),
thread_pools_(default_env->thread_pools_),
mu_(default_env->mu_),
threads_to_join_(default_env->threads_to_join_),
allow_non_owner_access_(default_env->allow_non_owner_access_) {
thread_status_updater_ = default_env->thread_status_updater_;
}
@ -508,7 +504,7 @@ Env* Env::Default() {
return &default_env;
}
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
}

@ -129,13 +129,4 @@ IOStatus ReadFileToString(FileSystem* fs, const std::string& fname,
return s;
}
#ifdef OS_WIN
std::shared_ptr<FileSystem> FileSystem::Default() {
static LegacyFileSystemWrapper default_fs(Env::Default());
static std::shared_ptr<LegacyFileSystemWrapper> default_fs_ptr(
&default_fs, [](LegacyFileSystemWrapper*) {});
return default_fs_ptr;
}
#endif
} // namespace ROCKSDB_NAMESPACE

@ -10,7 +10,6 @@
#include <thread>
#include <vector>
#include "env/composite_env_wrapper.h"
#include "file/file_util.h"
#include "file/sst_file_manager_impl.h"
#include "rocksdb/env.h"
@ -96,11 +95,9 @@ class DeleteSchedulerTest : public testing::Test {
// Tests in this file are for DeleteScheduler component and don't create any
// DBs, so we need to set max_trash_db_ratio to 100% (instead of default
// 25%)
std::shared_ptr<FileSystem>
fs(std::make_shared<LegacyFileSystemWrapper>(env_));
sst_file_mgr_.reset(
new SstFileManagerImpl(env_, fs, nullptr, rate_bytes_per_sec_,
/* max_trash_db_ratio= */ 1.1, 128 * 1024));
sst_file_mgr_.reset(new SstFileManagerImpl(
env_, env_->GetFileSystem(), nullptr, rate_bytes_per_sec_,
/* max_trash_db_ratio= */ 1.1, 128 * 1024));
delete_scheduler_ = sst_file_mgr_->delete_scheduler();
sst_file_mgr_->SetStatisticsPtr(stats_);
}

@ -22,10 +22,23 @@ extern IOStatus CopyFile(FileSystem* fs, const std::string& source,
const std::string& destination, uint64_t size,
bool use_fsync,
const std::shared_ptr<IOTracer>& io_tracer = nullptr);
inline IOStatus CopyFile(const std::shared_ptr<FileSystem>& fs,
const std::string& source,
const std::string& destination, uint64_t size,
bool use_fsync,
const std::shared_ptr<IOTracer>& io_tracer = nullptr) {
return CopyFile(fs.get(), source, destination, size, use_fsync, io_tracer);
}
extern IOStatus CreateFile(FileSystem* fs, const std::string& destination,
const std::string& contents, bool use_fsync);
inline IOStatus CreateFile(const std::shared_ptr<FileSystem>& fs,
const std::string& destination,
const std::string& contents, bool use_fsync) {
return CreateFile(fs.get(), destination, contents, use_fsync);
}
extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& path_to_sync, const bool force_bg,
@ -41,6 +54,19 @@ extern IOStatus GenerateOneFileChecksum(
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter = nullptr);
inline IOStatus GenerateOneFileChecksum(
const std::shared_ptr<FileSystem>& fs, const std::string& file_path,
FileChecksumGenFactory* checksum_factory,
const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer) {
return GenerateOneFileChecksum(
fs.get(), file_path, checksum_factory, requested_checksum_func_name,
file_checksum, file_checksum_func_name, verify_checksums_readahead_size,
allow_mmap_reads, io_tracer);
}
inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env,
IOOptions& opts) {
if (!env) {

@ -75,7 +75,9 @@ std::string BuildKey(int num, std::string postfix = "") {
TEST_P(PrefetchTest, Basic) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch = std::get<0>(GetParam());
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());

@ -9,7 +9,6 @@
#include <vector>
#include "db/db_impl/db_impl.h"
#include "env/composite_env_wrapper.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/sst_file_manager.h"
@ -485,13 +484,7 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
bool delete_existing_trash, Status* status,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk) {
std::shared_ptr<FileSystem> fs;
if (env == Env::Default()) {
fs = FileSystem::Default();
} else {
fs.reset(new LegacyFileSystemWrapper(env));
}
const auto& fs = env->GetFileSystem();
return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
delete_existing_trash, status, max_trash_db_ratio,

@ -30,6 +30,7 @@
// Windows API macro interference
#undef DeleteFile
#undef GetCurrentTime
#undef LoadLibrary
#endif
#if defined(__GNUC__) || defined(__clang__)
@ -1663,6 +1664,6 @@ Env* NewTimedEnv(Env* base_env);
Status NewEnvLogger(const std::string& fname, Env* env,
std::shared_ptr<Logger>* result);
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs);
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs);
} // namespace ROCKSDB_NAMESPACE

@ -366,6 +366,10 @@ class FileSystem {
return IOStatus::OK();
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef DeleteFile
#undef DeleteFile
#endif
// Delete the named file.
virtual IOStatus DeleteFile(const std::string& fname,
const IOOptions& options,
@ -1048,7 +1052,8 @@ class FSDirectory {
class FileSystemWrapper : public FileSystem {
public:
// Initialize an EnvWrapper that delegates all calls to *t
explicit FileSystemWrapper(std::shared_ptr<FileSystem> t) : target_(t) {}
explicit FileSystemWrapper(const std::shared_ptr<FileSystem>& t)
: target_(t) {}
~FileSystemWrapper() override {}
const char* Name() const override { return target_->Name(); }

@ -11,8 +11,8 @@
#include <mutex>
#include <rocksdb/env.h>
#include "port/win/env_win.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
#include "util/compression_context_cache.h"
#include "util/thread_local.h"

File diff suppressed because it is too large Load Diff

@ -15,30 +15,29 @@
// multiple threads without any external synchronization.
#pragma once
#include "port/win/win_thread.h"
#include <rocksdb/env.h>
#include "util/threadpool_imp.h"
#include <stdint.h>
#include <windows.h>
#include <mutex>
#include <vector>
#include <string>
#include <vector>
#include "env/composite_env_wrapper.h"
#include "port/win/win_thread.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "util/threadpool_imp.h"
#undef GetCurrentTime
#undef DeleteFile
#undef GetTickCount
#undef LoadLibrary
namespace ROCKSDB_NAMESPACE {
namespace port {
// Currently not designed for inheritance but rather a replacement
class WinEnvThreads {
public:
public:
explicit WinEnvThreads(Env* hosted_env);
~WinEnvThreads();
@ -46,12 +45,12 @@ public:
WinEnvThreads(const WinEnvThreads&) = delete;
WinEnvThreads& operator=(const WinEnvThreads&) = delete;
void Schedule(void(*function)(void*), void* arg, Env::Priority pri,
void* tag, void(*unschedFunction)(void* arg));
void Schedule(void (*function)(void*), void* arg, Env::Priority pri,
void* tag, void (*unschedFunction)(void* arg));
int UnSchedule(void* arg, Env::Priority pri);
void StartThread(void(*function)(void* arg), void* arg);
void StartThread(void (*function)(void* arg), void* arg);
void WaitForJoin();
@ -61,235 +60,198 @@ public:
uint64_t GetThreadID() const;
void SleepForMicroseconds(int micros);
// Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Env::Priority pri);
int GetBackgroundThreads(Env::Priority pri);
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri);
private:
private:
Env* hosted_env_;
mutable std::mutex mu_;
std::vector<ThreadPoolImpl> thread_pools_;
std::vector<WindowsThread> threads_to_join_;
};
// Designed for inheritance so can be re-used
// but certain parts replaced
class WinEnvIO {
public:
explicit WinEnvIO(Env* hosted_env);
virtual ~WinEnvIO();
virtual Status DeleteFile(const std::string& fname);
Status Truncate(const std::string& fname, size_t size);
virtual Status GetCurrentTime(int64_t* unix_time);
virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);
// Helper for NewWritable and ReopenWritableFile
virtual Status OpenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options,
bool reopen);
virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);
// The returned file will only be accessed by one thread at a time.
virtual Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options);
virtual Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result);
virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result);
virtual Status FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result);
virtual Status CreateDir(const std::string& name);
virtual Status CreateDirIfMissing(const std::string& name);
virtual Status DeleteDir(const std::string& name);
virtual Status GetFileSize(const std::string& fname, uint64_t* size);
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
virtual Status RenameFile(const std::string& src, const std::string& target);
virtual Status LinkFile(const std::string& src, const std::string& target);
virtual Status NumFileLinks(const std::string& /*fname*/,
uint64_t* /*count*/);
virtual Status AreFilesSame(const std::string& first,
const std::string& second, bool* res);
virtual Status LockFile(const std::string& lockFname, FileLock** lock);
virtual Status UnlockFile(FileLock* lock);
virtual Status GetTestDirectory(std::string* result);
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);
virtual Status IsDirectory(const std::string& path, bool* is_dir);
class WinClock {
public:
static const std::shared_ptr<WinClock>& Default();
WinClock();
virtual ~WinClock() {}
virtual uint64_t NowMicros();
virtual uint64_t NowNanos();
virtual Status GetHostName(char* name, uint64_t len);
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path);
// This seems to clash with a macro on Windows, so #undef it here
#undef GetFreeSpace
// Get the amount of free disk space
virtual Status GetFreeSpace(const std::string& path, uint64_t* diskfree);
virtual std::string TimeToString(uint64_t secondsSince1970);
virtual EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const;
virtual EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const;
virtual EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const;
size_t GetPageSize() const { return page_size_; }
virtual void SleepForMicroseconds(int micros);
size_t GetAllocationGranularity() const { return allocation_granularity_; }
virtual Status GetCurrentTime(int64_t* unix_time);
// Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time);
uint64_t GetPerfCounterFrequency() const { return perf_counter_frequency_; }
static size_t GetSectorSize(const std::string& fname);
private:
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname);
typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME);
private:
typedef VOID(WINAPI* FnGetSystemTimePreciseAsFileTime)(LPFILETIME);
Env* hosted_env_;
size_t page_size_;
size_t allocation_granularity_;
uint64_t perf_counter_frequency_;
uint64_t nano_seconds_per_period_;
FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_;
};
class WinEnv : public Env {
public:
WinEnv();
~WinEnv();
Status DeleteFile(const std::string& fname) override;
Status Truncate(const std::string& fname, size_t size) override;
Status GetCurrentTime(int64_t* unix_time) override;
class WinFileSystem : public FileSystem {
public:
static const std::shared_ptr<WinFileSystem>& Default();
WinFileSystem(const std::shared_ptr<WinClock>& clock);
~WinFileSystem() {}
const char* Name() const { return "WinFS"; }
static size_t GetSectorSize(const std::string& fname);
size_t GetPageSize() const { return page_size_; }
size_t GetAllocationGranularity() const { return allocation_granularity_; }
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
Status ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
// The returned file will only be accessed by one thread at a time.
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override;
Status NewMemoryMappedFileBuffer(
IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override;
// Truncate the named file to the specified size.
IOStatus Truncate(const std::string& /*fname*/, size_t /*size*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override;
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg) override;
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* /*dbg*/) override;
IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* dbg) override;
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus NewRandomRWFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* dbg) override;
IOStatus NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override;
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
Status CreateDir(const std::string& name) override;
Status CreateDirIfMissing(const std::string& name) override;
Status DeleteDir(const std::string& name) override;
Status GetFileSize(const std::string& fname,
uint64_t* size) override;
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;
Status RenameFile(const std::string& src,
const std::string& target) override;
IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) override;
IOStatus FileExists(const std::string& f, const IOOptions& io_opts,
IODebugContext* dbg) override;
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
std::vector<std::string>* r,
IODebugContext* dbg) override;
IOStatus CreateDir(const std::string& dirname, const IOOptions& options,
IODebugContext* dbg) override;
// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
IOStatus CreateDirIfMissing(const std::string& dirname,
const IOOptions& options,
IODebugContext* dbg) override;
// Delete the specified directory.
IOStatus DeleteDir(const std::string& dirname, const IOOptions& options,
IODebugContext* dbg) override;
// Store the size of fname in *file_size.
IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
uint64_t* file_size, IODebugContext* dbg) override;
// Store the last modification time of fname in *file_mtime.
IOStatus GetFileModificationTime(const std::string& fname,
const IOOptions& options,
uint64_t* file_mtime,
IODebugContext* dbg) override;
// Rename file src to target.
IOStatus RenameFile(const std::string& src, const std::string& target,
const IOOptions& options, IODebugContext* dbg) override;
// Hard Link file src to target.
IOStatus LinkFile(const std::string& /*src*/, const std::string& /*target*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override;
IOStatus NumFileLinks(const std::string& /*fname*/,
const IOOptions& /*options*/, uint64_t* /*count*/,
IODebugContext* /*dbg*/) override;
IOStatus AreFilesSame(const std::string& /*first*/,
const std::string& /*second*/,
const IOOptions& /*options*/, bool* /*res*/,
IODebugContext* /*dbg*/) override;
IOStatus LockFile(const std::string& fname, const IOOptions& options,
FileLock** lock, IODebugContext* dbg) override;
IOStatus UnlockFile(FileLock* lock, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus GetTestDirectory(const IOOptions& options, std::string* path,
IODebugContext* dbg) override;
// Create and returns a default logger (an instance of EnvLogger) for storing
// informational messages. Derived classes can overide to provide custom
// logger.
IOStatus NewLogger(const std::string& fname, const IOOptions& io_opts,
std::shared_ptr<Logger>* result,
IODebugContext* dbg) override;
// Get full directory name for this db.
IOStatus GetAbsolutePath(const std::string& db_path, const IOOptions& options,
std::string* output_path,
IODebugContext* dbg) override;
IOStatus IsDirectory(const std::string& /*path*/, const IOOptions& options,
bool* is_dir, IODebugContext* /*dgb*/) override;
// This seems to clash with a macro on Windows, so #undef it here
#undef GetFreeSpace
IOStatus GetFreeSpace(const std::string& /*path*/,
const IOOptions& /*options*/, uint64_t* /*diskfree*/,
IODebugContext* /*dbg*/) override;
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override;
FileOptions OptimizeForManifestRead(
const FileOptions& file_options) const override;
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override;
protected:
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);
// Returns true iff the named directory exists and is a directory.
Status LinkFile(const std::string& src,
const std::string& target) override;
virtual bool DirExists(const std::string& dname);
// Helper for NewWritable and ReopenWritableFile
virtual IOStatus OpenWritableFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
bool reopen);
Status NumFileLinks(const std::string& fname, uint64_t* count) override;
private:
std::shared_ptr<WinClock> clock_;
size_t page_size_;
size_t allocation_granularity_;
};
Status AreFilesSame(const std::string& first,
const std::string& second, bool* res) override;
// Designed for inheritance so can be re-used
// but certain parts replaced
class WinEnvIO {
public:
explicit WinEnvIO(Env* hosted_env);
Status LockFile(const std::string& lockFname, FileLock** lock) override;
virtual ~WinEnvIO();
Status UnlockFile(FileLock* lock) override;
virtual Status GetHostName(char* name, uint64_t len);
Status GetTestDirectory(std::string* result) override;
private:
Env* hosted_env_;
};
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
class WinEnv : public CompositeEnv {
public:
WinEnv();
Status IsDirectory(const std::string& path, bool* is_dir) override;
~WinEnv();
Status GetCurrentTime(int64_t* unix_time) override;
uint64_t NowMicros() override;
@ -297,19 +259,16 @@ public:
Status GetHostName(char* name, uint64_t len) override;
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override;
std::string TimeToString(uint64_t secondsSince1970) override;
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override;
void Schedule(void(*function)(void*), void* arg, Env::Priority pri,
void* tag, void(*unschedFunction)(void* arg)) override;
void Schedule(void (*function)(void*), void* arg, Env::Priority pri,
void* tag, void (*unschedFunction)(void* arg)) override;
int UnSchedule(void* arg, Env::Priority pri) override;
void StartThread(void(*function)(void* arg), void* arg) override;
void StartThread(void (*function)(void* arg), void* arg) override;
void WaitForJoin() override;
@ -317,12 +276,6 @@ public:
uint64_t GetThreadID() const override;
// This seems to clash with a macro on Windows, so #undef it here
#undef GetFreeSpace
// Get the amount of free disk space
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override;
void SleepForMicroseconds(int micros) override;
// Allow increasing the number of worker threads.
@ -331,21 +284,11 @@ public:
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override;
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override;
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override;
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override;
private:
private:
std::shared_ptr<WinClock> clock_;
WinEnvIO winenv_io_;
WinEnvThreads winenv_threads_;
};
} // namespace port
} // namespace port
} // namespace ROCKSDB_NAMESPACE

File diff suppressed because it is too large Load Diff

@ -9,51 +9,51 @@
#pragma once
#include <stdint.h>
#include <windows.h>
#include <mutex>
#include <string>
#include "rocksdb/file_system.h"
#include "rocksdb/status.h"
#include "rocksdb/env.h"
#include "util/aligned_buffer.h"
#include <windows.h>
namespace ROCKSDB_NAMESPACE {
namespace port {
std::string GetWindowsErrSz(DWORD err);
inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) {
inline IOStatus IOErrorFromWindowsError(const std::string& context, DWORD err) {
return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL))
? Status::NoSpace(context, GetWindowsErrSz(err))
? IOStatus::NoSpace(context, GetWindowsErrSz(err))
: ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND))
? Status::PathNotFound(context, GetWindowsErrSz(err))
: Status::IOError(context, GetWindowsErrSz(err));
? IOStatus::PathNotFound(context, GetWindowsErrSz(err))
: IOStatus::IOError(context, GetWindowsErrSz(err));
}
inline Status IOErrorFromLastWindowsError(const std::string& context) {
inline IOStatus IOErrorFromLastWindowsError(const std::string& context) {
return IOErrorFromWindowsError(context, GetLastError());
}
inline Status IOError(const std::string& context, int err_number) {
inline IOStatus IOError(const std::string& context, int err_number) {
return (err_number == ENOSPC)
? Status::NoSpace(context, strerror(err_number))
? IOStatus::NoSpace(context, strerror(err_number))
: (err_number == ENOENT)
? Status::PathNotFound(context, strerror(err_number))
: Status::IOError(context, strerror(err_number));
? IOStatus::PathNotFound(context, strerror(err_number))
: IOStatus::IOError(context, strerror(err_number));
}
class WinFileData;
Status pwrite(const WinFileData* file_data, const Slice& data,
uint64_t offset, size_t& bytes_written);
IOStatus pwrite(const WinFileData* file_data, const Slice& data,
uint64_t offset, size_t& bytes_written);
Status pread(const WinFileData* file_data, char* src, size_t num_bytes,
uint64_t offset, size_t& bytes_read);
IOStatus pread(const WinFileData* file_data, char* src, size_t num_bytes,
uint64_t offset, size_t& bytes_read);
Status fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size);
IOStatus fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size);
Status ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize);
IOStatus ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize);
size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size);
@ -95,34 +95,38 @@ class WinFileData {
WinFileData& operator=(const WinFileData&) = delete;
};
class WinSequentialFile : protected WinFileData, public SequentialFile {
class WinSequentialFile : protected WinFileData, public FSSequentialFile {
// Override for behavior change when creating a custom env
virtual Status PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset, size_t& bytes_read) const;
virtual IOStatus PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset,
size_t& bytes_read) const;
public:
public:
WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options);
const FileOptions& options);
~WinSequentialFile();
WinSequentialFile(const WinSequentialFile&) = delete;
WinSequentialFile& operator=(const WinSequentialFile&) = delete;
virtual Status Read(size_t n, Slice* result, char* scratch) override;
virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override;
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override;
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) override;
virtual Status Skip(uint64_t n) override;
IOStatus Skip(uint64_t n) override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
virtual bool use_direct_io() const override { return WinFileData::use_direct_io(); }
virtual bool use_direct_io() const override {
return WinFileData::use_direct_io();
}
};
// mmap() based random-access
class WinMmapReadableFile : private WinFileData, public RandomAccessFile {
class WinMmapReadableFile : private WinFileData, public FSRandomAccessFile {
HANDLE hMap_;
const void* mapped_region_;
@ -138,10 +142,11 @@ class WinMmapReadableFile : private WinFileData, public RandomAccessFile {
WinMmapReadableFile(const WinMmapReadableFile&) = delete;
WinMmapReadableFile& operator=(const WinMmapReadableFile&) = delete;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
virtual IOStatus InvalidateCache(size_t offset, size_t length) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
@ -150,7 +155,7 @@ class WinMmapReadableFile : private WinFileData, public RandomAccessFile {
// data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class WinMmapFile : private WinFileData, public WritableFile {
class WinMmapFile : private WinFileData, public FSWritableFile {
private:
HANDLE hMap_;
@ -179,51 +184,59 @@ class WinMmapFile : private WinFileData, public WritableFile {
// Can only truncate or reserve to a sector size aligned if
// used on files that are opened with Unbuffered I/O
Status TruncateFile(uint64_t toSize);
IOStatus TruncateFile(uint64_t toSize);
Status UnmapCurrentRegion();
IOStatus UnmapCurrentRegion();
Status MapNewRegion();
IOStatus MapNewRegion(const IOOptions& options, IODebugContext* dbg);
virtual Status PreallocateInternal(uint64_t spaceToReserve);
virtual IOStatus PreallocateInternal(uint64_t spaceToReserve);
public:
WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size,
size_t allocation_granularity, const EnvOptions& options);
size_t allocation_granularity, const FileOptions& options);
~WinMmapFile();
WinMmapFile(const WinMmapFile&) = delete;
WinMmapFile& operator=(const WinMmapFile&) = delete;
virtual Status Append(const Slice& data) override;
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus Append(const Slice& data, const IOOptions& opts,
const DataVerificationInfo& /* verification_info */,
IODebugContext* dbg) override {
return Append(data, opts, dbg);
}
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override;
IOStatus Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) override;
virtual Status Close() override;
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Flush() override;
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override;
// Flush only data
virtual Status Sync() override;
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
/**
* Flush data as well as metadata to stable storage.
*/
virtual Status Fsync() override;
* Flush data as well as metadata to stable storage.
*/
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
/**
* Get the size of valid data in the file. This will not match the
* size that is returned from the filesystem because we use mmap
* to extend file by map_size every time.
*/
virtual uint64_t GetFileSize() override;
* Get the size of valid data in the file. This will not match the
* size that is returned from the filesystem because we use mmap
* to extend file by map_size every time.
*/
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
virtual Status Allocate(uint64_t offset, uint64_t len) override;
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
IODebugContext* dbg) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
@ -231,24 +244,24 @@ class WinMmapFile : private WinFileData, public WritableFile {
class WinRandomAccessImpl {
protected:
WinFileData* file_base_;
size_t alignment_;
size_t alignment_;
// Override for behavior change when creating a custom env
virtual Status PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset, size_t& bytes_read) const;
virtual IOStatus PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset,
size_t& bytes_read) const;
WinRandomAccessImpl(WinFileData* file_base, size_t alignment,
const EnvOptions& options);
const FileOptions& options);
virtual ~WinRandomAccessImpl() {}
Status ReadImpl(uint64_t offset, size_t n, Slice* result,
char* scratch) const;
IOStatus ReadImpl(uint64_t offset, size_t n, Slice* result,
char* scratch) const;
size_t GetAlignment() const { return alignment_; }
public:
WinRandomAccessImpl(const WinRandomAccessImpl&) = delete;
WinRandomAccessImpl& operator=(const WinRandomAccessImpl&) = delete;
};
@ -258,21 +271,24 @@ class WinRandomAccessFile
: private WinFileData,
protected WinRandomAccessImpl, // Want to be able to override
// PositionedReadInternal
public RandomAccessFile {
public FSRandomAccessFile {
public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options);
const FileOptions& options);
~WinRandomAccessFile();
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
virtual bool use_direct_io() const override { return WinFileData::use_direct_io(); }
virtual bool use_direct_io() const override {
return WinFileData::use_direct_io();
}
virtual Status InvalidateCache(size_t offset, size_t length) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
virtual size_t GetRequiredBufferAlignment() const override;
};
@ -293,10 +309,11 @@ class WinWritableImpl {
protected:
WinFileData* file_data_;
const uint64_t alignment_;
uint64_t next_write_offset_; // Needed because Windows does not support O_APPEND
uint64_t
next_write_offset_; // Needed because Windows does not support O_APPEND
uint64_t reservedsize_; // how far we have reserved space
virtual Status PreallocateInternal(uint64_t spaceToReserve);
virtual IOStatus PreallocateInternal(uint64_t spaceToReserve);
WinWritableImpl(WinFileData* file_data, size_t alignment);
@ -304,17 +321,17 @@ class WinWritableImpl {
uint64_t GetAlignement() const { return alignment_; }
Status AppendImpl(const Slice& data);
IOStatus AppendImpl(const Slice& data);
// Requires that the data is aligned as specified by
// GetRequiredBufferAlignment()
Status PositionedAppendImpl(const Slice& data, uint64_t offset);
IOStatus PositionedAppendImpl(const Slice& data, uint64_t offset);
Status TruncateImpl(uint64_t size);
IOStatus TruncateImpl(uint64_t size);
Status CloseImpl();
IOStatus CloseImpl();
Status SyncImpl();
IOStatus SyncImpl(const IOOptions& options, IODebugContext* dbg);
uint64_t GetFileNextWriteOffset() {
// Double accounting now here with WritableFileWriter
@ -326,7 +343,7 @@ class WinWritableImpl {
return next_write_offset_;
}
Status AllocateImpl(uint64_t offset, uint64_t len);
IOStatus AllocateImpl(uint64_t offset, uint64_t len);
public:
WinWritableImpl(const WinWritableImpl&) = delete;
@ -335,32 +352,47 @@ class WinWritableImpl {
class WinWritableFile : private WinFileData,
protected WinWritableImpl,
public WritableFile {
public FSWritableFile {
public:
WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t capacity, const EnvOptions& options);
size_t capacity, const FileOptions& options);
~WinWritableFile();
virtual Status Append(const Slice& data) override;
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus Append(const Slice& data, const IOOptions& opts,
const DataVerificationInfo& /* verification_info */,
IODebugContext* dbg) override {
return Append(data, opts, dbg);
}
// Requires that the data is aligned as specified by
// GetRequiredBufferAlignment()
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) override;
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& opts,
const DataVerificationInfo& /* verification_info */,
IODebugContext* dbg) override {
return PositionedAppend(data, offset, opts, dbg);
}
// Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode
virtual Status Truncate(uint64_t size) override;
IOStatus Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) override;
virtual Status Close() override;
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
// write out the cached data to the OS cache
// This is now taken care of the WritableFileWriter
virtual Status Flush() override;
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Sync() override;
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Fsync() override;
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
virtual bool IsSyncThreadSafe() const override;
@ -370,9 +402,10 @@ class WinWritableFile : private WinFileData,
virtual size_t GetRequiredBufferAlignment() const override;
virtual uint64_t GetFileSize() override;
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Allocate(uint64_t offset, uint64_t len) override;
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
IODebugContext* dbg) override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
@ -380,10 +413,10 @@ class WinWritableFile : private WinFileData,
class WinRandomRWFile : private WinFileData,
protected WinRandomAccessImpl,
protected WinWritableImpl,
public RandomRWFile {
public FSRandomRWFile {
public:
WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options);
const FileOptions& options);
~WinRandomRWFile() {}
@ -397,45 +430,50 @@ class WinRandomRWFile : private WinFileData,
// Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
// Pass aligned buffer when use_direct_io() returns true.
virtual Status Write(uint64_t offset, const Slice& data) override;
IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
IODebugContext* dbg) override;
// Read up to `n` bytes starting from offset `offset` and store them in
// result, provided `scratch` size should be at least `n`.
// Returns Status::OK() on success.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
virtual Status Flush() override;
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Sync() override;
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
virtual Status Fsync() override { return Sync(); }
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
return Sync(options, dbg);
}
virtual Status Close() override;
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
};
class WinMemoryMappedBuffer : public MemoryMappedFileBuffer {
private:
HANDLE file_handle_;
HANDLE map_handle_;
public:
WinMemoryMappedBuffer(HANDLE file_handle, HANDLE map_handle, void* base, size_t size) :
MemoryMappedFileBuffer(base, size),
file_handle_(file_handle),
map_handle_(map_handle) {}
private:
HANDLE file_handle_;
HANDLE map_handle_;
public:
WinMemoryMappedBuffer(HANDLE file_handle, HANDLE map_handle, void* base,
size_t size)
: MemoryMappedFileBuffer(base, size),
file_handle_(file_handle),
map_handle_(map_handle) {}
~WinMemoryMappedBuffer() override;
};
class WinDirectory : public Directory {
class WinDirectory : public FSDirectory {
HANDLE handle_;
public:
explicit WinDirectory(HANDLE h) noexcept : handle_(h) {
assert(handle_ != INVALID_HANDLE_VALUE);
}
~WinDirectory() {
::CloseHandle(handle_);
}
virtual Status Fsync() override;
~WinDirectory() { ::CloseHandle(handle_); }
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
size_t GetUniqueId(char* id, size_t max_size) const override;
};
@ -452,5 +490,5 @@ class WinFileLock : public FileLock {
private:
HANDLE hFile_;
};
}
} // namespace port
} // namespace ROCKSDB_NAMESPACE

@ -13,31 +13,33 @@
#if defined(OS_WIN)
#include "port/win/win_logger.h"
#include "port/win/io_win.h"
#include <algorithm>
#include <fcntl.h>
#include <stdio.h>
#include <time.h>
#include <fcntl.h>
#include <atomic>
#include "rocksdb/env.h"
#include <algorithm>
#include <atomic>
#include "monitoring/iostats_context_imp.h"
#include "port/sys_time.h"
#include "port/win/env_win.h"
#include "port/win/io_win.h"
#include "rocksdb/env.h"
namespace ROCKSDB_NAMESPACE {
namespace port {
WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
WinLogger::WinLogger(uint64_t (*gettid)(),
const std::shared_ptr<WinClock>& clock, HANDLE file,
const InfoLogLevel log_level)
: Logger(log_level),
file_(file),
gettid_(gettid),
log_size_(0),
last_flush_micros_(0),
env_(env),
clock_(clock),
flush_pending_(false) {
assert(file_ != NULL);
assert(file_ != INVALID_HANDLE_VALUE);
@ -88,7 +90,7 @@ void WinLogger::Flush() {
// for perf reasons.
}
last_flush_micros_ = env_->NowMicros();
last_flush_micros_ = clock_->NowMicros();
}
void WinLogger::Logv(const char* format, va_list ap) {

@ -12,22 +12,23 @@
#pragma once
#include <stdint.h>
#include <windows.h>
#include <atomic>
#include <memory>
#include "rocksdb/env.h"
#include <stdint.h>
#include <windows.h>
namespace ROCKSDB_NAMESPACE {
class Env;
namespace port {
class WinClock;
class WinLogger : public ROCKSDB_NAMESPACE::Logger {
public:
WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
WinLogger(uint64_t (*gettid)(), const std::shared_ptr<WinClock>& clock,
HANDLE file,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL);
virtual ~WinLogger();
@ -54,7 +55,7 @@ protected:
uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
std::shared_ptr<WinClock> clock_;
bool flush_pending_;
Status CloseInternal();

@ -452,6 +452,26 @@ bool IsDirectIOSupported(Env* env, const std::string& dir) {
return s.ok();
}
bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs,
const std::string& dir) {
bool supported = false;
std::string tmp = TempFileName(dir, 999);
Random rnd(301);
std::string test_string = rnd.RandomString(4096);
Slice data(test_string);
Status s = WriteStringToFile(fs.get(), data, tmp, true);
if (s.ok()) {
std::unique_ptr<FSRandomAccessFile> file;
auto io_s = fs->NewRandomAccessFile(tmp, FileOptions(), &file, nullptr);
if (io_s.ok()) {
supported = !(file->Prefetch(0, data.size(), IOOptions(), nullptr)
.IsNotSupported());
}
s = fs->DeleteFile(tmp, IOOptions(), nullptr);
}
return s.ok() && supported;
}
size_t GetLinesCount(const std::string& fname, const std::string& pattern) {
std::stringstream ssbuf;
std::string line;

@ -26,6 +26,7 @@
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
class FileSystem;
class Random;
class SequentialFile;
class SequentialFileReader;
@ -861,6 +862,9 @@ std::string RandomName(Random* rnd, const size_t len);
bool IsDirectIOSupported(Env* env, const std::string& dir);
bool IsPrefetchSupported(const std::shared_ptr<FileSystem>& fs,
const std::string& dir);
// Return the number of lines where a given pattern was found in a file.
size_t GetLinesCount(const std::string& fname, const std::string& pattern);

@ -32,7 +32,6 @@ class DBBenchTest : public testing::Test {
Env::Default()->CreateDir(test_path_);
db_path_ = test_path_ + "/db";
wal_path_ = test_path_ + "/wal";
fs_.reset(new LegacyFileSystemWrapper(Env::Default()));
}
~DBBenchTest() {
@ -114,7 +113,6 @@ class DBBenchTest : public testing::Test {
std::string db_path_;
std::string test_path_;
std::string wal_path_;
std::unique_ptr<LegacyFileSystemWrapper> fs_;
char arg_buffer_[kArgBufferSize];
char* argv_[kMaxArgCount];
@ -130,7 +128,7 @@ TEST_F(DBBenchTest, OptionsFile) {
Options opt = GetDefaultOptions();
ASSERT_OK(PersistRocksDBOptions(DBOptions(opt), {"default"},
{ColumnFamilyOptions()}, kOptionsFileName,
fs_.get()));
opt.env->GetFileSystem().get()));
// override the following options as db_bench will not take these
// options from the options file
@ -149,7 +147,7 @@ TEST_F(DBBenchTest, OptionsFileUniversal) {
ASSERT_OK(PersistRocksDBOptions(DBOptions(opt), {"default"},
{ColumnFamilyOptions(opt)}, kOptionsFileName,
fs_.get()));
opt.env->GetFileSystem().get()));
// override the following options as db_bench will not take these
// options from the options file

@ -824,10 +824,10 @@ TEST_F(BlobDBTest, SstFileManagerRestart) {
Close();
// Create 3 dummy trash files under the blob_dir
LegacyFileSystemWrapper fs(db_options.env);
CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
const auto &fs = db_options.env->GetFileSystem();
CreateFile(fs, blob_dir + "/000666.blob.trash", "", false);
CreateFile(fs, blob_dir + "/000888.blob.trash", "", true);
CreateFile(fs, blob_dir + "/something_not_match.trash", "", false);
// Make sure that reopening the DB rescan the existing trash files
Open(bdb_options, db_options);

@ -7,7 +7,6 @@
#include "rocksdb/utilities/options_util.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "options/options_parser.h"
#include "rocksdb/convenience.h"
@ -34,8 +33,8 @@ Status LoadOptionsFromFile(const ConfigOptions& config_options,
std::vector<ColumnFamilyDescriptor>* cf_descs,
std::shared_ptr<Cache>* cache) {
RocksDBOptionsParser parser;
LegacyFileSystemWrapper fs(config_options.env);
Status s = parser.Parse(config_options, file_name, &fs);
const auto& fs = config_options.env->GetFileSystem();
Status s = parser.Parse(config_options, file_name, fs.get());
if (!s.ok()) {
return s;
}
@ -149,12 +148,11 @@ Status CheckOptionsCompatibility(
cf_opts.push_back(cf_desc.options);
}
LegacyFileSystemWrapper fs(config_options.env);
const auto& fs = config_options.env->GetFileSystem();
return RocksDBOptionsParser::VerifyRocksDBOptionsFromFile(
config_options, db_options, cf_names, cf_opts,
dbpath + "/" + options_file_name, &fs);
dbpath + "/" + options_file_name, fs.get());
}
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save