Introduce a CountedFileSystem for counting file operations (#9283)

Summary:
Added a CountedFileSystem that tracks a number of file operations (opens, closes, deletes, renames, flushes, syncs, fsyncs, reads, writes).    This class was based on the ReportFileOpEnv from db_bench.

This is a stepping stone PR to be able to change the SpecialEnv into a SpecialFileSystem, where several of the file varieties wish to do operation counting.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9283

Reviewed By: pdillinger

Differential Revision: D33062004

Pulled By: mrambacher

fbshipit-source-id: d0d297a7fb9c48c06cbf685e5fa755c27193b6f5
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 5104c10ffb
commit aae3093719
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 32
      env/env_test.cc
  4. 8
      env/file_system.cc
  5. 1
      src.mk
  6. 221
      tools/db_bench_tool.cc
  7. 355
      utilities/counted_fs.cc
  8. 152
      utilities/counted_fs.h

@ -870,6 +870,7 @@ set(SOURCES
utilities/checkpoint/checkpoint_impl.cc utilities/checkpoint/checkpoint_impl.cc
utilities/compaction_filters.cc utilities/compaction_filters.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/counted_fs.cc
utilities/debug.cc utilities/debug.cc
utilities/env_mirror.cc utilities/env_mirror.cc
utilities/env_timed.cc utilities/env_timed.cc

@ -393,6 +393,7 @@ cpp_library(
"utilities/compaction_filters.cc", "utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc", "utilities/convenience/info_log_finder.cc",
"utilities/counted_fs.cc",
"utilities/debug.cc", "utilities/debug.cc",
"utilities/env_mirror.cc", "utilities/env_mirror.cc",
"utilities/env_timed.cc", "utilities/env_timed.cc",
@ -721,6 +722,7 @@ cpp_library(
"utilities/compaction_filters.cc", "utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc", "utilities/convenience/info_log_finder.cc",
"utilities/counted_fs.cc",
"utilities/debug.cc", "utilities/debug.cc",
"utilities/env_mirror.cc", "utilities/env_mirror.cc",
"utilities/env_timed.cc", "utilities/env_timed.cc",

32
env/env_test.cc vendored

@ -62,6 +62,7 @@
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/counted_fs.h"
#include "utilities/env_timed.h" #include "utilities/env_timed.h"
#include "utilities/fault_injection_env.h" #include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
@ -2582,6 +2583,37 @@ TEST_F(CreateEnvTest, CreateTimedFileSystem) {
ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, &copy)); ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, &copy));
ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch)); ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch));
} }
TEST_F(CreateEnvTest, CreateCountedFileSystem) {
std::shared_ptr<FileSystem> fs, copy;
ASSERT_OK(FileSystem::CreateFromString(config_options_,
CountedFileSystem::kClassName(), &fs));
ASSERT_NE(fs, nullptr);
ASSERT_STREQ(fs->Name(), CountedFileSystem::kClassName());
ASSERT_EQ(fs->Inner(), FileSystem::Default().get());
std::string opts_str = fs->ToString(config_options_);
std::string mismatch;
ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, &copy));
ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch));
ASSERT_OK(FileSystem::CreateFromString(
config_options_,
std::string("id=") + CountedFileSystem::kClassName() +
"; target=" + ReadOnlyFileSystem::kClassName(),
&fs));
ASSERT_NE(fs, nullptr);
opts_str = fs->ToString(config_options_);
ASSERT_STREQ(fs->Name(), CountedFileSystem::kClassName());
ASSERT_NE(fs->Inner(), nullptr);
ASSERT_STREQ(fs->Inner()->Name(), ReadOnlyFileSystem::kClassName());
ASSERT_EQ(fs->Inner()->Inner(), FileSystem::Default().get());
ASSERT_OK(FileSystem::CreateFromString(config_options_, opts_str, &copy));
ASSERT_TRUE(fs->AreEquivalent(config_options_, copy.get(), &mismatch));
}
#ifndef OS_WIN #ifndef OS_WIN
TEST_F(CreateEnvTest, CreateChrootFileSystem) { TEST_F(CreateEnvTest, CreateChrootFileSystem) {
std::shared_ptr<FileSystem> fs, copy; std::shared_ptr<FileSystem> fs, copy;

@ -16,6 +16,7 @@
#include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_type.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/counted_fs.h"
#include "utilities/env_timed.h" #include "utilities/env_timed.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -56,6 +57,13 @@ static int RegisterBuiltinFileSystems(ObjectLibrary& library,
} }
return guard->get(); return guard->get();
}); });
library.AddFactory<FileSystem>(
CountedFileSystem::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<FileSystem>* guard,
std::string* /*errmsg*/) {
guard->reset(new CountedFileSystem(FileSystem::Default()));
return guard->get();
});
library.AddFactory<FileSystem>( library.AddFactory<FileSystem>(
MockFileSystem::kClassName(), MockFileSystem::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<FileSystem>* guard, [](const std::string& /*uri*/, std::unique_ptr<FileSystem>* guard,

@ -245,6 +245,7 @@ LIB_SOURCES = \
utilities/compaction_filters.cc \ utilities/compaction_filters.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \ utilities/convenience/info_log_finder.cc \
utilities/counted_fs.cc \
utilities/debug.cc \ utilities/debug.cc \
utilities/env_mirror.cc \ utilities/env_mirror.cc \
utilities/env_timed.cc \ utilities/env_timed.cc \

@ -84,6 +84,7 @@
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
#include "utilities/blob_db/blob_db.h" #include "utilities/blob_db/blob_db.h"
#include "utilities/counted_fs.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/bytesxor.h" #include "utilities/merge_operators/bytesxor.h"
#include "utilities/merge_operators/sortlist.h" #include "utilities/merge_operators/sortlist.h"
@ -1569,192 +1570,6 @@ static Status CreateMemTableRepFactory(
return s; return s;
} }
struct ReportFileOpCounters {
std::atomic<int> open_counter_;
std::atomic<int> delete_counter_;
std::atomic<int> rename_counter_;
std::atomic<int> flush_counter_;
std::atomic<int> sync_counter_;
std::atomic<int> fsync_counter_;
std::atomic<int> close_counter_;
std::atomic<int> read_counter_;
std::atomic<int> append_counter_;
std::atomic<uint64_t> bytes_read_;
std::atomic<uint64_t> bytes_written_;
};
// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
public:
explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }
const char* Name() const override { return "ReportFileOpEnv"; }
void reset() {
counters_.open_counter_ = 0;
counters_.delete_counter_ = 0;
counters_.rename_counter_ = 0;
counters_.flush_counter_ = 0;
counters_.sync_counter_ = 0;
counters_.fsync_counter_ = 0;
counters_.close_counter_ = 0;
counters_.read_counter_ = 0;
counters_.append_counter_ = 0;
counters_.bytes_read_ = 0;
counters_.bytes_written_ = 0;
}
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public SequentialFile {
private:
std::unique_ptr<SequentialFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(std::unique_ptr<SequentialFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
Status Read(size_t n, Slice* result, char* scratch) override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
std::memory_order_relaxed);
return rv;
}
Status Skip(uint64_t n) override { return target_->Skip(n); }
};
Status s = target()->NewSequentialFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
Status DeleteFile(const std::string& fname) override {
Status s = target()->DeleteFile(fname);
if (s.ok()) {
counters()->delete_counter_.fetch_add(1, std::memory_order_relaxed);
}
return s;
}
Status RenameFile(const std::string& s, const std::string& t) override {
Status st = target()->RenameFile(s, t);
if (st.ok()) {
counters()->rename_counter_.fetch_add(1, std::memory_order_relaxed);
}
return st;
}
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public RandomAccessFile {
private:
std::unique_ptr<RandomAccessFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(std::unique_ptr<RandomAccessFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(offset, n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
std::memory_order_relaxed);
return rv;
}
};
Status s = target()->NewRandomAccessFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public WritableFile {
private:
std::unique_ptr<WritableFile> target_;
ReportFileOpCounters* counters_;
public:
CountingFile(std::unique_ptr<WritableFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
Status Append(const Slice& data) override {
counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Append(data);
counters_->bytes_written_.fetch_add(data.size(),
std::memory_order_relaxed);
return rv;
}
Status Append(
const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status Truncate(uint64_t size) override {
return target_->Truncate(size);
}
Status Close() override {
Status s = target_->Close();
if (s.ok()) {
counters_->close_counter_.fetch_add(1, std::memory_order_relaxed);
}
return s;
}
Status Flush() override {
Status s = target_->Flush();
if (s.ok()) {
counters_->flush_counter_.fetch_add(1, std::memory_order_relaxed);
}
return s;
}
Status Sync() override {
Status s = target_->Sync();
if (s.ok()) {
counters_->sync_counter_.fetch_add(1, std::memory_order_relaxed);
}
return s;
}
Status Fsync() override {
Status s = target_->Fsync();
if (s.ok()) {
counters_->fsync_counter_.fetch_add(1, std::memory_order_relaxed);
}
return s;
}
};
Status s = target()->NewWritableFile(f, r, soptions);
if (s.ok()) {
counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
r->reset(new CountingFile(std::move(*r), counters()));
}
return s;
}
// getter
ReportFileOpCounters* counters() { return &counters_; }
private:
ReportFileOpCounters counters_;
};
} // namespace } // namespace
enum DistributionType : unsigned char { enum DistributionType : unsigned char {
@ -2410,31 +2225,11 @@ class Stats {
} }
} }
if (FLAGS_report_file_operations) { if (FLAGS_report_file_operations) {
ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env); auto* counted_fs =
ReportFileOpCounters* counters = env->counters(); FLAGS_env->GetFileSystem()->CheckedCast<CountedFileSystem>();
fprintf(stdout, "Num files opened: %d\n", assert(counted_fs);
counters->open_counter_.load(std::memory_order_relaxed)); fprintf(stdout, "%s", counted_fs->PrintCounters().c_str());
fprintf(stdout, "Num files deleted: %d\n", counted_fs->ResetCounters();
counters->delete_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num files renamed: %d\n",
counters->rename_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Flush(): %d\n",
counters->flush_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Sync(): %d\n",
counters->sync_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Fsync(): %d\n",
counters->fsync_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Close(): %d\n",
counters->close_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Read(): %d\n",
counters->read_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num Append(): %d\n",
counters->append_counter_.load(std::memory_order_relaxed));
fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
counters->bytes_read_.load(std::memory_order_relaxed));
fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
counters->bytes_written_.load(std::memory_order_relaxed));
env->reset();
} }
fflush(stdout); fflush(stdout);
} }
@ -3031,7 +2826,9 @@ class Benchmark {
} }
if (report_file_operations_) { if (report_file_operations_) {
FLAGS_env = new ReportFileOpEnv(FLAGS_env); FLAGS_env = new CompositeEnvWrapper(
FLAGS_env,
std::make_shared<CountedFileSystem>(FLAGS_env->GetFileSystem()));
} }
if (FLAGS_prefix_size > FLAGS_key_size) { if (FLAGS_prefix_size > FLAGS_key_size) {

@ -0,0 +1,355 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/counted_fs.h"
#include <sstream>
#include "rocksdb/file_system.h"
#include "rocksdb/utilities/options_type.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class CountedSequentialFile : public FSSequentialFileOwnerWrapper {
private:
CountedFileSystem* fs_;
public:
CountedSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
CountedFileSystem* fs)
: FSSequentialFileOwnerWrapper(std::move(f)), fs_(fs) {}
~CountedSequentialFile() override { fs_->counters()->closes++; }
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override {
IOStatus rv = target()->Read(n, options, result, scratch, dbg);
fs_->counters()->reads.RecordOp(rv, result->size());
return rv;
}
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) override {
IOStatus rv =
target()->PositionedRead(offset, n, options, result, scratch, dbg);
fs_->counters()->reads.RecordOp(rv, result->size());
return rv;
}
};
class CountedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
private:
CountedFileSystem* fs_;
public:
CountedRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& f,
CountedFileSystem* fs)
: FSRandomAccessFileOwnerWrapper(std::move(f)), fs_(fs) {}
~CountedRandomAccessFile() override { fs_->counters()->closes++; }
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
IOStatus rv = target()->Read(offset, n, options, result, scratch, dbg);
fs_->counters()->reads.RecordOp(rv, result->size());
return rv;
}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->MultiRead(reqs, num_reqs, options, dbg);
for (size_t r = 0; r < num_reqs; r++) {
fs_->counters()->reads.RecordOp(reqs[r].status, reqs[r].result.size());
}
return rv;
}
};
class CountedWritableFile : public FSWritableFileOwnerWrapper {
private:
CountedFileSystem* fs_;
public:
CountedWritableFile(std::unique_ptr<FSWritableFile>&& f,
CountedFileSystem* fs)
: FSWritableFileOwnerWrapper(std::move(f)), fs_(fs) {}
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override {
IOStatus rv = target()->Append(data, options, dbg);
fs_->counters()->writes.RecordOp(rv, data.size());
return rv;
}
IOStatus Append(const Slice& data, const IOOptions& options,
const DataVerificationInfo& info,
IODebugContext* dbg) override {
IOStatus rv = target()->Append(data, options, info, dbg);
fs_->counters()->writes.RecordOp(rv, data.size());
return rv;
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) override {
IOStatus rv = target()->PositionedAppend(data, offset, options, dbg);
fs_->counters()->writes.RecordOp(rv, data.size());
return rv;
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& options,
const DataVerificationInfo& info,
IODebugContext* dbg) override {
IOStatus rv = target()->PositionedAppend(data, offset, options, info, dbg);
fs_->counters()->writes.RecordOp(rv, data.size());
return rv;
}
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Close(options, dbg);
if (rv.ok()) {
fs_->counters()->closes++;
}
return rv;
}
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Flush(options, dbg);
if (rv.ok()) {
fs_->counters()->flushes++;
}
return rv;
}
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Sync(options, dbg);
if (rv.ok()) {
fs_->counters()->syncs++;
}
return rv;
}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Fsync(options, dbg);
if (rv.ok()) {
fs_->counters()->fsyncs++;
}
return rv;
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
IODebugContext* dbg) override {
IOStatus rv = target()->RangeSync(offset, nbytes, options, dbg);
if (rv.ok()) {
fs_->counters()->syncs++;
}
return rv;
}
};
class CountedRandomRWFile : public FSRandomRWFileOwnerWrapper {
private:
mutable CountedFileSystem* fs_;
public:
CountedRandomRWFile(std::unique_ptr<FSRandomRWFile>&& f,
CountedFileSystem* fs)
: FSRandomRWFileOwnerWrapper(std::move(f)), fs_(fs) {}
IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
IODebugContext* dbg) override {
IOStatus rv = target()->Write(offset, data, options, dbg);
fs_->counters()->writes.RecordOp(rv, data.size());
return rv;
}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
IOStatus rv = target()->Read(offset, n, options, result, scratch, dbg);
fs_->counters()->reads.RecordOp(rv, result->size());
return rv;
}
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Flush(options, dbg);
if (rv.ok()) {
fs_->counters()->flushes++;
}
return rv;
}
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Sync(options, dbg);
if (rv.ok()) {
fs_->counters()->syncs++;
}
return rv;
}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Fsync(options, dbg);
if (rv.ok()) {
fs_->counters()->fsyncs++;
}
return rv;
}
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = target()->Close(options, dbg);
if (rv.ok()) {
fs_->counters()->closes++;
}
return rv;
}
};
class CountedDirectory : public FSDirectoryWrapper {
private:
mutable CountedFileSystem* fs_;
public:
CountedDirectory(std::unique_ptr<FSDirectory>&& f, CountedFileSystem* fs)
: FSDirectoryWrapper(std::move(f)), fs_(fs) {}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
IOStatus rv = FSDirectoryWrapper::Fsync(options, dbg);
if (rv.ok()) {
fs_->counters()->dsyncs++;
}
return rv;
}
IOStatus FsyncWithDirOptions(const IOOptions& options, IODebugContext* dbg,
const DirFsyncOptions& dir_options) override {
IOStatus rv =
FSDirectoryWrapper::FsyncWithDirOptions(options, dbg, dir_options);
if (rv.ok()) {
fs_->counters()->dsyncs++;
}
return rv;
}
};
} // anonymous namespace
std::string FileOpCounters::PrintCounters() const {
std::stringstream ss;
ss << "Num files opened: " << opens.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num files deleted: " << deletes.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num files renamed: " << renames.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num Flush(): " << flushes.load(std::memory_order_relaxed) << std::endl;
ss << "Num Sync(): " << syncs.load(std::memory_order_relaxed) << std::endl;
ss << "Num Fsync(): " << fsyncs.load(std::memory_order_relaxed) << std::endl;
ss << "Num Dir Fsync(): " << dsyncs.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num Close(): " << closes.load(std::memory_order_relaxed) << std::endl;
ss << "Num Read(): " << reads.ops.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num Append(): " << writes.ops.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num bytes read: " << reads.bytes.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num bytes written: " << writes.bytes.load(std::memory_order_relaxed)
<< std::endl;
return ss.str();
}
CountedFileSystem::CountedFileSystem(const std::shared_ptr<FileSystem>& base)
: FileSystemWrapper(base) {}
IOStatus CountedFileSystem::NewSequentialFile(
const std::string& f, const FileOptions& options,
std::unique_ptr<FSSequentialFile>* r, IODebugContext* dbg) {
std::unique_ptr<FSSequentialFile> base;
IOStatus s = target()->NewSequentialFile(f, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
r->reset(new CountedSequentialFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::NewRandomAccessFile(
const std::string& f, const FileOptions& options,
std::unique_ptr<FSRandomAccessFile>* r, IODebugContext* dbg) {
std::unique_ptr<FSRandomAccessFile> base;
IOStatus s = target()->NewRandomAccessFile(f, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
r->reset(new CountedRandomAccessFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::NewWritableFile(const std::string& f,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* dbg) {
std::unique_ptr<FSWritableFile> base;
IOStatus s = target()->NewWritableFile(f, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
r->reset(new CountedWritableFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::ReopenWritableFile(
const std::string& fname, const FileOptions& options,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
std::unique_ptr<FSWritableFile> base;
IOStatus s = target()->ReopenWritableFile(fname, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
result->reset(new CountedWritableFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::ReuseWritableFile(
const std::string& fname, const std::string& old_fname,
const FileOptions& options, std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) {
std::unique_ptr<FSWritableFile> base;
IOStatus s =
target()->ReuseWritableFile(fname, old_fname, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
result->reset(new CountedWritableFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::NewRandomRWFile(
const std::string& name, const FileOptions& options,
std::unique_ptr<FSRandomRWFile>* result, IODebugContext* dbg) {
std::unique_ptr<FSRandomRWFile> base;
IOStatus s = target()->NewRandomRWFile(name, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
result->reset(new CountedRandomRWFile(std::move(base), this));
}
return s;
}
IOStatus CountedFileSystem::NewDirectory(const std::string& name,
const IOOptions& options,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) {
std::unique_ptr<FSDirectory> base;
IOStatus s = target()->NewDirectory(name, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
result->reset(new CountedDirectory(std::move(base), this));
}
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,152 @@
// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <atomic>
#include <memory>
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class Logger;
struct OpCounter {
std::atomic<int> ops;
std::atomic<uint64_t> bytes;
OpCounter() : ops(0), bytes(0) {}
void Reset() {
ops = 0;
bytes = 0;
}
void RecordOp(const IOStatus& io_s, size_t added_bytes) {
if (!io_s.IsNotSupported()) {
ops.fetch_add(1, std::memory_order_relaxed);
}
if (io_s.ok()) {
bytes.fetch_add(added_bytes, std::memory_order_relaxed);
}
}
};
struct FileOpCounters {
static const char* kName() { return "FileOpCounters"; }
std::atomic<int> opens;
std::atomic<int> closes;
std::atomic<int> deletes;
std::atomic<int> renames;
std::atomic<int> flushes;
std::atomic<int> syncs;
std::atomic<int> dsyncs;
std::atomic<int> fsyncs;
OpCounter reads;
OpCounter writes;
FileOpCounters()
: opens(0),
closes(0),
deletes(0),
renames(0),
flushes(0),
syncs(0),
dsyncs(0),
fsyncs(0) {}
void Reset() {
opens = 0;
closes = 0;
deletes = 0;
renames = 0;
flushes = 0;
syncs = 0;
dsyncs = 0;
fsyncs = 0;
reads.Reset();
writes.Reset();
}
std::string PrintCounters() const;
};
// A FileSystem class that counts operations (reads, writes, opens, closes, etc)
class CountedFileSystem : public FileSystemWrapper {
public:
private:
FileOpCounters counters_;
public:
explicit CountedFileSystem(const std::shared_ptr<FileSystem>& base);
static const char* kClassName() { return "CountedFileSystem"; }
const char* Name() const override { return kClassName(); }
IOStatus NewSequentialFile(const std::string& f, const FileOptions& options,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* dbg) override;
IOStatus NewRandomAccessFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* r,
IODebugContext* dbg) override;
IOStatus NewWritableFile(const std::string& f, const FileOptions& options,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* dbg) override;
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus NewRandomRWFile(const std::string& name, const FileOptions& options,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* dbg) override;
IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) override;
IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override {
IOStatus s = target()->DeleteFile(fname, options, dbg);
if (s.ok()) {
counters_.deletes++;
}
return s;
}
IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& options, IODebugContext* dbg) override {
IOStatus st = target()->RenameFile(s, t, options, dbg);
if (st.ok()) {
counters_.renames++;
}
return st;
}
const FileOpCounters* counters() const { return &counters_; }
FileOpCounters* counters() { return &counters_; }
const void* GetOptionsPtr(const std::string& name) const override {
if (name == FileOpCounters::kName()) {
return counters();
} else {
return FileSystemWrapper::GetOptionsPtr(name);
}
}
// Prints the counters to a string
std::string PrintCounters() const { return counters_.PrintCounters(); }
void ResetCounters() { counters_.Reset(); }
};
} // namespace ROCKSDB_NAMESPACE
Loading…
Cancel
Save