diff --git a/TARGETS b/TARGETS index c888ce5b0..6db4905d7 100644 --- a/TARGETS +++ b/TARGETS @@ -193,6 +193,7 @@ cpp_library( "env/env_hdfs.cc", "env/env_posix.cc", "env/file_system.cc", + "env/file_system_tracer.cc", "env/fs_posix.cc", "env/io_posix.cc", "env/mock_env.cc", diff --git a/env/file_system_tracer.cc b/env/file_system_tracer.cc new file mode 100644 index 000000000..55948f8cb --- /dev/null +++ b/env/file_system_tracer.cc @@ -0,0 +1,319 @@ +// Copyright (c) 2019-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "env/file_system_tracer.h" + +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +IOStatus FileSystemTracingWrapper::NewWritableFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(elapsed, TraceType::kIOFileName, __func__, elapsed, + s.ToString(), fname); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::NewDirectory( + const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->NewDirectory(name, io_opts, result, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), name); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::GetChildren(const std::string& dir, + const IOOptions& io_opts, + std::vector* r, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->GetChildren(dir, io_opts, r, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), dir); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::DeleteFile(const std::string& fname, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->DeleteFile(fname, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), fname); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::CreateDir(const std::string& dirname, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->CreateDir(dirname, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), dirname); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::CreateDirIfMissing( + const std::string& dirname, const IOOptions& options, IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->CreateDirIfMissing(dirname, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), dirname); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::DeleteDir(const std::string& dirname, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->DeleteDir(dirname, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileName, __func__, + elapsed, s.ToString(), dirname); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FileSystemTracingWrapper::GetFileSize(const std::string& fname, + const IOOptions& options, + uint64_t* file_size, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->GetFileSize(fname, options, file_size, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileNameAndFileSize, + __func__, elapsed, s.ToString(), fname, *file_size); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSSequentialFileTracingWrapper::Read(size_t n, + const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Read(n, options, result, scratch, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__, + elapsed, s.ToString(), result->size()); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSSequentialFileTracingWrapper::InvalidateCache(size_t offset, + size_t length) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->InvalidateCache(offset, length); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), length, offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSSequentialFileTracingWrapper::PositionedRead( + uint64_t offset, size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = + target()->PositionedRead(offset, n, options, result, scratch, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), result->size(), + offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSRandomAccessFileTracingWrapper::Read(uint64_t offset, size_t n, + const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Read(offset, n, options, result, scratch, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), n, offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSRandomAccessFileTracingWrapper::MultiRead(FSReadRequest* reqs, + size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->MultiRead(reqs, num_reqs, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + uint64_t latency = elapsed; + for (size_t i = 0; i < num_reqs; i++) { + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, latency, reqs[i].status.ToString(), + reqs[i].len, reqs[i].offset); + io_tracer_->WriteIOOp(io_record); + } + return s; +} + +IOStatus FSRandomAccessFileTracingWrapper::Prefetch(uint64_t offset, size_t n, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Prefetch(offset, n, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), n, offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset, + size_t length) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->InvalidateCache(offset, length); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), length, + static_cast(offset)); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSWritableFileTracingWrapper::Append(const Slice& data, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Append(data, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__, + elapsed, s.ToString(), data.size()); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSWritableFileTracingWrapper::PositionedAppend( + const Slice& data, uint64_t offset, const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->PositionedAppend(data, offset, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), data.size(), offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSWritableFileTracingWrapper::Truncate(uint64_t size, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Truncate(size, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLen, __func__, + elapsed, s.ToString(), size); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSWritableFileTracingWrapper::Close(const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Close(options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOGeneral, __func__, + elapsed, s.ToString()); + io_tracer_->WriteIOOp(io_record); + return s; +} + +uint64_t FSWritableFileTracingWrapper::GetFileSize(const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + uint64_t file_size = target()->GetFileSize(options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOFileNameAndFileSize, + "GetFileSize", elapsed, "" /* file_name */, + file_size); + io_tracer_->WriteIOOp(io_record); + return file_size; +} + +IOStatus FSWritableFileTracingWrapper::InvalidateCache(size_t offset, + size_t length) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->InvalidateCache(offset, length); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), length, + static_cast(offset)); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSRandomRWFileTracingWrapper::Write(uint64_t offset, const Slice& data, + const IOOptions& options, + IODebugContext* dbg) { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Write(offset, data, options, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), data.size(), offset); + io_tracer_->WriteIOOp(io_record); + return s; +} + +IOStatus FSRandomRWFileTracingWrapper::Read(uint64_t offset, size_t n, + const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const { + StopWatchNano timer(Env::Default()); + timer.Start(); + IOStatus s = target()->Read(offset, n, options, result, scratch, dbg); + uint64_t elapsed = timer.ElapsedNanos(); + IOTraceRecord io_record(env_->NowNanos(), TraceType::kIOLenAndOffset, + __func__, elapsed, s.ToString(), n, offset); + io_tracer_->WriteIOOp(io_record); + return s; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h new file mode 100644 index 000000000..f5c92e54c --- /dev/null +++ b/env/file_system_tracer.h @@ -0,0 +1,333 @@ +// Copyright (c) 2019-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/file_system.h" +#include "trace_replay/io_tracer.h" + +namespace ROCKSDB_NAMESPACE { + +// FileSystemTracingWrapper is a wrapper class above FileSystem that forwards +// the call to the underlying storage system. It then invokes IOTracer to record +// file operations and other contextual information in a binary format for +// tracing. It overrides methods we are interested in tracing and extends +// FileSystemWrapper, which forwards all methods that are not explicitly +// overridden. +class FileSystemTracingWrapper : public FileSystemWrapper { + public: + FileSystemTracingWrapper(std::shared_ptr t, + std::shared_ptr io_tracer) + : FileSystemWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {} + + ~FileSystemTracingWrapper() override {} + + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts, + std::vector* r, + IODebugContext* dbg) override; + + IOStatus DeleteFile(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus CreateDir(const std::string& dirname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus CreateDirIfMissing(const std::string& dirname, + const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus DeleteDir(const std::string& dirname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus GetFileSize(const std::string& fname, const IOOptions& options, + uint64_t* file_size, IODebugContext* dbg) override; + + private: + std::shared_ptr io_tracer_; + Env* env_; +}; + +// The FileSystemPtr is a wrapper class that takes pointer to storage systems +// (such as posix filesystems). It overloads operator -> and returns a pointer +// of either FileSystem or FileSystemTracingWrapper based on whether tracing is +// enabled or not. It is added to bypass FileSystemTracingWrapper when tracing +// is disabled. +class FileSystemPtr { + public: + FileSystemPtr(std::shared_ptr fs, + std::shared_ptr io_tracer) + : fs_(fs), + io_tracer_(io_tracer), + fs_tracer_( + std::make_shared(fs_, io_tracer_)) {} + + explicit FileSystemPtr(std::shared_ptr fs) + : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + + std::shared_ptr operator->() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_; + } else { + return fs_; + } + } + + private: + std::shared_ptr fs_; + std::shared_ptr io_tracer_; + std::shared_ptr fs_tracer_; +}; + +// FSSequentialFileTracingWrapper is a wrapper class above FSSequentialFile that +// forwards the call to the underlying storage system. It then invokes IOTracer +// to record file operations and other contextual information in a binary format +// for tracing. It overrides methods we are interested in tracing and extends +// FSSequentialFileWrapper, which forwards all methods that are not explicitly +// overridden. +class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper { + public: + FSSequentialFileTracingWrapper(FSSequentialFile* t, + std::shared_ptr io_tracer) + : FSSequentialFileWrapper(t), + io_tracer_(io_tracer), + env_(Env::Default()) {} + + ~FSSequentialFileTracingWrapper() override {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override; + + IOStatus InvalidateCache(size_t offset, size_t length) override; + + IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) override; + + private: + std::shared_ptr io_tracer_; + Env* env_; +}; + +// The FSSequentialFilePtr is a wrapper class that takes pointer to storage +// systems (such as posix filesystems). It overloads operator -> and returns a +// pointer of either FSSequentialFile or FSSequentialFileTracingWrapper based on +// whether tracing is enabled or not. It is added to bypass +// FSSequentialFileTracingWrapper when tracing is disabled. +class FSSequentialFilePtr { + public: + FSSequentialFilePtr(FSSequentialFile* fs, std::shared_ptr io_tracer) + : fs_(fs), io_tracer_(io_tracer) { + fs_tracer_ = new FSSequentialFileTracingWrapper(fs_, io_tracer_); + } + + explicit FSSequentialFilePtr(FSSequentialFile* fs) + : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + + FSSequentialFile* operator->() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_; + } else { + return fs_; + } + } + + private: + FSSequentialFile* fs_; + std::shared_ptr io_tracer_; + FSSequentialFileTracingWrapper* fs_tracer_; +}; + +// FSRandomAccessFileTracingWrapper is a wrapper class above FSRandomAccessFile +// that forwards the call to the underlying storage system. It then invokes +// IOTracer to record file operations and other contextual information in a +// binary format for tracing. It overrides methods we are interested in tracing +// and extends FSRandomAccessFileWrapper, which forwards all methods that are +// not explicitly overridden. +class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper { + public: + FSRandomAccessFileTracingWrapper(FSRandomAccessFile* t, + std::shared_ptr io_tracer) + : FSRandomAccessFileWrapper(t), + io_tracer_(io_tracer), + env_(Env::Default()) {} + + ~FSRandomAccessFileTracingWrapper() override {} + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override; + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, IODebugContext* dbg) override; + + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus InvalidateCache(size_t offset, size_t length) override; + + private: + std::shared_ptr io_tracer_; + Env* env_; +}; + +// The FSRandomAccessFilePtr is a wrapper class that takes pointer to storage +// systems (such as posix filesystems). It overloads operator -> and returns a +// pointer of either FSRandomAccessFile or FSRandomAccessFileTracingWrapper +// based on whether tracing is enabled or not. It is added to bypass +// FSRandomAccessFileTracingWrapper when tracing is disabled. +class FSRandomAccessFilePtr { + public: + FSRandomAccessFilePtr(FSRandomAccessFile* fs, + std::shared_ptr io_tracer) + : fs_(fs), + io_tracer_(io_tracer), + fs_tracer_(new FSRandomAccessFileTracingWrapper(fs_, io_tracer_)) {} + + explicit FSRandomAccessFilePtr(FSRandomAccessFile* fs) + : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + + FSRandomAccessFile* operator->() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_; + } else { + return fs_; + } + } + + private: + FSRandomAccessFile* fs_; + std::shared_ptr io_tracer_; + FSRandomAccessFileTracingWrapper* fs_tracer_; +}; + +// FSWritableFileTracingWrapper is a wrapper class above FSWritableFile that +// forwards the call to the underlying storage system. It then invokes IOTracer +// to record file operations and other contextual information in a binary format +// for tracing. It overrides methods we are interested in tracing and extends +// FSWritableFileWrapper, which forwards all methods that are not explicitly +// overridden. +class FSWritableFileTracingWrapper : public FSWritableFileWrapper { + public: + FSWritableFileTracingWrapper(FSWritableFile* t, + std::shared_ptr io_tracer) + : FSWritableFileWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {} + + ~FSWritableFileTracingWrapper() override {} + + IOStatus Append(const Slice& data, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus Truncate(uint64_t size, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus Close(const IOOptions& options, IODebugContext* dbg) override; + + uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override; + + IOStatus InvalidateCache(size_t offset, size_t length) override; + + private: + std::shared_ptr io_tracer_; + Env* env_; +}; + +// The FSWritableFilePtr is a wrapper class that takes pointer to storage +// systems (such as posix filesystems). It overloads operator -> and returns a +// pointer of either FSWritableFile or FSWritableFileTracingWrapper based on +// whether tracing is enabled or not. It is added to bypass +// FSWritableFileTracingWrapper when tracing is disabled. +class FSWritableFilePtr { + public: + FSWritableFilePtr(FSWritableFile* fs, std::shared_ptr io_tracer) + : fs_(fs), + io_tracer_(io_tracer), + fs_tracer_(new FSWritableFileTracingWrapper(fs_, io_tracer_)) {} + + explicit FSWritableFilePtr(FSWritableFile* fs) + : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + + FSWritableFile* operator->() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_; + } else { + return fs_; + } + } + + private: + FSWritableFile* fs_; + std::shared_ptr io_tracer_; + FSWritableFileTracingWrapper* fs_tracer_; +}; + +// FSRandomRWFileTracingWrapper is a wrapper class above FSRandomRWFile that +// forwards the call to the underlying storage system. It then invokes IOTracer +// to record file operations and other contextual information in a binary format +// for tracing. It overrides methods we are interested in tracing and extends +// FSRandomRWFileWrapper, which forwards all methods that are not explicitly +// overridden. +class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper { + public: + FSRandomRWFileTracingWrapper(FSRandomRWFile* t, + std::shared_ptr io_tracer) + : FSRandomRWFileWrapper(t), io_tracer_(io_tracer), env_(Env::Default()) {} + + ~FSRandomRWFileTracingWrapper() override {} + + IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override; + + private: + std::shared_ptr io_tracer_; + Env* env_; +}; + +// The FSRandomRWFilePtr is a wrapper class that takes pointer to storage +// systems (such as posix filesystems). It overloads operator -> and returns a +// pointer of either FSRandomRWFile or FSRandomRWFileTracingWrapper based on +// whether tracing is enabled or not. It is added to bypass +// FSRandomRWFileTracingWrapper when tracing is disabled. +class FSRandomRWFilePtr { + public: + FSRandomRWFilePtr(FSRandomRWFile* fs, std::shared_ptr io_tracer) + : fs_(fs), + io_tracer_(io_tracer), + fs_tracer_(new FSRandomRWFileTracingWrapper(fs_, io_tracer_)) {} + + explicit FSRandomRWFilePtr(FSRandomRWFile* fs) + : fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {} + + FSRandomRWFile* operator->() const { + if (io_tracer_ && io_tracer_->is_tracing_enabled()) { + return fs_tracer_; + } else { + return fs_; + } + } + + private: + FSRandomRWFile* fs_; + std::shared_ptr io_tracer_; + FSRandomRWFileTracingWrapper* fs_tracer_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 13a260225..559d210d6 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -1212,8 +1212,9 @@ class FileSystemWrapper : public FileSystem { class FSSequentialFileWrapper : public FSSequentialFile { public: - explicit FSSequentialFileWrapper(FSSequentialFile* target) - : target_(target) {} + explicit FSSequentialFileWrapper(FSSequentialFile* t) : target_(t) {} + + FSSequentialFile* target() const { return target_; } IOStatus Read(size_t n, const IOOptions& options, Slice* result, char* scratch, IODebugContext* dbg) override { @@ -1239,8 +1240,9 @@ class FSSequentialFileWrapper : public FSSequentialFile { class FSRandomAccessFileWrapper : public FSRandomAccessFile { public: - explicit FSRandomAccessFileWrapper(FSRandomAccessFile* target) - : target_(target) {} + explicit FSRandomAccessFileWrapper(FSRandomAccessFile* t) : target_(t) {} + + FSRandomAccessFile* target() const { return target_; } IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, Slice* result, char* scratch, @@ -1275,6 +1277,8 @@ class FSWritableFileWrapper : public FSWritableFile { public: explicit FSWritableFileWrapper(FSWritableFile* t) : target_(t) {} + FSWritableFile* target() const { return target_; } + IOStatus Append(const Slice& data, const IOOptions& options, IODebugContext* dbg) override { return target_->Append(data, options, dbg); @@ -1358,7 +1362,9 @@ class FSWritableFileWrapper : public FSWritableFile { class FSRandomRWFileWrapper : public FSRandomRWFile { public: - explicit FSRandomRWFileWrapper(FSRandomRWFile* target) : target_(target) {} + explicit FSRandomRWFileWrapper(FSRandomRWFile* t) : target_(t) {} + + FSRandomRWFile* target() const { return target_; } bool use_direct_io() const override { return target_->use_direct_io(); } size_t GetRequiredBufferAlignment() const override { @@ -1392,7 +1398,7 @@ class FSRandomRWFileWrapper : public FSRandomRWFile { class FSDirectoryWrapper : public FSDirectory { public: - explicit FSDirectoryWrapper(FSDirectory* target) : target_(target) {} + explicit FSDirectoryWrapper(FSDirectory* t) : target_(t) {} IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { return target_->Fsync(options, dbg); diff --git a/src.mk b/src.mk index 2e438f9df..a8592ff3a 100644 --- a/src.mk +++ b/src.mk @@ -78,6 +78,7 @@ LIB_SOURCES = \ env/env_posix.cc \ env/file_system.cc \ env/fs_posix.cc \ + env/file_system_tracer.cc \ env/io_posix.cc \ env/mock_env.cc \ file/delete_scheduler.cc \ diff --git a/trace_replay/io_tracer.cc b/trace_replay/io_tracer.cc index d9f701587..fde7eb6d0 100644 --- a/trace_replay/io_tracer.cc +++ b/trace_replay/io_tracer.cc @@ -33,13 +33,30 @@ Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record) { trace.type = record.trace_type; Slice file_operation(record.file_operation); PutLengthPrefixedSlice(&trace.payload, file_operation); + PutFixed64(&trace.payload, record.latency); Slice io_status(record.io_status); PutLengthPrefixedSlice(&trace.payload, io_status); - Slice file_name(record.file_name); - PutLengthPrefixedSlice(&trace.payload, file_name); - // TODO: add below options based on file_operation - trace.payload.push_back(record.len); - PutFixed64(&trace.payload, record.offset); + /* Write remaining options based on trace_type set by file operation */ + switch (record.trace_type) { + case TraceType::kIOGeneral: + break; + case TraceType::kIOFileNameAndFileSize: + PutFixed64(&trace.payload, record.file_size); + FALLTHROUGH_INTENDED; + case TraceType::kIOFileName: { + Slice file_name(record.file_name); + PutLengthPrefixedSlice(&trace.payload, file_name); + break; + } + case TraceType::kIOLenAndOffset: + PutFixed64(&trace.payload, record.offset); + FALLTHROUGH_INTENDED; + case TraceType::kIOLen: + trace.payload.push_back(record.len); + break; + default: + assert(false); + } std::string encoded_trace; TracerHelper::EncodeTrace(trace, &encoded_trace); return trace_writer_->Write(encoded_trace); @@ -124,29 +141,53 @@ Status IOTraceReader::ReadIOOp(IOTraceRecord* record) { "Incomplete access record: Failed to read file operation."); } record->file_operation = file_operation.ToString(); + if (!GetFixed64(&enc_slice, &record->latency)) { + return Status::Incomplete( + "Incomplete access record: Failed to read latency."); + } Slice io_status; if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) { return Status::Incomplete( "Incomplete access record: Failed to read IO status."); } record->io_status = io_status.ToString(); - Slice file_name; - if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { - return Status::Incomplete( - "Incomplete access record: Failed to read file name."); - } - record->file_name = file_name.ToString(); - // TODO: Read below options based on file_operation. - record->len = static_cast(enc_slice[0]); - const unsigned int kCharSize = 1; - enc_slice.remove_prefix(kCharSize); - if (enc_slice.empty()) { - return Status::Incomplete( - "Incomplete access record: Failed to read is_cache_hit."); - } - if (!GetFixed64(&enc_slice, &record->offset)) { - return Status::Incomplete( - "Incomplete access record: Failed to read offset."); + /* Read remaining options based on trace_type set by file operation */ + switch (record->trace_type) { + case TraceType::kIOGeneral: + break; + case TraceType::kIOFileNameAndFileSize: + if (!GetFixed64(&enc_slice, &record->file_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file size."); + } + FALLTHROUGH_INTENDED; + case TraceType::kIOFileName: { + Slice file_name; + if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file name."); + } + record->file_name = file_name.ToString(); + break; + } + case TraceType::kIOLenAndOffset: + if (!GetFixed64(&enc_slice, &record->offset)) { + return Status::Incomplete( + "Incomplete access record: Failed to read offset."); + } + FALLTHROUGH_INTENDED; + case TraceType::kIOLen: { + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read length."); + } + record->len = static_cast(enc_slice[0]); + const unsigned int kCharSize = 1; + enc_slice.remove_prefix(kCharSize); + break; + } + default: + assert(false); } return Status::OK(); } diff --git a/trace_replay/io_tracer.h b/trace_replay/io_tracer.h index fce9735d1..9a793d05d 100644 --- a/trace_replay/io_tracer.h +++ b/trace_replay/io_tracer.h @@ -21,45 +21,49 @@ struct IOTraceRecord { uint64_t access_timestamp = 0; TraceType trace_type = TraceType::kTraceMax; std::string file_operation; + uint64_t latency = 0; std::string io_status; // Required fields for read. std::string file_name; - uint64_t offset = 0; size_t len = 0; + uint64_t offset = 0; uint64_t file_size = 0; IOTraceRecord() {} IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, - const std::string& _file_operation, + const std::string& _file_operation, const uint64_t& _latency, const std::string& _io_status, const std::string& _file_name) : access_timestamp(_access_timestamp), trace_type(_trace_type), file_operation(_file_operation), + latency(_latency), io_status(_io_status), file_name(_file_name) {} IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, - const std::string& _file_operation, + const std::string& _file_operation, const uint64_t& _latency, const std::string& _io_status, const std::string& _file_name, const uint64_t& _file_size) : access_timestamp(_access_timestamp), trace_type(_trace_type), file_operation(_file_operation), + latency(_latency), io_status(_io_status), file_name(_file_name), file_size(_file_size) {} IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, - const std::string& _file_operation, - const std::string& _io_status, const uint64_t& _offset = 0, - const uint64_t& _len = 0) + const std::string& _file_operation, const uint64_t& _latency, + const std::string& _io_status, const uint64_t& _len = 0, + const uint64_t& _offset = 0) : access_timestamp(_access_timestamp), trace_type(_trace_type), file_operation(_file_operation), + latency(_latency), io_status(_io_status), - offset(_offset), - len(_len) {} + len(_len), + offset(_offset) {} }; struct IOTraceHeader { diff --git a/trace_replay/io_tracer_test.cc b/trace_replay/io_tracer_test.cc index 3ec312eda..c83be285a 100644 --- a/trace_replay/io_tracer_test.cc +++ b/trace_replay/io_tracer_test.cc @@ -50,6 +50,7 @@ class IOTracerTest : public testing::Test { assert(writer); for (uint32_t i = 0; i < nrecords; i++) { IOTraceRecord record; + record.trace_type = TraceType::kIOLenAndOffset; record.file_operation = GetFileOperation(i); record.io_status = IOStatus::OK().ToString(); record.file_name = kDummyFile + std::to_string(i); @@ -59,16 +60,6 @@ class IOTracerTest : public testing::Test { } } - IOTraceRecord GenerateAccessRecord(int i) { - IOTraceRecord record; - record.file_operation = GetFileOperation(i); - record.io_status = IOStatus::OK().ToString(); - record.file_name = kDummyFile + std::to_string(i); - record.len = i; - record.offset = i + 20; - return record; - } - void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) { assert(reader); for (uint32_t i = 0; i < nrecords; i++) { @@ -78,7 +69,6 @@ class IOTracerTest : public testing::Test { ASSERT_EQ(record.io_status, IOStatus::OK().ToString()); ASSERT_EQ(record.len, i); ASSERT_EQ(record.offset, i + 20); - ASSERT_EQ(record.file_name, kDummyFile + std::to_string(i)); } } @@ -89,8 +79,10 @@ class IOTracerTest : public testing::Test { }; TEST_F(IOTracerTest, AtomicWrite) { - IOTraceRecord record = GenerateAccessRecord(0); + std::string file_name = kDummyFile + std::to_string(0); { + IOTraceRecord record(0, TraceType::kIOFileName, GetFileOperation(0), 0, + IOStatus::OK().ToString(), file_name); TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, @@ -110,14 +102,20 @@ TEST_F(IOTracerTest, AtomicWrite) { ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); - VerifyIOOp(&reader, 1); - ASSERT_NOK(reader.ReadIOOp(&record)); + // Read record and verify data. + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(0)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_name, file_name); + ASSERT_NOK(reader.ReadIOOp(&access_record)); } } TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { - IOTraceRecord record = GenerateAccessRecord(0); { + IOTraceRecord record(0, TraceType::kIOGeneral, GetFileOperation(0), 0, + IOStatus::OK().ToString()); std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); @@ -139,8 +137,10 @@ TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { } TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { - IOTraceRecord record = GenerateAccessRecord(0); { + IOTraceRecord record(0, TraceType::kIOFileNameAndFileSize, + GetFileOperation(2), 0 /*latency*/, + IOStatus::OK().ToString(), "", 10 /*file_size*/); TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, @@ -164,8 +164,14 @@ TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); - VerifyIOOp(&reader, 1); - ASSERT_NOK(reader.ReadIOOp(&record)); + + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(2)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_size, 10); + // No more record. + ASSERT_NOK(reader.ReadIOOp(&access_record)); } } diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index e7ef598f0..3c3be1cd1 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -46,6 +46,12 @@ enum TraceType : char { kBlockTraceDataBlock = 9, kBlockTraceUncompressionDictBlock = 10, kBlockTraceRangeDeletionBlock = 11, + // IO Trace related types based on options that will be added in trace file. + kIOGeneral = 12, + kIOFileName = 13, + kIOFileNameAndFileSize = 14, + kIOLen = 15, + kIOLenAndOffset = 16, // All trace types should be added before kTraceMax kTraceMax, };