extend listener callback functions to more file I/O operations (#7055)

Summary:
Currently, `EventListener` in listner.h only have callback functions for file read and write. One may favor extended callback functions for more file I/O operations like flush, sync and close. This PR tries to add those interface and have them called when appropriate throughout the code base.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7055

Test Plan:
Write an experimental listener with those new callback functions with log output in them; run experiments and check logs to see those functions are actually called.
Default test suits `make check` should also be included.

Reviewed By: riversand963

Differential Revision: D22380624

Pulled By: roghnin

fbshipit-source-id: 4121491d45c2c2aae8c255e7998090559a241c6a
main
wenh 4 years ago committed by Facebook GitHub Bot
parent dd29ad4223
commit 226d1f9c73
  1. 1
      HISTORY.md
  2. 68
      db/listener_test.cc
  3. 4
      file/random_access_file_reader.h
  4. 94
      file/writable_file_writer.cc
  5. 63
      file/writable_file_writer.h
  6. 45
      include/rocksdb/listener.h

@ -2,6 +2,7 @@
## Unreleased
### Public API Change
* Encryption file classes now exposed for inheritance in env_encryption.h
* File I/O listener is extended to cover more I/O operations. Now class `EventListener` in listener.h contains new callback functions: `OnFileFlushFinish()`, `OnFileSyncFinish()`, `OnFileRangeSyncFinish()`, `OnFileTruncateFinish()`, and ``OnFileCloseFinish()``.
### Behavior Changes
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).

@ -977,6 +977,14 @@ class TestFileOperationListener : public EventListener {
file_reads_success_.store(0);
file_writes_.store(0);
file_writes_success_.store(0);
file_flushes_.store(0);
file_flushes_success_.store(0);
file_closes_.store(0);
file_closes_success_.store(0);
file_syncs_.store(0);
file_syncs_success_.store(0);
file_truncates_.store(0);
file_truncates_success_.store(0);
}
void OnFileReadFinish(const FileOperationInfo& info) override {
@ -995,12 +1003,52 @@ class TestFileOperationListener : public EventListener {
ReportDuration(info);
}
void OnFileFlushFinish(const FileOperationInfo& info) override {
++file_flushes_;
if (info.status.ok()) {
++file_flushes_success_;
}
ReportDuration(info);
}
void OnFileCloseFinish(const FileOperationInfo& info) override {
++file_closes_;
if (info.status.ok()) {
++file_closes_success_;
}
ReportDuration(info);
}
void OnFileSyncFinish(const FileOperationInfo& info) override {
++file_syncs_;
if (info.status.ok()) {
++file_syncs_success_;
}
ReportDuration(info);
}
void OnFileTruncateFinish(const FileOperationInfo& info) override {
++file_truncates_;
if (info.status.ok()) {
++file_truncates_success_;
}
ReportDuration(info);
}
bool ShouldBeNotifiedOnFileIO() override { return true; }
std::atomic<size_t> file_reads_;
std::atomic<size_t> file_reads_success_;
std::atomic<size_t> file_writes_;
std::atomic<size_t> file_writes_success_;
std::atomic<size_t> file_flushes_;
std::atomic<size_t> file_flushes_success_;
std::atomic<size_t> file_closes_;
std::atomic<size_t> file_closes_success_;
std::atomic<size_t> file_syncs_;
std::atomic<size_t> file_syncs_success_;
std::atomic<size_t> file_truncates_;
std::atomic<size_t> file_truncates_success_;
private:
void ReportDuration(const FileOperationInfo& info) const {
@ -1018,6 +1066,13 @@ TEST_F(EventListenerTest, OnFileOperationTest) {
TestFileOperationListener* listener = new TestFileOperationListener();
options.listeners.emplace_back(listener);
options.use_direct_io_for_flush_and_compaction = true;
Status s = TryReopen(options);
if (s.IsInvalidArgument()) {
options.use_direct_io_for_flush_and_compaction = false;
} else {
ASSERT_OK(s);
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
dbfull()->Flush(FlushOptions());
@ -1025,11 +1080,24 @@ TEST_F(EventListenerTest, OnFileOperationTest) {
ASSERT_GE(listener->file_writes_.load(),
listener->file_writes_success_.load());
ASSERT_GT(listener->file_writes_.load(), 0);
ASSERT_GE(listener->file_flushes_.load(),
listener->file_flushes_success_.load());
ASSERT_GT(listener->file_flushes_.load(), 0);
Close();
Reopen(options);
ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
ASSERT_GT(listener->file_reads_.load(), 0);
ASSERT_GE(listener->file_closes_.load(),
listener->file_closes_success_.load());
ASSERT_GT(listener->file_closes_.load(), 0);
ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load());
ASSERT_GT(listener->file_syncs_.load(), 0);
if (true == options.use_direct_io_for_flush_and_compaction) {
ASSERT_GE(listener->file_truncates_.load(),
listener->file_truncates_success_.load());
ASSERT_GT(listener->file_truncates_.load(), 0);
}
}
} // namespace ROCKSDB_NAMESPACE

@ -39,10 +39,10 @@ class RandomAccessFileReader {
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) const {
FileOperationInfo info(file_name_, start_ts, finish_ts);
FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
finish_ts, status);
info.offset = offset;
info.length = length;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);

@ -139,9 +139,38 @@ IOStatus WritableFileWriter::Close() {
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
{
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
}
#endif
}
if (interim.ok()) {
interim = writable_file_->Fsync(IOOptions(), nullptr);
{
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
interim = writable_file_->Fsync(IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileSyncFinish(start_ts, finish_ts, s,
FileOperationType::kFsync);
}
#endif
}
}
if (!interim.ok() && s.ok()) {
s = interim;
@ -149,7 +178,21 @@ IOStatus WritableFileWriter::Close() {
}
TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
interim = writable_file_->Close(IOOptions(), nullptr);
{
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
interim = writable_file_->Close(IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileCloseFinish(start_ts, finish_ts, s);
}
#endif
}
if (!interim.ok() && s.ok()) {
s = interim;
}
@ -187,7 +230,21 @@ IOStatus WritableFileWriter::Flush() {
}
}
s = writable_file_->Flush(IOOptions(), nullptr);
{
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
s = writable_file_->Flush(IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileFlushFinish(start_ts, finish_ts, s);
}
#endif
}
if (!s.ok()) {
return s;
@ -275,11 +332,25 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
if (use_fsync) {
s = writable_file_->Fsync(IOOptions(), nullptr);
} else {
s = writable_file_->Sync(IOOptions(), nullptr);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileSyncFinish(
start_ts, finish_ts, s,
use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
}
#endif
SetPerfLevel(prev_perf_level);
return s;
}
@ -287,7 +358,20 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
}
#endif
return s;
}
// This method writes to disk the specified data and makes use of the rate

