diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 8514bb1c4..75e823972 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -861,7 +861,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } file_reader.reset(new SequentialFileReader( - std::move(file), fname, immutable_db_options_.log_readahead_size)); + std::move(file), fname, immutable_db_options_.log_readahead_size, + io_tracer_)); } // Create the log reader. diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 446835b3f..63fb1c1db 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -153,7 +153,8 @@ Status DBImplSecondary::MaybeInitLogReader( return status; } file_reader.reset(new SequentialFileReader( - std::move(file), fname, immutable_db_options_.log_readahead_size)); + std::move(file), fname, immutable_db_options_.log_readahead_size, + io_tracer_)); } // Create the log reader. diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 31cceea51..fad62004b 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -137,7 +137,7 @@ Status ExternalSstFileIngestionJob::Prepare( nullptr); // CopyFile also sync the new file. status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, - db_options_.use_fsync); + db_options_.use_fsync, io_tracer_); } TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); if (!status.ok()) { diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 3aa244bd8..a2782f54a 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -90,7 +90,8 @@ class ExternalSstFileIngestionJob { directories_(directories), event_logger_(event_logger), job_start_time_(env_->NowMicros()), - consumed_seqno_count_(0) { + consumed_seqno_count_(0), + io_tracer_(io_tracer) { assert(directories != nullptr); } @@ -188,6 +189,7 @@ class ExternalSstFileIngestionJob { // Set in ExternalSstFileIngestionJob::Prepare(), if true and DB // file_checksum_gen_factory is set, DB will generate checksum each file. bool need_generate_file_checksum_{true}; + std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index 20bb27cb8..593b8cbe7 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -101,7 +101,7 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, } if (!hardlink_files) { status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, - db_options_.use_fsync); + db_options_.use_fsync, io_tracer_); } if (!status.ok()) { break; diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h index ef681862b..6cdde2473 100644 --- a/db/import_column_family_job.h +++ b/db/import_column_family_job.h @@ -33,7 +33,8 @@ class ImportColumnFamilyJob { fs_(db_options_.fs, io_tracer), env_options_(env_options), import_options_(import_options), - metadata_(metadata) {} + metadata_(metadata), + io_tracer_(io_tracer) {} // Prepare the job by copying external files into the DB. Status Prepare(uint64_t next_file_number, SuperVersion* sv); @@ -68,6 +69,7 @@ class ImportColumnFamilyJob { VersionEdit edit_; const ImportColumnFamilyOptions& import_options_; std::vector metadata_; + const std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 5e094e414..e6180903f 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -63,7 +63,8 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } if (s.ok()) { - file_reader->reset(new SequentialFileReader(std::move(file), fname)); + file_reader->reset( + new SequentialFileReader(std::move(file), fname, io_tracer_)); } return s; } diff --git a/db/version_set.cc b/db/version_set.cc index b30befe38..1c8242c23 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4558,7 +4558,7 @@ Status VersionSet::Recover( } manifest_file_reader.reset( new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size)); + db_options_->log_readahead_size, io_tracer_)); } VersionBuilderMap builders; @@ -4840,7 +4840,7 @@ Status VersionSet::TryRecoverFromOneManifest( } manifest_file_reader.reset( new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size)); + db_options_->log_readahead_size, io_tracer_)); } assert(s.ok()); @@ -4881,7 +4881,8 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, if (!s.ok()) { return s; } - file_reader.reset(new SequentialFileReader(std::move(file), manifest_path)); + file_reader.reset(new SequentialFileReader(std::move(file), manifest_path, + nullptr /*IOTracer*/)); } std::map column_family_names; @@ -5070,7 +5071,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, return s; } file_reader.reset(new SequentialFileReader( - std::move(file), dscname, db_options_->log_readahead_size)); + std::move(file), dscname, db_options_->log_readahead_size, io_tracer_)); } bool have_prev_log_number = false; @@ -6368,9 +6369,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest( } std::unique_ptr manifest_file_reader; if (s.ok()) { - manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size)); + manifest_file_reader.reset(new SequentialFileReader( + std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_)); manifest_reader->reset(new log::FragmentBufferedReader( nullptr, std::move(manifest_file_reader), reporter, true /* checksum */, 0 /* log_number */)); diff --git a/db/wal_manager.cc b/db/wal_manager.cc index f4f3c80a4..a2e88e2c2 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -467,7 +467,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr); std::unique_ptr file_reader( - new SequentialFileReader(std::move(file), fname)); + new SequentialFileReader(std::move(file), fname, io_tracer_)); if (!status.ok()) { return status; diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index 9a0fd18f1..8e91692cc 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -131,26 +131,33 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper { // FSSequentialFileTracingWrapper when tracing is disabled. class FSSequentialFilePtr { public: - FSSequentialFilePtr(FSSequentialFile* fs, std::shared_ptr io_tracer) - : fs_(fs), io_tracer_(io_tracer) { - fs_tracer_ = new FSSequentialFileTracingWrapper(fs_, io_tracer_); + FSSequentialFilePtr() {} + FSSequentialFilePtr(std::unique_ptr&& fs, + const std::shared_ptr& io_tracer) + : fs_(std::move(fs)), io_tracer_(io_tracer) { + fs_tracer_.reset(new FSSequentialFileTracingWrapper(fs_.get(), io_tracer_)); } - explicit FSSequentialFilePtr(FSSequentialFile* fs) - : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} - FSSequentialFile* operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { - return fs_tracer_; + return fs_tracer_.get(); } else { - return fs_; + return fs_.get(); + } + } + + FSSequentialFile* get() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_.get(); + } else { + return fs_.get(); } } private: - FSSequentialFile* fs_; + std::unique_ptr fs_; std::shared_ptr io_tracer_; - FSSequentialFileTracingWrapper* fs_tracer_; + std::unique_ptr fs_tracer_; }; // FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile diff --git a/file/file_util.cc b/file/file_util.cc index 0fda7aa83..9953c8d51 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -18,8 +18,8 @@ namespace ROCKSDB_NAMESPACE { // Utility function to copy a file up to a specified length IOStatus CopyFile(FileSystem* fs, const std::string& source, - const std::string& destination, uint64_t size, - bool use_fsync) { + const std::string& destination, uint64_t size, bool use_fsync, + const std::shared_ptr& io_tracer) { const FileOptions soptions; IOStatus io_s; std::unique_ptr src_reader; @@ -44,7 +44,8 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source, return io_s; } } - src_reader.reset(new SequentialFileReader(std::move(srcfile), source)); + src_reader.reset( + new SequentialFileReader(std::move(srcfile), source, io_tracer)); dest_writer.reset( new WritableFileWriter(std::move(destfile), destination, soptions)); } diff --git a/file/file_util.h b/file/file_util.h index 17b058038..21dee9e93 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -13,13 +13,15 @@ #include "rocksdb/sst_file_writer.h" #include "rocksdb/status.h" #include "rocksdb/types.h" +#include "trace_replay/io_tracer.h" namespace ROCKSDB_NAMESPACE { // use_fsync maps to options.use_fsync, which determines the way that // the file is synced after copying. extern IOStatus CopyFile(FileSystem* fs, const std::string& source, const std::string& destination, uint64_t size, - bool use_fsync); + bool use_fsync, + const std::shared_ptr& io_tracer = nullptr); extern IOStatus CreateFile(FileSystem* fs, const std::string& destination, const std::string& contents, bool use_fsync); diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index 2f0898b42..cd41f187b 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -10,6 +10,8 @@ #pragma once #include #include + +#include "env/file_system_tracer.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" @@ -21,20 +23,23 @@ namespace ROCKSDB_NAMESPACE { // cache disabled) reads appropriately, and also updates the IO stats. class SequentialFileReader { private: - std::unique_ptr file_; std::string file_name_; + FSSequentialFilePtr file_; std::atomic offset_{0}; // read offset public: - explicit SequentialFileReader(std::unique_ptr&& _file, - const std::string& _file_name) - : file_(std::move(_file)), file_name_(_file_name) {} + explicit SequentialFileReader( + std::unique_ptr&& _file, const std::string& _file_name, + const std::shared_ptr& io_tracer = nullptr) + : file_name_(_file_name), file_(std::move(_file), io_tracer) {} - explicit SequentialFileReader(std::unique_ptr&& _file, - const std::string& _file_name, - size_t _readahead_size) - : file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)), - file_name_(_file_name) {} + explicit SequentialFileReader( + std::unique_ptr&& _file, const std::string& _file_name, + size_t _readahead_size, + const std::shared_ptr& io_tracer = nullptr) + : file_name_(_file_name), + file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), + io_tracer) {} SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o);