Add FileSystem wrapper classes for IO tracing. (#7002)

Summary:
1. Add the wrapper classes FileSystemTracingWrapper, FSSequentialFileTracingWrapper, FSRandomAccessFileTracingWrapper, FSWritableFileTracingWrapper, FSRandomRWFileTracingWrapper that forward the calls to underlying storage system and then pass the file operation information to IOTracer. IOTracer dumps the record in binary format for tracing.
2. Add the wrapper classes FileSystemPtr, FSSequentialFilePtr, FSRandomAccessFilePtr, FSWritableFilePtr and FSRandomRWFilePtr that overload operator-> and return ptr to underlying storage system or Tracing wrapper class based on enabling/disabling of IO tracing. These classes are added to bypass Tracing Wrapper classes when we disable tracing.
3. Add enums in trace.h that distinguish which options need to be added for different file operations(Read, close, write etc) as part of tracing record.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7002

Test Plan: make check -j64

Reviewed By: anand1976

Differential Revision: D22127897

Pulled By: akankshamahajan15

fbshipit-source-id: 74cff58ce5661c9a3832dfaa52483f3b2d8565e0
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 0ff752cf0d
commit d93bd3ce25
  1. 1
      TARGETS
  2. 319
      env/file_system_tracer.cc
  3. 333
      env/file_system_tracer.h
  4. 18
      include/rocksdb/file_system.h
  5. 1
      src.mk
  6. 59
      trace_replay/io_tracer.cc
  7. 20
      trace_replay/io_tracer.h
  8. 42
      trace_replay/io_tracer_test.cc
  9. 6
      trace_replay/trace_replay.h

@ -193,6 +193,7 @@ cpp_library(
"env/env_hdfs.cc", "env/env_hdfs.cc",
"env/env_posix.cc", "env/env_posix.cc",
"env/file_system.cc", "env/file_system.cc",
"env/file_system_tracer.cc",
"env/fs_posix.cc", "env/fs_posix.cc",
"env/io_posix.cc", "env/io_posix.cc",
"env/mock_env.cc", "env/mock_env.cc",

@ -0,0 +1,319 @@
// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "env/file_system_tracer.h"
#include "rocksdb/env.h"
namespace ROCKSDB_NAMESPACE {
IOStatus FileSystemTracingWrapper::NewWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(elapsed, TraceType::kIOFileName, __func__, elapsed,
s.ToString(), fname);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::NewDirectory(
const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->NewDirectory(name, io_opts, result, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), name);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::GetChildren(const std::string& dir,
const IOOptions& io_opts,
std::vector<std::string>* r,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->GetChildren(dir, io_opts, r, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), dir);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::DeleteFile(const std::string& fname,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->DeleteFile(fname, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), fname);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::CreateDir(const std::string& dirname,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->CreateDir(dirname, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), dirname);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::CreateDirIfMissing(
const std::string& dirname, const IOOptions& options, IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->CreateDirIfMissing(dirname, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), dirname);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::DeleteDir(const std::string& dirname,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->DeleteDir(dirname, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__,
elapsed, s.ToString(), dirname);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FileSystemTracingWrapper::GetFileSize(const std::string& fname,
const IOOptions& options,
uint64_t* file_size,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->GetFileSize(fname, options, file_size, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileNameAndFileSize,
__func__, elapsed, s.ToString(), fname, *file_size);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSSequentialFileTracingWrapper::Read(size_t n,
const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Read(n, options, result, scratch, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__,
elapsed, s.ToString(), result->size());
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSSequentialFileTracingWrapper::InvalidateCache(size_t offset,
size_t length) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->InvalidateCache(offset, length);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), length, offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSSequentialFileTracingWrapper::PositionedRead(
uint64_t offset, size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s =
target()->PositionedRead(offset, n, options, result, scratch, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), result->size(),
offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSRandomAccessFileTracingWrapper::Read(uint64_t offset, size_t n,
const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Read(offset, n, options, result, scratch, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), n, offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSRandomAccessFileTracingWrapper::MultiRead(FSReadRequest* reqs,
size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->MultiRead(reqs, num_reqs, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
uint64_t latency = elapsed;
for (size_t i = 0; i < num_reqs; i++) {
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, latency, reqs[i].status.ToString(),
reqs[i].len, reqs[i].offset);
io_tracer_->WriteIOOp(io_record);
}
return s;
}
IOStatus FSRandomAccessFileTracingWrapper::Prefetch(uint64_t offset, size_t n,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Prefetch(offset, n, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), n, offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset,
size_t length) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->InvalidateCache(offset, length);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), length,
static_cast<uint64_t>(offset));
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSWritableFileTracingWrapper::Append(const Slice& data,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Append(data, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__,
elapsed, s.ToString(), data.size());
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSWritableFileTracingWrapper::PositionedAppend(
const Slice& data, uint64_t offset, const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->PositionedAppend(data, offset, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), data.size(), offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSWritableFileTracingWrapper::Truncate(uint64_t size,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Truncate(size, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__,
elapsed, s.ToString(), size);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSWritableFileTracingWrapper::Close(const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Close(options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOGeneral, __func__,
elapsed, s.ToString());
io_tracer_->WriteIOOp(io_record);
return s;
}
uint64_t FSWritableFileTracingWrapper::GetFileSize(const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
uint64_t file_size = target()->GetFileSize(options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileNameAndFileSize,
"GetFileSize", elapsed, "" /* file_name */,
file_size);
io_tracer_->WriteIOOp(io_record);
return file_size;
}
IOStatus FSWritableFileTracingWrapper::InvalidateCache(size_t offset,
size_t length) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->InvalidateCache(offset, length);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), length,
static_cast<uint64_t>(offset));
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSRandomRWFileTracingWrapper::Write(uint64_t offset, const Slice& data,
const IOOptions& options,
IODebugContext* dbg) {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Write(offset, data, options, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), data.size(), offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
IOStatus FSRandomRWFileTracingWrapper::Read(uint64_t offset, size_t n,
const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const {
StopWatchNano timer(Env::Default());
timer.Start();
IOStatus s = target()->Read(offset, n, options, result, scratch, dbg);
uint64_t elapsed = timer.ElapsedNanos();
IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset,
__func__, elapsed, s.ToString(), n, offset);
io_tracer_->WriteIOOp(io_record);
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,333 @@
// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/file_system.h"
#include "trace_replay/io_tracer.h"
namespace ROCKSDB_NAMESPACE {
// FileSystemTracingWrapper is a wrapper class above FileSystem that forwards
// the call to the underlying storage system. It then invokes IOTracer to record
// file operations and other contextual information in a binary format for
// tracing. It overrides methods we are interested in tracing and extends
// FileSystemWrapper, which forwards all methods that are not explicitly
// overridden.
class FileSystemTracingWrapper : public FileSystemWrapper {
public:
FileSystemTracingWrapper(std::shared_ptr<FileSystem> t,
std::shared_ptr<IOTracer> io_tracer)
: FileSystemWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {}
~FileSystemTracingWrapper() override {}
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) override;
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
std::vector<std::string>* r,
IODebugContext* dbg) override;
IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus CreateDir(const std::string& dirname, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus CreateDirIfMissing(const std::string& dirname,
const IOOptions& options,
IODebugContext* dbg) override;
IOStatus DeleteDir(const std::string& dirname, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
uint64_t* file_size, IODebugContext* dbg) override;
private:
std::shared_ptr<IOTracer> io_tracer_;
Env* env_;
};
// The FileSystemPtr is a wrapper class that takes pointer to storage systems
// (such as posix filesystems). It overloads operator -> and returns a pointer
// of either FileSystem or FileSystemTracingWrapper based on whether tracing is
// enabled or not. It is added to bypass FileSystemTracingWrapper when tracing
// is disabled.
class FileSystemPtr {
public:
FileSystemPtr(std::shared_ptr<FileSystem> fs,
std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(
std::make_shared<FileSystemTracingWrapper>(fs_, io_tracer_)) {}
explicit FileSystemPtr(std::shared_ptr<FileSystem> fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
std::shared_ptr<FileSystem> operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
} else {
return fs_;
}
}
private:
std::shared_ptr<FileSystem> fs_;
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<FileSystemTracingWrapper> fs_tracer_;
};
// FSSequentialFileTracingWrapper is a wrapper class above FSSequentialFile that
// forwards the call to the underlying storage system. It then invokes IOTracer
// to record file operations and other contextual information in a binary format
// for tracing. It overrides methods we are interested in tracing and extends
// FSSequentialFileWrapper, which forwards all methods that are not explicitly
// overridden.
class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
public:
FSSequentialFileTracingWrapper(FSSequentialFile* t,
std::shared_ptr<IOTracer> io_tracer)
: FSSequentialFileWrapper(t),
io_tracer_(io_tracer),
env_(Env::Default()) {}
~FSSequentialFileTracingWrapper() override {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) override;
private:
std::shared_ptr<IOTracer> io_tracer_;
Env* env_;
};
// The FSSequentialFilePtr is a wrapper class that takes pointer to storage
// systems (such as posix filesystems). It overloads operator -> and returns a
// pointer of either FSSequentialFile or FSSequentialFileTracingWrapper based on
// whether tracing is enabled or not. It is added to bypass
// FSSequentialFileTracingWrapper when tracing is disabled.
class FSSequentialFilePtr {
public:
FSSequentialFilePtr(FSSequentialFile* fs, std::shared_ptr<IOTracer> io_tracer)
: fs_(fs), io_tracer_(io_tracer) {
fs_tracer_ = new FSSequentialFileTracingWrapper(fs_, io_tracer_);
}
explicit FSSequentialFilePtr(FSSequentialFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSSequentialFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
} else {
return fs_;
}
}
private:
FSSequentialFile* fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSSequentialFileTracingWrapper* fs_tracer_;
};
// FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile
// that forwards the call to the underlying storage system. It then invokes
// IOTracer to record file operations and other contextual information in a
// binary format for tracing. It overrides methods we are interested in tracing
// and extends FSRandomAccessFileWrapper, which forwards all methods that are
// not explicitly overridden.
class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
public:
FSRandomAccessFileTracingWrapper(FSRandomAccessFile* t,
std::shared_ptr<IOTracer> io_tracer)
: FSRandomAccessFileWrapper(t),
io_tracer_(io_tracer),
env_(Env::Default()) {}
~FSRandomAccessFileTracingWrapper() override {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override;
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
private:
std::shared_ptr<IOTracer> io_tracer_;
Env* env_;
};
// The FSRandomAccessFilePtr is a wrapper class that takes pointer to storage
// systems (such as posix filesystems). It overloads operator -> and returns a
// pointer of either FSRandomAccessFile or FSRandomAccessFileTracingWrapper
// based on whether tracing is enabled or not. It is added to bypass
// FSRandomAccessFileTracingWrapper when tracing is disabled.
class FSRandomAccessFilePtr {
public:
FSRandomAccessFilePtr(FSRandomAccessFile* fs,
std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(new FSRandomAccessFileTracingWrapper(fs_, io_tracer_)) {}
explicit FSRandomAccessFilePtr(FSRandomAccessFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSRandomAccessFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
} else {
return fs_;
}
}
private:
FSRandomAccessFile* fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSRandomAccessFileTracingWrapper* fs_tracer_;
};
// FSWritableFileTracingWrapper is a wrapper class above FSWritableFile that
// forwards the call to the underlying storage system. It then invokes IOTracer
// to record file operations and other contextual information in a binary format
// for tracing. It overrides methods we are interested in tracing and extends
// FSWritableFileWrapper, which forwards all methods that are not explicitly
// overridden.
class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
public:
FSWritableFileTracingWrapper(FSWritableFile* t,
std::shared_ptr<IOTracer> io_tracer)
: FSWritableFileWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {}
~FSWritableFileTracingWrapper() override {}
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) override;
IOStatus Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override;
IOStatus InvalidateCache(size_t offset, size_t length) override;
private:
std::shared_ptr<IOTracer> io_tracer_;
Env* env_;
};
// The FSWritableFilePtr is a wrapper class that takes pointer to storage
// systems (such as posix filesystems). It overloads operator -> and returns a
// pointer of either FSWritableFile or FSWritableFileTracingWrapper based on
// whether tracing is enabled or not. It is added to bypass
// FSWritableFileTracingWrapper when tracing is disabled.
class FSWritableFilePtr {
public:
FSWritableFilePtr(FSWritableFile* fs, std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(new FSWritableFileTracingWrapper(fs_, io_tracer_)) {}
explicit FSWritableFilePtr(FSWritableFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSWritableFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
} else {
return fs_;
}
}
private:
FSWritableFile* fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSWritableFileTracingWrapper* fs_tracer_;
};
// FSRandomRWFileTracingWrapper is a wrapper class above FSRandomRWFile that
// forwards the call to the underlying storage system. It then invokes IOTracer
// to record file operations and other contextual information in a binary format
// for tracing. It overrides methods we are interested in tracing and extends
// FSRandomRWFileWrapper, which forwards all methods that are not explicitly
// overridden.
class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper {
public:
FSRandomRWFileTracingWrapper(FSRandomRWFile* t,
std::shared_ptr<IOTracer> io_tracer)
: FSRandomRWFileWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {}
~FSRandomRWFileTracingWrapper() override {}
IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
private:
std::shared_ptr<IOTracer> io_tracer_;
Env* env_;
};
// The FSRandomRWFilePtr is a wrapper class that takes pointer to storage
// systems (such as posix filesystems). It overloads operator -> and returns a
// pointer of either FSRandomRWFile or FSRandomRWFileTracingWrapper based on
// whether tracing is enabled or not. It is added to bypass
// FSRandomRWFileTracingWrapper when tracing is disabled.
class FSRandomRWFilePtr {
public:
FSRandomRWFilePtr(FSRandomRWFile* fs, std::shared_ptr<IOTracer> io_tracer)
: fs_(fs),
io_tracer_(io_tracer),
fs_tracer_(new FSRandomRWFileTracingWrapper(fs_, io_tracer_)) {}
explicit FSRandomRWFilePtr(FSRandomRWFile* fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
FSRandomRWFile* operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;
} else {
return fs_;
}
}
private:
FSRandomRWFile* fs_;
std::shared_ptr<IOTracer> io_tracer_;
FSRandomRWFileTracingWrapper* fs_tracer_;
};
} // namespace ROCKSDB_NAMESPACE

@ -1212,8 +1212,9 @@ class FileSystemWrapper : public FileSystem {
class FSSequentialFileWrapper : public FSSequentialFile { class FSSequentialFileWrapper : public FSSequentialFile {
public: public:
explicit FSSequentialFileWrapper(FSSequentialFile* target) explicit FSSequentialFileWrapper(FSSequentialFile* t) : target_(t) {}
: target_(target) {}
FSSequentialFile* target() const { return target_; }
IOStatus Read(size_t n, const IOOptions& options, Slice* result, IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override { char* scratch, IODebugContext* dbg) override {
@ -1239,8 +1240,9 @@ class FSSequentialFileWrapper : public FSSequentialFile {
class FSRandomAccessFileWrapper : public FSRandomAccessFile { class FSRandomAccessFileWrapper : public FSRandomAccessFile {
public: public:
explicit FSRandomAccessFileWrapper(FSRandomAccessFile* target) explicit FSRandomAccessFileWrapper(FSRandomAccessFile* t) : target_(t) {}
: target_(target) {}
FSRandomAccessFile* target() const { return target_; }
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch, Slice* result, char* scratch,
@ -1275,6 +1277,8 @@ class FSWritableFileWrapper : public FSWritableFile {
public: public:
explicit FSWritableFileWrapper(FSWritableFile* t) : target_(t) {} explicit FSWritableFileWrapper(FSWritableFile* t) : target_(t) {}
FSWritableFile* target() const { return target_; }
IOStatus Append(const Slice& data, const IOOptions& options, IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override { IODebugContext* dbg) override {
return target_->Append(data, options, dbg); return target_->Append(data, options, dbg);
@ -1358,7 +1362,9 @@ class FSWritableFileWrapper : public FSWritableFile {
class FSRandomRWFileWrapper : public FSRandomRWFile { class FSRandomRWFileWrapper : public FSRandomRWFile {
public: public:
explicit FSRandomRWFileWrapper(FSRandomRWFile* target) : target_(target) {} explicit FSRandomRWFileWrapper(FSRandomRWFile* t) : target_(t) {}
FSRandomRWFile* target() const { return target_; }
bool use_direct_io() const override { return target_->use_direct_io(); } bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override { size_t GetRequiredBufferAlignment() const override {
@ -1392,7 +1398,7 @@ class FSRandomRWFileWrapper : public FSRandomRWFile {
class FSDirectoryWrapper : public FSDirectory { class FSDirectoryWrapper : public FSDirectory {
public: public:
explicit FSDirectoryWrapper(FSDirectory* target) : target_(target) {} explicit FSDirectoryWrapper(FSDirectory* t) : target_(t) {}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
return target_->Fsync(options, dbg); return target_->Fsync(options, dbg);

@ -78,6 +78,7 @@ LIB_SOURCES = \
env/env_posix.cc \ env/env_posix.cc \
env/file_system.cc \ env/file_system.cc \
env/fs_posix.cc \ env/fs_posix.cc \
env/file_system_tracer.cc \
env/io_posix.cc \ env/io_posix.cc \
env/mock_env.cc \ env/mock_env.cc \
file/delete_scheduler.cc \ file/delete_scheduler.cc \

@ -33,13 +33,30 @@ Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record) {
trace.type = record.trace_type; trace.type = record.trace_type;
Slice file_operation(record.file_operation); Slice file_operation(record.file_operation);
PutLengthPrefixedSlice(&trace.payload, file_operation); PutLengthPrefixedSlice(&trace.payload, file_operation);
PutFixed64(&trace.payload, record.latency);
Slice io_status(record.io_status); Slice io_status(record.io_status);
PutLengthPrefixedSlice(&trace.payload, io_status); PutLengthPrefixedSlice(&trace.payload, io_status);
/* Write remaining options based on trace_type set by file operation */
switch (record.trace_type) {
case TraceType::kIOGeneral:
break;
case TraceType::kIOFileNameAndFileSize:
PutFixed64(&trace.payload, record.file_size);
FALLTHROUGH_INTENDED;
case TraceType::kIOFileName: {
Slice file_name(record.file_name); Slice file_name(record.file_name);
PutLengthPrefixedSlice(&trace.payload, file_name); PutLengthPrefixedSlice(&trace.payload, file_name);
// TODO: add below options based on file_operation break;
trace.payload.push_back(record.len); }
case TraceType::kIOLenAndOffset:
PutFixed64(&trace.payload, record.offset); PutFixed64(&trace.payload, record.offset);
FALLTHROUGH_INTENDED;
case TraceType::kIOLen:
trace.payload.push_back(record.len);
break;
default:
assert(false);
}
std::string encoded_trace; std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace); TracerHelper::EncodeTrace(trace, &encoded_trace);
return trace_writer_->Write(encoded_trace); return trace_writer_->Write(encoded_trace);
@ -124,30 +141,54 @@ Status IOTraceReader::ReadIOOp(IOTraceRecord* record) {
"Incomplete access record: Failed to read file operation."); "Incomplete access record: Failed to read file operation.");
} }
record->file_operation = file_operation.ToString(); record->file_operation = file_operation.ToString();
if (!GetFixed64(&enc_slice, &record->latency)) {
return Status::Incomplete(
"Incomplete access record: Failed to read latency.");
}
Slice io_status; Slice io_status;
if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) { if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) {
return Status::Incomplete( return Status::Incomplete(
"Incomplete access record: Failed to read IO status."); "Incomplete access record: Failed to read IO status.");
} }
record->io_status = io_status.ToString(); record->io_status = io_status.ToString();
/* Read remaining options based on trace_type set by file operation */
switch (record->trace_type) {
case TraceType::kIOGeneral:
break;
case TraceType::kIOFileNameAndFileSize:
if (!GetFixed64(&enc_slice, &record->file_size)) {
return Status::Incomplete(
"Incomplete access record: Failed to read file size.");
}
FALLTHROUGH_INTENDED;
case TraceType::kIOFileName: {
Slice file_name; Slice file_name;
if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) {
return Status::Incomplete( return Status::Incomplete(
"Incomplete access record: Failed to read file name."); "Incomplete access record: Failed to read file name.");
} }
record->file_name = file_name.ToString(); record->file_name = file_name.ToString();
// TODO: Read below options based on file_operation. break;
record->len = static_cast<size_t>(enc_slice[0]);
const unsigned int kCharSize = 1;
enc_slice.remove_prefix(kCharSize);
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read is_cache_hit.");
} }
case TraceType::kIOLenAndOffset:
if (!GetFixed64(&enc_slice, &record->offset)) { if (!GetFixed64(&enc_slice, &record->offset)) {
return Status::Incomplete( return Status::Incomplete(
"Incomplete access record: Failed to read offset."); "Incomplete access record: Failed to read offset.");
} }
FALLTHROUGH_INTENDED;
case TraceType::kIOLen: {
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read length.");
}
record->len = static_cast<size_t>(enc_slice[0]);
const unsigned int kCharSize = 1;
enc_slice.remove_prefix(kCharSize);
break;
}
default:
assert(false);
}
return Status::OK(); return Status::OK();
} }

@ -21,45 +21,49 @@ struct IOTraceRecord {
uint64_t access_timestamp = 0; uint64_t access_timestamp = 0;
TraceType trace_type = TraceType::kTraceMax; TraceType trace_type = TraceType::kTraceMax;
std::string file_operation; std::string file_operation;
uint64_t latency = 0;
std::string io_status; std::string io_status;
// Required fields for read. // Required fields for read.
std::string file_name; std::string file_name;
uint64_t offset = 0;
size_t len = 0; size_t len = 0;
uint64_t offset = 0;
uint64_t file_size = 0; uint64_t file_size = 0;
IOTraceRecord() {} IOTraceRecord() {}
IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type,
const std::string& _file_operation, const std::string& _file_operation, const uint64_t& _latency,
const std::string& _io_status, const std::string& _file_name) const std::string& _io_status, const std::string& _file_name)
: access_timestamp(_access_timestamp), : access_timestamp(_access_timestamp),
trace_type(_trace_type), trace_type(_trace_type),
file_operation(_file_operation), file_operation(_file_operation),
latency(_latency),
io_status(_io_status), io_status(_io_status),
file_name(_file_name) {} file_name(_file_name) {}
IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type,
const std::string& _file_operation, const std::string& _file_operation, const uint64_t& _latency,
const std::string& _io_status, const std::string& _file_name, const std::string& _io_status, const std::string& _file_name,
const uint64_t& _file_size) const uint64_t& _file_size)
: access_timestamp(_access_timestamp), : access_timestamp(_access_timestamp),
trace_type(_trace_type), trace_type(_trace_type),
file_operation(_file_operation), file_operation(_file_operation),
latency(_latency),
io_status(_io_status), io_status(_io_status),
file_name(_file_name), file_name(_file_name),
file_size(_file_size) {} file_size(_file_size) {}
IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type,
const std::string& _file_operation, const std::string& _file_operation, const uint64_t& _latency,
const std::string& _io_status, const uint64_t& _offset = 0, const std::string& _io_status, const uint64_t& _len = 0,
const uint64_t& _len = 0) const uint64_t& _offset = 0)
: access_timestamp(_access_timestamp), : access_timestamp(_access_timestamp),
trace_type(_trace_type), trace_type(_trace_type),
file_operation(_file_operation), file_operation(_file_operation),
latency(_latency),
io_status(_io_status), io_status(_io_status),
offset(_offset), len(_len),
len(_len) {} offset(_offset) {}
}; };
struct IOTraceHeader { struct IOTraceHeader {

@ -50,6 +50,7 @@ class IOTracerTest : public testing::Test {
assert(writer); assert(writer);
for (uint32_t i = 0; i < nrecords; i++) { for (uint32_t i = 0; i < nrecords; i++) {
IOTraceRecord record; IOTraceRecord record;
record.trace_type = TraceType::kIOLenAndOffset;
record.file_operation = GetFileOperation(i); record.file_operation = GetFileOperation(i);
record.io_status = IOStatus::OK().ToString(); record.io_status = IOStatus::OK().ToString();
record.file_name = kDummyFile + std::to_string(i); record.file_name = kDummyFile + std::to_string(i);
@ -59,16 +60,6 @@ class IOTracerTest : public testing::Test {
} }
} }
IOTraceRecord GenerateAccessRecord(int i) {
IOTraceRecord record;
record.file_operation = GetFileOperation(i);
record.io_status = IOStatus::OK().ToString();
record.file_name = kDummyFile + std::to_string(i);
record.len = i;
record.offset = i + 20;
return record;
}
void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) { void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) {
assert(reader); assert(reader);
for (uint32_t i = 0; i < nrecords; i++) { for (uint32_t i = 0; i < nrecords; i++) {
@ -78,7 +69,6 @@ class IOTracerTest : public testing::Test {
ASSERT_EQ(record.io_status, IOStatus::OK().ToString()); ASSERT_EQ(record.io_status, IOStatus::OK().ToString());
ASSERT_EQ(record.len, i); ASSERT_EQ(record.len, i);
ASSERT_EQ(record.offset, i + 20); ASSERT_EQ(record.offset, i + 20);
ASSERT_EQ(record.file_name, kDummyFile + std::to_string(i));
} }
} }
@ -89,8 +79,10 @@ class IOTracerTest : public testing::Test {
}; };
TEST_F(IOTracerTest, AtomicWrite) { TEST_F(IOTracerTest, AtomicWrite) {
IOTraceRecord record = GenerateAccessRecord(0); std::string file_name = kDummyFile + std::to_string(0);
{ {
IOTraceRecord record(0, TraceType::kIOFileName, GetFileOperation(0), 0,
IOStatus::OK().ToString(), file_name);
TraceOptions trace_opt; TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer; std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
@ -110,14 +102,20 @@ TEST_F(IOTracerTest, AtomicWrite) {
ASSERT_OK(reader.ReadHeader(&header)); ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
VerifyIOOp(&reader, 1); // Read record and verify data.
ASSERT_NOK(reader.ReadIOOp(&record)); IOTraceRecord access_record;
ASSERT_OK(reader.ReadIOOp(&access_record));
ASSERT_EQ(access_record.file_operation, GetFileOperation(0));
ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString());
ASSERT_EQ(access_record.file_name, file_name);
ASSERT_NOK(reader.ReadIOOp(&access_record));
} }
} }
TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) {
IOTraceRecord record = GenerateAccessRecord(0);
{ {
IOTraceRecord record(0, TraceType::kIOGeneral, GetFileOperation(0), 0,
IOStatus::OK().ToString());
std::unique_ptr<TraceWriter> trace_writer; std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer)); &trace_writer));
@ -139,8 +137,10 @@ TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) {
} }
TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) {
IOTraceRecord record = GenerateAccessRecord(0);
{ {
IOTraceRecord record(0, TraceType::kIOFileNameAndFileSize,
GetFileOperation(2), 0 /*latency*/,
IOStatus::OK().ToString(), "", 10 /*file_size*/);
TraceOptions trace_opt; TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer; std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
@ -164,8 +164,14 @@ TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) {
ASSERT_OK(reader.ReadHeader(&header)); ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
VerifyIOOp(&reader, 1);
ASSERT_NOK(reader.ReadIOOp(&record)); IOTraceRecord access_record;
ASSERT_OK(reader.ReadIOOp(&access_record));
ASSERT_EQ(access_record.file_operation, GetFileOperation(2));
ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString());
ASSERT_EQ(access_record.file_size, 10);
// No more record.
ASSERT_NOK(reader.ReadIOOp(&access_record));
} }
} }

@ -46,6 +46,12 @@ enum TraceType : char {
kBlockTraceDataBlock = 9, kBlockTraceDataBlock = 9,
kBlockTraceUncompressionDictBlock = 10, kBlockTraceUncompressionDictBlock = 10,
kBlockTraceRangeDeletionBlock = 11, kBlockTraceRangeDeletionBlock = 11,
// IO Trace related types based on options that will be added in trace file.
kIOGeneral = 12,
kIOFileName = 13,
kIOFileNameAndFileSize = 14,
kIOLen = 15,
kIOLenAndOffset = 16,
// All trace types should be added before kTraceMax // All trace types should be added before kTraceMax
kTraceMax, kTraceMax,
}; };

Loading…
Cancel
Save