@ -38,16 +38,75 @@ class WritableFileWriter {
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const IOStatus& io_status) {
FileOperationInfo info(file_name_, start_ts, finish_ts);
FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts,
finish_ts, io_status);
info.offset = offset;
info.length = length;
info.status = io_status;
for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
}
info.status.PermitUncheckedError();
}
void NotifyOnFileFlushFinish(const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts,
finish_ts, io_status);
for (auto& listener : listeners_) {
listener->OnFileFlushFinish(info);
}
info.status.PermitUncheckedError();
}
void NotifyOnFileSyncFinish(
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts, const IOStatus& io_status,
FileOperationType type = FileOperationType::kSync) {
FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status);
for (auto& listener : listeners_) {
listener->OnFileSyncFinish(info);
}
info.status.PermitUncheckedError();
}
void NotifyOnFileRangeSyncFinish(
uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts,
finish_ts, io_status);
info.offset = offset;
info.length = length;
for (auto& listener : listeners_) {
listener->OnFileRangeSyncFinish(info);
}
info.status.PermitUncheckedError();
}
void NotifyOnFileTruncateFinish(const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts,
finish_ts, io_status);
for (auto& listener : listeners_) {
listener->OnFileTruncateFinish(info);
}
info.status.PermitUncheckedError();
}
void NotifyOnFileCloseFinish(const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const IOStatus& io_status) {
FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts,
finish_ts, io_status);
for (auto& listener : listeners_) {
listener->OnFileCloseFinish(info);
}
info.status.PermitUncheckedError();
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }

@ -149,19 +149,36 @@ struct TableFileDeletionInfo {
Status status;
};
enum class FileOperationType {
kRead,
kWrite,
kTruncate,
kClose,
kFlush,
kSync,
kFsync,
kRangeSync
};
struct FileOperationInfo {
using TimePoint = std::chrono::time_point<std::chrono::system_clock,
std::chrono::nanoseconds>;
FileOperationType type;
const std::string& path;
uint64_t offset;
size_t length;
const TimePoint& start_timestamp;
const TimePoint& finish_timestamp;
Status status;
FileOperationInfo(const std::string& _path, const TimePoint& start,
const TimePoint& finish)
: path(_path), start_timestamp(start), finish_timestamp(finish) {}
FileOperationInfo(const FileOperationType _type, const std::string& _path,
const TimePoint& start, const TimePoint& finish,
const Status& _status)
: type(_type),
path(_path),
start_timestamp(start),
finish_timestamp(finish),
status(_status) {}
};
struct FlushJobInfo {
@ -460,7 +477,27 @@ class EventListener {
// operation finishes.
virtual void OnFileWriteFinish(const FileOperationInfo& /* info */) {}
// If true, the OnFileReadFinish and OnFileWriteFinish will be called. If
// A callback function for RocksDB which will be called whenever a file flush
// operation finishes.
virtual void OnFileFlushFinish(const FileOperationInfo& /* info */) {}
// A callback function for RocksDB which will be called whenever a file sync
// operation finishes.
virtual void OnFileSyncFinish(const FileOperationInfo& /* info */) {}
// A callback function for RocksDB which will be called whenever a file
// rangeSync operation finishes.
virtual void OnFileRangeSyncFinish(const FileOperationInfo& /* info */) {}
// A callback function for RocksDB which will be called whenever a file
// truncate operation finishes.
virtual void OnFileTruncateFinish(const FileOperationInfo& /* info */) {}
// A callback function for RocksDB which will be called whenever a file close
// operation finishes.
virtual void OnFileCloseFinish(const FileOperationInfo& /* info */) {}
// If true, the OnFile*Finish functions will be called. If
// false, then they won't be called.
virtual bool ShouldBeNotifiedOnFileIO() { return false; }

Loading…
Cancel
Save