Add listener to sample file io (#3933)

Summary:
We would like to collect file-system-level statistics including file name, offset, length, return code, latency, etc., which requires to add callbacks to intercept file IO function calls when RocksDB is running.
To collect file-system-level statistics, users can inherit the class `EventListener`, as in `TestFileOperationListener `. Note that `TestFileOperationListener::ShouldBeNotifiedOnFileIO()` returns true.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3933

Differential Revision: D10219571

Pulled By: riversand963

fbshipit-source-id: 7acc577a2d31097766a27adb6f78eaf8b1e8ff15
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 9c20797136
commit 729a617b5b
  1. 5
      db/builder.cc
  2. 4
      db/compaction_job.cc
  3. 11
      db/db_impl_open.cc
  4. 5
      db/db_impl_write.cc
  5. 53
      db/listener_test.cc
  6. 3
      db/table_cache.cc
  7. 9
      db/version_set.cc
  8. 21
      include/rocksdb/listener.h
  9. 3
      table/sst_file_writer.cc
  10. 49
      util/file_reader_writer.cc
  11. 93
      util/file_reader_writer.h

@ -121,8 +121,9 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(
std::move(file), fname, env_options, ioptions.statistics));
file_writer.reset(new WritableFileWriter(std::move(file), fname,
env_options, ioptions.statistics,
ioptions.listeners));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,

@ -1464,9 +1464,11 @@ Status CompactionJob::OpenCompactionOutputFile(
writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, env_options_,
db_options_.statistics.get()));
db_options_.statistics.get(), listeners));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where

@ -238,8 +238,9 @@ Status DBImpl::NewDB() {
}
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), manifest, env_options));
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, env_options, nullptr /* stats */,
immutable_db_options_.listeners));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
@ -1144,8 +1145,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
{
InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number;
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_env_options));
const auto& listeners = impl->immutable_db_options_.listeners;
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
nullptr /* stats */, listeners));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(

@ -1394,8 +1394,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// of calling GetWalPreallocateBlockSize()
lfile->SetPreallocationBlockSize(preallocate_block_size);
lfile->SetWriteLifeTimeHint(write_hint);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_env_opt));
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_env_opt, nullptr /* stats */,
immutable_db_options_.listeners));
new_log = new log::Writer(
std::move(file_writer), new_log_number,
immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);

@ -891,6 +891,59 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
ASSERT_LE(1, NumTableFilesAtLevel(0));
}
class TestFileOperationListener : public EventListener {
public:
TestFileOperationListener() {
file_reads_.store(0);
file_reads_success_.store(0);
file_writes_.store(0);
file_writes_success_.store(0);
}
void OnFileReadFinish(const FileOperationInfo& info) override {
++file_reads_;
if (info.status.ok()) {
++file_reads_success_;
}
}
void OnFileWriteFinish(const FileOperationInfo& info) override {
++file_writes_;
if (info.status.ok()) {
++file_writes_success_;
}
}
bool ShouldBeNotifiedOnFileIO() override { return true; }
std::atomic<size_t> file_reads_;
std::atomic<size_t> file_reads_success_;
std::atomic<size_t> file_writes_;
std::atomic<size_t> file_writes_success_;
};
TEST_F(EventListenerTest, OnFileOperationTest) {
Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
TestFileOperationListener* listener = new TestFileOperationListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
dbfull()->Flush(FlushOptions());
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_GE(listener->file_writes_.load(),
listener->file_writes_success_.load());
ASSERT_GT(listener->file_writes_.load(), 0);
Close();
Reopen(options);
ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
ASSERT_GT(listener->file_reads_.load(), 0);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -116,7 +116,8 @@ Status TableCache::GetTableReader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.env,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, for_compaction));
file_read_hist, ioptions_.rate_limiter, for_compaction,
ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, env_options,
internal_comparator, skip_filters, immortal_tables_,

@ -766,7 +766,11 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
// By setting the magic number to kInvalidTableMagicNumber, we can by
// pass the magic number check in the footer.
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), file_name));
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
0 /* hist_type */, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, false /* for_compaction*/,
ioptions->listeners));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
@ -2886,7 +2890,8 @@ Status VersionSet::ProcessManifestWrites(
db_options_->manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_env_opts));
std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr,
db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());

@ -143,6 +143,15 @@ struct TableFileDeletionInfo {
Status status;
};
struct FileOperationInfo {
std::string path;
uint64_t offset;
size_t length;
time_t start_timestamp;
time_t finish_timestamp;
Status status;
};
struct FlushJobInfo {
// the name of the column family
std::string cf_name;
@ -407,6 +416,18 @@ class EventListener {
// returns. Otherwise, RocksDB may be blocked.
virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {}
// A callback function for RocksDB which will be called whenever a file read
// operation finishes.
virtual void OnFileReadFinish(const FileOperationInfo& /* info */) {}
// A callback function for RocksDB which will be called whenever a file write
// operation finishes.
virtual void OnFileWriteFinish(const FileOperationInfo& /* info */) {}
// If true, the OnFileReadFinish and OnFileWriteFinish will be called. If
// false, then they won't be called.
virtual bool ShouldBeNotifiedOnFileIO() { return false; }
// A callback function for RocksDB which will be called just before
// starting the automatic recovery process for recoverable background
// errors, such as NoSpace(). The callback can suppress the automatic

@ -238,7 +238,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
unknown_level);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), file_path, r->env_options));
new WritableFileWriter(std::move(sst_file), file_path, r->env_options,
nullptr /* stats */, r->ioptions.listeners));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.

@ -98,8 +98,20 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
allowed = read_size;
}
Slice tmp;
time_t start_ts = 0;
uint64_t orig_offset = 0;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now());
orig_offset = aligned_offset + buf.CurrentSize();
}
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
buf.Destination());
if (ShouldNotifyListeners()) {
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, s);
}
buf.Size(buf.CurrentSize() + tmp.size());
if (!s.ok() || tmp.size() < allowed) {
break;
@ -131,7 +143,21 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
allowed = n;
}
Slice tmp_result;
#ifndef ROCKSDB_LITE
time_t start_ts = 0;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now());
}
#endif
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, s);
}
#endif
if (res_scratch == nullptr) {
// we can't simply use `scratch` because reads of mmap'd files return
// data in a different buffer.
@ -414,7 +440,22 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
#ifndef ROCKSDB_LITE
time_t start_ts = 0;
uint64_t old_size = writable_file_->GetFileSize();
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now());
old_size = next_write_offset_;
}
#endif
s = writable_file_->Append(Slice(src, allowed));
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
NotifyOnFileWriteFinish(old_size, allowed, start_ts, s);
}
#endif
if (!s.ok()) {
return s;
}
@ -477,8 +518,16 @@ Status WritableFileWriter::WriteDirect() {
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
time_t start_ts(0);
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now());
}
// direct writes must be positional
s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
if (ShouldNotifyListeners()) {
NotifyOnFileWriteFinish(write_offset, size, start_ts, s);
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
return s;

@ -12,6 +12,7 @@
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"
#include "util/sync_point.h"
@ -62,6 +63,27 @@ class SequentialFileReader {
class RandomAccessFileReader {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileReadFinish(uint64_t offset, size_t length, time_t start_ts,
const Status& status) const {
FileOperationInfo info;
info.path = file_name_;
info.offset = offset;
info.length = length;
info.start_timestamp = start_ts;
time_t finish_ts =
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
info.finish_timestamp = finish_ts;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<RandomAccessFile> file_;
std::string file_name_;
Env* env_;
@ -70,16 +92,15 @@ class RandomAccessFileReader {
HistogramImpl* file_read_hist_;
RateLimiter* rate_limiter_;
bool for_compaction_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf,
std::string _file_name,
Env* env = nullptr,
Statistics* stats = nullptr,
uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
bool for_compaction = false)
explicit RandomAccessFileReader(
std::unique_ptr<RandomAccessFile>&& raf, std::string _file_name,
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr, bool for_compaction = false,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
file_name_(std::move(_file_name)),
env_(env),
@ -87,7 +108,19 @@ class RandomAccessFileReader {
hist_type_(hist_type),
file_read_hist_(file_read_hist),
rate_limiter_(rate_limiter),
for_compaction_(for_compaction) {}
for_compaction_(for_compaction),
listeners_() {
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
@ -124,6 +157,27 @@ class RandomAccessFileReader {
// Use posix write to write data to a file.
class WritableFileWriter {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileWriteFinish(uint64_t offset, size_t length, time_t start_ts,
const Status& status) {
FileOperationInfo info;
info.path = file_name_;
info.offset = offset;
info.length = length;
info.start_timestamp = start_ts;
time_t finish_ts =
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
info.finish_timestamp = finish_ts;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<WritableFile> writable_file_;
std::string file_name_;
AlignedBuffer buf_;
@ -142,11 +196,13 @@ class WritableFileWriter {
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Statistics* stats_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
WritableFileWriter(std::unique_ptr<WritableFile>&& file,
const std::string& _file_name, const EnvOptions& options,
Statistics* stats = nullptr)
WritableFileWriter(
std::unique_ptr<WritableFile>&& file, const std::string& _file_name,
const EnvOptions& options, Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: writable_file_(std::move(file)),
file_name_(_file_name),
buf_(),
@ -159,11 +215,22 @@ class WritableFileWriter {
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter),
stats_(stats) {
stats_(stats),
listeners_() {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
WritableFileWriter(const WritableFileWriter&) = delete;

Loading…
Cancel
Save