From 32a6dd9a4198023b641864f0b60913af8dbd8aab Mon Sep 17 00:00:00 2001 From: Alexander Zinoviev Date: Tue, 29 Jan 2019 16:23:21 -0800 Subject: [PATCH] Add a new CPU time counter to compaction report (#4889) Summary: Measure CPU time consumed for a compaction and report it in the stats report Enable NowCPUNanos() to work for MacOS Pull Request resolved: https://github.com/facebook/rocksdb/pull/4889 Differential Revision: D13701276 Pulled By: zinoale fbshipit-source-id: 5024e5bbccd4dd10fd90d947870237f436445055 --- db/builder.cc | 6 ++--- db/compaction_job.cc | 27 ++++++++++++++++--- db/db_impl_open.cc | 4 +-- db/db_impl_write.cc | 2 +- db/db_test2.cc | 2 +- db/flush_job.cc | 10 +++++++ db/internal_stats.cc | 15 +++++++---- db/internal_stats.h | 12 +++++++-- db/merge_helper.cc | 4 +-- db/version_set.cc | 4 +-- env/env_posix.cc | 3 ++- include/rocksdb/compaction_job_stats.h | 3 +++ include/rocksdb/iostats_context.h | 4 +++ include/rocksdb/statistics.h | 1 + monitoring/iostats_context_imp.h | 7 +++++ monitoring/statistics.cc | 1 + table/sst_file_writer.cc | 6 ++--- tools/db_bench_tool.cc | 8 +++--- util/compaction_job_stats_impl.cc | 2 ++ util/compression.h | 8 ++---- util/file_reader_writer.cc | 25 ++++++++++++++--- util/file_reader_writer.h | 5 +++- .../transactions/write_prepared_txn_db.h | 11 ++++---- 23 files changed, 125 insertions(+), 45 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index b13b68aeb..06eea2405 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -121,9 +121,9 @@ Status BuildTable( file->SetIOPriority(io_priority); file->SetWriteLifeTimeHint(write_hint); - file_writer.reset(new WritableFileWriter(std::move(file), fname, - env_options, ioptions.statistics, - ioptions.listeners)); + file_writer.reset( + new WritableFileWriter(std::move(file), fname, env_options, env, + ioptions.statistics, ioptions.listeners)); builder = NewTableBuilder( ioptions, mutable_cf_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 55ae0d864..58f34d165 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -604,7 +604,14 @@ Status CompactionJob::Run() { } compaction_stats_.micros = env_->NowMicros() - start_micros; + compaction_stats_.cpu_micros = 0; + for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) { + compaction_stats_.cpu_micros += + compact_->sub_compact_states[i].compaction_job_stats.cpu_micros; + } + MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); + MeasureTime(stats_, COMPACTION_CPU_TIME, compaction_stats_.cpu_micros); TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify"); @@ -767,6 +774,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { stream << "job" << job_id_ << "event" << "compaction_finished" << "compaction_time_micros" << compaction_stats_.micros + << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros << "output_level" << compact_->compaction->output_level() << "num_output_files" << compact_->NumOutputFiles() << "total_output_size" << compact_->total_bytes << "num_input_records" @@ -804,6 +812,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); + + uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000; + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); @@ -823,13 +834,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { uint64_t prev_fsync_nanos = 0; uint64_t prev_range_sync_nanos = 0; uint64_t prev_prepare_write_nanos = 0; + uint64_t prev_cpu_write_nanos = 0; + uint64_t prev_cpu_read_nanos = 0; if (measure_io_stats_) { prev_perf_level = GetPerfLevel(); - SetPerfLevel(PerfLevel::kEnableTime); + SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); prev_write_nanos = IOSTATS(write_nanos); prev_fsync_nanos = IOSTATS(fsync_nanos); prev_range_sync_nanos = IOSTATS(range_sync_nanos); prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); + prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); + prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } const MutableCFOptions* mutable_cf_options = @@ -1107,6 +1122,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); } + sub_compact->compaction_job_stats.cpu_micros = + env_->NowCPUNanos() / 1000 - prev_cpu_micros; + if (measure_io_stats_) { sub_compact->compaction_job_stats.file_write_nanos += IOSTATS(write_nanos) - prev_write_nanos; @@ -1116,7 +1134,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { IOSTATS(range_sync_nanos) - prev_range_sync_nanos; sub_compact->compaction_job_stats.file_prepare_write_nanos += IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; - if (prev_perf_level != PerfLevel::kEnableTime) { + sub_compact->compaction_job_stats.cpu_micros -= + (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos + + IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / 1000; + if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) { SetPerfLevel(prev_perf_level); } } @@ -1527,7 +1548,7 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->compaction->immutable_cf_options()->listeners; sub_compact->outfile.reset( new WritableFileWriter(std::move(writable_file), fname, env_options_, - db_options_.statistics.get(), listeners)); + env_, db_options_.statistics.get(), listeners)); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 51c9fb7ca..cff8e7e9f 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -237,7 +237,7 @@ Status DBImpl::NewDB() { file->SetPreallocationBlockSize( immutable_db_options_.manifest_preallocation_size); std::unique_ptr file_writer(new WritableFileWriter( - std::move(file), manifest, env_options, nullptr /* stats */, + std::move(file), manifest, env_options, env_, nullptr /* stats */, immutable_db_options_.listeners)); log::Writer log(std::move(file_writer), 0, false); std::string record; @@ -1176,7 +1176,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, const auto& listeners = impl->immutable_db_options_.listeners; std::unique_ptr file_writer( new WritableFileWriter(std::move(lfile), log_fname, opt_env_options, - nullptr /* stats */, listeners)); + impl->env_, nullptr /* stats */, listeners)); impl->logs_.emplace_back( new_log_number, new log::Writer( diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 885832f5a..40bd6e26c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1456,7 +1456,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { lfile->SetPreallocationBlockSize(preallocate_block_size); lfile->SetWriteLifeTimeHint(write_hint); std::unique_ptr file_writer(new WritableFileWriter( - std::move(lfile), log_fname, opt_env_opt, nullptr /* stats */, + std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */, immutable_db_options_.listeners)); new_log = new log::Writer( std::move(file_writer), new_log_number, diff --git a/db/db_test2.cc b/db/db_test2.cc index d0638875c..869920c17 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1763,7 +1763,7 @@ TEST_F(DBTest2, TestPerfContextCpuTime) { SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); ASSERT_EQ("bar", Get("foo")); - ASSERT_EQ(env_->now_cpu_count_.load(), 2); + ASSERT_GT(env_->now_cpu_count_.load(), 2); ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime); ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime); diff --git a/db/flush_job.cc b/db/flush_job.cc index 8769c849e..8398bd8f8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -211,6 +211,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, uint64_t prev_fsync_nanos = 0; uint64_t prev_range_sync_nanos = 0; uint64_t prev_prepare_write_nanos = 0; + uint64_t prev_cpu_write_nanos = 0; + uint64_t prev_cpu_read_nanos = 0; if (measure_io_stats_) { prev_perf_level = GetPerfLevel(); SetPerfLevel(PerfLevel::kEnableTime); @@ -218,6 +220,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, prev_fsync_nanos = IOSTATS(fsync_nanos); prev_range_sync_nanos = IOSTATS(range_sync_nanos); prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); + prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); + prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } // This will release and re-acquire the mutex. @@ -269,6 +273,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos); stream << "file_prepare_write_nanos" << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos); + stream << "file_cpu_write_nanos" + << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos); + stream << "file_cpu_read_nanos" + << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos); } return s; @@ -285,6 +293,7 @@ Status FlushJob::WriteLevel0Table() { ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); + const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000; Status s; { auto write_hint = cfd_->CalculateSSTWriteHint(0); @@ -401,6 +410,7 @@ Status FlushJob::WriteLevel0Table() { // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = db_options_.env->NowMicros() - start_micros; + stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros; stats.bytes_written = meta_.fd.GetFileSize(); MeasureTime(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index b330a40f1..6d58552e2 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -45,6 +45,8 @@ const std::map InternalStats::compaction_level_stats = {LevelStatType::READ_MBPS, LevelStat{"ReadMBps", "Rd(MB/s)"}}, {LevelStatType::WRITE_MBPS, LevelStat{"WriteMBps", "Wr(MB/s)"}}, {LevelStatType::COMP_SEC, LevelStat{"CompSec", "Comp(sec)"}}, + {LevelStatType::COMP_CPU_SEC, + LevelStat{"CompMergeCPU", "CompMergeCPU(sec)"}}, {LevelStatType::COMP_COUNT, LevelStat{"CompCount", "Comp(cnt)"}}, {LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}}, {LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}}, @@ -64,7 +66,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) { }; int line_size = snprintf( buf + written_size, len - written_size, - "Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", + "Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", // Note that we skip COMPACTED_FILES and merge it with Files column hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE), hdr(LevelStatType::READ_GB), @@ -72,9 +74,9 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) { hdr(LevelStatType::WRITE_GB), hdr(LevelStatType::W_NEW_GB), hdr(LevelStatType::MOVED_GB), hdr(LevelStatType::WRITE_AMP), hdr(LevelStatType::READ_MBPS), hdr(LevelStatType::WRITE_MBPS), - hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_COUNT), - hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN), - hdr(LevelStatType::KEY_DROP)); + hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC), + hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC), + hdr(LevelStatType::KEY_IN), hdr(LevelStatType::KEY_DROP)); written_size += line_size; snprintf(buf + written_size, len - written_size, "%s\n", @@ -106,6 +108,7 @@ void PrepareLevelStats(std::map* level_stats, (*level_stats)[LevelStatType::WRITE_MBPS] = stats.bytes_written / kMB / elapsed; (*level_stats)[LevelStatType::COMP_SEC] = stats.micros / kMicrosInSec; + (*level_stats)[LevelStatType::COMP_CPU_SEC] = stats.cpu_micros / kMicrosInSec; (*level_stats)[LevelStatType::COMP_COUNT] = stats.count; (*level_stats)[LevelStatType::AVG_SEC] = stats.count == 0 ? 0 : stats.micros / kMicrosInSec / stats.count; @@ -132,7 +135,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, "%5.1f " /* W-Amp */ "%8.1f " /* Rd(MB/s) */ "%8.1f " /* Wr(MB/s) */ - "%9.0f " /* Comp(sec) */ + "%9.2f " /* Comp(sec) */ + "%17.2f " /* CompMergeCPU(sec) */ "%9d " /* Comp(cnt) */ "%8.3f " /* Avg(sec) */ "%7s " /* KeyIn */ @@ -153,6 +157,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, stat_value.at(LevelStatType::READ_MBPS), stat_value.at(LevelStatType::WRITE_MBPS), stat_value.at(LevelStatType::COMP_SEC), + stat_value.at(LevelStatType::COMP_CPU_SEC), static_cast(stat_value.at(LevelStatType::COMP_COUNT)), stat_value.at(LevelStatType::AVG_SEC), NumberToHumanString( diff --git a/db/internal_stats.h b/db/internal_stats.h index 6f0127513..6fa8727a4 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -71,6 +71,7 @@ enum class LevelStatType { READ_MBPS, WRITE_MBPS, COMP_SEC, + COMP_CPU_SEC, COMP_COUNT, AVG_SEC, KEY_IN, @@ -135,6 +136,7 @@ class InternalStats { // compactions that produced data for the specified "level". struct CompactionStats { uint64_t micros; + uint64_t cpu_micros; // The number of bytes read from all non-output levels uint64_t bytes_read_non_output_levels; @@ -172,6 +174,7 @@ class InternalStats { explicit CompactionStats() : micros(0), + cpu_micros(0), bytes_read_non_output_levels(0), bytes_read_output_level(0), bytes_written(0), @@ -190,6 +193,7 @@ class InternalStats { explicit CompactionStats(CompactionReason reason, int c) : micros(0), + cpu_micros(0), bytes_read_non_output_levels(0), bytes_read_output_level(0), bytes_written(0), @@ -214,14 +218,14 @@ class InternalStats { explicit CompactionStats(const CompactionStats& c) : micros(c.micros), + cpu_micros(c.cpu_micros), bytes_read_non_output_levels(c.bytes_read_non_output_levels), bytes_read_output_level(c.bytes_read_output_level), bytes_written(c.bytes_written), bytes_moved(c.bytes_moved), num_input_files_in_non_output_levels( c.num_input_files_in_non_output_levels), - num_input_files_in_output_level( - c.num_input_files_in_output_level), + num_input_files_in_output_level(c.num_input_files_in_output_level), num_output_files(c.num_output_files), num_input_records(c.num_input_records), num_dropped_records(c.num_dropped_records), @@ -234,6 +238,7 @@ class InternalStats { void Clear() { this->micros = 0; + this->cpu_micros = 0; this->bytes_read_non_output_levels = 0; this->bytes_read_output_level = 0; this->bytes_written = 0; @@ -252,6 +257,7 @@ class InternalStats { void Add(const CompactionStats& c) { this->micros += c.micros; + this->cpu_micros += c.cpu_micros; this->bytes_read_non_output_levels += c.bytes_read_non_output_levels; this->bytes_read_output_level += c.bytes_read_output_level; this->bytes_written += c.bytes_written; @@ -272,6 +278,7 @@ class InternalStats { void Subtract(const CompactionStats& c) { this->micros -= c.micros; + this->cpu_micros -= c.cpu_micros; this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels; this->bytes_read_output_level -= c.bytes_read_output_level; this->bytes_written -= c.bytes_written; @@ -595,6 +602,7 @@ class InternalStats { struct CompactionStats { uint64_t micros; + uint64_t cpu_micros; uint64_t bytes_read_non_output_levels; uint64_t bytes_read_output_level; uint64_t bytes_written; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 2c8046fbc..0f73576c0 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -298,8 +298,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // to combine the keys. Since VersionSet::SetupOtherInputs() always makes // sure that all merge-operands on the same level get compacted together, // this will simply lead to these merge operands moving to the next level. - bool surely_seen_the_beginning = (hit_the_next_user_key || !iter->Valid()) - && at_bottom; + bool surely_seen_the_beginning = + (hit_the_next_user_key || !iter->Valid()) && at_bottom; if (surely_seen_the_beginning) { // do a final merge with nullptr as the existing value and say // bye to the merge type (it's now converted to a Put) diff --git a/db/version_set.cc b/db/version_set.cc index 8cead5dc6..231771bdc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3044,8 +3044,8 @@ Status VersionSet::ProcessManifestWrites( db_options_->manifest_preallocation_size); std::unique_ptr file_writer(new WritableFileWriter( - std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr, - db_options_->listeners)); + std::move(descriptor_file), descriptor_fname, opt_env_opts, env_, + nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); diff --git a/env/env_posix.cc b/env/env_posix.cc index b64a13bb4..591be68f9 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -844,7 +844,8 @@ class PosixEnv : public Env { } virtual uint64_t NowCPUNanos() override { -#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \ + defined(__MACH__) struct timespec ts; clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index e5d8af8bd..4021fcab2 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -18,6 +18,9 @@ struct CompactionJobStats { // the elapsed time of this compaction in microseconds. uint64_t elapsed_micros; + // the elapsed CPU time of this compaction in microseconds. + uint64_t cpu_micros; + // the number of compaction input records. uint64_t num_input_records; // the number of compaction input files. diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h index 77a59643a..67f2b3217 100644 --- a/include/rocksdb/iostats_context.h +++ b/include/rocksdb/iostats_context.h @@ -44,6 +44,10 @@ struct IOStatsContext { uint64_t prepare_write_nanos; // time spent in Logger::Logv(). uint64_t logger_nanos; + // CPU time spent in write() and pwrite() + uint64_t cpu_write_nanos; + // CPU time spent in read() and pread() + uint64_t cpu_read_nanos; }; // Get Thread-local IOStatsContext object pointer diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index a814ea0b3..dc7750283 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -352,6 +352,7 @@ enum Histograms : uint32_t { DB_GET = 0, DB_WRITE, COMPACTION_TIME, + COMPACTION_CPU_TIME, SUBCOMPACTION_SETUP_TIME, TABLE_SYNC_MICROS, COMPACTION_OUTFILE_SYNC_MICROS, diff --git a/monitoring/iostats_context_imp.h b/monitoring/iostats_context_imp.h index 8af64f1fa..60370daae 100644 --- a/monitoring/iostats_context_imp.h +++ b/monitoring/iostats_context_imp.h @@ -37,6 +37,13 @@ extern __thread IOStatsContext iostats_context; PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \ iostats_step_timer_##metric.Start(); +// Declare and set start time of the timer +#define IOSTATS_CPU_TIMER_GUARD(metric, env) \ + PerfStepTimer iostats_step_timer_##metric( \ + &(iostats_context.metric), env, true, \ + PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ + iostats_step_timer_##metric.Start(); + #else // ROCKSDB_SUPPORT_THREAD_LOCAL #define IOSTATS_ADD(metric, value) diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index df843889b..a9479269d 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -184,6 +184,7 @@ const std::vector> HistogramsNameMap = { {DB_GET, "rocksdb.db.get.micros"}, {DB_WRITE, "rocksdb.db.write.micros"}, {COMPACTION_TIME, "rocksdb.compaction.times.micros"}, + {COMPACTION_CPU_TIME, "rocksdb.compaction.times.cpu_micros"}, {SUBCOMPACTION_SETUP_TIME, "rocksdb.subcompaction.setup.times.micros"}, {TABLE_SYNC_MICROS, "rocksdb.table.sync.micros"}, {COMPACTION_OUTFILE_SYNC_MICROS, "rocksdb.compaction.outfile.sync.micros"}, diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index a752504c8..9612aa5cd 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -237,9 +237,9 @@ Status SstFileWriter::Open(const std::string& file_path) { &int_tbl_prop_collector_factories, compression_type, compression_opts, nullptr /* compression_dict */, r->skip_filters, r->column_family_name, unknown_level); - r->file_writer.reset( - new WritableFileWriter(std::move(sst_file), file_path, r->env_options, - nullptr /* stats */, r->ioptions.listeners)); + r->file_writer.reset(new WritableFileWriter( + std::move(sst_file), file_path, r->env_options, r->ioptions.env, + nullptr /* stats */, r->ioptions.listeners)); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 70746d0e2..20f3cb273 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -4667,18 +4667,18 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t seek = 0; int64_t seek_found = 0; int64_t bytes = 0; - const int64_t default_value_max = 64*1024*1024; + const int64_t default_value_max = 64 * 1024 * 1024; int64_t value_max = default_value_max; int64_t scan_len_max = FLAGS_mix_max_scan_len; double write_rate = 1000000.0; double read_rate = 1000000.0; - std::vector ratio {FLAGS_mix_get_ratio, - FLAGS_mix_put_ratio, FLAGS_mix_seek_ratio}; + std::vector ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio, + FLAGS_mix_seek_ratio}; char value_buffer[default_value_max]; QueryDecider query; RandomGenerator gen; Status s; - if(value_max > FLAGS_mix_max_value_size) { + if (value_max > FLAGS_mix_max_value_size) { value_max = FLAGS_mix_max_value_size; } diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 612af6f27..a1ebc8b96 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -11,6 +11,7 @@ namespace rocksdb { void CompactionJobStats::Reset() { elapsed_micros = 0; + cpu_micros = 0; num_input_records = 0; num_input_files = 0; @@ -45,6 +46,7 @@ void CompactionJobStats::Reset() { void CompactionJobStats::Add(const CompactionJobStats& stats) { elapsed_micros += stats.elapsed_micros; + cpu_micros += stats.cpu_micros; num_input_records += stats.num_input_records; num_input_files += stats.num_input_files; diff --git a/util/compression.h b/util/compression.h index 3076b6260..9a1358a58 100644 --- a/util/compression.h +++ b/util/compression.h @@ -195,9 +195,7 @@ struct CompressionDict { } #if ZSTD_VERSION_NUMBER >= 700 - const ZSTD_CDict* GetDigestedZstdCDict() const { - return zstd_cdict_; - } + const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; } #endif // ZSTD_VERSION_NUMBER >= 700 Slice GetRawDict() const { return dict_; } @@ -259,9 +257,7 @@ struct UncompressionDict { } #ifdef ROCKSDB_ZSTD_DDICT - const ZSTD_DDict* GetDigestedZstdDDict() const { - return zstd_ddict_; - } + const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; } #endif // ROCKSDB_ZSTD_DDICT Slice GetRawDict() const { return dict_; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 42f7e2e35..6740a34f0 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -77,6 +77,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, StopWatch sw(env_, stats_, hist_type_, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, true /*delay_enabled*/); + auto prev_perf_level = GetPerfLevel(); IOSTATS_TIMER_GUARD(read_nanos); if (use_direct_io()) { #ifndef ROCKSDB_LITE @@ -105,8 +106,11 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, start_ts = std::chrono::system_clock::now(); orig_offset = aligned_offset + buf.CurrentSize(); } - s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, - buf.Destination()); + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, + buf.Destination()); + } if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, @@ -151,7 +155,10 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, start_ts = std::chrono::system_clock::now(); } #endif - s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); @@ -176,10 +183,12 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, *result = Slice(res_scratch, s.ok() ? pos : 0); } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); + SetPerfLevel(prev_perf_level); } if (stats_ != nullptr && file_read_hist_ != nullptr) { file_read_hist_->Add(elapsed); } + return s; } @@ -409,11 +418,14 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) { Status s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); if (use_fsync) { s = writable_file_->Fsync(); } else { s = writable_file_->Sync(); } + SetPerfLevel(prev_perf_level); return s; } @@ -453,7 +465,12 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { old_size = next_write_offset_; } #endif - s = writable_file_->Append(Slice(src, allowed)); + { + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); + s = writable_file_->Append(Slice(src, allowed)); + SetPerfLevel(prev_perf_level); + } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::system_clock::now(); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 1083c685c..4451f8b81 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -174,6 +174,7 @@ class WritableFileWriter { std::unique_ptr writable_file_; std::string file_name_; + Env* env_; AlignedBuffer buf_; size_t max_buffer_size_; // Actually written data size can be used for truncate @@ -195,10 +196,12 @@ class WritableFileWriter { public: WritableFileWriter( std::unique_ptr&& file, const std::string& _file_name, - const EnvOptions& options, Statistics* stats = nullptr, + const EnvOptions& options, Env* env = nullptr, + Statistics* stats = nullptr, const std::vector>& listeners = {}) : writable_file_(std::move(file)), file_name_(_file_name), + env_(env), buf_(), max_buffer_size_(options.writable_file_max_buffer_size), filesize_(0), diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index c16bbea38..d053b80a8 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -791,14 +791,15 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { bool publish_seq_; }; -// For two_write_queues commit both the aborted batch and the cleanup batch and then published the seq +// For two_write_queues commit both the aborted batch and the cleanup batch and +// then published the seq class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { public: WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, - DBImpl* db_impl, - SequenceNumber prep_seq, - SequenceNumber rollback_seq, - size_t prep_batch_cnt) + DBImpl* db_impl, + SequenceNumber prep_seq, + SequenceNumber rollback_seq, + size_t prep_batch_cnt) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq),