From 226d1f9c73443891d044f3697549923408fe26db Mon Sep 17 00:00:00 2001 From: wenh Date: Tue, 7 Jul 2020 18:19:32 -0700 Subject: [PATCH] extend listener callback functions to more file I/O operations (#7055) Summary: Currently, `EventListener` in listner.h only have callback functions for file read and write. One may favor extended callback functions for more file I/O operations like flush, sync and close. This PR tries to add those interface and have them called when appropriate throughout the code base. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7055 Test Plan: Write an experimental listener with those new callback functions with log output in them; run experiments and check logs to see those functions are actually called. Default test suits `make check` should also be included. Reviewed By: riversand963 Differential Revision: D22380624 Pulled By: roghnin fbshipit-source-id: 4121491d45c2c2aae8c255e7998090559a241c6a --- HISTORY.md | 1 + db/listener_test.cc | 68 +++++++++++++++++++++++ file/random_access_file_reader.h | 4 +- file/writable_file_writer.cc | 94 ++++++++++++++++++++++++++++++-- file/writable_file_writer.h | 63 ++++++++++++++++++++- include/rocksdb/listener.h | 45 +++++++++++++-- 6 files changed, 262 insertions(+), 13 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 0b011dc7e..9d084f6c6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Change * Encryption file classes now exposed for inheritance in env_encryption.h +* File I/O listener is extended to cover more I/O operations. Now class `EventListener` in listener.h contains new callback functions: `OnFileFlushFinish()`, `OnFileSyncFinish()`, `OnFileRangeSyncFinish()`, `OnFileTruncateFinish()`, and ``OnFileCloseFinish()``. ### Behavior Changes * Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s). diff --git a/db/listener_test.cc b/db/listener_test.cc index 33cf51036..c12ca1ef2 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -977,6 +977,14 @@ class TestFileOperationListener : public EventListener { file_reads_success_.store(0); file_writes_.store(0); file_writes_success_.store(0); + file_flushes_.store(0); + file_flushes_success_.store(0); + file_closes_.store(0); + file_closes_success_.store(0); + file_syncs_.store(0); + file_syncs_success_.store(0); + file_truncates_.store(0); + file_truncates_success_.store(0); } void OnFileReadFinish(const FileOperationInfo& info) override { @@ -995,12 +1003,52 @@ class TestFileOperationListener : public EventListener { ReportDuration(info); } + void OnFileFlushFinish(const FileOperationInfo& info) override { + ++file_flushes_; + if (info.status.ok()) { + ++file_flushes_success_; + } + ReportDuration(info); + } + + void OnFileCloseFinish(const FileOperationInfo& info) override { + ++file_closes_; + if (info.status.ok()) { + ++file_closes_success_; + } + ReportDuration(info); + } + + void OnFileSyncFinish(const FileOperationInfo& info) override { + ++file_syncs_; + if (info.status.ok()) { + ++file_syncs_success_; + } + ReportDuration(info); + } + + void OnFileTruncateFinish(const FileOperationInfo& info) override { + ++file_truncates_; + if (info.status.ok()) { + ++file_truncates_success_; + } + ReportDuration(info); + } + bool ShouldBeNotifiedOnFileIO() override { return true; } std::atomic file_reads_; std::atomic file_reads_success_; std::atomic file_writes_; std::atomic file_writes_success_; + std::atomic file_flushes_; + std::atomic file_flushes_success_; + std::atomic file_closes_; + std::atomic file_closes_success_; + std::atomic file_syncs_; + std::atomic file_syncs_success_; + std::atomic file_truncates_; + std::atomic file_truncates_success_; private: void ReportDuration(const FileOperationInfo& info) const { @@ -1018,6 +1066,13 @@ TEST_F(EventListenerTest, OnFileOperationTest) { TestFileOperationListener* listener = new TestFileOperationListener(); options.listeners.emplace_back(listener); + options.use_direct_io_for_flush_and_compaction = true; + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + options.use_direct_io_for_flush_and_compaction = false; + } else { + ASSERT_OK(s); + } DestroyAndReopen(options); ASSERT_OK(Put("foo", "aaa")); dbfull()->Flush(FlushOptions()); @@ -1025,11 +1080,24 @@ TEST_F(EventListenerTest, OnFileOperationTest) { ASSERT_GE(listener->file_writes_.load(), listener->file_writes_success_.load()); ASSERT_GT(listener->file_writes_.load(), 0); + ASSERT_GE(listener->file_flushes_.load(), + listener->file_flushes_success_.load()); + ASSERT_GT(listener->file_flushes_.load(), 0); Close(); Reopen(options); ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load()); ASSERT_GT(listener->file_reads_.load(), 0); + ASSERT_GE(listener->file_closes_.load(), + listener->file_closes_success_.load()); + ASSERT_GT(listener->file_closes_.load(), 0); + ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load()); + ASSERT_GT(listener->file_syncs_.load(), 0); + if (true == options.use_direct_io_for_flush_and_compaction) { + ASSERT_GE(listener->file_truncates_.load(), + listener->file_truncates_success_.load()); + ASSERT_GT(listener->file_truncates_.load(), 0); + } } } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index f0f6801f6..a134b8254 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -39,10 +39,10 @@ class RandomAccessFileReader { const FileOperationInfo::TimePoint& start_ts, const FileOperationInfo::TimePoint& finish_ts, const Status& status) const { - FileOperationInfo info(file_name_, start_ts, finish_ts); + FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts, + finish_ts, status); info.offset = offset; info.length = length; - info.status = status; for (auto& listener : listeners_) { listener->OnFileReadFinish(info); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 8e3bb131c..836ed38f6 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -139,9 +139,38 @@ IOStatus WritableFileWriter::Close() { // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { - interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); + { +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileTruncateFinish(start_ts, finish_ts, s); + } +#endif + } if (interim.ok()) { - interim = writable_file_->Fsync(IOOptions(), nullptr); + { +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + interim = writable_file_->Fsync(IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileSyncFinish(start_ts, finish_ts, s, + FileOperationType::kFsync); + } +#endif + } } if (!interim.ok() && s.ok()) { s = interim; @@ -149,7 +178,21 @@ IOStatus WritableFileWriter::Close() { } TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); - interim = writable_file_->Close(IOOptions(), nullptr); + { +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + interim = writable_file_->Close(IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileCloseFinish(start_ts, finish_ts, s); + } +#endif + } if (!interim.ok() && s.ok()) { s = interim; } @@ -187,7 +230,21 @@ IOStatus WritableFileWriter::Flush() { } } - s = writable_file_->Flush(IOOptions(), nullptr); + { +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + s = writable_file_->Flush(IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileFlushFinish(start_ts, finish_ts, s); + } +#endif + } if (!s.ok()) { return s; @@ -275,11 +332,25 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); auto prev_perf_level = GetPerfLevel(); IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif if (use_fsync) { s = writable_file_->Fsync(IOOptions(), nullptr); } else { s = writable_file_->Sync(IOOptions(), nullptr); } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileSyncFinish( + start_ts, finish_ts, s, + use_fsync ? FileOperationType::kFsync : FileOperationType::kSync); + } +#endif SetPerfLevel(prev_perf_level); return s; } @@ -287,7 +358,20 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); - return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s); + } +#endif + return s; } // This method writes to disk the specified data and makes use of the rate diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 8fa5d53f6..1a4f49fca 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -38,16 +38,75 @@ class WritableFileWriter { const FileOperationInfo::TimePoint& start_ts, const FileOperationInfo::TimePoint& finish_ts, const IOStatus& io_status) { - FileOperationInfo info(file_name_, start_ts, finish_ts); + FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts, + finish_ts, io_status); info.offset = offset; info.length = length; - info.status = io_status; for (auto& listener : listeners_) { listener->OnFileWriteFinish(info); } info.status.PermitUncheckedError(); } + void NotifyOnFileFlushFinish(const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const IOStatus& io_status) { + FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts, + finish_ts, io_status); + + for (auto& listener : listeners_) { + listener->OnFileFlushFinish(info); + } + info.status.PermitUncheckedError(); + } + void NotifyOnFileSyncFinish( + const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, const IOStatus& io_status, + FileOperationType type = FileOperationType::kSync) { + FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status); + + for (auto& listener : listeners_) { + listener->OnFileSyncFinish(info); + } + info.status.PermitUncheckedError(); + } + void NotifyOnFileRangeSyncFinish( + uint64_t offset, size_t length, + const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const IOStatus& io_status) { + FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts, + finish_ts, io_status); + info.offset = offset; + info.length = length; + + for (auto& listener : listeners_) { + listener->OnFileRangeSyncFinish(info); + } + info.status.PermitUncheckedError(); + } + void NotifyOnFileTruncateFinish(const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const IOStatus& io_status) { + FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts, + finish_ts, io_status); + + for (auto& listener : listeners_) { + listener->OnFileTruncateFinish(info); + } + info.status.PermitUncheckedError(); + } + void NotifyOnFileCloseFinish(const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const IOStatus& io_status) { + FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts, + finish_ts, io_status); + + for (auto& listener : listeners_) { + listener->OnFileCloseFinish(info); + } + info.status.PermitUncheckedError(); + } #endif // ROCKSDB_LITE bool ShouldNotifyListeners() const { return !listeners_.empty(); } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 97570713f..6257b8f62 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -149,19 +149,36 @@ struct TableFileDeletionInfo { Status status; }; +enum class FileOperationType { + kRead, + kWrite, + kTruncate, + kClose, + kFlush, + kSync, + kFsync, + kRangeSync +}; + struct FileOperationInfo { using TimePoint = std::chrono::time_point; + FileOperationType type; const std::string& path; uint64_t offset; size_t length; const TimePoint& start_timestamp; const TimePoint& finish_timestamp; Status status; - FileOperationInfo(const std::string& _path, const TimePoint& start, - const TimePoint& finish) - : path(_path), start_timestamp(start), finish_timestamp(finish) {} + FileOperationInfo(const FileOperationType _type, const std::string& _path, + const TimePoint& start, const TimePoint& finish, + const Status& _status) + : type(_type), + path(_path), + start_timestamp(start), + finish_timestamp(finish), + status(_status) {} }; struct FlushJobInfo { @@ -460,7 +477,27 @@ class EventListener { // operation finishes. virtual void OnFileWriteFinish(const FileOperationInfo& /* info */) {} - // If true, the OnFileReadFinish and OnFileWriteFinish will be called. If + // A callback function for RocksDB which will be called whenever a file flush + // operation finishes. + virtual void OnFileFlushFinish(const FileOperationInfo& /* info */) {} + + // A callback function for RocksDB which will be called whenever a file sync + // operation finishes. + virtual void OnFileSyncFinish(const FileOperationInfo& /* info */) {} + + // A callback function for RocksDB which will be called whenever a file + // rangeSync operation finishes. + virtual void OnFileRangeSyncFinish(const FileOperationInfo& /* info */) {} + + // A callback function for RocksDB which will be called whenever a file + // truncate operation finishes. + virtual void OnFileTruncateFinish(const FileOperationInfo& /* info */) {} + + // A callback function for RocksDB which will be called whenever a file close + // operation finishes. + virtual void OnFileCloseFinish(const FileOperationInfo& /* info */) {} + + // If true, the OnFile*Finish functions will be called. If // false, then they won't be called. virtual bool ShouldBeNotifiedOnFileIO() { return false; }