diff --git a/db/builder.cc b/db/builder.cc index 5c90d348c..2192d4c9a 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -121,8 +121,9 @@ Status BuildTable( file->SetIOPriority(io_priority); file->SetWriteLifeTimeHint(write_hint); - file_writer.reset(new WritableFileWriter( - std::move(file), fname, env_options, ioptions.statistics)); + file_writer.reset(new WritableFileWriter(std::move(file), fname, + env_options, ioptions.statistics, + ioptions.listeners)); builder = NewTableBuilder( ioptions, mutable_cf_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b62c3bf69..a3c5488c3 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1464,9 +1464,11 @@ Status CompactionJob::OpenCompactionOutputFile( writable_file->SetWriteLifeTimeHint(write_hint_); writable_file->SetPreallocationBlockSize(static_cast( sub_compact->compaction->OutputFilePreallocationSize())); + const auto& listeners = + sub_compact->compaction->immutable_cf_options()->listeners; sub_compact->outfile.reset( new WritableFileWriter(std::move(writable_file), fname, env_options_, - db_options_.statistics.get())); + db_options_.statistics.get(), listeners)); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index fc2b0e195..b2cdb2c2c 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -238,8 +238,9 @@ Status DBImpl::NewDB() { } file->SetPreallocationBlockSize( immutable_db_options_.manifest_preallocation_size); - unique_ptr file_writer( - new WritableFileWriter(std::move(file), manifest, env_options)); + unique_ptr file_writer(new WritableFileWriter( + std::move(file), manifest, env_options, nullptr /* stats */, + immutable_db_options_.listeners)); log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); @@ -1144,8 +1145,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, { InstrumentedMutexLock wl(&impl->log_write_mutex_); impl->logfile_number_ = new_log_number; - unique_ptr file_writer(new WritableFileWriter( - std::move(lfile), log_fname, opt_env_options)); + const auto& listeners = impl->immutable_db_options_.listeners; + unique_ptr file_writer( + new WritableFileWriter(std::move(lfile), log_fname, opt_env_options, + nullptr /* stats */, listeners)); impl->logs_.emplace_back( new_log_number, new log::Writer( diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 62fbfb320..8910ab08e 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1394,8 +1394,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // of calling GetWalPreallocateBlockSize() lfile->SetPreallocationBlockSize(preallocate_block_size); lfile->SetWriteLifeTimeHint(write_hint); - unique_ptr file_writer( - new WritableFileWriter(std::move(lfile), log_fname, opt_env_opt)); + unique_ptr file_writer(new WritableFileWriter( + std::move(lfile), log_fname, opt_env_opt, nullptr /* stats */, + immutable_db_options_.listeners)); new_log = new log::Writer( std::move(file_writer), new_log_number, immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_); diff --git a/db/listener_test.cc b/db/listener_test.cc index 77afcd9ed..cbbffc8cb 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -891,6 +891,59 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) { ASSERT_LE(1, NumTableFilesAtLevel(0)); } +class TestFileOperationListener : public EventListener { + public: + TestFileOperationListener() { + file_reads_.store(0); + file_reads_success_.store(0); + file_writes_.store(0); + file_writes_success_.store(0); + } + + void OnFileReadFinish(const FileOperationInfo& info) override { + ++file_reads_; + if (info.status.ok()) { + ++file_reads_success_; + } + } + + void OnFileWriteFinish(const FileOperationInfo& info) override { + ++file_writes_; + if (info.status.ok()) { + ++file_writes_success_; + } + } + + bool ShouldBeNotifiedOnFileIO() override { return true; } + + std::atomic file_reads_; + std::atomic file_reads_success_; + std::atomic file_writes_; + std::atomic file_writes_success_; +}; + +TEST_F(EventListenerTest, OnFileOperationTest) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "aaa")); + dbfull()->Flush(FlushOptions()); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_GE(listener->file_writes_.load(), + listener->file_writes_success_.load()); + ASSERT_GT(listener->file_writes_.load(), 0); + Close(); + + Reopen(options); + ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load()); + ASSERT_GT(listener->file_reads_.load(), 0); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/table_cache.cc b/db/table_cache.cc index e9a5ee9ab..d28d067c3 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -116,7 +116,8 @@ Status TableCache::GetTableReader( new RandomAccessFileReader( std::move(file), fname, ioptions_.env, record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS, - file_read_hist, ioptions_.rate_limiter, for_compaction)); + file_read_hist, ioptions_.rate_limiter, for_compaction, + ioptions_.listeners)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, prefix_extractor, env_options, internal_comparator, skip_filters, immortal_tables_, diff --git a/db/version_set.cc b/db/version_set.cc index 0f9bf6fb3..bbcbc3af7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -766,7 +766,11 @@ Status Version::GetTableProperties(std::shared_ptr* tp, // By setting the magic number to kInvalidTableMagicNumber, we can by // pass the magic number check in the footer. std::unique_ptr file_reader( - new RandomAccessFileReader(std::move(file), file_name)); + new RandomAccessFileReader( + std::move(file), file_name, nullptr /* env */, nullptr /* stats */, + 0 /* hist_type */, nullptr /* file_read_hist */, + nullptr /* rate_limiter */, false /* for_compaction*/, + ioptions->listeners)); s = ReadTableProperties( file_reader.get(), file_meta->fd.GetFileSize(), Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, @@ -2886,7 +2890,8 @@ Status VersionSet::ProcessManifestWrites( db_options_->manifest_preallocation_size); unique_ptr file_writer(new WritableFileWriter( - std::move(descriptor_file), descriptor_fname, opt_env_opts)); + std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr, + db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index b55a33982..e61410c85 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -143,6 +143,15 @@ struct TableFileDeletionInfo { Status status; }; +struct FileOperationInfo { + std::string path; + uint64_t offset; + size_t length; + time_t start_timestamp; + time_t finish_timestamp; + Status status; +}; + struct FlushJobInfo { // the name of the column family std::string cf_name; @@ -407,6 +416,18 @@ class EventListener { // returns. Otherwise, RocksDB may be blocked. virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {} + // A callback function for RocksDB which will be called whenever a file read + // operation finishes. + virtual void OnFileReadFinish(const FileOperationInfo& /* info */) {} + + // A callback function for RocksDB which will be called whenever a file write + // operation finishes. + virtual void OnFileWriteFinish(const FileOperationInfo& /* info */) {} + + // If true, the OnFileReadFinish and OnFileWriteFinish will be called. If + // false, then they won't be called. + virtual bool ShouldBeNotifiedOnFileIO() { return false; } + // A callback function for RocksDB which will be called just before // starting the automatic recovery process for recoverable background // errors, such as NoSpace(). The callback can suppress the automatic diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index e0c4c3189..a752504c8 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -238,7 +238,8 @@ Status SstFileWriter::Open(const std::string& file_path) { nullptr /* compression_dict */, r->skip_filters, r->column_family_name, unknown_level); r->file_writer.reset( - new WritableFileWriter(std::move(sst_file), file_path, r->env_options)); + new WritableFileWriter(std::move(sst_file), file_path, r->env_options, + nullptr /* stats */, r->ioptions.listeners)); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index cd09f7122..4810a318d 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -98,8 +98,20 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, allowed = read_size; } Slice tmp; + + time_t start_ts = 0; + uint64_t orig_offset = 0; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::to_time_t( + std::chrono::system_clock::now()); + orig_offset = aligned_offset + buf.CurrentSize(); + } s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, buf.Destination()); + if (ShouldNotifyListeners()) { + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, s); + } + buf.Size(buf.CurrentSize() + tmp.size()); if (!s.ok() || tmp.size() < allowed) { break; @@ -131,7 +143,21 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, allowed = n; } Slice tmp_result; + +#ifndef ROCKSDB_LITE + time_t start_ts = 0; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::to_time_t( + std::chrono::system_clock::now()); + } +#endif s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, s); + } +#endif + if (res_scratch == nullptr) { // we can't simply use `scratch` because reads of mmap'd files return // data in a different buffer. @@ -414,7 +440,22 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { { IOSTATS_TIMER_GUARD(write_nanos); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + +#ifndef ROCKSDB_LITE + time_t start_ts = 0; + uint64_t old_size = writable_file_->GetFileSize(); + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::to_time_t( + std::chrono::system_clock::now()); + old_size = next_write_offset_; + } +#endif s = writable_file_->Append(Slice(src, allowed)); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + NotifyOnFileWriteFinish(old_size, allowed, start_ts, s); + } +#endif if (!s.ok()) { return s; } @@ -477,8 +518,16 @@ Status WritableFileWriter::WriteDirect() { { IOSTATS_TIMER_GUARD(write_nanos); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + time_t start_ts(0); + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::to_time_t( + std::chrono::system_clock::now()); + } // direct writes must be positional s = writable_file_->PositionedAppend(Slice(src, size), write_offset); + if (ShouldNotifyListeners()) { + NotifyOnFileWriteFinish(write_offset, size, start_ts, s); + } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); return s; diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index a2c90f2b3..f73383080 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -12,6 +12,7 @@ #include #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/listener.h" #include "rocksdb/rate_limiter.h" #include "util/aligned_buffer.h" #include "util/sync_point.h" @@ -62,6 +63,27 @@ class SequentialFileReader { class RandomAccessFileReader { private: +#ifndef ROCKSDB_LITE + void NotifyOnFileReadFinish(uint64_t offset, size_t length, time_t start_ts, + const Status& status) const { + FileOperationInfo info; + info.path = file_name_; + info.offset = offset; + info.length = length; + info.start_timestamp = start_ts; + time_t finish_ts = + std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + info.finish_timestamp = finish_ts; + info.status = status; + + for (auto& listener : listeners_) { + listener->OnFileReadFinish(info); + } + } +#endif // ROCKSDB_LITE + + bool ShouldNotifyListeners() const { return !listeners_.empty(); } + std::unique_ptr file_; std::string file_name_; Env* env_; @@ -70,16 +92,15 @@ class RandomAccessFileReader { HistogramImpl* file_read_hist_; RateLimiter* rate_limiter_; bool for_compaction_; + std::vector> listeners_; public: - explicit RandomAccessFileReader(std::unique_ptr&& raf, - std::string _file_name, - Env* env = nullptr, - Statistics* stats = nullptr, - uint32_t hist_type = 0, - HistogramImpl* file_read_hist = nullptr, - RateLimiter* rate_limiter = nullptr, - bool for_compaction = false) + explicit RandomAccessFileReader( + std::unique_ptr&& raf, std::string _file_name, + Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, + HistogramImpl* file_read_hist = nullptr, + RateLimiter* rate_limiter = nullptr, bool for_compaction = false, + const std::vector>& listeners = {}) : file_(std::move(raf)), file_name_(std::move(_file_name)), env_(env), @@ -87,7 +108,19 @@ class RandomAccessFileReader { hist_type_(hist_type), file_read_hist_(file_read_hist), rate_limiter_(rate_limiter), - for_compaction_(for_compaction) {} + for_compaction_(for_compaction), + listeners_() { +#ifndef ROCKSDB_LITE + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { + listeners_.emplace_back(e); + } + }); +#else // !ROCKSDB_LITE + (void)listeners; +#endif + } RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); @@ -124,6 +157,27 @@ class RandomAccessFileReader { // Use posix write to write data to a file. class WritableFileWriter { private: +#ifndef ROCKSDB_LITE + void NotifyOnFileWriteFinish(uint64_t offset, size_t length, time_t start_ts, + const Status& status) { + FileOperationInfo info; + info.path = file_name_; + info.offset = offset; + info.length = length; + info.start_timestamp = start_ts; + time_t finish_ts = + std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + info.finish_timestamp = finish_ts; + info.status = status; + + for (auto& listener : listeners_) { + listener->OnFileWriteFinish(info); + } + } +#endif // ROCKSDB_LITE + + bool ShouldNotifyListeners() const { return !listeners_.empty(); } + std::unique_ptr writable_file_; std::string file_name_; AlignedBuffer buf_; @@ -142,11 +196,13 @@ class WritableFileWriter { uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; Statistics* stats_; + std::vector> listeners_; public: - WritableFileWriter(std::unique_ptr&& file, - const std::string& _file_name, const EnvOptions& options, - Statistics* stats = nullptr) + WritableFileWriter( + std::unique_ptr&& file, const std::string& _file_name, + const EnvOptions& options, Statistics* stats = nullptr, + const std::vector>& listeners = {}) : writable_file_(std::move(file)), file_name_(_file_name), buf_(), @@ -159,11 +215,22 @@ class WritableFileWriter { last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter), - stats_(stats) { + stats_(stats), + listeners_() { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); +#ifndef ROCKSDB_LITE + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { + listeners_.emplace_back(e); + } + }); +#else // !ROCKSDB_LITE + (void)listeners; +#endif } WritableFileWriter(const WritableFileWriter&) = delete;