From 4a09d632c48a0ccd58cea5faefca333c7d64e978 Mon Sep 17 00:00:00 2001 From: mrambacher Date: Thu, 28 Jan 2021 22:08:46 -0800 Subject: [PATCH] Remove Legacy and Custom FileWrapper classes from header files (#7851) Summary: Removed the uses of the Legacy FileWrapper classes from the source code. The wrappers were creating an additional layer of indirection/wrapping, as the Env already has a FileSystem. Moved the Custom FileWrapper classes into the CustomEnv, as these classes are really for the private use the the CustomEnv class. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7851 Reviewed By: anand1976 Differential Revision: D26114816 Pulled By: mrambacher fbshipit-source-id: db32840e58d969d3a0fa6c25aaf13d6dcdc74150 --- db/compaction/compaction_job_test.cc | 15 +- db/corruption_test.cc | 14 +- db/db_wal_test.cc | 16 +- db/flush_job_test.cc | 12 +- db/repair.cc | 11 +- db/version_set_test.cc | 43 ++- db/wal_manager_test.cc | 28 +- env/composite_env_wrapper.h | 272 ------------------ env/env.cc | 262 ++++++++++++++++- env/env_posix.cc | 1 + file/random_access_file_reader.cc | 11 + file/random_access_file_reader.h | 4 + file/sequence_file_reader.cc | 12 + file/sequence_file_reader.h | 4 + file/writable_file_writer.cc | 13 + file/writable_file_writer.h | 4 + fuzz/sst_file_writer_fuzzer.cc | 13 +- port/win/env_win.cc | 1 + port/win/win_logger.h | 1 - table/block_fetcher_test.cc | 10 +- table/cuckoo/cuckoo_table_builder_test.cc | 110 +++---- table/cuckoo/cuckoo_table_reader_test.cc | 76 +++-- table/mock_table.cc | 11 +- table/sst_file_dumper.cc | 29 +- table/sst_file_reader.cc | 15 +- table/sst_file_writer.cc | 14 +- table/table_reader_bench.cc | 21 +- tools/ldb_cmd.cc | 17 +- tools/sst_dump_test.cc | 11 +- tools/trace_analyzer_test.cc | 11 +- tools/trace_analyzer_tool.cc | 11 +- util/file_reader_writer_test.cc | 157 ++++++---- utilities/backupable/backupable_db.cc | 38 ++- utilities/blob_db/blob_db_impl.cc | 22 +- utilities/blob_db/blob_db_impl.h | 3 +- utilities/blob_db/blob_file.cc | 26 +- utilities/blob_db/blob_file.h | 6 +- .../persistent_cache/block_cache_tier_file.cc | 23 +- utilities/simulator_cache/sim_cache.cc | 13 +- utilities/trace/file_trace_reader_writer.cc | 21 +- 40 files changed, 714 insertions(+), 668 deletions(-) diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index fa0393a9a..839ba35c8 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -5,6 +5,8 @@ #ifndef ROCKSDB_LITE +#include "db/compaction/compaction_job.h" + #include #include #include @@ -14,13 +16,13 @@ #include "db/blob/blob_index.h" #include "db/column_family.h" -#include "db/compaction/compaction_job.h" #include "db/db_impl/db_impl.h" #include "db/error_handler.h" #include "db/version_set.h" #include "file/writable_file_writer.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" +#include "rocksdb/file_system.h" #include "rocksdb/options.h" #include "rocksdb/write_buffer_manager.h" #include "table/mock_table.h" @@ -277,12 +279,13 @@ class CompactionJobTestBase : public testing::Test { new_db.SetLastSequence(0); const std::string manifest = DescriptorFileName(dbname_, 1); - std::unique_ptr file; - Status s = env_->NewWritableFile( - manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + std::unique_ptr file_writer; + const auto& fs = env_->GetFileSystem(); + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); + ASSERT_OK(s); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); { log::Writer log(std::move(file_writer), 0, false); std::string record; diff --git a/db/corruption_test.cc b/db/corruption_test.cc index af987f3e1..4fb1f7403 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -19,7 +19,6 @@ #include "db/db_test_util.h" #include "db/log_format.h" #include "db/version_set.h" -#include "env/composite_env_wrapper.h" #include "file/filename.h" #include "port/stack_trace.h" #include "rocksdb/cache.h" @@ -539,14 +538,15 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) { ASSERT_EQ(static_cast(1), metadata.size()); std::string filename = dbname_ + metadata[0].name; - std::unique_ptr file; - ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions())); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - filename)); + FileOptions file_opts; + const auto& fs = options_.env->GetFileSystem(); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts, + &file_reader, nullptr)); uint64_t file_size; - ASSERT_OK(options_.env->GetFileSize(filename, &file_size)); + ASSERT_OK( + fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr)); BlockHandle range_del_handle; ASSERT_OK(FindMetaBlock( diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f74de2846..1cb0391b8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -8,10 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" -#include "env/composite_env_wrapper.h" #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/file_system.h" #include "test_util/sync_point.h" #include "utilities/fault_injection_env.h" @@ -1147,7 +1147,7 @@ class RecoveryTestHelper { *count = 0; std::shared_ptr table_cache = NewLRUCache(50, 0); - EnvOptions env_options; + FileOptions file_options; WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); std::unique_ptr versions; @@ -1155,22 +1155,22 @@ class RecoveryTestHelper { WriteController write_controller; versions.reset(new VersionSet( - test->dbname_, &db_options, env_options, table_cache.get(), + test->dbname_, &db_options, file_options, table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); wal_manager.reset( - new WalManager(db_options, env_options, /*io_tracer=*/nullptr)); + new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); std::unique_ptr current_log_writer; for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { uint64_t current_log_number = j; std::string fname = LogFileName(test->dbname_, current_log_number); - std::unique_ptr file; - ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), fname, env_options)); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(), + fname, file_options, &file_writer, + nullptr)); current_log_writer.reset( new log::Writer(std::move(file_writer), current_log_number, db_options.recycle_log_file_num > 0)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index eab2e7f1a..d68e1fe8f 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -16,6 +16,7 @@ #include "db/version_set.h" #include "file/writable_file_writer.h" #include "rocksdb/cache.h" +#include "rocksdb/file_system.h" #include "rocksdb/write_buffer_manager.h" #include "table/mock_table.h" #include "test_util/testharness.h" @@ -74,12 +75,13 @@ class FlushJobTestBase : public testing::Test { } const std::string manifest = DescriptorFileName(dbname_, 1); - std::unique_ptr file; - Status s = env_->NewWritableFile( - manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); ASSERT_OK(s); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions())); + { log::Writer log(std::move(file_writer), 0, false); std::string record; diff --git a/db/repair.cc b/db/repair.cc index 3795db371..94c31cac3 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -71,7 +71,6 @@ #include "db/table_cache.h" #include "db/version_edit.h" #include "db/write_batch_internal.h" -#include "env/composite_env_wrapper.h" #include "file/filename.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" @@ -358,14 +357,14 @@ class Repairer { // Open the log file std::string logname = LogFileName(db_options_.wal_dir, log); - std::unique_ptr lfile; - Status status = env_->NewSequentialFile( - logname, &lfile, env_->OptimizeForLogRead(env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr lfile_reader; + Status status = SequentialFileReader::Create( + fs, logname, fs->OptimizeForLogRead(env_options_), &lfile_reader, + nullptr); if (!status.ok()) { return status; } - std::unique_ptr lfile_reader(new SequentialFileReader( - NewLegacySequentialFileWrapper(lfile), logname)); // Create the log reader. LogReporter reporter; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index ecc47b207..92232b87c 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -11,6 +11,7 @@ #include "db/db_impl/db_impl.h" #include "db/log_writer.h" +#include "rocksdb/file_system.h" #include "table/block_based/block_based_table_factory.h" #include "table/mock_table.h" #include "test_util/testharness.h" @@ -783,13 +784,13 @@ class VersionSetTestBase { } *last_seqno = last_seq; num_initial_edits_ = static_cast(new_cfs.size() + 1); + std::unique_ptr file_writer; const std::string manifest = DescriptorFileName(dbname_, 1); - std::unique_ptr file; - Status s = env_->NewWritableFile( - manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + const auto& fs = env_->GetFileSystem(); + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); ASSERT_OK(s); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); { log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); std::string record; @@ -2312,14 +2313,13 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase, assert(log_writer != nullptr); VersionEdit new_db; new_db.SetLogNumber(0); - std::unique_ptr file; const std::string manifest_path = DescriptorFileName(dbname_, 1); - Status s = env_->NewWritableFile( - manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create( + fs, manifest_path, fs->OptimizeForManifestWrite(env_options_), + &file_writer, nullptr); ASSERT_OK(s); - std::unique_ptr file_writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), - manifest_path, env_options_)); log_writer->reset(new log::Writer(std::move(file_writer), 0, true)); std::string record; ASSERT_TRUE(new_db.EncodeTo(&record)); @@ -2387,13 +2387,12 @@ class VersionSetTestEmptyDb new_db.SetDBId(db_id); } const std::string manifest_path = DescriptorFileName(dbname_, 1); - std::unique_ptr file; - Status s = env_->NewWritableFile( - manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create( + fs, manifest_path, fs->OptimizeForManifestWrite(env_options_), + &file_writer, nullptr); ASSERT_OK(s); - std::unique_ptr file_writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), - manifest_path, env_options_)); { log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); std::string record; @@ -2697,12 +2696,12 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, assert(last_seqno != nullptr); assert(log_writer != nullptr); const std::string manifest = DescriptorFileName(dbname_, 1); - std::unique_ptr file; - Status s = env_->NewWritableFile( - manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); ASSERT_OK(s); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); VersionEdit new_db; if (db_options_.write_dbid_to_manifest) { diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 618a50d75..8dc05b982 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -5,20 +5,21 @@ #ifndef ROCKSDB_LITE +#include "db/wal_manager.h" + #include #include -#include "rocksdb/cache.h" -#include "rocksdb/write_batch.h" -#include "rocksdb/write_buffer_manager.h" - #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/log_writer.h" #include "db/version_set.h" -#include "db/wal_manager.h" #include "env/mock_env.h" #include "file/writable_file_writer.h" +#include "rocksdb/cache.h" +#include "rocksdb/file_system.h" +#include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "table/mock_table.h" #include "test_util/testharness.h" #include "test_util/testutil.h" @@ -81,10 +82,10 @@ class WalManagerTest : public testing::Test { void RollTheLog(bool /*archived*/) { current_log_number_++; std::string fname = ArchivedLogFileName(dbname_, current_log_number_); - std::unique_ptr file; - ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), fname, env_options_)); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(fs, fname, env_options_, &file_writer, + nullptr)); current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false)); } @@ -123,8 +124,9 @@ class WalManagerTest : public testing::Test { TEST_F(WalManagerTest, ReadFirstRecordCache) { Init(); std::string path = dbname_ + "/000001.log"; - std::unique_ptr file; - ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); + std::unique_ptr file; + ASSERT_OK(env_->GetFileSystem()->NewWritableFile(path, FileOptions(), &file, + nullptr)); SequenceNumber s; ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s)); @@ -134,8 +136,8 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) { wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s)); ASSERT_EQ(s, 0U); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), path, EnvOptions())); + std::unique_ptr file_writer( + new WritableFileWriter(std::move(file), path, FileOptions())); log::Writer writer(std::move(file_writer), 1, db_options_.recycle_log_file_num > 0); WriteBatch batch; diff --git a/env/composite_env_wrapper.h b/env/composite_env_wrapper.h index 5eafa43b6..c2752fcee 100644 --- a/env/composite_env_wrapper.h +++ b/env/composite_env_wrapper.h @@ -18,7 +18,6 @@ namespace ROCKSDB_NAMESPACE { - class CompositeEnv : public Env { public: // Initialize a CompositeEnvWrapper that delegates all thread/time related @@ -335,275 +334,4 @@ class CompositeEnvWrapper : public CompositeEnv { private: Env* env_target_; }; - -class LegacySequentialFileWrapper : public FSSequentialFile { - public: - explicit LegacySequentialFileWrapper( - std::unique_ptr&& _target) - : target_(std::move(_target)) {} - - IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result, - char* scratch, IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Read(n, result, scratch)); - } - IOStatus Skip(uint64_t n) override { - return status_to_io_status(target_->Skip(n)); - } - bool use_direct_io() const override { return target_->use_direct_io(); } - size_t GetRequiredBufferAlignment() const override { - return target_->GetRequiredBufferAlignment(); - } - IOStatus InvalidateCache(size_t offset, size_t length) override { - return status_to_io_status(target_->InvalidateCache(offset, length)); - } - IOStatus PositionedRead(uint64_t offset, size_t n, - const IOOptions& /*options*/, Slice* result, - char* scratch, IODebugContext* /*dbg*/) override { - return status_to_io_status( - target_->PositionedRead(offset, n, result, scratch)); - } - SequentialFile* target() { return target_.get(); } - - private: - std::unique_ptr target_; -}; - -class LegacyRandomAccessFileWrapper : public FSRandomAccessFile { - public: - explicit LegacyRandomAccessFileWrapper( - std::unique_ptr&& target) - : target_(std::move(target)) {} - - IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, - Slice* result, char* scratch, - IODebugContext* /*dbg*/) const override { - return status_to_io_status(target_->Read(offset, n, result, scratch)); - } - IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs, - const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - std::vector reqs; - Status status; - - reqs.reserve(num_reqs); - for (size_t i = 0; i < num_reqs; ++i) { - ReadRequest req; - - req.offset = fs_reqs[i].offset; - req.len = fs_reqs[i].len; - req.scratch = fs_reqs[i].scratch; - req.status = Status::OK(); - - reqs.emplace_back(req); - } - status = target_->MultiRead(reqs.data(), num_reqs); - for (size_t i = 0; i < num_reqs; ++i) { - fs_reqs[i].result = reqs[i].result; - fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status)); - } - return status_to_io_status(std::move(status)); - ; - } - IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Prefetch(offset, n)); - } - size_t GetUniqueId(char* id, size_t max_size) const override { - return target_->GetUniqueId(id, max_size); - }; - void Hint(AccessPattern pattern) override { - target_->Hint((RandomAccessFile::AccessPattern)pattern); - } - bool use_direct_io() const override { return target_->use_direct_io(); } - size_t GetRequiredBufferAlignment() const override { - return target_->GetRequiredBufferAlignment(); - } - IOStatus InvalidateCache(size_t offset, size_t length) override { - return status_to_io_status(target_->InvalidateCache(offset, length)); - } - - private: - std::unique_ptr target_; -}; - -class LegacyWritableFileWrapper : public FSWritableFile { - public: - explicit LegacyWritableFileWrapper(std::unique_ptr&& _target) - : target_(std::move(_target)) {} - - IOStatus Append(const Slice& data, const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Append(data)); - } - IOStatus Append(const Slice& data, const IOOptions& /*options*/, - const DataVerificationInfo& /*verification_info*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Append(data)); - } - IOStatus PositionedAppend(const Slice& data, uint64_t offset, - const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->PositionedAppend(data, offset)); - } - IOStatus PositionedAppend(const Slice& data, uint64_t offset, - const IOOptions& /*options*/, - const DataVerificationInfo& /*verification_info*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->PositionedAppend(data, offset)); - } - IOStatus Truncate(uint64_t size, const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Truncate(size)); - } - IOStatus Close(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Close()); - } - IOStatus Flush(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Flush()); - } - IOStatus Sync(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Sync()); - } - IOStatus Fsync(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Fsync()); - } - bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); } - - bool use_direct_io() const override { return target_->use_direct_io(); } - - size_t GetRequiredBufferAlignment() const override { - return target_->GetRequiredBufferAlignment(); - } - - void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override { - target_->SetWriteLifeTimeHint(hint); - } - - Env::WriteLifeTimeHint GetWriteLifeTimeHint() override { - return target_->GetWriteLifeTimeHint(); - } - - uint64_t GetFileSize(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return target_->GetFileSize(); - } - - void SetPreallocationBlockSize(size_t size) override { - target_->SetPreallocationBlockSize(size); - } - - void GetPreallocationStatus(size_t* block_size, - size_t* last_allocated_block) override { - target_->GetPreallocationStatus(block_size, last_allocated_block); - } - - size_t GetUniqueId(char* id, size_t max_size) const override { - return target_->GetUniqueId(id, max_size); - } - - IOStatus InvalidateCache(size_t offset, size_t length) override { - return status_to_io_status(target_->InvalidateCache(offset, length)); - } - - IOStatus RangeSync(uint64_t offset, uint64_t nbytes, - const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->RangeSync(offset, nbytes)); - } - - void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - target_->PrepareWrite(offset, len); - } - - IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Allocate(offset, len)); - } - - WritableFile* target() { return target_.get(); } - - private: - std::unique_ptr target_; -}; - -class LegacyRandomRWFileWrapper : public FSRandomRWFile { - public: - explicit LegacyRandomRWFileWrapper(std::unique_ptr&& target) - : target_(std::move(target)) {} - - bool use_direct_io() const override { return target_->use_direct_io(); } - size_t GetRequiredBufferAlignment() const override { - return target_->GetRequiredBufferAlignment(); - } - IOStatus Write(uint64_t offset, const Slice& data, - const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Write(offset, data)); - } - IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, - Slice* result, char* scratch, - IODebugContext* /*dbg*/) const override { - return status_to_io_status(target_->Read(offset, n, result, scratch)); - } - IOStatus Flush(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Flush()); - } - IOStatus Sync(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Sync()); - } - IOStatus Fsync(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Fsync()); - } - IOStatus Close(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Close()); - } - - private: - std::unique_ptr target_; -}; - -class LegacyDirectoryWrapper : public FSDirectory { - public: - explicit LegacyDirectoryWrapper(std::unique_ptr&& target) - : target_(std::move(target)) {} - - IOStatus Fsync(const IOOptions& /*options*/, - IODebugContext* /*dbg*/) override { - return status_to_io_status(target_->Fsync()); - } - size_t GetUniqueId(char* id, size_t max_size) const override { - return target_->GetUniqueId(id, max_size); - } - - private: - std::unique_ptr target_; -}; - -inline std::unique_ptr NewLegacySequentialFileWrapper( - std::unique_ptr& file) { - return std::unique_ptr( - new LegacySequentialFileWrapper(std::move(file))); -} - -inline std::unique_ptr NewLegacyRandomAccessFileWrapper( - std::unique_ptr& file) { - return std::unique_ptr( - new LegacyRandomAccessFileWrapper(std::move(file))); -} - -inline std::unique_ptr NewLegacyWritableFileWrapper( - std::unique_ptr&& file) { - return std::unique_ptr( - new LegacyWritableFileWrapper(std::move(file))); -} - } // namespace ROCKSDB_NAMESPACE diff --git a/env/env.cc b/env/env.cc index 04d64fa3b..bf3ec3231 100644 --- a/env/env.cc +++ b/env/env.cc @@ -62,6 +62,256 @@ class LegacySystemClock : public SystemClock { } }; +class LegacySequentialFileWrapper : public FSSequentialFile { + public: + explicit LegacySequentialFileWrapper( + std::unique_ptr&& _target) + : target_(std::move(_target)) {} + + IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Read(n, result, scratch)); + } + IOStatus Skip(uint64_t n) override { + return status_to_io_status(target_->Skip(n)); + } + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + IOStatus InvalidateCache(size_t offset, size_t length) override { + return status_to_io_status(target_->InvalidateCache(offset, length)); + } + IOStatus PositionedRead(uint64_t offset, size_t n, + const IOOptions& /*options*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + return status_to_io_status( + target_->PositionedRead(offset, n, result, scratch)); + } + + private: + std::unique_ptr target_; +}; + +class LegacyRandomAccessFileWrapper : public FSRandomAccessFile { + public: + explicit LegacyRandomAccessFileWrapper( + std::unique_ptr&& target) + : target_(std::move(target)) {} + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { + return status_to_io_status(target_->Read(offset, n, result, scratch)); + } + + IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + std::vector reqs; + Status status; + + reqs.reserve(num_reqs); + for (size_t i = 0; i < num_reqs; ++i) { + ReadRequest req; + + req.offset = fs_reqs[i].offset; + req.len = fs_reqs[i].len; + req.scratch = fs_reqs[i].scratch; + req.status = Status::OK(); + + reqs.emplace_back(req); + } + status = target_->MultiRead(reqs.data(), num_reqs); + for (size_t i = 0; i < num_reqs; ++i) { + fs_reqs[i].result = reqs[i].result; + fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status)); + } + return status_to_io_status(std::move(status)); + } + + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Prefetch(offset, n)); + } + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + void Hint(AccessPattern pattern) override { + target_->Hint((RandomAccessFile::AccessPattern)pattern); + } + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + IOStatus InvalidateCache(size_t offset, size_t length) override { + return status_to_io_status(target_->InvalidateCache(offset, length)); + } + + private: + std::unique_ptr target_; +}; + +class LegacyRandomRWFileWrapper : public FSRandomRWFile { + public: + explicit LegacyRandomRWFileWrapper(std::unique_ptr&& target) + : target_(std::move(target)) {} + + bool use_direct_io() const override { return target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + IOStatus Write(uint64_t offset, const Slice& data, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Write(offset, data)); + } + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { + return status_to_io_status(target_->Read(offset, n, result, scratch)); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Flush()); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Sync()); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Fsync()); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Close()); + } + + private: + std::unique_ptr target_; +}; + +class LegacyWritableFileWrapper : public FSWritableFile { + public: + explicit LegacyWritableFileWrapper(std::unique_ptr&& _target) + : target_(std::move(_target)) {} + + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Append(data)); + } + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + const DataVerificationInfo& /*verification_info*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Append(data)); + } + IOStatus PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->PositionedAppend(data, offset)); + } + IOStatus PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& /*options*/, + const DataVerificationInfo& /*verification_info*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->PositionedAppend(data, offset)); + } + IOStatus Truncate(uint64_t size, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Truncate(size)); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Close()); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Flush()); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Sync()); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Fsync()); + } + bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); } + + bool use_direct_io() const override { return target_->use_direct_io(); } + + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + + void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override { + target_->SetWriteLifeTimeHint(hint); + } + + Env::WriteLifeTimeHint GetWriteLifeTimeHint() override { + return target_->GetWriteLifeTimeHint(); + } + + uint64_t GetFileSize(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return target_->GetFileSize(); + } + + void SetPreallocationBlockSize(size_t size) override { + target_->SetPreallocationBlockSize(size); + } + + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override { + target_->GetPreallocationStatus(block_size, last_allocated_block); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + + IOStatus InvalidateCache(size_t offset, size_t length) override { + return status_to_io_status(target_->InvalidateCache(offset, length)); + } + + IOStatus RangeSync(uint64_t offset, uint64_t nbytes, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->RangeSync(offset, nbytes)); + } + + void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + target_->PrepareWrite(offset, len); + } + + IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Allocate(offset, len)); + } + + private: + std::unique_ptr target_; +}; + +class LegacyDirectoryWrapper : public FSDirectory { + public: + explicit LegacyDirectoryWrapper(std::unique_ptr&& target) + : target_(std::move(target)) {} + + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return status_to_io_status(target_->Fsync()); + } + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + + private: + std::unique_ptr target_; +}; + class LegacyFileSystemWrapper : public FileSystem { public: // Initialize an EnvWrapper that delegates all calls to *t @@ -759,18 +1009,18 @@ EnvOptions::EnvOptions() { Status NewEnvLogger(const std::string& fname, Env* env, std::shared_ptr* result) { - EnvOptions options; + FileOptions options; // TODO: Tune the buffer size. options.writable_file_max_buffer_size = 1024 * 1024; - std::unique_ptr writable_file; - const auto status = env->NewWritableFile(fname, &writable_file, options); + std::unique_ptr writable_file; + const auto status = env->GetFileSystem()->NewWritableFile( + fname, options, &writable_file, nullptr); if (!status.ok()) { return status; } - *result = std::make_shared( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options, - env); + *result = std::make_shared(std::move(writable_file), fname, + options, env); return Status::OK(); } diff --git a/env/env_posix.cc b/env/env_posix.cc index 8c20f1eb5..98ac66ad4 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -123,6 +123,7 @@ class PosixDynamicLibrary : public DynamicLibrary { void* handle_; }; #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION + class PosixClock : public SystemClock { public: const char* Name() const override { return "PosixClock"; } diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index e330db342..21d471f09 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -22,6 +22,17 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { +Status RandomAccessFileReader::Create( + const std::shared_ptr& fs, const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* reader, IODebugContext* dbg) { + std::unique_ptr file; + Status s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg); + if (s.ok()) { + reader->reset(new RandomAccessFileReader(std::move(file), fname)); + } + return s; +} Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, char* scratch, diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index c13b4ceb1..29106b5fc 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -103,6 +103,10 @@ class RandomAccessFileReader { #endif } + static Status Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* reader, + IODebugContext* dbg); RandomAccessFileReader(const RandomAccessFileReader&) = delete; RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc index 81c5e5d1d..3a87b6d10 100644 --- a/file/sequence_file_reader.cc +++ b/file/sequence_file_reader.cc @@ -22,6 +22,18 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { +Status SequentialFileReader::Create( + const std::shared_ptr& fs, const std::string& fname, + const FileOptions& file_opts, std::unique_ptr* reader, + IODebugContext* dbg) { + std::unique_ptr file; + Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg); + if (s.ok()) { + reader->reset(new SequentialFileReader(std::move(file), fname)); + } + return s; +} + Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s; if (use_direct_io()) { diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index c04fc10f9..ea315f853 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -41,6 +41,10 @@ class SequentialFileReader { : file_name_(_file_name), file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), io_tracer, _file_name) {} + static Status Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* reader, + IODebugContext* dbg); SequentialFileReader(const SequentialFileReader&) = delete; SequentialFileReader& operator=(const SequentialFileReader&) = delete; diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 1c18b3eb1..f344297a6 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -22,6 +22,19 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { +Status WritableFileWriter::Create(const std::shared_ptr& fs, + const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* writer, + IODebugContext* dbg) { + std::unique_ptr file; + Status s = fs->NewWritableFile(fname, file_opts, &file, dbg); + if (s.ok()) { + writer->reset(new WritableFileWriter(std::move(file), fname, file_opts)); + } + return s; +} + IOStatus WritableFileWriter::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index c1375af5a..1b7b3cb4c 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -191,6 +191,10 @@ class WritableFileWriter { } } + static Status Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* writer, + IODebugContext* dbg); WritableFileWriter(const WritableFileWriter&) = delete; WritableFileWriter& operator=(const WritableFileWriter&) = delete; diff --git a/fuzz/sst_file_writer_fuzzer.cc b/fuzz/sst_file_writer_fuzzer.cc index 7bc128fc0..a21544943 100644 --- a/fuzz/sst_file_writer_fuzzer.cc +++ b/fuzz/sst_file_writer_fuzzer.cc @@ -58,16 +58,15 @@ TableReader* NewTableReader(const std::string& sst_file_path, // This code block is similar to SstFileReader::Open. uint64_t file_size = 0; - std::unique_ptr file; std::unique_ptr file_reader; std::unique_ptr table_reader; - Status s = options.env->GetFileSize(sst_file_path, &file_size); + const auto& fs = options.env->GetFileSystem(); + FileOptions fopts(env_options); + Status s = options.env->GetFileSize(sst_file_path, fopts.io_options, + &file_size, nullptr); if (s.ok()) { - s = options.env->NewRandomAccessFile(sst_file_path, &file, env_options); - } - if (s.ok()) { - file_reader.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), sst_file_path)); + s = RandomAccessFileReader::Create(fs, sst_file_path, fopts, &file_reader, + nullptr); } if (s.ok()) { TableReaderOptions t_opt(cf_ioptions, /*prefix_extractor=*/nullptr, diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 05bab78b2..c28144938 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1360,6 +1360,7 @@ Status WinEnv::GetThreadList(std::vector* thread_list) { Status WinEnv::GetHostName(char* name, uint64_t len) { return winenv_io_.GetHostName(name, len); } + void WinEnv::Schedule(void (*function)(void*), void* arg, Env::Priority pri, void* tag, void (*unschedFunction)(void* arg)) { return winenv_threads_.Schedule(function, arg, pri, tag, unschedFunction); diff --git a/port/win/win_logger.h b/port/win/win_logger.h index 08be85089..7cd39f6c9 100644 --- a/port/win/win_logger.h +++ b/port/win/win_logger.h @@ -24,7 +24,6 @@ namespace ROCKSDB_NAMESPACE { class SystemClock; namespace port { - class WinLogger : public ROCKSDB_NAMESPACE::Logger { public: WinLogger(uint64_t (*gettid)(), const std::shared_ptr& clock, diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc index 5d3c90858..cf5a4151c 100644 --- a/table/block_fetcher_test.cc +++ b/table/block_fetcher_test.cc @@ -6,11 +6,11 @@ #include "table/block_fetcher.h" #include "db/table_properties_collector.h" -#include "env/composite_env_wrapper.h" #include "file/file_util.h" #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/file_system.h" #include "table/block_based/binary_search_index_reader.h" #include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_factory.h" @@ -248,11 +248,9 @@ class BlockFetcherTest : public testing::Test { void NewFileWriter(const std::string& filename, std::unique_ptr* writer) { std::string path = Path(filename); - EnvOptions env_options; - std::unique_ptr file; - ASSERT_OK(env_->NewWritableFile(path, &file, env_options)); - writer->reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), path, env_options)); + FileOptions file_options; + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), path, + file_options, writer, nullptr)); } void NewFileReader(const std::string& filename, const FileOptions& opt, diff --git a/table/cuckoo/cuckoo_table_builder_test.cc b/table/cuckoo/cuckoo_table_builder_test.cc index 2f6db70d9..01f543dca 100644 --- a/table/cuckoo/cuckoo_table_builder_test.cc +++ b/table/cuckoo/cuckoo_table_builder_test.cc @@ -5,14 +5,16 @@ #ifndef ROCKSDB_LITE -#include -#include +#include "table/cuckoo/cuckoo_table_builder.h" + #include +#include #include +#include #include "file/random_access_file_reader.h" #include "file/writable_file_writer.h" -#include "table/cuckoo/cuckoo_table_builder.h" +#include "rocksdb/file_system.h" #include "table/meta_blocks.h" #include "test_util/testharness.h" #include "test_util/testutil.h" @@ -35,7 +37,7 @@ class CuckooBuilderTest : public testing::Test { env_ = Env::Default(); Options options; options.allow_mmap_reads = true; - env_options_ = EnvOptions(options); + file_options_ = FileOptions(options); } void CheckFileContents(const std::vector& keys, @@ -54,10 +56,11 @@ class CuckooBuilderTest : public testing::Test { } } // Read file - std::unique_ptr read_file; - ASSERT_OK(env_->NewRandomAccessFile(fname, &read_file, env_options_)); uint64_t read_file_size; ASSERT_OK(env_->GetFileSize(fname, &read_file_size)); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create( + env_->GetFileSystem(), fname, file_options_, &file_reader, nullptr)); Options options; options.allow_mmap_reads = true; @@ -65,9 +68,6 @@ class CuckooBuilderTest : public testing::Test { // Assert Table Properties. TableProperties* props = nullptr; - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size, kCuckooTableMagicNumber, ioptions, &props, true /* compression_type_missing */)); @@ -158,7 +158,7 @@ class CuckooBuilderTest : public testing::Test { Env* env_; - EnvOptions env_options_; + FileOptions file_options_; std::string fname; const double kHashTableRatio = 0.9; }; @@ -166,10 +166,9 @@ class CuckooBuilderTest : public testing::Test { TEST_F(CuckooBuilderTest, SuccessWithEmptyFile) { std::unique_ptr writable_file; fname = test::PerThreadDBPath("EmptyFile"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, 4, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -207,12 +206,10 @@ TEST_F(CuckooBuilderTest, WriteSuccessNoCollisionFullKey) { } uint64_t expected_table_size = GetExpectedTableSize(keys.size()); - std::unique_ptr writable_file; fname = test::PerThreadDBPath("NoCollisionFullKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -257,12 +254,10 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionFullKey) { } uint64_t expected_table_size = GetExpectedTableSize(keys.size()); - std::unique_ptr writable_file; fname = test::PerThreadDBPath("WithCollisionFullKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -306,13 +301,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionAndCuckooBlock) { } uint64_t expected_table_size = GetExpectedTableSize(keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; uint32_t cuckoo_block_size = 2; fname = test::PerThreadDBPath("WithCollisionFullKey2"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder( file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), cuckoo_block_size, false, false, GetSliceHash, @@ -361,12 +354,10 @@ TEST_F(CuckooBuilderTest, WithCollisionPathFullKey) { } uint64_t expected_table_size = GetExpectedTableSize(keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("WithCollisionPathFullKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -412,12 +403,10 @@ TEST_F(CuckooBuilderTest, WithCollisionPathFullKeyAndCuckooBlock) { } uint64_t expected_table_size = GetExpectedTableSize(keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("WithCollisionPathFullKeyAndCuckooBlock"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 2, false, false, GetSliceHash, 0 /* column_family_id */, @@ -456,12 +445,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessNoCollisionUserKey) { std::vector expected_locations = {0, 1, 2, 3}; uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("NoCollisionUserKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); + CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -501,12 +489,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionUserKey) { std::vector expected_locations = {0, 1, 2, 3}; uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("WithCollisionUserKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); + CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -548,12 +535,11 @@ TEST_F(CuckooBuilderTest, WithCollisionPathUserKey) { std::vector expected_locations = {0, 1, 3, 4, 2}; uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("WithCollisionPathUserKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); + CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 2, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -594,12 +580,10 @@ TEST_F(CuckooBuilderTest, FailWhenCollisionPathTooLong) { }; hash_map = std::move(hm); - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("WithCollisionPathUserKey"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 2, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, @@ -623,12 +607,10 @@ TEST_F(CuckooBuilderTest, FailWhenSameKeyInserted) { uint32_t num_hash_fun = 4; std::string user_key = "repeatedkey"; - std::unique_ptr writable_file; + std::unique_ptr file_writer; fname = test::PerThreadDBPath("FailWhenSameKeyInserted"); - ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - EnvOptions())); + ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname, + file_options_, &file_writer, nullptr)); CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, 100, BytewiseComparator(), 1, false, false, GetSliceHash, 0 /* column_family_id */, diff --git a/table/cuckoo/cuckoo_table_reader_test.cc b/table/cuckoo/cuckoo_table_reader_test.cc index 56a0b2393..3c6357c38 100644 --- a/table/cuckoo/cuckoo_table_reader_test.cc +++ b/table/cuckoo/cuckoo_table_reader_test.cc @@ -68,7 +68,7 @@ class CuckooReaderTest : public testing::Test { CuckooReaderTest() { options.allow_mmap_reads = true; env = options.env; - env_options = EnvOptions(options); + file_options = FileOptions(options); } void SetUp(int num) { @@ -88,12 +88,9 @@ class CuckooReaderTest : public testing::Test { void CreateCuckooFileAndCheckReader( const Comparator* ucomp = BytewiseComparator()) { - std::unique_ptr writable_file; - ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - env_options)); - + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(env->GetFileSystem(), fname, + file_options, &file_writer, nullptr)); CuckooTableBuilder builder( file_writer.get(), 0.9, kNumHashFunc, 100, ucomp, 2, false, false, GetSliceHash, 0 /* column_family_id */, kDefaultColumnFamilyName); @@ -109,11 +106,9 @@ class CuckooReaderTest : public testing::Test { ASSERT_OK(file_writer->Close()); // Check reader now. - std::unique_ptr read_file; - ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create( + env->GetFileSystem(), fname, file_options, &file_reader, nullptr)); const ImmutableCFOptions ioptions(options); CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp, GetSliceHash); @@ -139,11 +134,9 @@ class CuckooReaderTest : public testing::Test { } void CheckIterator(const Comparator* ucomp = BytewiseComparator()) { - std::unique_ptr read_file; - ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create( + env->GetFileSystem(), fname, file_options, &file_reader, nullptr)); const ImmutableCFOptions ioptions(options); CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp, GetSliceHash); @@ -211,7 +204,7 @@ class CuckooReaderTest : public testing::Test { uint64_t file_size; Options options; Env* env; - EnvOptions env_options; + FileOptions file_options; }; TEST_F(CuckooReaderTest, FileNotMmaped) { @@ -330,11 +323,11 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { } auto* ucmp = BytewiseComparator(); CreateCuckooFileAndCheckReader(); - std::unique_ptr read_file; - ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); + + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create( + env->GetFileSystem(), fname, file_options, &file_reader, nullptr)); + const ImmutableCFOptions ioptions(options); CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucmp, GetSliceHash); @@ -415,15 +408,13 @@ void WriteFile(const std::vector& keys, const uint64_t num, double hash_ratio) { Options options; options.allow_mmap_reads = true; - Env* env = options.env; - EnvOptions env_options = EnvOptions(options); + const auto& fs = options.env->GetFileSystem(); + FileOptions file_options(options); std::string fname = GetFileName(num); - std::unique_ptr writable_file; - ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); - std::unique_ptr file_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), fname, - env_options)); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(fs, fname, file_options, &file_writer, + nullptr)); CuckooTableBuilder builder( file_writer.get(), hash_ratio, 64, 1000, test::Uint64Comparator(), 5, false, FLAGS_identity_as_first_hash, nullptr, 0 /* column_family_id */, @@ -440,12 +431,11 @@ void WriteFile(const std::vector& keys, ASSERT_OK(file_writer->Close()); uint64_t file_size; - env->GetFileSize(fname, &file_size); - std::unique_ptr read_file; - ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); + ASSERT_OK( + fs->GetFileSize(fname, file_options.io_options, &file_size, nullptr)); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create(fs, fname, file_options, + &file_reader, nullptr)); const ImmutableCFOptions ioptions(options); CuckooTableReader reader(ioptions, std::move(file_reader), file_size, @@ -469,16 +459,16 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { Options options; options.allow_mmap_reads = true; Env* env = options.env; - EnvOptions env_options = EnvOptions(options); + const auto& fs = options.env->GetFileSystem(); + FileOptions file_options(options); std::string fname = GetFileName(num); uint64_t file_size; - env->GetFileSize(fname, &file_size); - std::unique_ptr read_file; - ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), - fname)); + ASSERT_OK( + fs->GetFileSize(fname, file_options.io_options, &file_size, nullptr)); + std::unique_ptr file_reader; + ASSERT_OK(RandomAccessFileReader::Create(fs, fname, file_options, + &file_reader, nullptr)); const ImmutableCFOptions ioptions(options); CuckooTableReader reader(ioptions, std::move(file_reader), file_size, diff --git a/table/mock_table.cc b/table/mock_table.cc index d7439c0e3..a05c1075e 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -265,17 +265,14 @@ TableBuilder* MockTableFactory::NewTableBuilder( Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, KVVector file_contents) { - std::unique_ptr file; - auto s = env->NewWritableFile(fname, &file, EnvOptions()); + std::unique_ptr file_writer; + auto s = WritableFileWriter::Create(env->GetFileSystem(), fname, + FileOptions(), &file_writer, nullptr); if (!s.ok()) { return s; } - - WritableFileWriter file_writer(NewLegacyWritableFileWrapper(std::move(file)), - fname, EnvOptions()); - uint32_t id; - s = GetAndWriteNextID(&file_writer, &id); + s = GetAndWriteNextID(file_writer.get(), &id); if (s.ok()) { file_system_.files.insert({id, std::move(file_contents)}); } diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index c405609bc..8a715c120 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -18,7 +18,6 @@ #include "db/blob/blob_index.h" #include "db/memtable.h" #include "db/write_batch_internal.h" -#include "env/composite_env_wrapper.h" #include "options/cf_options.h" #include "port/port.h" #include "rocksdb/db.h" @@ -80,11 +79,13 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { // read table magic number Footer footer; - std::unique_ptr file; + const auto& fs = options_.env->GetFileSystem(); + std::unique_ptr file; uint64_t file_size = 0; - Status s = options_.env->NewRandomAccessFile(file_path, &file, soptions_); + Status s = fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file, + nullptr); if (s.ok()) { - s = options_.env->GetFileSize(file_path, &file_size); + s = fs->GetFileSize(file_path, IOOptions(), &file_size, nullptr); } // check empty file @@ -93,8 +94,7 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { return Status::Aborted(file_path, "Empty file"); } - file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - file_path)); + file_.reset(new RandomAccessFileReader(std::move(file), file_path)); FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, false /* track_min_offset */); @@ -119,9 +119,10 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { if (magic_number == kPlainTableMagicNumber || magic_number == kLegacyPlainTableMagicNumber) { soptions_.use_mmap_reads = true; - options_.env->NewRandomAccessFile(file_path, &file, soptions_); - file_.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), file_path)); + + fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file, + nullptr); + file_.reset(new RandomAccessFileReader(std::move(file), file_path)); } options_.comparator = &internal_comparator_; // For old sst format, ReadTableProperties might fail but file can be read @@ -192,16 +193,14 @@ Status SstFileDumper::DumpTable(const std::string& out_filename) { Status SstFileDumper::CalculateCompressedTableSize( const TableBuilderOptions& tb_options, size_t block_size, uint64_t* num_data_blocks, uint64_t* compressed_table_size) { - std::unique_ptr out_file; std::unique_ptr env(NewMemEnv(options_.env)); - Status s = env->NewWritableFile(testFileName, &out_file, soptions_); + std::unique_ptr dest_writer; + Status s = + WritableFileWriter::Create(env->GetFileSystem(), testFileName, + FileOptions(soptions_), &dest_writer, nullptr); if (!s.ok()) { return s; } - std::unique_ptr dest_writer; - dest_writer.reset( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)), - testFileName, soptions_)); BlockBasedTableOptions table_options; table_options.block_size = block_size; BlockBasedTableFactory block_based_tf(table_options); diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc index 9bb8bdd71..3f9344154 100644 --- a/table/sst_file_reader.cc +++ b/table/sst_file_reader.cc @@ -10,9 +10,10 @@ #include "db/arena_wrapped_db_iter.h" #include "db/db_iter.h" #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/random_access_file_reader.h" #include "options/cf_options.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "table/get_context.h" #include "table/table_builder.h" #include "table/table_reader.h" @@ -42,15 +43,17 @@ Status SstFileReader::Open(const std::string& file_path) { auto r = rep_.get(); Status s; uint64_t file_size = 0; - std::unique_ptr file; + std::unique_ptr file; std::unique_ptr file_reader; - s = r->options.env->GetFileSize(file_path, &file_size); + FileOptions fopts(r->soptions); + const auto& fs = r->options.env->GetFileSystem(); + + s = fs->GetFileSize(file_path, fopts.io_options, &file_size, nullptr); if (s.ok()) { - s = r->options.env->NewRandomAccessFile(file_path, &file, r->soptions); + s = fs->NewRandomAccessFile(file_path, fopts, &file, nullptr); } if (s.ok()) { - file_reader.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), file_path)); + file_reader.reset(new RandomAccessFileReader(std::move(file), file_path)); } if (s.ok()) { TableReaderOptions t_opt(r->ioptions, r->moptions.prefix_extractor.get(), diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index d90f97881..8e4e264e2 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -8,8 +8,8 @@ #include #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/writable_file_writer.h" +#include "rocksdb/file_system.h" #include "rocksdb/table.h" #include "table/block_based/block_based_table_builder.h" #include "table/sst_file_writer_collectors.h" @@ -182,8 +182,9 @@ SstFileWriter::~SstFileWriter() { Status SstFileWriter::Open(const std::string& file_path) { Rep* r = rep_.get(); Status s; - std::unique_ptr sst_file; - s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); + std::unique_ptr sst_file; + s = r->ioptions.env->GetFileSystem()->NewWritableFile( + file_path, r->env_options, &sst_file, nullptr); if (!s.ok()) { return s; } @@ -255,10 +256,11 @@ Status SstFileWriter::Open(const std::string& file_path) { r->column_family_name, unknown_level, 0 /* creation_time */, 0 /* oldest_key_time */, 0 /* target_file_size */, 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id); + r->file_writer.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(sst_file)), file_path, - r->env_options, r->ioptions.env->GetSystemClock(), - nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners, + std::move(sst_file), file_path, r->env_options, + r->ioptions.env->GetSystemClock(), nullptr /* io_tracer */, + nullptr /* stats */, r->ioptions.listeners, r->ioptions.file_checksum_gen_factory)); // TODO(tec) : If table_factory is using compressed block cache, we will diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index f2825dccb..9982c5748 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -13,10 +13,10 @@ int main() { #include "db/db_impl/db_impl.h" #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/random_access_file_reader.h" #include "monitoring/histogram.h" #include "rocksdb/db.h" +#include "rocksdb/file_system.h" #include "rocksdb/slice_transform.h" #include "rocksdb/system_clock.h" #include "rocksdb/table.h" @@ -92,14 +92,13 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, const MutableCFOptions moptions(cfo); std::unique_ptr file_writer; if (!through_db) { - std::unique_ptr file; - env->NewWritableFile(file_name, &file, env_options); + ASSERT_OK(WritableFileWriter::Create(env->GetFileSystem(), file_name, + FileOptions(env_options), &file_writer, + nullptr)); std::vector > int_tbl_prop_collector_factories; - file_writer.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(file)), file_name, env_options)); int unknown_level = -1; tb = opts.table_factory->NewTableBuilder( TableBuilderOptions( @@ -133,17 +132,19 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, std::unique_ptr table_reader; if (!through_db) { - std::unique_ptr raf; - s = env->NewRandomAccessFile(file_name, &raf, env_options); + const auto& fs = env->GetFileSystem(); + FileOptions fopts(env_options); + + std::unique_ptr raf; + s = fs->NewRandomAccessFile(file_name, fopts, &raf, nullptr); if (!s.ok()) { fprintf(stderr, "Create File Error: %s\n", s.ToString().c_str()); exit(1); } uint64_t file_size; - env->GetFileSize(file_name, &file_size); + fs->GetFileSize(file_name, fopts.io_options, &file_size, nullptr); std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(raf), - file_name)); + new RandomAccessFileReader(std::move(raf), file_name)); s = opts.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), env_options, ikc), diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index e1d0f6084..02643cfae 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -22,7 +22,6 @@ #include "db/dbformat.h" #include "db/log_reader.h" #include "db/write_batch_internal.h" -#include "env/composite_env_wrapper.h" #include "file/filename.h" #include "rocksdb/cache.h" #include "rocksdb/file_checksum.h" @@ -2290,19 +2289,11 @@ class InMemoryHandler : public WriteBatch::Handler { void DumpWalFile(Options options, std::string wal_file, bool print_header, bool print_values, bool is_write_committed, LDBCommandExecuteResult* exec_state) { - Env* env = options.env; - EnvOptions soptions(options); + const auto& fs = options.env->GetFileSystem(); + FileOptions soptions(options); std::unique_ptr wal_file_reader; - - Status status; - { - std::unique_ptr file; - status = env->NewSequentialFile(wal_file, &file, soptions); - if (status.ok()) { - wal_file_reader.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(file), wal_file)); - } - } + Status status = SequentialFileReader::Create(fs, wal_file, soptions, + &wal_file_reader, nullptr); if (!status.ok()) { if (exec_state) { *exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " + diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index 4342d9364..e0db79aa9 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -94,21 +94,20 @@ class SSTDumpToolTest : public testing::Test { void createSST(const Options& opts, const std::string& file_name) { Env* test_env = opts.env; - EnvOptions env_options(opts); + FileOptions file_options(opts); ReadOptions read_options; const ImmutableCFOptions imoptions(opts); const MutableCFOptions moptions(opts); ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator); std::unique_ptr tb; - std::unique_ptr file; - ASSERT_OK(test_env->NewWritableFile(file_name, &file, env_options)); std::vector > int_tbl_prop_collector_factories; - std::unique_ptr file_writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), - file_name, EnvOptions())); + std::unique_ptr file_writer; + ASSERT_OK(WritableFileWriter::Create(test_env->GetFileSystem(), file_name, + file_options, &file_writer, nullptr)); + std::string column_family_name; int unknown_level = -1; tb.reset(opts.table_factory->NewTableBuilder( diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index cb3ec3b01..dc41e9069 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -120,9 +120,12 @@ class TraceAnalyzerTest : public testing::Test { void CheckFileContent(const std::vector& cnt, std::string file_path, bool full_content) { - ASSERT_OK(env_->FileExists(file_path)); - std::unique_ptr f_ptr; - ASSERT_OK(env_->NewSequentialFile(file_path, &f_ptr, env_options_)); + const auto& fs = env_->GetFileSystem(); + FileOptions fopts(env_options_); + + ASSERT_OK(fs->FileExists(file_path, fopts.io_options, nullptr)); + std::unique_ptr file; + ASSERT_OK(fs->NewSequentialFile(file_path, fopts, &file, nullptr)); std::string get_line; std::istringstream iss; @@ -130,8 +133,6 @@ class TraceAnalyzerTest : public testing::Test { std::vector result; uint32_t count; Status s; - std::unique_ptr file = - NewLegacySequentialFileWrapper(f_ptr); SequentialFileReader sf_reader(std::move(file), file_path, 4096 /* filereadahead_size */); diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index c861225b1..9f9a36636 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -1047,18 +1047,17 @@ Status TraceAnalyzer::ReProcessing() { std::vector prefix(kTaTypeNum); std::istringstream iss; bool has_data = true; - std::unique_ptr wkey_input_f; + std::unique_ptr file; - s = env_->NewSequentialFile(whole_key_path, &wkey_input_f, env_options_); + s = env_->GetFileSystem()->NewSequentialFile( + whole_key_path, FileOptions(env_options_), &file, nullptr); if (!s.ok()) { fprintf(stderr, "Cannot open the whole key space file of CF: %u\n", cf_id); - wkey_input_f.reset(); + file.reset(); } - if (wkey_input_f) { - std::unique_ptr file; - file = NewLegacySequentialFileWrapper(wkey_input_f); + if (file) { size_t kTraceFileReadaheadSize = 2 * 1024 * 1024; SequentialFileReader sf_reader( std::move(file), whole_key_path, diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index c883e9016..cf0b4718f 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -5,11 +5,12 @@ // #include #include -#include "env/composite_env_wrapper.h" + #include "file/random_access_file_reader.h" #include "file/readahead_raf.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" +#include "rocksdb/file_system.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/random.h" @@ -21,42 +22,64 @@ class WritableFileWriterTest : public testing::Test {}; const uint32_t kMb = 1 << 20; TEST_F(WritableFileWriterTest, RangeSync) { - class FakeWF : public WritableFile { + class FakeWF : public FSWritableFile { public: explicit FakeWF() : size_(0), last_synced_(0) {} ~FakeWF() override {} - Status Append(const Slice& data) override { + using FSWritableFile::Append; + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { size_ += data.size(); - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - Status Truncate(uint64_t /*size*/) override { return Status::OK(); } - Status Close() override { + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { EXPECT_GE(size_, last_synced_ + kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb); // Make sure random writes generated enough writes. EXPECT_GT(size_, 10 * kMb); - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - Status Flush() override { return Status::OK(); } - Status Sync() override { return Status::OK(); } - Status Fsync() override { return Status::OK(); } void SetIOPriority(Env::IOPriority /*pri*/) override {} - uint64_t GetFileSize() override { return size_; } + uint64_t GetFileSize(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return size_; + } void GetPreallocationStatus(size_t* /*block_size*/, size_t* /*last_allocated_block*/) override {} size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { return 0; } - Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override { - return Status::OK(); + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); } protected: - Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override { - return Status::OK(); + IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - Status RangeSync(uint64_t offset, uint64_t nbytes) override { + IOStatus RangeSync(uint64_t offset, uint64_t nbytes, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { EXPECT_EQ(offset % 4096, 0u); EXPECT_EQ(nbytes % 4096, 0u); @@ -66,7 +89,7 @@ TEST_F(WritableFileWriterTest, RangeSync) { if (size_ > 2 * kMb) { EXPECT_LT(size_, last_synced_ + 2 * kMb); } - return Status::OK(); + return IOStatus::OK(); } uint64_t size_; @@ -77,8 +100,7 @@ TEST_F(WritableFileWriterTest, RangeSync) { env_options.bytes_per_sync = kMb; std::unique_ptr wf(new FakeWF); std::unique_ptr writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), - "" /* don't care */, env_options)); + new WritableFileWriter(std::move(wf), "" /* don't care */, env_options)); Random r(301); Status s; std::unique_ptr large_buf(new char[10 * kMb]); @@ -99,7 +121,7 @@ TEST_F(WritableFileWriterTest, RangeSync) { } TEST_F(WritableFileWriterTest, IncrementalBuffer) { - class FakeWF : public WritableFile { + class FakeWF : public FSWritableFile { public: explicit FakeWF(std::string* _file_data, bool _use_direct_io, bool _no_flush) @@ -108,37 +130,58 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) { no_flush_(_no_flush) {} ~FakeWF() override {} - Status Append(const Slice& data) override { + using FSWritableFile::Append; + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { file_data_->append(data.data(), data.size()); size_ += data.size(); - return Status::OK(); + return IOStatus::OK(); } - Status PositionedAppend(const Slice& data, uint64_t pos) override { + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& data, uint64_t pos, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { EXPECT_TRUE(pos % 512 == 0); EXPECT_TRUE(data.size() % 512 == 0); file_data_->resize(pos); file_data_->append(data.data(), data.size()); size_ += data.size(); - return Status::OK(); + return IOStatus::OK(); } - Status Truncate(uint64_t size) override { + IOStatus Truncate(uint64_t size, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { file_data_->resize(size); - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - Status Close() override { return Status::OK(); } - Status Flush() override { return Status::OK(); } - Status Sync() override { return Status::OK(); } - Status Fsync() override { return Status::OK(); } void SetIOPriority(Env::IOPriority /*pri*/) override {} - uint64_t GetFileSize() override { return size_; } + uint64_t GetFileSize(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return size_; + } void GetPreallocationStatus(size_t* /*block_size*/, size_t* /*last_allocated_block*/) override {} size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { return 0; } - Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override { - return Status::OK(); + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); } bool use_direct_io() const override { return use_direct_io_; } @@ -163,9 +206,8 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) { false, #endif no_flush)); - std::unique_ptr writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), - "" /* don't care */, env_options)); + std::unique_ptr writer(new WritableFileWriter( + std::move(wf), "" /* don't care */, env_options)); std::string target; for (int i = 0; i < 20; i++) { @@ -188,26 +230,41 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) { #ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { - class FakeWF : public WritableFile { + class FakeWF : public FSWritableFile { public: explicit FakeWF() : use_direct_io_(false), io_error_(false) {} bool use_direct_io() const override { return use_direct_io_; } - Status Append(const Slice& /*data*/) override { + + using FSWritableFile::Append; + IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { if (io_error_) { - return Status::IOError("Fake IO error"); + return IOStatus::IOError("Fake IO error"); } - return Status::OK(); + return IOStatus::OK(); } - Status PositionedAppend(const Slice& /*data*/, uint64_t) override { + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { if (io_error_) { - return Status::IOError("Fake IO error"); + return IOStatus::IOError("Fake IO error"); } - return Status::OK(); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); } - Status Close() override { return Status::OK(); } - Status Flush() override { return Status::OK(); } - Status Sync() override { return Status::OK(); } void Setuse_direct_io(bool val) { use_direct_io_ = val; } void SetIOError(bool val) { io_error_ = val; } @@ -218,15 +275,13 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) { std::unique_ptr wf(new FakeWF()); wf->Setuse_direct_io(true); std::unique_ptr writer( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), - "" /* don't care */, EnvOptions())); + new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions())); ASSERT_OK(writer->Append(std::string(2 * kMb, 'a'))); // Next call to WritableFile::Append() should fail - LegacyWritableFileWrapper* file = - static_cast(writer->writable_file()); - static_cast(file->target())->SetIOError(true); + FakeWF* fwf = static_cast(writer->writable_file()); + fwf->SetIOError(true); ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b'))); } #endif diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 10245ad22..211da07fe 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1505,10 +1505,10 @@ Status BackupEngineImpl::CopyOrCreateFile( uint64_t size_limit, std::function progress_callback) { assert(src.empty() != contents.empty()); Status s; - std::unique_ptr dst_file; - std::unique_ptr src_file; - EnvOptions dst_env_options; - dst_env_options.use_mmap_writes = false; + std::unique_ptr dst_file; + std::unique_ptr src_file; + FileOptions dst_file_options; + dst_file_options.use_mmap_writes = false; // TODO:(gzh) maybe use direct reads/writes here if possible if (size != nullptr) { *size = 0; @@ -1520,21 +1520,22 @@ Status BackupEngineImpl::CopyOrCreateFile( size_limit = std::numeric_limits::max(); } - s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options); + s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options, + &dst_file, nullptr); if (s.ok() && !src.empty()) { - s = src_env->NewSequentialFile(src, &src_file, src_env_options); + s = src_env->GetFileSystem()->NewSequentialFile( + src, FileOptions(src_env_options), &src_file, nullptr); } if (!s.ok()) { return s; } - std::unique_ptr dest_writer(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options)); + std::unique_ptr dest_writer( + new WritableFileWriter(std::move(dst_file), dst, dst_file_options)); std::unique_ptr src_reader; std::unique_ptr buf; if (!src.empty()) { - src_reader.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(src_file), src)); + src_reader.reset(new SequentialFileReader(std::move(src_file), src)); buf.reset(new char[copy_file_buffer_size_]); } @@ -1825,14 +1826,14 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum( size_limit = std::numeric_limits::max(); } - std::unique_ptr src_file; - Status s = src_env->NewSequentialFile(src, &src_file, src_env_options); + std::unique_ptr src_reader; + Status s = SequentialFileReader::Create(src_env->GetFileSystem(), src, + FileOptions(src_env_options), + &src_reader, nullptr); if (!s.ok()) { return s; } - std::unique_ptr src_reader( - new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src)); std::unique_ptr buf(new char[copy_file_buffer_size_]); Slice data; @@ -2142,15 +2143,12 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( const std::unordered_map& abs_path_to_size) { assert(Empty()); Status s; - std::unique_ptr backup_meta_file; - s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions()); + std::unique_ptr backup_meta_reader; + s = SequentialFileReader::Create(env_->GetFileSystem(), meta_filename_, + FileOptions(), &backup_meta_reader, nullptr); if (!s.ok()) { return s; } - - std::unique_ptr backup_meta_reader( - new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file), - meta_filename_)); std::unique_ptr buf(new char[max_backup_meta_file_size_ + 1]); Slice data; s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get()); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 608cd334b..777f54887 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -15,7 +15,6 @@ #include "db/blob/blob_index.h" #include "db/db_impl/db_impl.h" #include "db/write_batch_internal.h" -#include "env/composite_env_wrapper.h" #include "file/file_util.h" #include "file/filename.h" #include "file/random_access_file_reader.h" @@ -81,7 +80,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, bdb_options_(blob_db_options), db_options_(db_options), cf_options_(cf_options), - env_options_(db_options), + file_options_(db_options), statistics_(db_options_.statistics.get()), next_file_number_(1), flush_sequence_(0), @@ -96,7 +95,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, blob_dir_ = (bdb_options_.path_relative) ? dbname + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; - env_options_.bytes_per_sync = blob_db_options.bytes_per_sync; + file_options_.bytes_per_sync = blob_db_options.bytes_per_sync; } BlobDBImpl::~BlobDBImpl() { @@ -346,7 +345,8 @@ Status BlobDBImpl::OpenAllBlobFiles() { blob_file->MarkImmutable(/* sequence */ 0); // Read file header and footer - Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); + Status read_metadata_status = + blob_file->ReadMetadata(env_->GetFileSystem(), file_options_); if (read_metadata_status.IsCorruption()) { // Remove incomplete file. if (!obsolete_files_.empty()) { @@ -679,7 +679,7 @@ Status BlobDBImpl::GetBlobFileReader( std::shared_ptr* reader) { assert(reader != nullptr); bool fresh_open = false; - Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open); + Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open); if (s.ok() && fresh_open) { assert(*reader != nullptr); open_file_count_++; @@ -720,21 +720,23 @@ void BlobDBImpl::RegisterBlobFile(std::shared_ptr blob_file) { Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { std::string fpath(bfile->PathName()); - std::unique_ptr wfile; + std::unique_ptr wfile; + const auto& fs = env_->GetFileSystem(); - Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_); + Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to open blob file for write: %s status: '%s'" " exists: '%s'", fpath.c_str(), s.ToString().c_str(), - env_->FileExists(fpath).ToString().c_str()); + fs->FileExists(fpath, file_options_.io_options, nullptr) + .ToString() + .c_str()); return s; } std::unique_ptr fwriter; - fwriter.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_)); + fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_)); uint64_t boffset = bfile->GetFileSize(); if (debug_level_ >= 2 && boffset) { diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index ecc302ff1..14f45e5b6 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -24,6 +24,7 @@ #include "db/db_iter.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" +#include "rocksdb/file_system.h" #include "rocksdb/listener.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" @@ -407,7 +408,7 @@ class BlobDBImpl : public BlobDB { BlobDBOptions bdb_options_; DBOptions db_options_; ColumnFamilyOptions cf_options_; - EnvOptions env_options_; + FileOptions file_options_; // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold // ownership. diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 8c95b78f4..f12c3b9b3 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -15,7 +15,6 @@ #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/filename.h" #include "file/readahead_raf.h" #include "logging/logging.h" @@ -151,7 +150,7 @@ void BlobFile::CloseRandomAccessLocked() { last_access_ = -1; } -Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, +Status BlobFile::GetReader(Env* env, const FileOptions& file_options, std::shared_ptr* reader, bool* fresh_open) { assert(reader != nullptr); @@ -178,8 +177,9 @@ Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, return s; } - std::unique_ptr rfile; - s = env->NewRandomAccessFile(PathName(), &rfile, env_options); + std::unique_ptr rfile; + s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options, + &rfile, nullptr); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to open blob file for random-read: %s status: '%s'" @@ -189,18 +189,20 @@ Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, return s; } - ra_file_reader_ = std::make_shared( - NewLegacyRandomAccessFileWrapper(rfile), PathName()); + ra_file_reader_ = + std::make_shared(std::move(rfile), PathName()); *reader = ra_file_reader_; *fresh_open = true; return s; } -Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { +Status BlobFile::ReadMetadata(const std::shared_ptr& fs, + const FileOptions& file_options) { assert(Immutable()); // Get file size. uint64_t file_size = 0; - Status s = env->GetFileSize(PathName(), &file_size); + Status s = + fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr); if (s.ok()) { file_size_ = file_size; } else { @@ -219,17 +221,15 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { } // Create file reader. - std::unique_ptr file; - s = env->NewRandomAccessFile(PathName(), &file, env_options); + std::unique_ptr file_reader; + s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader, + nullptr); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to open blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } - std::unique_ptr file_reader( - new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - PathName())); // Read file header. std::string header_buf; diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 9de9a3e91..6f3f2bea7 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -15,6 +15,7 @@ #include "file/random_access_file_reader.h" #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "rocksdb/options.h" namespace ROCKSDB_NAMESPACE { @@ -208,9 +209,10 @@ class BlobFile { // Read blob file header and footer. Return corruption if file header is // malform or incomplete. If footer is malform or incomplete, set // footer_valid_ to false and return Status::OK. - Status ReadMetadata(Env* env, const EnvOptions& env_options); + Status ReadMetadata(const std::shared_ptr& fs, + const FileOptions& file_options); - Status GetReader(Env* env, const EnvOptions& env_options, + Status GetReader(Env* env, const FileOptions& file_options, std::shared_ptr* reader, bool* fresh_open); diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc index d069acb89..2dcabc850 100644 --- a/utilities/persistent_cache/block_cache_tier_file.cc +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -33,15 +33,15 @@ Status NewWritableCacheFile(Env* const env, const std::string& filepath, return s; } -Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, - std::unique_ptr* file, +Status NewRandomAccessCacheFile(const std::shared_ptr& fs, + const std::string& filepath, + std::unique_ptr* file, const bool use_direct_reads = true) { - assert(env); + assert(fs.get()); - EnvOptions opt; + FileOptions opt; opt.use_direct_reads = use_direct_reads; - Status s = env->NewRandomAccessFile(filepath, file, opt); - return s; + return fs->NewRandomAccessFile(filepath, opt, file, nullptr); } // @@ -210,17 +210,18 @@ bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { rwlock_.AssertHeld(); ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); + assert(env_); - std::unique_ptr file; - Status status = - NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads); + std::unique_ptr file; + Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file, + enable_direct_reads); if (!status.ok()) { Error(log_, "Error opening random access file %s. %s", Path().c_str(), status.ToString().c_str()); return false; } - freader_.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), Path(), env_->GetSystemClock())); + freader_.reset(new RandomAccessFileReader(std::move(file), Path(), + env_->GetSystemClock())); return true; } diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index 5d528fc85..0da71ab16 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -4,12 +4,14 @@ // (found in the LICENSE.Apache file in the root directory). #include "rocksdb/utilities/sim_cache.h" + #include -#include "env/composite_env_wrapper.h" + #include "file/writable_file_writer.h" #include "monitoring/statistics.h" #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "util/mutexlock.h" #include "util/string_util.h" @@ -35,8 +37,7 @@ class CacheActivityLogger { assert(env != nullptr); Status status; - EnvOptions env_opts; - std::unique_ptr log_file; + FileOptions file_opts; MutexLock l(&mutex_); @@ -44,13 +45,11 @@ class CacheActivityLogger { StopLoggingInternal(); // Open log file - status = env->NewWritableFile(activity_log_file, &log_file, env_opts); + status = WritableFileWriter::Create(env->GetFileSystem(), activity_log_file, + file_opts, &file_writer_, nullptr); if (!status.ok()) { return status; } - file_writer_.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(log_file)), activity_log_file, - env_opts)); max_logging_size_ = max_logging_size; activity_logging_enabled_.store(true); diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index 3ee096a4b..d553e2434 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -92,15 +92,13 @@ uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); } Status NewFileTraceReader(Env* env, const EnvOptions& env_options, const std::string& trace_filename, std::unique_ptr* trace_reader) { - std::unique_ptr trace_file; - Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options); + std::unique_ptr file_reader; + Status s = RandomAccessFileReader::Create( + env->GetFileSystem(), trace_filename, FileOptions(env_options), + &file_reader, nullptr); if (!s.ok()) { return s; } - - std::unique_ptr file_reader; - file_reader.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(trace_file), trace_filename)); trace_reader->reset(new FileTraceReader(std::move(file_reader))); return s; } @@ -108,16 +106,13 @@ Status NewFileTraceReader(Env* env, const EnvOptions& env_options, Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, const std::string& trace_filename, std::unique_ptr* trace_writer) { - std::unique_ptr trace_file; - Status s = env->NewWritableFile(trace_filename, &trace_file, env_options); + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create(env->GetFileSystem(), trace_filename, + FileOptions(env_options), &file_writer, + nullptr); if (!s.ok()) { return s; } - - std::unique_ptr file_writer; - file_writer.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(trace_file)), trace_filename, - env_options)); trace_writer->reset(new FileTraceWriter(std::move(file_writer))); return s; }