From b175eceb09c119588dd22368a7319eb80e865a57 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Tue, 8 Sep 2020 10:49:01 -0700 Subject: [PATCH] Store FSWritableFilePtr object in WritableFileWriter (#7193) Summary: Replace FSWritableFile pointer with FSWritableFilePtr object in WritableFileWriter. This new object wraps FSWritableFile pointer. Objective: If tracing is enabled, FSWritableFile Ptr returns FSWritableFileTracingWrapper pointer that includes all necessary information in IORecord and calls underlying FileSystem and invokes IOTracer to dump that record in a binary file. If tracing is disabled then, underlying FileSystem pointer is returned directly. FSWritableFilePtr wrapper class is added to bypass the FSWritableFileWrapper when tracing is disabled. Test Plan: make check -j64 Pull Request resolved: https://github.com/facebook/rocksdb/pull/7193 Reviewed By: anand1976 Differential Revision: D23355915 Pulled By: akankshamahajan15 fbshipit-source-id: e62a27a13c1fd77e36a6dbafc7006d969bed25cf --- db/blob/blob_file_builder.cc | 8 ++-- db/builder.cc | 8 ++-- db/builder.h | 3 +- db/compaction/compaction_job.cc | 9 ++-- db/compaction/compaction_job.h | 1 + db/db_impl/db_impl_compaction_flush.cc | 4 +- db/db_impl/db_impl_open.cc | 17 +++---- db/flush_job.cc | 37 +++++++-------- db/flush_job.h | 5 +- db/flush_job_test.cc | 66 +++++++++++++------------- db/repair.cc | 10 ++-- db/version_edit_handler.cc | 15 +++--- db/version_edit_handler.h | 6 ++- db/version_set.cc | 26 +++++----- db/version_set.h | 7 ++- env/file_system_tracer.h | 34 ++++++++----- file/writable_file_writer.cc | 2 +- file/writable_file_writer.h | 9 ++-- table/sst_file_writer.cc | 5 +- 19 files changed, 153 insertions(+), 119 deletions(-) diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 2cc653f9c..4f84dd360 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -167,10 +167,10 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { Statistics* const statistics = immutable_cf_options_->statistics; - std::unique_ptr file_writer( - new WritableFileWriter(std::move(file), blob_file_path, *file_options_, - env_, statistics, immutable_cf_options_->listeners, - immutable_cf_options_->file_checksum_gen_factory)); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(file), blob_file_path, *file_options_, env_, + nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, + immutable_cf_options_->file_checksum_gen_factory)); std::unique_ptr blob_log_writer( new BlobLogWriter(std::move(file_writer), env_, statistics, diff --git a/db/builder.cc b/db/builder.cc index 8507135a8..9dfdf4217 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -83,7 +83,8 @@ Status BuildTable( uint64_t sample_for_compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, IOStatus* io_status, - EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, + const std::shared_ptr& io_tracer, EventLogger* event_logger, + int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, int level, const uint64_t creation_time, const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time, const std::string& db_id, @@ -143,8 +144,9 @@ Status BuildTable( file->SetWriteLifeTimeHint(write_hint); file_writer.reset(new WritableFileWriter( - std::move(file), fname, file_options, env, ioptions.statistics, - ioptions.listeners, ioptions.file_checksum_gen_factory)); + std::move(file), fname, file_options, env, io_tracer, + ioptions.statistics, ioptions.listeners, + ioptions.file_checksum_gen_factory)); builder = NewTableBuilder( ioptions, mutable_cf_options, internal_comparator, diff --git a/db/builder.h b/db/builder.h index 4b960c0b1..bcabb7935 100644 --- a/db/builder.h +++ b/db/builder.h @@ -79,7 +79,8 @@ extern Status BuildTable( const uint64_t sample_for_compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, - IOStatus* io_status, EventLogger* event_logger = nullptr, int job_id = 0, + IOStatus* io_status, const std::shared_ptr& io_tracer, + EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 3f477300f..603a0c981 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -303,6 +303,7 @@ CompactionJob::CompactionJob( db_options_(db_options), file_options_(file_options), env_(db_options.env), + io_tracer_(io_tracer), fs_(db_options.fs, io_tracer), file_options_for_read_( fs_->OptimizeForCompactionTableRead(file_options, db_options_)), @@ -1592,10 +1593,10 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->compaction->OutputFilePreallocationSize())); const auto& listeners = sub_compact->compaction->immutable_cf_options()->listeners; - sub_compact->outfile.reset( - new WritableFileWriter(std::move(writable_file), fname, file_options_, - env_, db_options_.statistics.get(), listeners, - db_options_.file_checksum_gen_factory.get())); + sub_compact->outfile.reset(new WritableFileWriter( + std::move(writable_file), fname, file_options_, env_, io_tracer_, + db_options_.statistics.get(), listeners, + db_options_.file_checksum_gen_factory.get())); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 14c28b5f5..aafad8d3a 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -158,6 +158,7 @@ class CompactionJob { const FileOptions file_options_; Env* env_; + std::shared_ptr io_tracer_; FileSystemPtr fs_; // env_option optimized for compaction table reads FileOptions file_options_for_read_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 73f6f79c0..d5dea4323 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -154,7 +154,7 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, - db_id_, db_session_id_); + io_tracer_, db_id_, db_session_id_); FileMetaData file_meta; TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); @@ -359,7 +359,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri, db_id_, db_session_id_)); + thread_pri, io_tracer_, db_id_, db_session_id_)); jobs.back()->PickMemTable(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 2e89316be..307ce99e1 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -290,8 +290,8 @@ Status DBImpl::NewDB(std::vector* new_filenames) { file->SetPreallocationBlockSize( immutable_db_options_.manifest_preallocation_size); std::unique_ptr file_writer(new WritableFileWriter( - std::move(file), manifest, file_options, env_, nullptr /* stats */, - immutable_db_options_.listeners)); + std::move(file), manifest, file_options, env_, io_tracer_, + nullptr /* stats */, immutable_db_options_.listeners)); log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); @@ -1330,9 +1330,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, mutable_cf_options.sample_for_compression, mutable_cf_options.compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s, - &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, - -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint, - 0 /* file_creation_time */, db_id_, db_session_id_); + io_tracer_, &event_logger_, job_id, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time, + 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */, + db_id_, db_session_id_); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" @@ -1436,9 +1437,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, lfile->SetPreallocationBlockSize(preallocate_block_size); const auto& listeners = immutable_db_options_.listeners; - std::unique_ptr file_writer( - new WritableFileWriter(std::move(lfile), log_fname, opt_file_options, - env_, nullptr /* stats */, listeners)); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(lfile), log_fname, opt_file_options, env_, io_tracer_, + nullptr /* stats */, listeners)); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush); diff --git a/db/flush_job.cc b/db/flush_job.cc index 31fdaba15..b0e316a1e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -82,23 +82,21 @@ const char* GetFlushReasonString (FlushReason flush_reason) { } } -FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, - const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, - const uint64_t* max_memtable_id, - const FileOptions& file_options, VersionSet* versions, - InstrumentedMutex* db_mutex, - std::atomic* shutting_down, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_file_directory, - CompressionType output_compression, Statistics* stats, - EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest, - Env::Priority thread_pri, const std::string& db_id, - const std::string& db_session_id) +FlushJob::FlushJob( + const std::string& dbname, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, + const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id, + const FileOptions& file_options, VersionSet* versions, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, CompressionType output_compression, + Statistics* stats, EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), @@ -126,7 +124,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, edit_(nullptr), base_(nullptr), pick_memtable_called(false), - thread_pri_(thread_pri) { + thread_pri_(thread_pri), + io_tracer_(io_tracer) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -400,7 +399,7 @@ Status FlushJob::WriteLevel0Table() { output_compression_, mutable_cf_options_.sample_for_compression, mutable_cf_options_.compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - TableFileCreationReason::kFlush, &io_s, event_logger_, + TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, creation_time, oldest_key_time, write_hint, current_time, db_id_, db_session_id_); diff --git a/db/flush_job.h b/db/flush_job.h index 980d92eb4..b724b2464 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -72,7 +72,8 @@ class FlushJob { CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, - Env::Priority thread_pri, const std::string& db_id = "", + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::string& db_id = "", const std::string& db_session_id = ""); ~FlushJob(); @@ -161,6 +162,8 @@ class FlushJob { bool pick_memtable_called; Env::Priority thread_pri_; IOStatus io_status_; + + const std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 4b855eef2..3f41e4e6a 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -135,14 +135,14 @@ TEST_F(FlushJobTest, Empty) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), - &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger, false, - true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, {}, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger, false, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -216,14 +216,14 @@ TEST_F(FlushJobTest, NonEmpty) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), - &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, - kNoCompression, db_options_.statistics.get(), - &event_logger, true, true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, {}, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); HistogramData hist; FileMetaData file_meta; @@ -278,14 +278,14 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { assert(memtable_ids.size() == num_mems); uint64_t smallest_memtable_id = memtable_ids.front(); uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - &flush_memtable_id, env_options_, versions_.get(), &mutex_, - &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, - &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); HistogramData hist; FileMetaData file_meta; mutex_.Lock(); @@ -357,7 +357,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, false /* sync_output_directory */, false /* write_manifest */, - Env::Priority::USER)); + Env::Priority::USER, nullptr /*IOTracer*/)); k++; } HistogramData hist; @@ -466,14 +466,14 @@ TEST_F(FlushJobTest, Snapshots) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), - &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, - kNoCompression, db_options_.statistics.get(), - &event_logger, true, true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); diff --git a/db/repair.cc b/db/repair.cc index 01ad80206..259892698 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -436,11 +436,11 @@ class Repairer { cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, 0 /* sample_for_compression */, CompressionOptions(), false, nullptr /* internal_stats */, - TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */, - 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, - -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint, - 0 /* file_creation_time */, "DB Repairer" /* db_id */, - db_session_id_); + TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time, + 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */, + "DB Repairer" /* db_id */, db_session_id_); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 888e94101..75fe107c5 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -16,14 +16,16 @@ namespace ROCKSDB_NAMESPACE { VersionEditHandler::VersionEditHandler( bool read_only, const std::vector& column_families, VersionSet* version_set, bool track_missing_files, - bool no_error_if_table_files_missing) + bool no_error_if_table_files_missing, + const std::shared_ptr& io_tracer) : read_only_(read_only), column_families_(column_families), status_(), version_set_(version_set), track_missing_files_(track_missing_files), no_error_if_table_files_missing_(no_error_if_table_files_missing), - initialized_(false) { + initialized_(false), + io_tracer_(io_tracer) { assert(version_set_ != nullptr); } @@ -390,7 +392,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, assert(builder_iter != builders_.end()); auto* builder = builder_iter->second->version_builder(); auto* v = new Version(cfd, version_set_, version_set_->file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, version_set_->current_version_number_++); s = builder->SaveTo(v->storage_info()); if (s.ok()) { @@ -485,10 +487,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( bool read_only, const std::vector& column_families, - VersionSet* version_set) + VersionSet* version_set, const std::shared_ptr& io_tracer) : VersionEditHandler(read_only, column_families, version_set, /*track_missing_files=*/true, - /*no_error_if_table_files_missing=*/true) {} + /*no_error_if_table_files_missing=*/true, io_tracer), + io_tracer_(io_tracer) {} VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() { for (const auto& elem : versions_) { @@ -573,7 +576,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( assert(builder_iter != builders_.end()); auto* builder = builder_iter->second->version_builder(); auto* version = new Version(cfd, version_set_, version_set_->file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, version_set_->current_version_number_++); s = builder->SaveTo(version->storage_info()); if (s.ok()) { diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 3c239bdf7..da222a8f3 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -36,7 +36,7 @@ class VersionEditHandler { bool read_only, const std::vector& column_families, VersionSet* version_set, bool track_missing_files, - bool ignore_missing_files); + bool ignore_missing_files, const std::shared_ptr& io_tracer); virtual ~VersionEditHandler() {} @@ -95,6 +95,7 @@ class VersionEditHandler { const VersionEdit& edit); bool initialized_; + std::shared_ptr io_tracer_; }; // A class similar to its base class, i.e. VersionEditHandler. @@ -108,7 +109,7 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { VersionEditHandlerPointInTime( bool read_only, const std::vector& column_families, - VersionSet* version_set); + VersionSet* version_set, const std::shared_ptr& io_tracer); ~VersionEditHandlerPointInTime() override; protected: @@ -119,6 +120,7 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { private: std::unordered_map versions_; + std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 4530b689a..e94f5870d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1284,7 +1284,7 @@ Status Version::GetTableProperties(std::shared_ptr* tp, // pass the magic number check in the footer. std::unique_ptr file_reader( new RandomAccessFileReader( - std::move(file), file_name, nullptr /* env */, nullptr /* IOTracer */, + std::move(file), file_name, nullptr /* env */, io_tracer_, nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */, nullptr /* rate_limiter */, ioptions->listeners)); s = ReadTableProperties( @@ -1750,6 +1750,7 @@ VersionStorageInfo::VersionStorageInfo( Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, const FileOptions& file_opt, const MutableCFOptions mutable_cf_options, + const std::shared_ptr& io_tracer, uint64_t version_number) : env_(vset->env_), cfd_(column_family_data), @@ -1777,7 +1778,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, mutable_cf_options_(mutable_cf_options), max_file_size_for_l0_meta_pin_( MaxFileSizeForL0MetaPin(mutable_cf_options_)), - version_number_(version_number) {} + version_number_(version_number), + io_tracer_(io_tracer) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, std::string* timestamp, Status* status, @@ -3805,7 +3807,7 @@ Status VersionSet::ProcessManifestWrites( } if (version == nullptr) { version = new Version(last_writer->cfd, this, file_options_, - last_writer->mutable_cf_options, + last_writer->mutable_cf_options, io_tracer_, current_version_number_++); versions.push_back(version); mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); @@ -3962,7 +3964,7 @@ Status VersionSet::ProcessManifestWrites( std::unique_ptr file_writer(new WritableFileWriter( std::move(descriptor_file), descriptor_fname, opt_file_opts, env_, - nullptr, db_options_->listeners)); + io_tracer_, nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(), @@ -4687,7 +4689,7 @@ Status VersionSet::Recover( } Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); s = builder->SaveTo(v->storage_info()); if (!s.ok()) { @@ -4863,8 +4865,8 @@ Status VersionSet::TryRecoverFromOneManifest( reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, /*checksum=*/true, /*log_num=*/0); - VersionEditHandlerPointInTime handler_pit(read_only, column_families, - const_cast(this)); + VersionEditHandlerPointInTime handler_pit( + read_only, column_families, const_cast(this), io_tracer_); handler_pit.Iterate(reader, &s, db_id); @@ -5242,7 +5244,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, auto builder = builders_iter->second->version_builder(); Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); s = builder->SaveTo(v->storage_info()); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false); @@ -5881,7 +5883,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( MutableCFOptions dummy_cf_options; Version* dummy_versions = - new Version(nullptr, this, file_options_, dummy_cf_options); + new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_); // Ref() dummy version once so that later we can call Unref() to delete it // by avoiding calling "delete" explicitly (~Version is private) dummy_versions->Ref(); @@ -5890,7 +5892,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( cf_options); Version* v = new Version(new_cfd, this, file_options_, - *new_cfd->GetLatestMutableCFOptions(), + *new_cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); // Fill level target base information. @@ -6079,7 +6081,7 @@ Status ReactiveVersionSet::Recover( auto* builder = builders_iter->second->version_builder(); Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); s = builder->SaveTo(v->storage_info()); @@ -6314,7 +6316,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( if (s.ok()) { auto version = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); s = builder->SaveTo(version->storage_info()); if (s.ok()) { diff --git a/db/version_set.h b/db/version_set.h index 3f669576d..6d439e14e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -815,9 +815,12 @@ class Version { // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. uint64_t version_number_; + std::shared_ptr io_tracer_; Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt, - MutableCFOptions mutable_cf_options, uint64_t version_number = 0); + MutableCFOptions mutable_cf_options, + const std::shared_ptr& io_tracer, + uint64_t version_number = 0); ~Version(); @@ -1193,7 +1196,7 @@ class VersionSet { const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); Version* const version = - new Version(cfd, this, file_options_, mutable_cf_options); + new Version(cfd, this, file_options_, mutable_cf_options, io_tracer_); constexpr bool update_stats = false; version->PrepareApply(mutable_cf_options, update_stats); diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index 998deadc6..e2ade0967 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -270,26 +270,38 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper { // FSWritableFileTracingWrapper when tracing is disabled. class FSWritableFilePtr { public: - FSWritableFilePtr(FSWritableFile* fs, std::shared_ptr io_tracer) - : fs_(fs), - io_tracer_(io_tracer), - fs_tracer_(new FSWritableFileTracingWrapper(fs_, io_tracer_)) {} - - explicit FSWritableFilePtr(FSWritableFile* fs) - : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + FSWritableFilePtr(std::unique_ptr&& fs, + const std::shared_ptr& io_tracer) + : fs_(std::move(fs)), io_tracer_(io_tracer) { + fs_tracer_.reset(new FSWritableFileTracingWrapper(fs_.get(), io_tracer_)); + } FSWritableFile* operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { - return fs_tracer_; + return fs_tracer_.get(); } else { - return fs_; + return fs_.get(); } } + FSWritableFile* get() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_.get(); + } else { + return fs_.get(); + } + } + + void reset() { + fs_.reset(); + fs_tracer_.reset(); + io_tracer_ = nullptr; + } + private: - FSWritableFile* fs_; + std::unique_ptr fs_; std::shared_ptr io_tracer_; - FSWritableFileTracingWrapper* fs_tracer_; + std::unique_ptr fs_tracer_; }; // FSRandomRWFileTracingWrapper is a wrapper class above FSRandomRWFile that diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index f225ae3e2..eafd8b66a 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -129,7 +129,7 @@ IOStatus WritableFileWriter::Close() { // in __dtor, simply flushing is not enough // Windows when pre-allocating does not fill with zeros // also with unbuffered access we also set the end of data. - if (!writable_file_) { + if (writable_file_.get() == nullptr) { return s; } diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 971ad6e21..51fbcc04b 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -10,7 +10,9 @@ #pragma once #include #include + #include "db/version_edit.h" +#include "env/file_system_tracer.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_checksum.h" @@ -117,8 +119,8 @@ class WritableFileWriter { bool ShouldNotifyListeners() const { return !listeners_.empty(); } void UpdateFileChecksum(const Slice& data); - std::unique_ptr writable_file_; std::string file_name_; + FSWritableFilePtr writable_file_; Env* env_; AlignedBuffer buf_; size_t max_buffer_size_; @@ -144,11 +146,12 @@ class WritableFileWriter { WritableFileWriter( std::unique_ptr&& file, const std::string& _file_name, const FileOptions& options, Env* env = nullptr, + const std::shared_ptr& io_tracer = nullptr, Statistics* stats = nullptr, const std::vector>& listeners = {}, FileChecksumGenFactory* file_checksum_gen_factory = nullptr) - : writable_file_(std::move(file)), - file_name_(_file_name), + : file_name_(_file_name), + writable_file_(std::move(file), io_tracer), env_(env), buf_(), max_buffer_size_(options.writable_file_max_buffer_size), diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 2d0338d23..9d4ffd27d 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -255,8 +255,9 @@ Status SstFileWriter::Open(const std::string& file_path) { 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id); r->file_writer.reset(new WritableFileWriter( NewLegacyWritableFileWrapper(std::move(sst_file)), file_path, - r->env_options, r->ioptions.env, nullptr /* stats */, - r->ioptions.listeners, r->ioptions.file_checksum_gen_factory)); + r->env_options, r->ioptions.env, nullptr /* io_tracer */, + nullptr /* stats */, r->ioptions.listeners, + r->ioptions.file_checksum_gen_factory)); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful.