Add file operation callbacks to SequentialFileReader (#8982)

Summary:
This change adds File IO Notifications to the SequentialFileReader The SequentialFileReader is extended
with a listener parameter.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8982

Test Plan:
A new test EventListenerTest::OnWALOperationTest has been added. The
test verifies that during restore the sequential file reader is called
and the notifications are fired.

Reviewed By: riversand963

Differential Revision: D31320844

Pulled By: shrfb

fbshipit-source-id: 040b24da7c010d7c14ebb5c6460fae9a19b8c168
main
Stefan Roesch 3 years ago committed by Facebook GitHub Bot
parent 787229837e
commit a776406de3
  1. 29
      db/listener_test.cc
  2. 4
      db/transaction_log_impl.cc
  3. 18
      db/version_set.cc
  4. 28
      file/sequence_file_reader.cc
  5. 54
      file/sequence_file_reader.h

@ -1000,6 +1000,7 @@ class TestFileOperationListener : public EventListener {
file_syncs_success_.store(0);
file_truncates_.store(0);
file_truncates_success_.store(0);
file_seq_reads_.store(0);
blob_file_reads_.store(0);
blob_file_writes_.store(0);
blob_file_flushes_.store(0);
@ -1013,6 +1014,9 @@ class TestFileOperationListener : public EventListener {
if (info.status.ok()) {
++file_reads_success_;
}
if (info.path.find("MANIFEST") != std::string::npos) {
++file_seq_reads_;
}
if (EndsWith(info.path, ".blob")) {
++blob_file_reads_;
}
@ -1088,6 +1092,7 @@ class TestFileOperationListener : public EventListener {
std::atomic<size_t> file_syncs_success_;
std::atomic<size_t> file_truncates_;
std::atomic<size_t> file_truncates_success_;
std::atomic<size_t> file_seq_reads_;
std::atomic<size_t> blob_file_reads_;
std::atomic<size_t> blob_file_writes_;
std::atomic<size_t> blob_file_flushes_;
@ -1183,6 +1188,30 @@ TEST_F(EventListenerTest, OnBlobFileOperationTest) {
}
}
TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) {
Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
TestFileOperationListener* listener = new TestFileOperationListener();
options.listeners.emplace_back(listener);
options.use_direct_io_for_flush_and_compaction = false;
Status s = TryReopen(options);
if (s.IsInvalidArgument()) {
options.use_direct_io_for_flush_and_compaction = false;
} else {
ASSERT_OK(s);
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
Close();
size_t seq_reads = listener->file_seq_reads_.load();
Reopen(options);
ASSERT_GT(listener->file_seq_reads_.load(), seq_reads);
}
class BlobDBJobLevelEventListenerTest : public EventListener {
public:
explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)

@ -63,8 +63,8 @@ Status TransactionLogIteratorImpl::OpenLogFile(
}
}
if (s.ok()) {
file_reader->reset(
new SequentialFileReader(std::move(file), fname, io_tracer_));
file_reader->reset(new SequentialFileReader(
std::move(file), fname, io_tracer_, options_->listeners));
}
return s;
}

@ -4746,9 +4746,9 @@ Status VersionSet::Recover(
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_));
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
}
uint64_t current_manifest_file_size = 0;
uint64_t log_number = 0;
@ -4918,9 +4918,9 @@ Status VersionSet::TryRecoverFromOneManifest(
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_));
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
}
assert(s.ok());
@ -5950,9 +5950,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
&manifest_file, nullptr);
std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) {
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_));
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter, true /* checksum */,
0 /* log_number */));

@ -47,7 +47,14 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
uint64_t orig_offset = 0;
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
orig_offset = aligned_offset + buf.CurrentSize();
start_ts = FileOperationInfo::StartNow();
}
io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
buf.BufferStart(), nullptr);
if (io_s.ok() && offset_advance < tmp.size()) {
@ -56,6 +63,11 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
std::min(tmp.size() - offset_advance, n));
}
*result = Slice(scratch, r);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
io_s);
}
#endif // !ROCKSDB_LITE
} else {
// To be paranoid, modify scratch a little bit, so in case underlying
@ -66,7 +78,23 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
if (n > 0 && scratch != nullptr) {
scratch[0]++;
}
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
#endif
io_s = file_->Read(n, IOOptions(), result, scratch, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
size_t offset = offset_.fetch_add(result->size());
NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s);
}
#endif
}
IOSTATS_ADD(bytes_read, result->size());
return io_s;

@ -23,24 +23,70 @@ namespace ROCKSDB_NAMESPACE {
// cache disabled) reads appropriately, and also updates the IO stats.
class SequentialFileReader {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileReadFinish(
uint64_t offset, size_t length,
const FileOperationInfo::StartTimePoint& start_ts,
const FileOperationInfo::FinishTimePoint& finish_ts,
const Status& status) const {
FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
finish_ts, status);
info.offset = offset;
info.length = length;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
}
void AddFileIOListeners(
const std::vector<std::shared_ptr<EventListener>>& listeners) {
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::string file_name_;
FSSequentialFilePtr file_;
std::atomic<size_t> offset_{0}; // read offset
std::vector<std::shared_ptr<EventListener>> listeners_{};
public:
explicit SequentialFileReader(
std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
const std::shared_ptr<IOTracer>& io_tracer = nullptr)
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_name_(_file_name),
file_(std::move(_file), io_tracer, _file_name) {}
file_(std::move(_file), io_tracer, _file_name),
listeners_() {
#ifndef ROCKSDB_LITE
AddFileIOListeners(listeners);
#else
(void)listeners;
#endif
}
explicit SequentialFileReader(
std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
size_t _readahead_size,
const std::shared_ptr<IOTracer>& io_tracer = nullptr)
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer, _file_name) {}
io_tracer, _file_name),
listeners_() {
#ifndef ROCKSDB_LITE
AddFileIOListeners(listeners);
#else
(void)listeners;
#endif
}
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<SequentialFileReader>* reader,

Loading…
Cancel
Save