From 8e0df9050c6c8cadf58920dd35c3826e891e7725 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 27 Aug 2020 11:20:08 -0700 Subject: [PATCH] Store FSRandomAccessPtr object in RandomAccessFileReader (#7192) Summary: Replace FSRandomAccessFile pointer with FSRandomAccessFilePtr object in RandomAccessFileReader. This new object wraps FSRandomAccessFile pointer. Objective: If tracing is enabled, FSRandomAccessFile Ptr returns FSRandomAccessFileTracingWrapper pointer that includes all necessary information in IORecord and calls underlying FileSystem and invokes IOTracer to dump that record in a binary file. If tracing is disabled then, underlying FileSystem pointer is returned directly. FSRandomAccessFilePtr wrapper class is added to bypass the FSRandomAccessFileWrapper when tracing is disabled. Test Plan: make check -j64 Pull Request resolved: https://github.com/facebook/rocksdb/pull/7192 Reviewed By: anand1976 Differential Revision: D23356867 Pulled By: akankshamahajan15 fbshipit-source-id: 48f31168166a17a7444b40be44a9a9d4a5c7182c --- db/column_family.cc | 15 ++++++---- db/column_family.h | 7 +++-- db/external_sst_file_ingestion_job.cc | 8 +++--- db/import_column_family_job.cc | 4 +-- db/repair.cc | 6 ++-- db/table_cache.cc | 8 ++++-- db/table_cache.h | 4 ++- db/version_set.cc | 17 +++++------ env/file_system_tracer.h | 41 +++++++++++++++------------ file/file_util.cc | 6 ++-- file/file_util.h | 3 +- file/random_access_file_reader.h | 23 ++++----------- file/sequence_file_reader.h | 9 ------ 13 files changed, 74 insertions(+), 77 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index c167dcf29..d6b9c69b9 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -499,7 +499,8 @@ ColumnFamilyData::ColumnFamilyData( Cache* _table_cache, WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, const FileOptions& file_options, ColumnFamilySet* column_family_set, - BlockCacheTracer* const block_cache_tracer) + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) : id_(id), name_(name), dummy_versions_(_dummy_versions), @@ -557,7 +558,7 @@ ColumnFamilyData::ColumnFamilyData( internal_stats_.reset( new InternalStats(ioptions_.num_levels, db_options.env, this)); table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache, - block_cache_tracer)); + block_cache_tracer, io_tracer)); if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(ioptions_, &internal_comparator_)); @@ -1415,12 +1416,13 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, Cache* table_cache, WriteBufferManager* _write_buffer_manager, WriteController* _write_controller, - BlockCacheTracer* const block_cache_tracer) + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData( ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr, - block_cache_tracer)), + block_cache_tracer, io_tracer)), default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), @@ -1428,7 +1430,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, table_cache_(table_cache), write_buffer_manager_(_write_buffer_manager), write_controller_(_write_controller), - block_cache_tracer_(block_cache_tracer) { + block_cache_tracer_(block_cache_tracer), + io_tracer_(io_tracer) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; dummy_cfd_->next_ = dummy_cfd_; @@ -1494,7 +1497,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( assert(column_families_.find(name) == column_families_.end()); ColumnFamilyData* new_cfd = new ColumnFamilyData( id, name, dummy_versions, table_cache_, write_buffer_manager_, options, - *db_options_, file_options_, this, block_cache_tracer_); + *db_options_, file_options_, this, block_cache_tracer_, io_tracer_); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); diff --git a/db/column_family.h b/db/column_family.h index b3cb8482e..0a251e545 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -518,7 +518,8 @@ class ColumnFamilyData { const ImmutableDBOptions& db_options, const FileOptions& file_options, ColumnFamilySet* column_family_set, - BlockCacheTracer* const block_cache_tracer); + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer); std::vector GetDbPaths() const; @@ -651,7 +652,8 @@ class ColumnFamilySet { const FileOptions& file_options, Cache* table_cache, WriteBufferManager* _write_buffer_manager, WriteController* _write_controller, - BlockCacheTracer* const block_cache_tracer); + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -715,6 +717,7 @@ class ColumnFamilySet { WriteBufferManager* write_buffer_manager_; WriteController* write_controller_; BlockCacheTracer* const block_cache_tracer_; + std::shared_ptr io_tracer_; }; // We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index fad62004b..a25e83f6e 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -198,7 +198,7 @@ Status ExternalSstFileIngestionJob::Prepare( db_options_.file_checksum_gen_factory.get(), &generated_checksum, &generated_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, - db_options_.allow_mmap_reads); + db_options_.allow_mmap_reads, io_tracer_); if (!io_s.ok()) { status = io_s; ROCKS_LOG_WARN(db_options_.info_log, @@ -509,8 +509,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( if (!status.ok()) { return status; } - sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), - external_file)); + sst_file_reader.reset(new RandomAccessFileReader( + std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_)); status = cfd_->ioptions()->table_factory->NewTableReader( TableReaderOptions(*cfd_->ioptions(), @@ -835,7 +835,7 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( db_options_.file_checksum_gen_factory.get(), &file_checksum, &file_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, - db_options_.allow_mmap_reads); + db_options_.allow_mmap_reads, io_tracer_); if (!io_s.ok()) { return io_s; } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index 593b8cbe7..6dcfc8874 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -217,8 +217,8 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo( if (!status.ok()) { return status; } - sst_file_reader.reset( - new RandomAccessFileReader(std::move(sst_file), external_file)); + sst_file_reader.reset(new RandomAccessFileReader( + std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_)); status = cfd_->ioptions()->table_factory->NewTableReader( TableReaderOptions(*cfd_->ioptions(), diff --git a/db/repair.cc b/db/repair.cc index 9fb7ff225..01ad80206 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -110,9 +110,9 @@ class Repairer { // TableCache can be small since we expect each table to be opened // once. NewLRUCache(10, db_options_.table_cache_numshardbits)), - table_cache_(new TableCache(default_cf_iopts_, env_options_, - raw_table_cache_.get(), - /*block_cache_tracer=*/nullptr)), + table_cache_(new TableCache( + default_cf_iopts_, env_options_, raw_table_cache_.get(), + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)), wb_(db_options_.db_write_buffer_size), wc_(db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, env_options_, diff --git a/db/table_cache.cc b/db/table_cache.cc index 1ce82c1a4..0a1076e2c 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -67,13 +67,15 @@ const int kLoadConcurency = 128; TableCache::TableCache(const ImmutableCFOptions& ioptions, const FileOptions& file_options, Cache* const cache, - BlockCacheTracer* const block_cache_tracer) + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) : ioptions_(ioptions), file_options_(file_options), cache_(cache), immortal_tables_(false), block_cache_tracer_(block_cache_tracer), - loader_mutex_(kLoadConcurency, GetSliceNPHash64) { + loader_mutex_(kLoadConcurency, GetSliceNPHash64), + io_tracer_(io_tracer) { if (ioptions_.row_cache) { // If the same cache is shared by multiple instances, we need to // disambiguate its entries. @@ -126,7 +128,7 @@ Status TableCache::GetTableReader( StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); std::unique_ptr file_reader( new RandomAccessFileReader( - std::move(file), fname, ioptions_.env, + std::move(file), fname, ioptions_.env, io_tracer_, record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS, file_read_hist, ioptions_.rate_limiter, ioptions_.listeners)); s = ioptions_.table_factory->NewTableReader( diff --git a/db/table_cache.h b/db/table_cache.h index 2fdb251ac..a834683fc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -50,7 +50,8 @@ class TableCache { public: TableCache(const ImmutableCFOptions& ioptions, const FileOptions& storage_options, Cache* cache, - BlockCacheTracer* const block_cache_tracer); + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -226,6 +227,7 @@ class TableCache { bool immortal_tables_; BlockCacheTracer* const block_cache_tracer_; Striped loader_mutex_; + std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index d9389e6f2..1a0793e2d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1284,8 +1284,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, // pass the magic number check in the footer. std::unique_ptr file_reader( new RandomAccessFileReader( - std::move(file), file_name, nullptr /* env */, nullptr /* stats */, - 0 /* hist_type */, nullptr /* file_read_hist */, + std::move(file), file_name, nullptr /* env */, nullptr /* IOTracer */, + nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */, nullptr /* rate_limiter */, ioptions->listeners)); s = ReadTableProperties( file_reader.get(), file_meta->fd.GetFileSize(), @@ -3618,9 +3618,10 @@ VersionSet::VersionSet(const std::string& dbname, WriteController* write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer) - : column_family_set_(new ColumnFamilySet( - dbname, _db_options, storage_options, table_cache, - write_buffer_manager, write_controller, block_cache_tracer)), + : column_family_set_( + new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, + write_buffer_manager, write_controller, + block_cache_tracer, io_tracer)), env_(_db_options->env), fs_(_db_options->fs, io_tracer), dbname_(dbname), @@ -3660,9 +3661,9 @@ void VersionSet::Reset() { Cache* table_cache = column_family_set_->get_table_cache(); WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); WriteController* wc = column_family_set_->write_controller(); - column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_, - file_options_, table_cache, - wbm, wc, block_cache_tracer_)); + column_family_set_.reset( + new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache, + wbm, wc, block_cache_tracer_, io_tracer_)); } db_id_.clear(); next_file_number_.store(2); diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h index 8e91692cc..998deadc6 100644 --- a/env/file_system_tracer.h +++ b/env/file_system_tracer.h @@ -131,16 +131,16 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper { // FSSequentialFileTracingWrapper when tracing is disabled. class FSSequentialFilePtr { public: - FSSequentialFilePtr() {} + FSSequentialFilePtr() = delete; FSSequentialFilePtr(std::unique_ptr&& fs, const std::shared_ptr& io_tracer) - : fs_(std::move(fs)), io_tracer_(io_tracer) { - fs_tracer_.reset(new FSSequentialFileTracingWrapper(fs_.get(), io_tracer_)); - } + : fs_(std::move(fs)), + io_tracer_(io_tracer), + fs_tracer_(fs_.get(), io_tracer_) {} FSSequentialFile* operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { - return fs_tracer_.get(); + return const_cast(&fs_tracer_); } else { return fs_.get(); } @@ -148,7 +148,7 @@ class FSSequentialFilePtr { FSSequentialFile* get() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { - return fs_tracer_.get(); + return const_cast(&fs_tracer_); } else { return fs_.get(); } @@ -157,7 +157,7 @@ class FSSequentialFilePtr { private: std::unique_ptr fs_; std::shared_ptr io_tracer_; - std::unique_ptr fs_tracer_; + FSSequentialFileTracingWrapper fs_tracer_; }; // FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile @@ -200,27 +200,32 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper { // FSRandomAccessFileTracingWrapper when tracing is disabled. class FSRandomAccessFilePtr { public: - FSRandomAccessFilePtr(FSRandomAccessFile* fs, - std::shared_ptr io_tracer) - : fs_(fs), + FSRandomAccessFilePtr(std::unique_ptr&& fs, + const std::shared_ptr& io_tracer) + : fs_(std::move(fs)), io_tracer_(io_tracer), - fs_tracer_(new FSRandomAccessFileTracingWrapper(fs_, io_tracer_)) {} - - explicit FSRandomAccessFilePtr(FSRandomAccessFile* fs) - : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + fs_tracer_(fs_.get(), io_tracer_) {} FSRandomAccessFile* operator->() const { if (io_tracer_ && io_tracer_->is_tracing_enabled()) { - return fs_tracer_; + return const_cast(&fs_tracer_); } else { - return fs_; + return fs_.get(); + } + } + + FSRandomAccessFile* get() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return const_cast(&fs_tracer_); + } else { + return fs_.get(); } } private: - FSRandomAccessFile* fs_; + std::unique_ptr fs_; std::shared_ptr io_tracer_; - FSRandomAccessFileTracingWrapper* fs_tracer_; + FSRandomAccessFileTracingWrapper fs_tracer_; }; // FSWritableFileTracingWrapper is a wrapper class above FSWritableFile that diff --git a/file/file_util.cc b/file/file_util.cc index 9953c8d51..647043f7e 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -128,7 +128,8 @@ IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path, std::string* file_checksum, std::string* file_checksum_func_name, size_t verify_checksums_readahead_size, - bool allow_mmap_reads) { + bool allow_mmap_reads, + std::shared_ptr& io_tracer) { if (checksum_factory == nullptr) { return IOStatus::InvalidArgument("Checksum factory is invalid"); } @@ -151,7 +152,8 @@ IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path, if (!io_s.ok()) { return io_s; } - reader.reset(new RandomAccessFileReader(std::move(r_file), file_path)); + reader.reset(new RandomAccessFileReader(std::move(r_file), file_path, + nullptr /*Env*/, io_tracer)); } // Found that 256 KB readahead size provides the best performance, based on diff --git a/file/file_util.h b/file/file_util.h index 21dee9e93..9aa86ebda 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -37,7 +37,8 @@ extern IOStatus GenerateOneFileChecksum( FileSystem* fs, const std::string& file_path, FileChecksumGenFactory* checksum_factory, std::string* file_checksum, std::string* file_checksum_func_name, - size_t verify_checksums_readahead_size, bool allow_mmap_reads); + size_t verify_checksums_readahead_size, bool allow_mmap_reads, + std::shared_ptr& io_tracer); inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env, IOOptions& opts) { diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index b1620e368..a0f7a1917 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -12,6 +12,7 @@ #include #include +#include "env/file_system_tracer.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" @@ -64,7 +65,7 @@ class RandomAccessFileReader { bool ShouldNotifyListeners() const { return !listeners_.empty(); } - std::unique_ptr file_; + FSRandomAccessFilePtr file_; std::string file_name_; Env* env_; Statistics* stats_; @@ -76,11 +77,12 @@ class RandomAccessFileReader { public: explicit RandomAccessFileReader( std::unique_ptr&& raf, const std::string& _file_name, - Env* _env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, + Env* _env = nullptr, const std::shared_ptr& io_tracer = nullptr, + Statistics* stats = nullptr, uint32_t hist_type = 0, HistogramImpl* file_read_hist = nullptr, RateLimiter* rate_limiter = nullptr, const std::vector>& listeners = {}) - : file_(std::move(raf)), + : file_(std::move(raf), io_tracer), file_name_(std::move(_file_name)), env_(_env), stats_(stats), @@ -100,21 +102,6 @@ class RandomAccessFileReader { #endif } - RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { - *this = std::move(o); - } - - RandomAccessFileReader& operator=(RandomAccessFileReader&& o) - ROCKSDB_NOEXCEPT { - file_ = std::move(o.file_); - env_ = std::move(o.env_); - stats_ = std::move(o.stats_); - hist_type_ = std::move(o.hist_type_); - file_read_hist_ = std::move(o.file_read_hist_); - rate_limiter_ = std::move(o.rate_limiter_); - return *this; - } - RandomAccessFileReader(const RandomAccessFileReader&) = delete; RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index cd41f187b..139e5255f 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -41,15 +41,6 @@ class SequentialFileReader { file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), io_tracer) {} - SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { - *this = std::move(o); - } - - SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { - file_ = std::move(o.file_); - return *this; - } - SequentialFileReader(const SequentialFileReader&) = delete; SequentialFileReader& operator=(const SequentialFileReader&) = delete;