From a776406de30886d03e550f793000a4856b66ca4b Mon Sep 17 00:00:00 2001 From: Stefan Roesch Date: Tue, 5 Oct 2021 10:50:27 -0700 Subject: [PATCH] Add file operation callbacks to SequentialFileReader (#8982) Summary: This change adds File IO Notifications to the SequentialFileReader The SequentialFileReader is extended with a listener parameter. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8982 Test Plan: A new test EventListenerTest::OnWALOperationTest has been added. The test verifies that during restore the sequential file reader is called and the notifications are fired. Reviewed By: riversand963 Differential Revision: D31320844 Pulled By: shrfb fbshipit-source-id: 040b24da7c010d7c14ebb5c6460fae9a19b8c168 --- db/listener_test.cc | 29 +++++++++++++++++++ db/transaction_log_impl.cc | 4 +-- db/version_set.cc | 18 ++++++------ file/sequence_file_reader.cc | 28 +++++++++++++++++++ file/sequence_file_reader.h | 54 +++++++++++++++++++++++++++++++++--- 5 files changed, 118 insertions(+), 15 deletions(-) diff --git a/db/listener_test.cc b/db/listener_test.cc index e3839fe77..19ec40cf5 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -1000,6 +1000,7 @@ class TestFileOperationListener : public EventListener { file_syncs_success_.store(0); file_truncates_.store(0); file_truncates_success_.store(0); + file_seq_reads_.store(0); blob_file_reads_.store(0); blob_file_writes_.store(0); blob_file_flushes_.store(0); @@ -1013,6 +1014,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_reads_success_; } + if (info.path.find("MANIFEST") != std::string::npos) { + ++file_seq_reads_; + } if (EndsWith(info.path, ".blob")) { ++blob_file_reads_; } @@ -1088,6 +1092,7 @@ class TestFileOperationListener : public EventListener { std::atomic file_syncs_success_; std::atomic file_truncates_; std::atomic file_truncates_success_; + std::atomic file_seq_reads_; std::atomic blob_file_reads_; std::atomic blob_file_writes_; std::atomic blob_file_flushes_; @@ -1183,6 +1188,30 @@ TEST_F(EventListenerTest, OnBlobFileOperationTest) { } } +TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + + options.use_direct_io_for_flush_and_compaction = false; + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + options.use_direct_io_for_flush_and_compaction = false; + } else { + ASSERT_OK(s); + } + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "aaa")); + Close(); + + size_t seq_reads = listener->file_seq_reads_.load(); + Reopen(options); + ASSERT_GT(listener->file_seq_reads_.load(), seq_reads); +} + class BlobDBJobLevelEventListenerTest : public EventListener { public: explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test) diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index ba4c65ff9..e1e168597 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -63,8 +63,8 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } if (s.ok()) { - file_reader->reset( - new SequentialFileReader(std::move(file), fname, io_tracer_)); + file_reader->reset(new SequentialFileReader( + std::move(file), fname, io_tracer_, options_->listeners)); } return s; } diff --git a/db/version_set.cc b/db/version_set.cc index e0b38ef6f..c8cbe3bb4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4746,9 +4746,9 @@ Status VersionSet::Recover( if (!s.ok()) { return s; } - manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size, io_tracer_)); + manifest_file_reader.reset(new SequentialFileReader( + std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_, db_options_->listeners)); } uint64_t current_manifest_file_size = 0; uint64_t log_number = 0; @@ -4918,9 +4918,9 @@ Status VersionSet::TryRecoverFromOneManifest( if (!s.ok()) { return s; } - manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size, io_tracer_)); + manifest_file_reader.reset(new SequentialFileReader( + std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_, db_options_->listeners)); } assert(s.ok()); @@ -5950,9 +5950,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest( &manifest_file, nullptr); std::unique_ptr manifest_file_reader; if (s.ok()) { - manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size, io_tracer_)); + manifest_file_reader.reset(new SequentialFileReader( + std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_, db_options_->listeners)); manifest_reader->reset(new log::FragmentBufferedReader( nullptr, std::move(manifest_file_reader), reporter, true /* checksum */, 0 /* log_number */)); diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc index 614dbb411..6a89d41d5 100644 --- a/file/sequence_file_reader.cc +++ b/file/sequence_file_reader.cc @@ -47,7 +47,14 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { AlignedBuffer buf; buf.Alignment(alignment); buf.AllocateNewBuffer(size); + Slice tmp; + uint64_t orig_offset = 0; + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + orig_offset = aligned_offset + buf.CurrentSize(); + start_ts = FileOperationInfo::StartNow(); + } io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp, buf.BufferStart(), nullptr); if (io_s.ok() && offset_advance < tmp.size()) { @@ -56,6 +63,11 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { std::min(tmp.size() - offset_advance, n)); } *result = Slice(scratch, r); + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + io_s); + } #endif // !ROCKSDB_LITE } else { // To be paranoid, modify scratch a little bit, so in case underlying @@ -66,7 +78,23 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { if (n > 0 && scratch != nullptr) { scratch[0]++; } + +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } +#endif + io_s = file_->Read(n, IOOptions(), result, scratch, nullptr); + +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + size_t offset = offset_.fetch_add(result->size()); + NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s); + } +#endif } IOSTATS_ADD(bytes_read, result->size()); return io_s; diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index 72fe37f7d..c4c6e5b5d 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -23,24 +23,70 @@ namespace ROCKSDB_NAMESPACE { // cache disabled) reads appropriately, and also updates the IO stats. class SequentialFileReader { private: +#ifndef ROCKSDB_LITE + void NotifyOnFileReadFinish( + uint64_t offset, size_t length, + const FileOperationInfo::StartTimePoint& start_ts, + const FileOperationInfo::FinishTimePoint& finish_ts, + const Status& status) const { + FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts, + finish_ts, status); + info.offset = offset; + info.length = length; + + for (auto& listener : listeners_) { + listener->OnFileReadFinish(info); + } + } + + void AddFileIOListeners( + const std::vector>& listeners) { + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { + listeners_.emplace_back(e); + } + }); + } +#endif // ROCKSDB_LITE + + bool ShouldNotifyListeners() const { return !listeners_.empty(); } + std::string file_name_; FSSequentialFilePtr file_; std::atomic offset_{0}; // read offset + std::vector> listeners_{}; public: explicit SequentialFileReader( std::unique_ptr&& _file, const std::string& _file_name, - const std::shared_ptr& io_tracer = nullptr) + const std::shared_ptr& io_tracer = nullptr, + const std::vector>& listeners = {}) : file_name_(_file_name), - file_(std::move(_file), io_tracer, _file_name) {} + file_(std::move(_file), io_tracer, _file_name), + listeners_() { +#ifndef ROCKSDB_LITE + AddFileIOListeners(listeners); +#else + (void)listeners; +#endif + } explicit SequentialFileReader( std::unique_ptr&& _file, const std::string& _file_name, size_t _readahead_size, - const std::shared_ptr& io_tracer = nullptr) + const std::shared_ptr& io_tracer = nullptr, + const std::vector>& listeners = {}) : file_name_(_file_name), file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), - io_tracer, _file_name) {} + io_tracer, _file_name), + listeners_() { +#ifndef ROCKSDB_LITE + AddFileIOListeners(listeners); +#else + (void)listeners; +#endif + } static IOStatus Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader,