diff --git a/CMakeLists.txt b/CMakeLists.txt index 34f368a2a..81010f972 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1118,6 +1118,7 @@ if(WITH_TESTS) table/table_test.cc table/block_fetcher_test.cc test_util/testutil_test.cc + trace_replay/io_tracer_test.cc tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc tools/ldb_cmd_test.cc tools/reduce_levels_test.cc diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 745a568c6..74f3c7ccd 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -328,8 +328,9 @@ CompactionJob::CompactionJob( const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, const std::atomic* manual_compaction_paused, - const std::string& db_id, const std::string& db_session_id) + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_paused, const std::string& db_id, + const std::string& db_session_id) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -340,7 +341,7 @@ CompactionJob::CompactionJob( db_options_(db_options), file_options_(file_options), env_(db_options.env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), file_options_for_read_( fs_->OptimizeForCompactionTableRead(file_options, db_options_)), versions_(versions), @@ -1564,7 +1565,8 @@ Status CompactionJob::OpenCompactionOutputFile( &syncpoint_arg); #endif Status s; - IOStatus io_s = NewWritableFile(fs_, fname, &writable_file, file_options_); + IOStatus io_s = + NewWritableFile(fs_.get(), fname, &writable_file, file_options_); s = io_s; if (sub_compact->io_status.ok()) { sub_compact->io_status = io_s; diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 7d7c58ca2..4160063b4 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -62,25 +62,23 @@ class VersionSet; // if needed. class CompactionJob { public: - CompactionJob(int job_id, Compaction* compaction, - const ImmutableDBOptions& db_options, - const FileOptions& file_options, VersionSet* versions, - const std::atomic* shutting_down, - const SequenceNumber preserve_deletes_seqnum, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_directory, Statistics* stats, - InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, - std::shared_ptr table_cache, EventLogger* event_logger, - bool paranoid_file_checks, bool measure_io_stats, - const std::string& dbname, - CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, - const std::atomic* manual_compaction_paused = nullptr, - const std::string& db_id = "", - const std::string& db_session_id = ""); + CompactionJob( + int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const FileOptions& file_options, VersionSet* versions, + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, + FSDirectory* db_directory, FSDirectory* output_directory, + Statistics* stats, InstrumentedMutex* db_mutex, + ErrorHandler* db_error_handler, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, + std::shared_ptr table_cache, EventLogger* event_logger, + bool paranoid_file_checks, bool measure_io_stats, + const std::string& dbname, CompactionJobStats* compaction_job_stats, + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_paused = nullptr, + const std::string& db_id = "", const std::string& db_session_id = ""); ~CompactionJob(); @@ -160,7 +158,7 @@ class CompactionJob { const FileOptions file_options_; Env* env_; - FileSystem* fs_; + FileSystemPtr fs_; // env_option optimized for compaction table reads FileOptions file_options_for_read_; VersionSet* versions_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 07f338c76..c6751fc15 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -79,10 +79,10 @@ class CompactionJobTest : public testing::Test { mutable_db_options_(), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)), + versions_(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)), shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), @@ -249,10 +249,10 @@ class CompactionJobTest : public testing::Test { void NewDB() { DestroyDB(dbname_, Options()); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); compaction_job_stats_.Reset(); SetIdentityFile(env_, dbname_); @@ -334,7 +334,7 @@ class CompactionJobTest : public testing::Test { nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, - Env::Priority::USER); + Env::Priority::USER, nullptr /* IOTracer */); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bf4bf0784..1fdd2bb82 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -151,8 +151,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, initial_db_options_(SanitizeOptions(dbname, options)), env_(initial_db_options_.env), io_tracer_(std::make_shared()), - fs_(initial_db_options_.env->GetFileSystem()), immutable_db_options_(initial_db_options_), + fs_(immutable_db_options_.fs, io_tracer_), mutable_db_options_(initial_db_options_), stats_(immutable_db_options_.statistics.get()), mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, @@ -197,7 +197,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, unable_to_release_oldest_log_(false), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE - wal_manager_(immutable_db_options_, file_options_, seq_per_batch), + wal_manager_(immutable_db_options_, file_options_, io_tracer_, + seq_per_batch), #endif // ROCKSDB_LITE event_logger_(immutable_db_options_.info_log.get()), bg_work_paused_(0), @@ -245,7 +246,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, - &write_controller_, &block_cache_tracer_)); + &write_controller_, &block_cache_tracer_, + io_tracer_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -3931,7 +3933,7 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock, std::string file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, - GetFileSystem()); + fs_.get()); if (s.ok()) { s = RenameTempFileToOptionsFile(file_name); @@ -4278,7 +4280,7 @@ Status DBImpl::IngestExternalFiles( auto* cfd = static_cast(arg.column_family)->cfd(); ingestion_jobs.emplace_back( env_, versions_.get(), cfd, immutable_db_options_, file_options_, - &snapshots_, arg.options, &directories_, &event_logger_); + &snapshots_, arg.options, &directories_, &event_logger_, io_tracer_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { @@ -4552,7 +4554,7 @@ Status DBImpl::CreateColumnFamilyWithImport( auto cfd = cfh->cfd(); ImportColumnFamilyJob import_job(env_, versions_.get(), cfd, immutable_db_options_, file_options_, - import_options, metadata.files); + import_options, metadata.files, io_tracer_); SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); VersionEdit dummy_edit; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e970761f4..1db89b3b0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1012,8 +1012,8 @@ class DBImpl : public DB { const DBOptions initial_db_options_; Env* const env_; std::shared_ptr io_tracer_; - std::shared_ptr fs_; const ImmutableDBOptions immutable_db_options_; + FileSystemPtr fs_; MutableDBOptions mutable_db_options_; Statistics* stats_; std::unordered_map diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 04aaaf80f..b427375f7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1051,8 +1051,8 @@ Status DBImpl::CompactFilesImpl( snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_, - db_id_, db_session_id_); + &compaction_job_stats, Env::Priority::USER, io_tracer_, + &manual_compaction_paused_, db_id_, db_session_id_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -2846,7 +2846,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats, thread_pri, + &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, db_id_, db_session_id_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 940675724..446835b3f 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -611,7 +611,7 @@ Status DB::OpenAsSecondary( impl->versions_.reset(new ReactiveVersionSet( dbname, &impl->immutable_db_options_, impl->file_options_, impl->table_cache_.get(), impl->write_buffer_manager_, - &impl->write_controller_)); + &impl->write_controller_, impl->io_tracer_)); impl->column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 537ee04a0..b2f8eacc8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -936,12 +936,13 @@ class RecoveryTestHelper { std::unique_ptr wal_manager; WriteController write_controller; - versions.reset(new VersionSet(test->dbname_, &db_options, env_options, - table_cache.get(), &write_buffer_manager, - &write_controller, - /*block_cache_tracer=*/nullptr)); + versions.reset(new VersionSet( + test->dbname_, &db_options, env_options, table_cache.get(), + &write_buffer_manager, &write_controller, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); - wal_manager.reset(new WalManager(db_options, env_options)); + wal_manager.reset( + new WalManager(db_options, env_options, /*io_tracer=*/nullptr)); std::unique_ptr current_log_writer; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7ae2ca375..31cceea51 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -136,7 +136,7 @@ Status ExternalSstFileIngestionJob::Prepare( TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", nullptr); // CopyFile also sync the new file. - status = CopyFile(fs_, path_outside_db, path_inside_db, 0, + status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, db_options_.use_fsync); } TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); @@ -194,7 +194,7 @@ Status ExternalSstFileIngestionJob::Prepare( for (size_t i = 0; i < files_to_ingest_.size(); i++) { std::string generated_checksum, generated_checksum_func_name; IOStatus io_s = GenerateOneFileChecksum( - fs_, files_to_ingest_[i].internal_file_path, + fs_.get(), files_to_ingest_[i].internal_file_path, db_options_.file_checksum_gen_factory.get(), &generated_checksum, &generated_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, @@ -831,7 +831,7 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( } std::string file_checksum, file_checksum_func_name; IOStatus io_s = GenerateOneFileChecksum( - fs_, file_to_ingest->internal_file_path, + fs_.get(), file_to_ingest->internal_file_path, db_options_.file_checksum_gen_factory.get(), &file_checksum, &file_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index f7723989d..3aa244bd8 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "env/file_system_tracer.h" #include "logging/event_logger.h" #include "options/db_options.h" #include "rocksdb/db.h" @@ -76,9 +77,10 @@ class ExternalSstFileIngestionJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, - Directories* directories, EventLogger* event_logger) + Directories* directories, EventLogger* event_logger, + const std::shared_ptr& io_tracer) : env_(env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), versions_(versions), cfd_(cfd), db_options_(db_options), @@ -167,7 +169,7 @@ class ExternalSstFileIngestionJob { Status SyncIngestedFile(TWritableFile* file); Env* env_; - FileSystem* fs_; + FileSystemPtr fs_; VersionSet* versions_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 1f0fdef57..0ea382e33 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -55,10 +55,10 @@ class FlushJobTest : public testing::Test { db_options_.env = env_; db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); EXPECT_OK(versions_->Recover(column_families, false)); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index 4b0502c49..20bb27cb8 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -100,7 +100,7 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, } } if (!hardlink_files) { - status = CopyFile(fs_, path_outside_db, path_inside_db, 0, + status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, db_options_.use_fsync); } if (!status.ok()) { diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h index 160fd1247..ef681862b 100644 --- a/db/import_column_family_job.h +++ b/db/import_column_family_job.h @@ -24,12 +24,13 @@ class ImportColumnFamilyJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, const ImportColumnFamilyOptions& import_options, - const std::vector& metadata) + const std::vector& metadata, + const std::shared_ptr& io_tracer) : env_(env), versions_(versions), cfd_(cfd), db_options_(db_options), - fs_(db_options_.fs.get()), + fs_(db_options_.fs, io_tracer), env_options_(env_options), import_options_(import_options), metadata_(metadata) {} @@ -61,7 +62,7 @@ class ImportColumnFamilyJob { VersionSet* versions_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; - FileSystem* fs_; + const FileSystemPtr fs_; const EnvOptions& env_options_; autovector files_to_import_; VersionEdit edit_; diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 6e0d33651..2fb52af48 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -100,7 +100,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller, /*block_cache_tracer=*/nullptr); + &write_controller, /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -148,7 +149,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller, /*block_cache_tracer=*/nullptr); + &write_controller, /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index 91ce51e82..9fb7ff225 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -117,7 +117,7 @@ class Repairer { wc_(db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, env_options_, raw_table_cache_.get(), &wb_, &wc_, - /*block_cache_tracer=*/nullptr), + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr), next_file_number_(1), db_lock_(nullptr) { for (const auto& cfd : column_families) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 56bc161a3..5e094e414 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -17,7 +17,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch) + const bool seq_per_batch, const std::shared_ptr& io_tracer) : dir_(dir), options_(options), read_options_(read_options), @@ -30,7 +30,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( current_batch_seq_(0), current_last_seq_(0), versions_(versions), - seq_per_batch_(seq_per_batch) { + seq_per_batch_(seq_per_batch), + io_tracer_(io_tracer) { assert(files_ != nullptr); assert(versions_ != nullptr); @@ -42,7 +43,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( Status TransactionLogIteratorImpl::OpenLogFile( const LogFile* log_file, std::unique_ptr* file_reader) { - FileSystem* fs = options_->fs.get(); + FileSystemPtr fs(options_->fs, io_tracer_); std::unique_ptr file; std::string fname; Status s; diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index eb53daf2b..1cea6e0b5 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -63,7 +63,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch); + const bool seq_per_batch, const std::shared_ptr& io_tracer); virtual bool Valid() override; @@ -122,6 +122,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); + std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/version_set.cc b/db/version_set.cc index d9795b72f..b30befe38 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3615,12 +3615,13 @@ VersionSet::VersionSet(const std::string& dbname, const FileOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer) + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) : column_family_set_(new ColumnFamilySet( dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller, block_cache_tracer)), env_(_db_options->env), - fs_(_db_options->fs.get()), + fs_(_db_options->fs, io_tracer), dbname_(dbname), db_options_(_db_options), next_file_number_(2), @@ -3634,7 +3635,8 @@ VersionSet::VersionSet(const std::string& dbname, current_version_number_(0), manifest_file_size_(0), file_options_(storage_options), - block_cache_tracer_(block_cache_tracer) {} + block_cache_tracer_(block_cache_tracer), + io_tracer_(io_tracer) {} VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on @@ -3937,7 +3939,7 @@ Status VersionSet::ProcessManifestWrites( std::string descriptor_fname = DescriptorFileName(dbname_, pending_manifest_file_number_); std::unique_ptr descriptor_file; - io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, + io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file, opt_file_opts); if (io_s.ok()) { descriptor_file->SetPreallocationBlockSize( @@ -4006,7 +4008,7 @@ Status VersionSet::ProcessManifestWrites( // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { - io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_, + io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_, db_directory); if (!io_s.ok()) { s = io_s; @@ -4536,7 +4538,7 @@ Status VersionSet::Recover( // Read "CURRENT" file, which contains a pointer to the current manifest file std::string manifest_path; - Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); if (!s.ok()) { return s; @@ -4943,7 +4945,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, WriteController wc(options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/); Status status; std::vector dummy; @@ -5925,15 +5927,14 @@ Status VersionSet::VerifyFileMetadata(const std::string& fpath, return status; } -ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, - const ImmutableDBOptions* _db_options, - const FileOptions& _file_options, - Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller) +ReactiveVersionSet::ReactiveVersionSet( + const std::string& dbname, const ImmutableDBOptions* _db_options, + const FileOptions& _file_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, WriteController* write_controller, + const std::shared_ptr& io_tracer) : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, - /*block_cache_tracer=*/nullptr), + /*block_cache_tracer=*/nullptr, io_tracer), number_of_edits_to_skip_(0) {} ReactiveVersionSet::~ReactiveVersionSet() {} @@ -6345,7 +6346,7 @@ Status ReactiveVersionSet::MaybeSwitchManifest( Status s; do { std::string manifest_path; - s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); std::unique_ptr manifest_file; if (s.ok()) { diff --git a/db/version_set.h b/db/version_set.h index 741cb7724..3f669576d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -42,6 +42,7 @@ #include "db/version_builder.h" #include "db/version_edit.h" #include "db/write_controller.h" +#include "env/file_system_tracer.h" #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" #include "port/port.h" @@ -763,7 +764,6 @@ class Version { private: Env* env_; - FileSystem* fs_; friend class ReactiveVersionSet; friend class VersionSet; friend class VersionEditHandler; @@ -897,7 +897,8 @@ class VersionSet { const FileOptions& file_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer); + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer); // No copying allowed VersionSet(const VersionSet&) = delete; void operator=(const VersionSet&) = delete; @@ -1277,7 +1278,7 @@ class VersionSet { std::unique_ptr column_family_set_; Env* const env_; - FileSystem* const fs_; + FileSystemPtr const fs_; const std::string dbname_; std::string db_id_; const ImmutableDBOptions* const db_options_; @@ -1330,6 +1331,8 @@ class VersionSet { // Store the IO status when Manifest is written IOStatus io_status_; + std::shared_ptr io_tracer_; + private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, @@ -1352,7 +1355,8 @@ class ReactiveVersionSet : public VersionSet { const ImmutableDBOptions* _db_options, const FileOptions& _file_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller); + WriteController* write_controller, + const std::shared_ptr& io_tracer); ~ReactiveVersionSet() override; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index b6d265b22..8f2134dce 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -726,13 +726,13 @@ class VersionSetTestBase { db_options_.env = env_; db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); reactive_versions_ = std::make_shared( dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_); + &write_buffer_manager_, &write_controller_, nullptr); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); } diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 7d2cd2b6a..f4f3c80a4 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -121,7 +121,7 @@ Status WalManager::GetUpdatesSince( } iter->reset(new TransactionLogIteratorImpl( db_options_.wal_dir, &db_options_, read_options, file_options_, seq, - std::move(wal_files), version_set, seq_per_batch_)); + std::move(wal_files), version_set, seq_per_batch_, io_tracer_)); return (*iter)->status(); } diff --git a/db/wal_manager.h b/db/wal_manager.h index 783bfe99c..7589bdeb5 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -36,14 +36,17 @@ namespace ROCKSDB_NAMESPACE { class WalManager { public: WalManager(const ImmutableDBOptions& db_options, - const FileOptions& file_options, const bool seq_per_batch = false) + const FileOptions& file_options, + const std::shared_ptr& io_tracer, + const bool seq_per_batch = false) : db_options_(db_options), file_options_(file_options), env_(db_options.env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), purge_wal_files_last_run_(0), seq_per_batch_(seq_per_batch), - wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {} + wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)), + io_tracer_(io_tracer) {} Status GetSortedWalFiles(VectorLogPtr& files); @@ -91,7 +94,7 @@ class WalManager { const ImmutableDBOptions& db_options_; const FileOptions file_options_; Env* env_; - FileSystem* fs_; + const FileSystemPtr fs_; // ------- WalManager state ------- // cache for ReadFirstRecord() calls @@ -108,6 +111,8 @@ class WalManager { // obsolete files will be deleted every this seconds if ttl deletion is // enabled and archive size_limit is disabled. static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; + + std::shared_ptr io_tracer_; }; #endif // ROCKSDB_LITE diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 26bad368e..87c168b70 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -50,16 +50,18 @@ class WalManagerTest : public testing::Test { fs_.reset(new LegacyFileSystemWrapper(env_.get())); db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); - wal_manager_.reset(new WalManager(db_options_, env_options_)); + wal_manager_.reset( + new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); } void Reopen() { - wal_manager_.reset(new WalManager(db_options_, env_options_)); + wal_manager_.reset( + new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); } // NOT thread safe diff --git a/env/file_system_tracer.cc b/env/file_system_tracer.cc index 55948f8cb..15b65870e 100644 --- a/env/file_system_tracer.cc +++ b/env/file_system_tracer.cc @@ -16,8 +16,8 @@ IOStatus FileSystemTracingWrapper::NewWritableFile( timer.Start(); IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg); uint64_t elapsed = timer.ElapsedNanos(); - IOTraceRecord io_record(elapsed, TraceType::kIOFileName, __func__, elapsed, - s.ToString(), fname); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), fname); io_tracer_->WriteIOOp(io_record); return s; } diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index 6899549b8..9a0fd18f1 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -66,11 +66,10 @@ class FileSystemTracingWrapper : public FileSystemWrapper { class FileSystemPtr { public: FileSystemPtr(std::shared_ptr fs, - std::shared_ptr io_tracer) - : fs_(fs), - io_tracer_(io_tracer), - fs_tracer_( - std::make_shared(fs_, io_tracer_)) {} + const std::shared_ptr& io_tracer) + : fs_(fs), io_tracer_(io_tracer) { + fs_tracer_ = std::make_shared(fs_, io_tracer_); + } std::shared_ptr operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { @@ -80,6 +79,15 @@ class FileSystemPtr { } } + /* Returns the underlying File System pointer */ + FileSystem* get() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_.get(); + } else { + return fs_.get(); + } + } + private: std::shared_ptr fs_; std::shared_ptr io_tracer_; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 210669ec4..d5a8b5bbf 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1030,7 +1030,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), @@ -1172,7 +1172,7 @@ void GetLiveFilesChecksumInfoFromVersionSet(Options options, WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); std::vector cf_name_list; s = versions.ListColumnFamilies(&cf_name_list, db_path, immutable_db_options.fs.get()); @@ -1892,7 +1892,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, WriteController wc(opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index fce26aabb..504a5d72c 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -205,7 +205,7 @@ class FileChecksumTestHelper { WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, - &wc, nullptr); + &wc, nullptr, nullptr); std::vector cf_name_list; Status s; s = versions.ListColumnFamilies(&cf_name_list, dbname_,