From d59d90bb1f86934f0effc0b92c3d062eb41cdcb6 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 12 Jun 2015 14:31:53 -0700 Subject: [PATCH] db_bench periodically writes QPS to CSV file Summary: This is part of an effort to better understand and optimize RocksDB stalls under high load. I added a feature to db_bench to periodically write QPS to CSV files. That way we can nicely see how our QPS changes in time (especially when DB is stalled) and can do a better job of evaluating our stall system (i.e. we want the QPS to be as constant as possible, as opposed to having bunch of stalls) Cool part of CSV files is that we can easily graph them -- there are a bunch of tools available. Test Plan: Ran ./db_bench --report_interval_seconds=10 --benchmarks=fillrandom --num=10000000 and observed this in report.csv: secs_elapsed,interval_qps 10,2725860 20,1980480 30,1863456 40,1454359 50,1460389 Reviewers: sdong, MarkCallaghan, rven, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40047 --- HISTORY.md | 1 + db/db_bench.cc | 119 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 7f8efa535..0a0b60891 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ### New Features * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. +* Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. diff --git a/db/db_bench.cc b/db/db_bench.cc index 0de99c974..a50b66bed 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -32,6 +32,12 @@ int main() { #include #include #include + +#include +#include +#include +#include + #include "db/db_impl.h" #include "db/version_set.h" #include "rocksdb/options.h" @@ -522,6 +528,14 @@ DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This " DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when" " this is greater than 0."); +DEFINE_int64(report_interval_seconds, 0, + "If greater than zero, it will write simple stats in CVS format " + "to --report_file every N seconds"); + +DEFINE_string(report_file, "report.csv", + "Filename where some simple stats are reported to (if " + "--report_interval_seconds is bigger than 0)"); + DEFINE_int32(thread_status_per_interval, 0, "Takes and report a snapshot of the current status of each thread" " when this is greater than 0."); @@ -950,6 +964,96 @@ struct DBWithColumnFamilies { } }; +// a class that reports stats to CSV file +class ReporterAgent { + public: + ReporterAgent(Env* env, const std::string& fname, + uint64_t report_interval_secs) + : env_(env), + total_ops_done_(0), + last_report_(0), + report_interval_secs_(report_interval_secs), + stop_(false) { + auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions()); + if (s.ok()) { + s = report_file_->Append(Header() + "\n"); + } + if (s.ok()) { + s = report_file_->Flush(); + } + if (!s.ok()) { + fprintf(stderr, "Can't open %s: %s\n", fname.c_str(), + s.ToString().c_str()); + abort(); + } + + reporting_thread_ = std::thread([&]() { SleepAndReport(); }); + } + + ~ReporterAgent() { + { + std::unique_lock lk(mutex_); + stop_ = true; + stop_cv_.notify_all(); + } + reporting_thread_.join(); + } + + // thread safe + void ReportFinishedOps(int64_t num_ops) { + total_ops_done_.fetch_add(num_ops); + } + + private: + std::string Header() const { return "secs_elapsed,interval_qps"; } + void SleepAndReport() { + uint64_t kMicrosInSecond = 1000 * 1000; + auto time_started = env_->NowMicros(); + while (true) { + { + std::unique_lock lk(mutex_); + if (stop_ || + stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_), + [&]() { return stop_; })) { + // stopping + break; + } + // else -> timeout, which means time for a report! + } + auto total_ops_done_snapshot = total_ops_done_.load(); + // round the seconds elapsed + auto secs_elapsed = + (env_->NowMicros() - time_started + kMicrosInSecond / 2) / + kMicrosInSecond; + std::string report = ToString(secs_elapsed) + "," + + ToString(total_ops_done_snapshot - last_report_) + + "\n"; + auto s = report_file_->Append(report); + if (s.ok()) { + s = report_file_->Flush(); + } + if (!s.ok()) { + fprintf(stderr, + "Can't write to report file (%s), stopping the reporting\n", + s.ToString().c_str()); + break; + } + last_report_ = total_ops_done_snapshot; + } + } + + Env* env_; + std::unique_ptr report_file_; + std::atomic total_ops_done_; + int64_t last_report_; + const uint64_t report_interval_secs_; + std::thread reporting_thread_; + std::mutex mutex_; + // will notify on stop + std::condition_variable stop_cv_; + bool stop_; +}; + class Stats { private: int id_; @@ -965,10 +1069,15 @@ class Stats { HistogramImpl hist_; std::string message_; bool exclude_from_merge_; + ReporterAgent* reporter_agent_; // does not own public: Stats() { Start(-1); } + void SetReporterAgent(ReporterAgent* reporter_agent) { + reporter_agent_ = reporter_agent; + } + void Start(int id) { id_ = id; next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100; @@ -1044,6 +1153,9 @@ class Stats { } void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) { + if (reporter_agent_) { + reporter_agent_->ReportFinishedOps(num_ops); + } if (FLAGS_histogram) { double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; @@ -1839,6 +1951,12 @@ class Benchmark { shared.num_done = 0; shared.start = false; + std::unique_ptr reporter_agent; + if (FLAGS_report_interval_seconds > 0) { + reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file, + FLAGS_report_interval_seconds)); + } + ThreadArg* arg = new ThreadArg[n]; for (int i = 0; i < n; i++) { @@ -1863,6 +1981,7 @@ class Benchmark { arg[i].method = method; arg[i].shared = &shared; arg[i].thread = new ThreadState(i); + arg[i].thread->stats.SetReporterAgent(reporter_agent.get()); arg[i].thread->shared = &shared; FLAGS_env->StartThread(ThreadBody, &arg[i]); }