From 1f9f630b2795f32c72de281d02127bbed4abdb4f Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Wed, 12 Aug 2020 17:28:10 -0700 Subject: [PATCH] Store FileSystemPtr object that contains FileSystem ptr (#7180) Summary: As part of the IOTracing project, this PR 1. Caches "FileSystemPtr" object(wrapper class that returns file system pointer based on tracing enabled) instead of "FileSystem" pointer. 2. FileSystemPtr object is created using FileSystem pointer and IOTracer pointer. 3. IOTracer shared_ptr is created in DBImpl and it is passed to different classes through constructor. 4. When tracing is enabled through DB::StartIOTrace, FileSystemPtr returns FileSystemTracingWrapper pointer for tracing purpose and when it is disabled underlying FileSystem pointer is returned. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7180 Test Plan: make check -j64 COMPILE_WITH_TSAN=1 make check -j64 Reviewed By: anand1976 Differential Revision: D22987117 Pulled By: akankshamahajan15 fbshipit-source-id: 6073617e4c2d5bc363914f3a1f55ae3b0a58fbf1 --- CMakeLists.txt | 1 + db/compaction/compaction_job.cc | 10 ++++--- db/compaction/compaction_job.h | 38 ++++++++++++-------------- db/compaction/compaction_job_test.cc | 18 ++++++------ db/db_impl/db_impl.cc | 14 ++++++---- db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_compaction_flush.cc | 6 ++-- db/db_impl/db_impl_secondary.cc | 2 +- db/db_wal_test.cc | 11 ++++---- db/external_sst_file_ingestion_job.cc | 6 ++-- db/external_sst_file_ingestion_job.h | 8 ++++-- db/flush_job_test.cc | 8 +++--- db/import_column_family_job.cc | 2 +- db/import_column_family_job.h | 7 +++-- db/memtable_list_test.cc | 6 ++-- db/repair.cc | 2 +- db/transaction_log_impl.cc | 7 +++-- db/transaction_log_impl.h | 3 +- db/version_set.cc | 31 +++++++++++---------- db/version_set.h | 12 +++++--- db/version_set_test.cc | 10 +++---- db/wal_manager.cc | 2 +- db/wal_manager.h | 13 ++++++--- db/wal_manager_test.cc | 14 ++++++---- env/file_system_tracer.cc | 4 +-- env/file_system_tracer.h | 18 ++++++++---- tools/ldb_cmd.cc | 6 ++-- tools/ldb_cmd_test.cc | 2 +- 28 files changed, 147 insertions(+), 116 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 34f368a2a..81010f972 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1118,6 +1118,7 @@ if(WITH_TESTS) table/table_test.cc table/block_fetcher_test.cc test_util/testutil_test.cc + trace_replay/io_tracer_test.cc tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc tools/ldb_cmd_test.cc tools/reduce_levels_test.cc diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 745a568c6..74f3c7ccd 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -328,8 +328,9 @@ CompactionJob::CompactionJob( const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, const std::atomic* manual_compaction_paused, - const std::string& db_id, const std::string& db_session_id) + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_paused, const std::string& db_id, + const std::string& db_session_id) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -340,7 +341,7 @@ CompactionJob::CompactionJob( db_options_(db_options), file_options_(file_options), env_(db_options.env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), file_options_for_read_( fs_->OptimizeForCompactionTableRead(file_options, db_options_)), versions_(versions), @@ -1564,7 +1565,8 @@ Status CompactionJob::OpenCompactionOutputFile( &syncpoint_arg); #endif Status s; - IOStatus io_s = NewWritableFile(fs_, fname, &writable_file, file_options_); + IOStatus io_s = + NewWritableFile(fs_.get(), fname, &writable_file, file_options_); s = io_s; if (sub_compact->io_status.ok()) { sub_compact->io_status = io_s; diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 7d7c58ca2..4160063b4 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -62,25 +62,23 @@ class VersionSet; // if needed. class CompactionJob { public: - CompactionJob(int job_id, Compaction* compaction, - const ImmutableDBOptions& db_options, - const FileOptions& file_options, VersionSet* versions, - const std::atomic* shutting_down, - const SequenceNumber preserve_deletes_seqnum, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_directory, Statistics* stats, - InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, - std::shared_ptr table_cache, EventLogger* event_logger, - bool paranoid_file_checks, bool measure_io_stats, - const std::string& dbname, - CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, - const std::atomic* manual_compaction_paused = nullptr, - const std::string& db_id = "", - const std::string& db_session_id = ""); + CompactionJob( + int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const FileOptions& file_options, VersionSet* versions, + const std::atomic* shutting_down, + const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, + FSDirectory* db_directory, FSDirectory* output_directory, + Statistics* stats, InstrumentedMutex* db_mutex, + ErrorHandler* db_error_handler, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, + std::shared_ptr table_cache, EventLogger* event_logger, + bool paranoid_file_checks, bool measure_io_stats, + const std::string& dbname, CompactionJobStats* compaction_job_stats, + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_paused = nullptr, + const std::string& db_id = "", const std::string& db_session_id = ""); ~CompactionJob(); @@ -160,7 +158,7 @@ class CompactionJob { const FileOptions file_options_; Env* env_; - FileSystem* fs_; + FileSystemPtr fs_; // env_option optimized for compaction table reads FileOptions file_options_for_read_; VersionSet* versions_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 07f338c76..c6751fc15 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -79,10 +79,10 @@ class CompactionJobTest : public testing::Test { mutable_db_options_(), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)), + versions_(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)), shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), @@ -249,10 +249,10 @@ class CompactionJobTest : public testing::Test { void NewDB() { DestroyDB(dbname_, Options()); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); compaction_job_stats_.Reset(); SetIdentityFile(env_, dbname_); @@ -334,7 +334,7 @@ class CompactionJobTest : public testing::Test { nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, - Env::Priority::USER); + Env::Priority::USER, nullptr /* IOTracer */); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bf4bf0784..1fdd2bb82 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -151,8 +151,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, initial_db_options_(SanitizeOptions(dbname, options)), env_(initial_db_options_.env), io_tracer_(std::make_shared()), - fs_(initial_db_options_.env->GetFileSystem()), immutable_db_options_(initial_db_options_), + fs_(immutable_db_options_.fs, io_tracer_), mutable_db_options_(initial_db_options_), stats_(immutable_db_options_.statistics.get()), mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, @@ -197,7 +197,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, unable_to_release_oldest_log_(false), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE - wal_manager_(immutable_db_options_, file_options_, seq_per_batch), + wal_manager_(immutable_db_options_, file_options_, io_tracer_, + seq_per_batch), #endif // ROCKSDB_LITE event_logger_(immutable_db_options_.info_log.get()), bg_work_paused_(0), @@ -245,7 +246,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, - &write_controller_, &block_cache_tracer_)); + &write_controller_, &block_cache_tracer_, + io_tracer_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -3931,7 +3933,7 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock, std::string file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, - GetFileSystem()); + fs_.get()); if (s.ok()) { s = RenameTempFileToOptionsFile(file_name); @@ -4278,7 +4280,7 @@ Status DBImpl::IngestExternalFiles( auto* cfd = static_cast(arg.column_family)->cfd(); ingestion_jobs.emplace_back( env_, versions_.get(), cfd, immutable_db_options_, file_options_, - &snapshots_, arg.options, &directories_, &event_logger_); + &snapshots_, arg.options, &directories_, &event_logger_, io_tracer_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { @@ -4552,7 +4554,7 @@ Status DBImpl::CreateColumnFamilyWithImport( auto cfd = cfh->cfd(); ImportColumnFamilyJob import_job(env_, versions_.get(), cfd, immutable_db_options_, file_options_, - import_options, metadata.files); + import_options, metadata.files, io_tracer_); SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); VersionEdit dummy_edit; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e970761f4..1db89b3b0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1012,8 +1012,8 @@ class DBImpl : public DB { const DBOptions initial_db_options_; Env* const env_; std::shared_ptr io_tracer_; - std::shared_ptr fs_; const ImmutableDBOptions immutable_db_options_; + FileSystemPtr fs_; MutableDBOptions mutable_db_options_; Statistics* stats_; std::unordered_map diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 04aaaf80f..b427375f7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1051,8 +1051,8 @@ Status DBImpl::CompactFilesImpl( snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_, - db_id_, db_session_id_); + &compaction_job_stats, Env::Priority::USER, io_tracer_, + &manual_compaction_paused_, db_id_, db_session_id_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -2846,7 +2846,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats, thread_pri, + &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, db_id_, db_session_id_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 940675724..446835b3f 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -611,7 +611,7 @@ Status DB::OpenAsSecondary( impl->versions_.reset(new ReactiveVersionSet( dbname, &impl->immutable_db_options_, impl->file_options_, impl->table_cache_.get(), impl->write_buffer_manager_, - &impl->write_controller_)); + &impl->write_controller_, impl->io_tracer_)); impl->column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 537ee04a0..b2f8eacc8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -936,12 +936,13 @@ class RecoveryTestHelper { std::unique_ptr wal_manager; WriteController write_controller; - versions.reset(new VersionSet(test->dbname_, &db_options, env_options, - table_cache.get(), &write_buffer_manager, - &write_controller, - /*block_cache_tracer=*/nullptr)); + versions.reset(new VersionSet( + test->dbname_, &db_options, env_options, table_cache.get(), + &write_buffer_manager, &write_controller, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); - wal_manager.reset(new WalManager(db_options, env_options)); + wal_manager.reset( + new WalManager(db_options, env_options, /*io_tracer=*/nullptr)); std::unique_ptr current_log_writer; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7ae2ca375..31cceea51 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -136,7 +136,7 @@ Status ExternalSstFileIngestionJob::Prepare( TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", nullptr); // CopyFile also sync the new file. - status = CopyFile(fs_, path_outside_db, path_inside_db, 0, + status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, db_options_.use_fsync); } TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); @@ -194,7 +194,7 @@ Status ExternalSstFileIngestionJob::Prepare( for (size_t i = 0; i < files_to_ingest_.size(); i++) { std::string generated_checksum, generated_checksum_func_name; IOStatus io_s = GenerateOneFileChecksum( - fs_, files_to_ingest_[i].internal_file_path, + fs_.get(), files_to_ingest_[i].internal_file_path, db_options_.file_checksum_gen_factory.get(), &generated_checksum, &generated_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, @@ -831,7 +831,7 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( } std::string file_checksum, file_checksum_func_name; IOStatus io_s = GenerateOneFileChecksum( - fs_, file_to_ingest->internal_file_path, + fs_.get(), file_to_ingest->internal_file_path, db_options_.file_checksum_gen_factory.get(), &file_checksum, &file_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index f7723989d..3aa244bd8 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "env/file_system_tracer.h" #include "logging/event_logger.h" #include "options/db_options.h" #include "rocksdb/db.h" @@ -76,9 +77,10 @@ class ExternalSstFileIngestionJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, - Directories* directories, EventLogger* event_logger) + Directories* directories, EventLogger* event_logger, + const std::shared_ptr& io_tracer) : env_(env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), versions_(versions), cfd_(cfd), db_options_(db_options), @@ -167,7 +169,7 @@ class ExternalSstFileIngestionJob { Status SyncIngestedFile(TWritableFile* file); Env* env_; - FileSystem* fs_; + FileSystemPtr fs_; VersionSet* versions_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 1f0fdef57..0ea382e33 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -55,10 +55,10 @@ class FlushJobTest : public testing::Test { db_options_.env = env_; db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); EXPECT_OK(versions_->Recover(column_families, false)); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index 4b0502c49..20bb27cb8 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -100,7 +100,7 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number, } } if (!hardlink_files) { - status = CopyFile(fs_, path_outside_db, path_inside_db, 0, + status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, db_options_.use_fsync); } if (!status.ok()) { diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h index 160fd1247..ef681862b 100644 --- a/db/import_column_family_job.h +++ b/db/import_column_family_job.h @@ -24,12 +24,13 @@ class ImportColumnFamilyJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, const ImportColumnFamilyOptions& import_options, - const std::vector& metadata) + const std::vector& metadata, + const std::shared_ptr& io_tracer) : env_(env), versions_(versions), cfd_(cfd), db_options_(db_options), - fs_(db_options_.fs.get()), + fs_(db_options_.fs, io_tracer), env_options_(env_options), import_options_(import_options), metadata_(metadata) {} @@ -61,7 +62,7 @@ class ImportColumnFamilyJob { VersionSet* versions_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; - FileSystem* fs_; + const FileSystemPtr fs_; const EnvOptions& env_options_; autovector files_to_import_; VersionEdit edit_; diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 6e0d33651..2fb52af48 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -100,7 +100,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller, /*block_cache_tracer=*/nullptr); + &write_controller, /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -148,7 +149,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller, /*block_cache_tracer=*/nullptr); + &write_controller, /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index 91ce51e82..9fb7ff225 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -117,7 +117,7 @@ class Repairer { wc_(db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, env_options_, raw_table_cache_.get(), &wb_, &wc_, - /*block_cache_tracer=*/nullptr), + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr), next_file_number_(1), db_lock_(nullptr) { for (const auto& cfd : column_families) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 56bc161a3..5e094e414 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -17,7 +17,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch) + const bool seq_per_batch, const std::shared_ptr& io_tracer) : dir_(dir), options_(options), read_options_(read_options), @@ -30,7 +30,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( current_batch_seq_(0), current_last_seq_(0), versions_(versions), - seq_per_batch_(seq_per_batch) { + seq_per_batch_(seq_per_batch), + io_tracer_(io_tracer) { assert(files_ != nullptr); assert(versions_ != nullptr); @@ -42,7 +43,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( Status TransactionLogIteratorImpl::OpenLogFile( const LogFile* log_file, std::unique_ptr* file_reader) { - FileSystem* fs = options_->fs.get(); + FileSystemPtr fs(options_->fs, io_tracer_); std::unique_ptr file; std::string fname; Status s; diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index eb53daf2b..1cea6e0b5 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -63,7 +63,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch); + const bool seq_per_batch, const std::shared_ptr& io_tracer); virtual bool Valid() override; @@ -122,6 +122,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); + std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/version_set.cc b/db/version_set.cc index d9795b72f..b30befe38 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3615,12 +3615,13 @@ VersionSet::VersionSet(const std::string& dbname, const FileOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer) + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) : column_family_set_(new ColumnFamilySet( dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller, block_cache_tracer)), env_(_db_options->env), - fs_(_db_options->fs.get()), + fs_(_db_options->fs, io_tracer), dbname_(dbname), db_options_(_db_options), next_file_number_(2), @@ -3634,7 +3635,8 @@ VersionSet::VersionSet(const std::string& dbname, current_version_number_(0), manifest_file_size_(0), file_options_(storage_options), - block_cache_tracer_(block_cache_tracer) {} + block_cache_tracer_(block_cache_tracer), + io_tracer_(io_tracer) {} VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on @@ -3937,7 +3939,7 @@ Status VersionSet::ProcessManifestWrites( std::string descriptor_fname = DescriptorFileName(dbname_, pending_manifest_file_number_); std::unique_ptr descriptor_file; - io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, + io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file, opt_file_opts); if (io_s.ok()) { descriptor_file->SetPreallocationBlockSize( @@ -4006,7 +4008,7 @@ Status VersionSet::ProcessManifestWrites( // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { - io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_, + io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_, db_directory); if (!io_s.ok()) { s = io_s; @@ -4536,7 +4538,7 @@ Status VersionSet::Recover( // Read "CURRENT" file, which contains a pointer to the current manifest file std::string manifest_path; - Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); if (!s.ok()) { return s; @@ -4943,7 +4945,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, WriteController wc(options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/); Status status; std::vector dummy; @@ -5925,15 +5927,14 @@ Status VersionSet::VerifyFileMetadata(const std::string& fpath, return status; } -ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, - const ImmutableDBOptions* _db_options, - const FileOptions& _file_options, - Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller) +ReactiveVersionSet::ReactiveVersionSet( + const std::string& dbname, const ImmutableDBOptions* _db_options, + const FileOptions& _file_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, WriteController* write_controller, + const std::shared_ptr& io_tracer) : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, - /*block_cache_tracer=*/nullptr), + /*block_cache_tracer=*/nullptr, io_tracer), number_of_edits_to_skip_(0) {} ReactiveVersionSet::~ReactiveVersionSet() {} @@ -6345,7 +6346,7 @@ Status ReactiveVersionSet::MaybeSwitchManifest( Status s; do { std::string manifest_path; - s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); std::unique_ptr manifest_file; if (s.ok()) { diff --git a/db/version_set.h b/db/version_set.h index 741cb7724..3f669576d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -42,6 +42,7 @@ #include "db/version_builder.h" #include "db/version_edit.h" #include "db/write_controller.h" +#include "env/file_system_tracer.h" #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" #include "port/port.h" @@ -763,7 +764,6 @@ class Version { private: Env* env_; - FileSystem* fs_; friend class ReactiveVersionSet; friend class VersionSet; friend class VersionEditHandler; @@ -897,7 +897,8 @@ class VersionSet { const FileOptions& file_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer); + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer); // No copying allowed VersionSet(const VersionSet&) = delete; void operator=(const VersionSet&) = delete; @@ -1277,7 +1278,7 @@ class VersionSet { std::unique_ptr column_family_set_; Env* const env_; - FileSystem* const fs_; + FileSystemPtr const fs_; const std::string dbname_; std::string db_id_; const ImmutableDBOptions* const db_options_; @@ -1330,6 +1331,8 @@ class VersionSet { // Store the IO status when Manifest is written IOStatus io_status_; + std::shared_ptr io_tracer_; + private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, @@ -1352,7 +1355,8 @@ class ReactiveVersionSet : public VersionSet { const ImmutableDBOptions* _db_options, const FileOptions& _file_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller); + WriteController* write_controller, + const std::shared_ptr& io_tracer); ~ReactiveVersionSet() override; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index b6d265b22..8f2134dce 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -726,13 +726,13 @@ class VersionSetTestBase { db_options_.env = env_; db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); reactive_versions_ = std::make_shared( dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_); + &write_buffer_manager_, &write_controller_, nullptr); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); } diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 7d2cd2b6a..f4f3c80a4 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -121,7 +121,7 @@ Status WalManager::GetUpdatesSince( } iter->reset(new TransactionLogIteratorImpl( db_options_.wal_dir, &db_options_, read_options, file_options_, seq, - std::move(wal_files), version_set, seq_per_batch_)); + std::move(wal_files), version_set, seq_per_batch_, io_tracer_)); return (*iter)->status(); } diff --git a/db/wal_manager.h b/db/wal_manager.h index 783bfe99c..7589bdeb5 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -36,14 +36,17 @@ namespace ROCKSDB_NAMESPACE { class WalManager { public: WalManager(const ImmutableDBOptions& db_options, - const FileOptions& file_options, const bool seq_per_batch = false) + const FileOptions& file_options, + const std::shared_ptr& io_tracer, + const bool seq_per_batch = false) : db_options_(db_options), file_options_(file_options), env_(db_options.env), - fs_(db_options.fs.get()), + fs_(db_options.fs, io_tracer), purge_wal_files_last_run_(0), seq_per_batch_(seq_per_batch), - wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {} + wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)), + io_tracer_(io_tracer) {} Status GetSortedWalFiles(VectorLogPtr& files); @@ -91,7 +94,7 @@ class WalManager { const ImmutableDBOptions& db_options_; const FileOptions file_options_; Env* env_; - FileSystem* fs_; + const FileSystemPtr fs_; // ------- WalManager state ------- // cache for ReadFirstRecord() calls @@ -108,6 +111,8 @@ class WalManager { // obsolete files will be deleted every this seconds if ttl deletion is // enabled and archive size_limit is disabled. static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; + + std::shared_ptr io_tracer_; }; #endif // ROCKSDB_LITE diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 26bad368e..87c168b70 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -50,16 +50,18 @@ class WalManagerTest : public testing::Test { fs_.reset(new LegacyFileSystemWrapper(env_.get())); db_options_.fs = fs_; - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr)); + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); - wal_manager_.reset(new WalManager(db_options_, env_options_)); + wal_manager_.reset( + new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); } void Reopen() { - wal_manager_.reset(new WalManager(db_options_, env_options_)); + wal_manager_.reset( + new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); } // NOT thread safe diff --git a/env/file_system_tracer.cc b/env/file_system_tracer.cc index 55948f8cb..15b65870e 100644 --- a/env/file_system_tracer.cc +++ b/env/file_system_tracer.cc @@ -16,8 +16,8 @@ IOStatus FileSystemTracingWrapper::NewWritableFile( timer.Start(); IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg); uint64_t elapsed = timer.ElapsedNanos(); - IOTraceRecord io_record(elapsed, TraceType::kIOFileName, __func__, elapsed, - s.ToString(), fname); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), fname); io_tracer_->WriteIOOp(io_record); return s; } diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index 6899549b8..9a0fd18f1 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -66,11 +66,10 @@ class FileSystemTracingWrapper : public FileSystemWrapper { class FileSystemPtr { public: FileSystemPtr(std::shared_ptr fs, - std::shared_ptr io_tracer) - : fs_(fs), - io_tracer_(io_tracer), - fs_tracer_( - std::make_shared(fs_, io_tracer_)) {} + const std::shared_ptr& io_tracer) + : fs_(fs), io_tracer_(io_tracer) { + fs_tracer_ = std::make_shared(fs_, io_tracer_); + } std::shared_ptr operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { @@ -80,6 +79,15 @@ class FileSystemPtr { } } + /* Returns the underlying File System pointer */ + FileSystem* get() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_.get(); + } else { + return fs_.get(); + } + } + private: std::shared_ptr fs_; std::shared_ptr io_tracer_; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 210669ec4..d5a8b5bbf 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1030,7 +1030,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), @@ -1172,7 +1172,7 @@ void GetLiveFilesChecksumInfoFromVersionSet(Options options, WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); std::vector cf_name_list; s = versions.ListColumnFamilies(&cf_name_list, db_path, immutable_db_options.fs.get()); @@ -1892,7 +1892,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, WriteController wc(opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index fce26aabb..504a5d72c 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -205,7 +205,7 @@ class FileChecksumTestHelper { WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, - &wc, nullptr); + &wc, nullptr, nullptr); std::vector cf_name_list; Status s; s = versions.ListColumnFamilies(&cf_name_list, dbname_,