diff --git a/CMakeLists.txt b/CMakeLists.txt index 43e1480b5..b1f4a3cc7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -870,6 +870,7 @@ set(SOURCES utilities/checkpoint/checkpoint_impl.cc utilities/compaction_filters.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc + utilities/counted_fs.cc utilities/debug.cc utilities/env_mirror.cc utilities/env_timed.cc diff --git a/TARGETS b/TARGETS index be6ef9f25..98d53beb6 100644 --- a/TARGETS +++ b/TARGETS @@ -393,6 +393,7 @@ cpp_library( "utilities/compaction_filters.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", + "utilities/counted_fs.cc", "utilities/debug.cc", "utilities/env_mirror.cc", "utilities/env_timed.cc", @@ -721,6 +722,7 @@ cpp_library( "utilities/compaction_filters.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", + "utilities/counted_fs.cc", "utilities/debug.cc", "utilities/env_mirror.cc", "utilities/env_timed.cc", diff --git a/env/env_test.cc b/env/env_test.cc index 7333df580..081fb225d 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -62,6 +62,7 @@ #include "util/mutexlock.h" #include "util/random.h" #include "util/string_util.h" +#include "utilities/counted_fs.h" #include "utilities/env_timed.h" #include "utilities/fault_injection_env.h" #include "utilities/fault_injection_fs.h" @@ -2582,6 +2583,37 @@ TEST_F(CreateEnvTest, CreateTimedFileSystem) { ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, ©)); ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch)); } + +TEST_F(CreateEnvTest, CreateCountedFileSystem) { + std::shared_ptr fs, copy; + + ASSERT_OK(FileSystem::CreateFromString(config_options_, + CountedFileSystem::kClassName(), &fs)); + ASSERT_NE(fs, nullptr); + ASSERT_STREQ(fs->Name(), CountedFileSystem::kClassName()); + ASSERT_EQ(fs->Inner(), FileSystem::Default().get()); + + std::string opts_str = fs->ToString(config_options_); + std::string mismatch; + + ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, ©)); + ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch)); + + ASSERT_OK(FileSystem::CreateFromString( + config_options_, + std::string("id=") + CountedFileSystem::kClassName() + + "; target=" + ReadOnlyFileSystem::kClassName(), + &fs)); + ASSERT_NE(fs, nullptr); + opts_str = fs->ToString(config_options_); + ASSERT_STREQ(fs->Name(), CountedFileSystem::kClassName()); + ASSERT_NE(fs->Inner(), nullptr); + ASSERT_STREQ(fs->Inner()->Name(), ReadOnlyFileSystem::kClassName()); + ASSERT_EQ(fs->Inner()->Inner(), FileSystem::Default().get()); + ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, ©)); + ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch)); +} + #ifndef OS_WIN TEST_F(CreateEnvTest, CreateChrootFileSystem) { std::shared_ptr fs, copy; diff --git a/env/file_system.cc b/env/file_system.cc index 618b6915d..e1cb19c01 100644 --- a/env/file_system.cc +++ b/env/file_system.cc @@ -16,6 +16,7 @@ #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/options_type.h" #include "util/string_util.h" +#include "utilities/counted_fs.h" #include "utilities/env_timed.h" namespace ROCKSDB_NAMESPACE { @@ -56,6 +57,13 @@ static int RegisterBuiltinFileSystems(ObjectLibrary& library, } return guard->get(); }); + library.AddFactory( + CountedFileSystem::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new CountedFileSystem(FileSystem::Default())); + return guard->get(); + }); library.AddFactory( MockFileSystem::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, diff --git a/src.mk b/src.mk index a0c0191b6..6c6dee439 100644 --- a/src.mk +++ b/src.mk @@ -245,6 +245,7 @@ LIB_SOURCES = \ utilities/compaction_filters.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ + utilities/counted_fs.cc \ utilities/debug.cc \ utilities/env_mirror.cc \ utilities/env_timed.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 704d3814d..d2caf6082 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -84,6 +84,7 @@ #include "util/string_util.h" #include "util/xxhash.h" #include "utilities/blob_db/blob_db.h" +#include "utilities/counted_fs.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/bytesxor.h" #include "utilities/merge_operators/sortlist.h" @@ -1569,192 +1570,6 @@ static Status CreateMemTableRepFactory( return s; } -struct ReportFileOpCounters { - std::atomic open_counter_; - std::atomic delete_counter_; - std::atomic rename_counter_; - std::atomic flush_counter_; - std::atomic sync_counter_; - std::atomic fsync_counter_; - std::atomic close_counter_; - std::atomic read_counter_; - std::atomic append_counter_; - std::atomic bytes_read_; - std::atomic bytes_written_; -}; - -// A special Env to records and report file operations in db_bench -class ReportFileOpEnv : public EnvWrapper { - public: - explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); } - const char* Name() const override { return "ReportFileOpEnv"; } - - void reset() { - counters_.open_counter_ = 0; - counters_.delete_counter_ = 0; - counters_.rename_counter_ = 0; - counters_.flush_counter_ = 0; - counters_.sync_counter_ = 0; - counters_.fsync_counter_ = 0; - counters_.close_counter_ = 0; - counters_.read_counter_ = 0; - counters_.append_counter_ = 0; - counters_.bytes_read_ = 0; - counters_.bytes_written_ = 0; - } - - Status NewSequentialFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& soptions) override { - class CountingFile : public SequentialFile { - private: - std::unique_ptr target_; - ReportFileOpCounters* counters_; - - public: - CountingFile(std::unique_ptr&& target, - ReportFileOpCounters* counters) - : target_(std::move(target)), counters_(counters) {} - - Status Read(size_t n, Slice* result, char* scratch) override { - counters_->read_counter_.fetch_add(1, std::memory_order_relaxed); - Status rv = target_->Read(n, result, scratch); - counters_->bytes_read_.fetch_add(result->size(), - std::memory_order_relaxed); - return rv; - } - - Status Skip(uint64_t n) override { return target_->Skip(n); } - }; - - Status s = target()->NewSequentialFile(f, r, soptions); - if (s.ok()) { - counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); - r->reset(new CountingFile(std::move(*r), counters())); - } - return s; - } - - Status DeleteFile(const std::string& fname) override { - Status s = target()->DeleteFile(fname); - if (s.ok()) { - counters()->delete_counter_.fetch_add(1, std::memory_order_relaxed); - } - return s; - } - - Status RenameFile(const std::string& s, const std::string& t) override { - Status st = target()->RenameFile(s, t); - if (st.ok()) { - counters()->rename_counter_.fetch_add(1, std::memory_order_relaxed); - } - return st; - } - - Status NewRandomAccessFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& soptions) override { - class CountingFile : public RandomAccessFile { - private: - std::unique_ptr target_; - ReportFileOpCounters* counters_; - - public: - CountingFile(std::unique_ptr&& target, - ReportFileOpCounters* counters) - : target_(std::move(target)), counters_(counters) {} - Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { - counters_->read_counter_.fetch_add(1, std::memory_order_relaxed); - Status rv = target_->Read(offset, n, result, scratch); - counters_->bytes_read_.fetch_add(result->size(), - std::memory_order_relaxed); - return rv; - } - }; - - Status s = target()->NewRandomAccessFile(f, r, soptions); - if (s.ok()) { - counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); - r->reset(new CountingFile(std::move(*r), counters())); - } - return s; - } - - Status NewWritableFile(const std::string& f, std::unique_ptr* r, - const EnvOptions& soptions) override { - class CountingFile : public WritableFile { - private: - std::unique_ptr target_; - ReportFileOpCounters* counters_; - - public: - CountingFile(std::unique_ptr&& target, - ReportFileOpCounters* counters) - : target_(std::move(target)), counters_(counters) {} - - Status Append(const Slice& data) override { - counters_->append_counter_.fetch_add(1, std::memory_order_relaxed); - Status rv = target_->Append(data); - counters_->bytes_written_.fetch_add(data.size(), - std::memory_order_relaxed); - return rv; - } - - Status Append( - const Slice& data, - const DataVerificationInfo& /* verification_info */) override { - return Append(data); - } - - Status Truncate(uint64_t size) override { - return target_->Truncate(size); - } - Status Close() override { - Status s = target_->Close(); - if (s.ok()) { - counters_->close_counter_.fetch_add(1, std::memory_order_relaxed); - } - return s; - } - Status Flush() override { - Status s = target_->Flush(); - if (s.ok()) { - counters_->flush_counter_.fetch_add(1, std::memory_order_relaxed); - } - return s; - } - Status Sync() override { - Status s = target_->Sync(); - if (s.ok()) { - counters_->sync_counter_.fetch_add(1, std::memory_order_relaxed); - } - return s; - } - Status Fsync() override { - Status s = target_->Fsync(); - if (s.ok()) { - counters_->fsync_counter_.fetch_add(1, std::memory_order_relaxed); - } - return s; - } - }; - - Status s = target()->NewWritableFile(f, r, soptions); - if (s.ok()) { - counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); - r->reset(new CountingFile(std::move(*r), counters())); - } - return s; - } - - // getter - ReportFileOpCounters* counters() { return &counters_; } - - private: - ReportFileOpCounters counters_; -}; - } // namespace enum DistributionType : unsigned char { @@ -2410,31 +2225,11 @@ class Stats { } } if (FLAGS_report_file_operations) { - ReportFileOpEnv* env = static_cast(FLAGS_env); - ReportFileOpCounters* counters = env->counters(); - fprintf(stdout, "Num files opened: %d\n", - counters->open_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num files deleted: %d\n", - counters->delete_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num files renamed: %d\n", - counters->rename_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Flush(): %d\n", - counters->flush_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Sync(): %d\n", - counters->sync_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Fsync(): %d\n", - counters->fsync_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Close(): %d\n", - counters->close_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Read(): %d\n", - counters->read_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num Append(): %d\n", - counters->append_counter_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num bytes read: %" PRIu64 "\n", - counters->bytes_read_.load(std::memory_order_relaxed)); - fprintf(stdout, "Num bytes written: %" PRIu64 "\n", - counters->bytes_written_.load(std::memory_order_relaxed)); - env->reset(); + auto* counted_fs = + FLAGS_env->GetFileSystem()->CheckedCast(); + assert(counted_fs); + fprintf(stdout, "%s", counted_fs->PrintCounters().c_str()); + counted_fs->ResetCounters(); } fflush(stdout); } @@ -3031,7 +2826,9 @@ class Benchmark { } if (report_file_operations_) { - FLAGS_env = new ReportFileOpEnv(FLAGS_env); + FLAGS_env = new CompositeEnvWrapper( + FLAGS_env, + std::make_shared(FLAGS_env->GetFileSystem())); } if (FLAGS_prefix_size > FLAGS_key_size) { diff --git a/utilities/counted_fs.cc b/utilities/counted_fs.cc new file mode 100644 index 000000000..f62158c8d --- /dev/null +++ b/utilities/counted_fs.cc @@ -0,0 +1,355 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/counted_fs.h" + +#include + +#include "rocksdb/file_system.h" +#include "rocksdb/utilities/options_type.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +class CountedSequentialFile : public FSSequentialFileOwnerWrapper { + private: + CountedFileSystem* fs_; + + public: + CountedSequentialFile(std::unique_ptr&& f, + CountedFileSystem* fs) + : FSSequentialFileOwnerWrapper(std::move(f)), fs_(fs) {} + + ~CountedSequentialFile() override { fs_->counters()->closes++; } + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override { + IOStatus rv = target()->Read(n, options, result, scratch, dbg); + fs_->counters()->reads.RecordOp(rv, result->size()); + return rv; + } + + IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) override { + IOStatus rv = + target()->PositionedRead(offset, n, options, result, scratch, dbg); + fs_->counters()->reads.RecordOp(rv, result->size()); + return rv; + } +}; + +class CountedRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + private: + CountedFileSystem* fs_; + + public: + CountedRandomAccessFile(std::unique_ptr&& f, + CountedFileSystem* fs) + : FSRandomAccessFileOwnerWrapper(std::move(f)), fs_(fs) {} + + ~CountedRandomAccessFile() override { fs_->counters()->closes++; } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override { + IOStatus rv = target()->Read(offset, n, options, result, scratch, dbg); + fs_->counters()->reads.RecordOp(rv, result->size()); + return rv; + } + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->MultiRead(reqs, num_reqs, options, dbg); + for (size_t r = 0; r < num_reqs; r++) { + fs_->counters()->reads.RecordOp(reqs[r].status, reqs[r].result.size()); + } + return rv; + } +}; + +class CountedWritableFile : public FSWritableFileOwnerWrapper { + private: + CountedFileSystem* fs_; + + public: + CountedWritableFile(std::unique_ptr&& f, + CountedFileSystem* fs) + : FSWritableFileOwnerWrapper(std::move(f)), fs_(fs) {} + + IOStatus Append(const Slice& data, const IOOptions& options, + IODebugContext* dbg) override { + IOStatus rv = target()->Append(data, options, dbg); + fs_->counters()->writes.RecordOp(rv, data.size()); + return rv; + } + + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& info, + IODebugContext* dbg) override { + IOStatus rv = target()->Append(data, options, info, dbg); + fs_->counters()->writes.RecordOp(rv, data.size()); + return rv; + } + + IOStatus PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& options, + IODebugContext* dbg) override { + IOStatus rv = target()->PositionedAppend(data, offset, options, dbg); + fs_->counters()->writes.RecordOp(rv, data.size()); + return rv; + } + + IOStatus PositionedAppend(const Slice& data, uint64_t offset, + const IOOptions& options, + const DataVerificationInfo& info, + IODebugContext* dbg) override { + IOStatus rv = target()->PositionedAppend(data, offset, options, info, dbg); + fs_->counters()->writes.RecordOp(rv, data.size()); + return rv; + } + + IOStatus Close(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Close(options, dbg); + if (rv.ok()) { + fs_->counters()->closes++; + } + return rv; + } + + IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Flush(options, dbg); + if (rv.ok()) { + fs_->counters()->flushes++; + } + return rv; + } + + IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Sync(options, dbg); + if (rv.ok()) { + fs_->counters()->syncs++; + } + return rv; + } + + IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Fsync(options, dbg); + if (rv.ok()) { + fs_->counters()->fsyncs++; + } + return rv; + } + + IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options, + IODebugContext* dbg) override { + IOStatus rv = target()->RangeSync(offset, nbytes, options, dbg); + if (rv.ok()) { + fs_->counters()->syncs++; + } + return rv; + } +}; + +class CountedRandomRWFile : public FSRandomRWFileOwnerWrapper { + private: + mutable CountedFileSystem* fs_; + + public: + CountedRandomRWFile(std::unique_ptr&& f, + CountedFileSystem* fs) + : FSRandomRWFileOwnerWrapper(std::move(f)), fs_(fs) {} + IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options, + IODebugContext* dbg) override { + IOStatus rv = target()->Write(offset, data, options, dbg); + fs_->counters()->writes.RecordOp(rv, data.size()); + return rv; + } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override { + IOStatus rv = target()->Read(offset, n, options, result, scratch, dbg); + fs_->counters()->reads.RecordOp(rv, result->size()); + return rv; + } + + IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Flush(options, dbg); + if (rv.ok()) { + fs_->counters()->flushes++; + } + return rv; + } + + IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Sync(options, dbg); + if (rv.ok()) { + fs_->counters()->syncs++; + } + return rv; + } + + IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Fsync(options, dbg); + if (rv.ok()) { + fs_->counters()->fsyncs++; + } + return rv; + } + + IOStatus Close(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = target()->Close(options, dbg); + if (rv.ok()) { + fs_->counters()->closes++; + } + return rv; + } +}; + +class CountedDirectory : public FSDirectoryWrapper { + private: + mutable CountedFileSystem* fs_; + + public: + CountedDirectory(std::unique_ptr&& f, CountedFileSystem* fs) + : FSDirectoryWrapper(std::move(f)), fs_(fs) {} + + IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { + IOStatus rv = FSDirectoryWrapper::Fsync(options, dbg); + if (rv.ok()) { + fs_->counters()->dsyncs++; + } + return rv; + } + + IOStatus FsyncWithDirOptions(const IOOptions& options, IODebugContext* dbg, + const DirFsyncOptions& dir_options) override { + IOStatus rv = + FSDirectoryWrapper::FsyncWithDirOptions(options, dbg, dir_options); + if (rv.ok()) { + fs_->counters()->dsyncs++; + } + return rv; + } +}; +} // anonymous namespace + +std::string FileOpCounters::PrintCounters() const { + std::stringstream ss; + ss << "Num files opened: " << opens.load(std::memory_order_relaxed) + << std::endl; + ss << "Num files deleted: " << deletes.load(std::memory_order_relaxed) + << std::endl; + ss << "Num files renamed: " << renames.load(std::memory_order_relaxed) + << std::endl; + ss << "Num Flush(): " << flushes.load(std::memory_order_relaxed) << std::endl; + ss << "Num Sync(): " << syncs.load(std::memory_order_relaxed) << std::endl; + ss << "Num Fsync(): " << fsyncs.load(std::memory_order_relaxed) << std::endl; + ss << "Num Dir Fsync(): " << dsyncs.load(std::memory_order_relaxed) + << std::endl; + ss << "Num Close(): " << closes.load(std::memory_order_relaxed) << std::endl; + ss << "Num Read(): " << reads.ops.load(std::memory_order_relaxed) + << std::endl; + ss << "Num Append(): " << writes.ops.load(std::memory_order_relaxed) + << std::endl; + ss << "Num bytes read: " << reads.bytes.load(std::memory_order_relaxed) + << std::endl; + ss << "Num bytes written: " << writes.bytes.load(std::memory_order_relaxed) + << std::endl; + return ss.str(); +} + +CountedFileSystem::CountedFileSystem(const std::shared_ptr& base) + : FileSystemWrapper(base) {} + +IOStatus CountedFileSystem::NewSequentialFile( + const std::string& f, const FileOptions& options, + std::unique_ptr* r, IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->NewSequentialFile(f, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + r->reset(new CountedSequentialFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::NewRandomAccessFile( + const std::string& f, const FileOptions& options, + std::unique_ptr* r, IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->NewRandomAccessFile(f, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + r->reset(new CountedRandomAccessFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::NewWritableFile(const std::string& f, + const FileOptions& options, + std::unique_ptr* r, + IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->NewWritableFile(f, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + r->reset(new CountedWritableFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::ReopenWritableFile( + const std::string& fname, const FileOptions& options, + std::unique_ptr* result, IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->ReopenWritableFile(fname, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + result->reset(new CountedWritableFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::ReuseWritableFile( + const std::string& fname, const std::string& old_fname, + const FileOptions& options, std::unique_ptr* result, + IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = + target()->ReuseWritableFile(fname, old_fname, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + result->reset(new CountedWritableFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::NewRandomRWFile( + const std::string& name, const FileOptions& options, + std::unique_ptr* result, IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->NewRandomRWFile(name, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + result->reset(new CountedRandomRWFile(std::move(base), this)); + } + return s; +} + +IOStatus CountedFileSystem::NewDirectory(const std::string& name, + const IOOptions& options, + std::unique_ptr* result, + IODebugContext* dbg) { + std::unique_ptr base; + IOStatus s = target()->NewDirectory(name, options, &base, dbg); + if (s.ok()) { + counters_.opens++; + result->reset(new CountedDirectory(std::move(base), this)); + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/counted_fs.h b/utilities/counted_fs.h new file mode 100644 index 000000000..f05a37b8d --- /dev/null +++ b/utilities/counted_fs.h @@ -0,0 +1,152 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/file_system.h" +#include "rocksdb/io_status.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { +class Logger; + +struct OpCounter { + std::atomic ops; + std::atomic bytes; + + OpCounter() : ops(0), bytes(0) {} + + void Reset() { + ops = 0; + bytes = 0; + } + void RecordOp(const IOStatus& io_s, size_t added_bytes) { + if (!io_s.IsNotSupported()) { + ops.fetch_add(1, std::memory_order_relaxed); + } + if (io_s.ok()) { + bytes.fetch_add(added_bytes, std::memory_order_relaxed); + } + } +}; + +struct FileOpCounters { + static const char* kName() { return "FileOpCounters"; } + + std::atomic opens; + std::atomic closes; + std::atomic deletes; + std::atomic renames; + std::atomic flushes; + std::atomic syncs; + std::atomic dsyncs; + std::atomic fsyncs; + OpCounter reads; + OpCounter writes; + + FileOpCounters() + : opens(0), + closes(0), + deletes(0), + renames(0), + flushes(0), + syncs(0), + dsyncs(0), + fsyncs(0) {} + + void Reset() { + opens = 0; + closes = 0; + deletes = 0; + renames = 0; + flushes = 0; + syncs = 0; + dsyncs = 0; + fsyncs = 0; + reads.Reset(); + writes.Reset(); + } + std::string PrintCounters() const; +}; + +// A FileSystem class that counts operations (reads, writes, opens, closes, etc) +class CountedFileSystem : public FileSystemWrapper { + public: + private: + FileOpCounters counters_; + + public: + explicit CountedFileSystem(const std::shared_ptr& base); + static const char* kClassName() { return "CountedFileSystem"; } + const char* Name() const override { return kClassName(); } + + IOStatus NewSequentialFile(const std::string& f, const FileOptions& options, + std::unique_ptr* r, + IODebugContext* dbg) override; + + IOStatus NewRandomAccessFile(const std::string& f, + const FileOptions& file_opts, + std::unique_ptr* r, + IODebugContext* dbg) override; + + IOStatus NewWritableFile(const std::string& f, const FileOptions& options, + std::unique_ptr* r, + IODebugContext* dbg) override; + IOStatus ReopenWritableFile(const std::string& fname, + const FileOptions& options, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + IOStatus NewRandomRWFile(const std::string& name, const FileOptions& options, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus DeleteFile(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override { + IOStatus s = target()->DeleteFile(fname, options, dbg); + if (s.ok()) { + counters_.deletes++; + } + return s; + } + + IOStatus RenameFile(const std::string& s, const std::string& t, + const IOOptions& options, IODebugContext* dbg) override { + IOStatus st = target()->RenameFile(s, t, options, dbg); + if (st.ok()) { + counters_.renames++; + } + return st; + } + + const FileOpCounters* counters() const { return &counters_; } + + FileOpCounters* counters() { return &counters_; } + + const void* GetOptionsPtr(const std::string& name) const override { + if (name == FileOpCounters::kName()) { + return counters(); + } else { + return FileSystemWrapper::GetOptionsPtr(name); + } + } + + // Prints the counters to a string + std::string PrintCounters() const { return counters_.PrintCounters(); } + void ResetCounters() { counters_.Reset(); } +}; +} // namespace ROCKSDB_NAMESPACE