diff --git a/db/db_bench.cc b/db/db_bench.cc index fcc930e67..79572e875 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -575,6 +575,8 @@ DEFINE_string(merge_operator, "", "The merge operator to use with the database." DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try " "linear search first for this many steps from the previous " "position"); +DEFINE_bool(report_file_operations, false, "if report number of file " + "operations"); static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -606,6 +608,131 @@ static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) = namespace rocksdb { +namespace { +struct ReportFileOpCounters { + std::atomic open_counter_; + std::atomic read_counter_; + std::atomic append_counter_; + std::atomic bytes_read_; + std::atomic bytes_written_; +}; + +// A special Env to records and report file operations in db_bench +class ReportFileOpEnv : public EnvWrapper { + public: + explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); } + + void reset() { + counters_.open_counter_ = 0; + counters_.read_counter_ = 0; + counters_.append_counter_ = 0; + counters_.bytes_read_ = 0; + counters_.bytes_written_ = 0; + } + + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { + class CountingFile : public SequentialFile { + private: + unique_ptr target_; + ReportFileOpCounters* counters_; + + public: + CountingFile(unique_ptr&& target, + ReportFileOpCounters* counters) + : target_(std::move(target)), counters_(counters) {} + + virtual Status Read(size_t n, Slice* result, char* scratch) { + counters_->read_counter_.fetch_add(1, std::memory_order_relaxed); + Status rv = target_->Read(n, result, scratch); + counters_->bytes_read_.fetch_add(result->size(), + std::memory_order_relaxed); + return rv; + } + + virtual Status Skip(uint64_t n) { return target_->Skip(n); } + }; + + Status s = target()->NewSequentialFile(f, r, soptions); + if (s.ok()) { + counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); + r->reset(new CountingFile(std::move(*r), counters())); + } + return s; + } + + Status NewRandomAccessFile(const std::string& f, + unique_ptr* r, + const EnvOptions& soptions) { + class CountingFile : public RandomAccessFile { + private: + unique_ptr target_; + ReportFileOpCounters* counters_; + + public: + CountingFile(unique_ptr&& target, + ReportFileOpCounters* counters) + : target_(std::move(target)), counters_(counters) {} + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + counters_->read_counter_.fetch_add(1, std::memory_order_relaxed); + Status rv = target_->Read(offset, n, result, scratch); + counters_->bytes_read_.fetch_add(result->size(), + std::memory_order_relaxed); + return rv; + } + }; + + Status s = target()->NewRandomAccessFile(f, r, soptions); + if (s.ok()) { + counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); + r->reset(new CountingFile(std::move(*r), counters())); + } + return s; + } + + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { + class CountingFile : public WritableFile { + private: + unique_ptr target_; + ReportFileOpCounters* counters_; + + public: + CountingFile(unique_ptr&& target, + ReportFileOpCounters* counters) + : target_(std::move(target)), counters_(counters) {} + + Status Append(const Slice& data) { + counters_->append_counter_.fetch_add(1, std::memory_order_relaxed); + Status rv = target_->Append(data); + counters_->bytes_written_.fetch_add(data.size(), + std::memory_order_relaxed); + return rv; + } + + Status Close() { return target_->Close(); } + Status Flush() { return target_->Flush(); } + Status Sync() { return target_->Sync(); } + }; + + Status s = target()->NewWritableFile(f, r, soptions); + if (s.ok()) { + counters()->open_counter_.fetch_add(1, std::memory_order_relaxed); + r->reset(new CountingFile(std::move(*r), counters())); + } + return s; + } + + // getter + ReportFileOpCounters* counters() { return &counters_; } + + private: + ReportFileOpCounters counters_; +}; + +} // namespace + // Helper for quickly generating random data. class RandomGenerator { private: @@ -810,6 +937,21 @@ class Stats { if (FLAGS_histogram) { fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); } + if (FLAGS_report_file_operations) { + ReportFileOpEnv* env = static_cast(FLAGS_env); + ReportFileOpCounters* counters = env->counters(); + fprintf(stdout, "Num files opened: %d\n", + counters->open_counter_.load(std::memory_order_relaxed)); + fprintf(stdout, "Num Read(): %d\n", + counters->read_counter_.load(std::memory_order_relaxed)); + fprintf(stdout, "Num Append(): %d\n", + counters->append_counter_.load(std::memory_order_relaxed)); + fprintf(stdout, "Num bytes read: %" PRIu64 "\n", + counters->bytes_read_.load(std::memory_order_relaxed)); + fprintf(stdout, "Num bytes written: %" PRIu64 "\n", + counters->bytes_written_.load(std::memory_order_relaxed)); + env->reset(); + } fflush(stdout); } }; @@ -899,6 +1041,7 @@ class Benchmark { int64_t writes_; int64_t readwrites_; int64_t merge_keys_; + bool report_file_operations_; bool SanityCheck() { if (FLAGS_compression_ratio > 1) { @@ -1118,7 +1261,18 @@ class Benchmark { readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads) ), - merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys) { + merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), + report_file_operations_(FLAGS_report_file_operations) { + if (report_file_operations_) { + if (!FLAGS_hdfs.empty()) { + fprintf(stderr, + "--hdfs and --report_file_operations cannot be enabled " + "at the same time"); + exit(1); + } + FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default()); + } + if (FLAGS_prefix_size > FLAGS_key_size) { fprintf(stderr, "prefix size is larger than key size"); exit(1);