diff --git a/HISTORY.md b/HISTORY.md index 7f8efa535..0a0b60891 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ### New Features * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. +* Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. diff --git a/db/db_bench.cc b/db/db_bench.cc index 0de99c974..a50b66bed 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -32,6 +32,12 @@ int main() { #include #include #include + +#include +#include +#include +#include + #include "db/db_impl.h" #include "db/version_set.h" #include "rocksdb/options.h" @@ -522,6 +528,14 @@ DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This " DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when" " this is greater than 0."); +DEFINE_int64(report_interval_seconds, 0, + "If greater than zero, it will write simple stats in CVS format " + "to --report_file every N seconds"); + +DEFINE_string(report_file, "report.csv", + "Filename where some simple stats are reported to (if " + "--report_interval_seconds is bigger than 0)"); + DEFINE_int32(thread_status_per_interval, 0, "Takes and report a snapshot of the current status of each thread" " when this is greater than 0."); @@ -950,6 +964,96 @@ struct DBWithColumnFamilies { } }; +// a class that reports stats to CSV file +class ReporterAgent { + public: + ReporterAgent(Env* env, const std::string& fname, + uint64_t report_interval_secs) + : env_(env), + total_ops_done_(0), + last_report_(0), + report_interval_secs_(report_interval_secs), + stop_(false) { + auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions()); + if (s.ok()) { + s = report_file_->Append(Header() + "\n"); + } + if (s.ok()) { + s = report_file_->Flush(); + } + if (!s.ok()) { + fprintf(stderr, "Can't open %s: %s\n", fname.c_str(), + s.ToString().c_str()); + abort(); + } + + reporting_thread_ = std::thread([&]() { SleepAndReport(); }); + } + + ~ReporterAgent() { + { + std::unique_lock lk(mutex_); + stop_ = true; + stop_cv_.notify_all(); + } + reporting_thread_.join(); + } + + // thread safe + void ReportFinishedOps(int64_t num_ops) { + total_ops_done_.fetch_add(num_ops); + } + + private: + std::string Header() const { return "secs_elapsed,interval_qps"; } + void SleepAndReport() { + uint64_t kMicrosInSecond = 1000 * 1000; + auto time_started = env_->NowMicros(); + while (true) { + { + std::unique_lock lk(mutex_); + if (stop_ || + stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_), + [&]() { return stop_; })) { + // stopping + break; + } + // else -> timeout, which means time for a report! + } + auto total_ops_done_snapshot = total_ops_done_.load(); + // round the seconds elapsed + auto secs_elapsed = + (env_->NowMicros() - time_started + kMicrosInSecond / 2) / + kMicrosInSecond; + std::string report = ToString(secs_elapsed) + "," + + ToString(total_ops_done_snapshot - last_report_) + + "\n"; + auto s = report_file_->Append(report); + if (s.ok()) { + s = report_file_->Flush(); + } + if (!s.ok()) { + fprintf(stderr, + "Can't write to report file (%s), stopping the reporting\n", + s.ToString().c_str()); + break; + } + last_report_ = total_ops_done_snapshot; + } + } + + Env* env_; + std::unique_ptr report_file_; + std::atomic total_ops_done_; + int64_t last_report_; + const uint64_t report_interval_secs_; + std::thread reporting_thread_; + std::mutex mutex_; + // will notify on stop + std::condition_variable stop_cv_; + bool stop_; +}; + class Stats { private: int id_; @@ -965,10 +1069,15 @@ class Stats { HistogramImpl hist_; std::string message_; bool exclude_from_merge_; + ReporterAgent* reporter_agent_; // does not own public: Stats() { Start(-1); } + void SetReporterAgent(ReporterAgent* reporter_agent) { + reporter_agent_ = reporter_agent; + } + void Start(int id) { id_ = id; next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100; @@ -1044,6 +1153,9 @@ class Stats { } void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) { + if (reporter_agent_) { + reporter_agent_->ReportFinishedOps(num_ops); + } if (FLAGS_histogram) { double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; @@ -1839,6 +1951,12 @@ class Benchmark { shared.num_done = 0; shared.start = false; + std::unique_ptr reporter_agent; + if (FLAGS_report_interval_seconds > 0) { + reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file, + FLAGS_report_interval_seconds)); + } + ThreadArg* arg = new ThreadArg[n]; for (int i = 0; i < n; i++) { @@ -1863,6 +1981,7 @@ class Benchmark { arg[i].method = method; arg[i].shared = &shared; arg[i].thread = new ThreadState(i); + arg[i].thread->stats.SetReporterAgent(reporter_agent.get()); arg[i].thread->shared = &shared; FLAGS_env->StartThread(ThreadBody, &arg[i]); }