Add a new CPU time counter to compaction report (#4889)

Summary:
Measure CPU time consumed for a compaction and report it in the stats report
Enable NowCPUNanos() to work for MacOS
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4889

Differential Revision: D13701276

Pulled By: zinoale

fbshipit-source-id: 5024e5bbccd4dd10fd90d947870237f436445055
main
Alexander Zinoviev 5 years ago committed by Facebook Github Bot
parent 158da7a6ee
commit 32a6dd9a41
  1. 6
      db/builder.cc
  2. 27
      db/compaction_job.cc
  3. 4
      db/db_impl_open.cc
  4. 2
      db/db_impl_write.cc
  5. 2
      db/db_test2.cc
  6. 10
      db/flush_job.cc
  7. 15
      db/internal_stats.cc
  8. 12
      db/internal_stats.h
  9. 4
      db/merge_helper.cc
  10. 4
      db/version_set.cc
  11. 3
      env/env_posix.cc
  12. 3
      include/rocksdb/compaction_job_stats.h
  13. 4
      include/rocksdb/iostats_context.h
  14. 1
      include/rocksdb/statistics.h
  15. 7
      monitoring/iostats_context_imp.h
  16. 1
      monitoring/statistics.cc
  17. 6
      table/sst_file_writer.cc
  18. 8
      tools/db_bench_tool.cc
  19. 2
      util/compaction_job_stats_impl.cc
  20. 8
      util/compression.h
  21. 25
      util/file_reader_writer.cc
  22. 5
      util/file_reader_writer.h
  23. 11
      utilities/transactions/write_prepared_txn_db.h

@ -121,9 +121,9 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(std::move(file), fname,
env_options, ioptions.statistics,
ioptions.listeners));
file_writer.reset(
new WritableFileWriter(std::move(file), fname, env_options, env,
ioptions.statistics, ioptions.listeners));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,

@ -604,7 +604,14 @@ Status CompactionJob::Run() {
}
compaction_stats_.micros = env_->NowMicros() - start_micros;
compaction_stats_.cpu_micros = 0;
for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
compaction_stats_.cpu_micros +=
compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
}
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
MeasureTime(stats_, COMPACTION_CPU_TIME, compaction_stats_.cpu_micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
@ -767,6 +774,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
stream << "job" << job_id_ << "event"
<< "compaction_finished"
<< "compaction_time_micros" << compaction_stats_.micros
<< "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->NumOutputFiles()
<< "total_output_size" << compact_->total_bytes << "num_input_records"
@ -804,6 +812,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_);
@ -823,13 +834,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
uint64_t prev_cpu_write_nanos = 0;
uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTime);
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
prev_write_nanos = IOSTATS(write_nanos);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
const MutableCFOptions* mutable_cf_options =
@ -1107,6 +1122,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
sub_compact->compaction_job_stats.cpu_micros =
env_->NowCPUNanos() / 1000 - prev_cpu_micros;
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
IOSTATS(write_nanos) - prev_write_nanos;
@ -1116,7 +1134,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
sub_compact->compaction_job_stats.file_prepare_write_nanos +=
IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
if (prev_perf_level != PerfLevel::kEnableTime) {
sub_compact->compaction_job_stats.cpu_micros -=
(IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos
+ IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / 1000;
if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
SetPerfLevel(prev_perf_level);
}
}
@ -1527,7 +1548,7 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, env_options_,
db_options_.statistics.get(), listeners));
env_, db_options_.statistics.get(), listeners));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where

@ -237,7 +237,7 @@ Status DBImpl::NewDB() {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, env_options, nullptr /* stats */,
std::move(file), manifest, env_options, env_, nullptr /* stats */,
immutable_db_options_.listeners));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
@ -1176,7 +1176,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const auto& listeners = impl->immutable_db_options_.listeners;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
nullptr /* stats */, listeners));
impl->env_, nullptr /* stats */, listeners));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(

@ -1456,7 +1456,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
lfile->SetPreallocationBlockSize(preallocate_block_size);
lfile->SetWriteLifeTimeHint(write_hint);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_env_opt, nullptr /* stats */,
std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */,
immutable_db_options_.listeners));
new_log = new log::Writer(
std::move(file_writer), new_log_number,

@ -1763,7 +1763,7 @@ TEST_F(DBTest2, TestPerfContextCpuTime) {
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ(env_->now_cpu_count_.load(), 2);
ASSERT_GT(env_->now_cpu_count_.load(), 2);
ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime);
ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);

@ -211,6 +211,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
uint64_t prev_cpu_write_nanos = 0;
uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTime);
@ -218,6 +220,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
// This will release and re-acquire the mutex.
@ -269,6 +273,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
stream << "file_prepare_write_nanos"
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
stream << "file_cpu_write_nanos"
<< (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
stream << "file_cpu_read_nanos"
<< (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
}
return s;
@ -285,6 +293,7 @@ Status FlushJob::WriteLevel0Table() {
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros();
const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
Status s;
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
@ -401,6 +410,7 @@ Status FlushJob::WriteLevel0Table() {
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
stats.bytes_written = meta_.fd.GetFileSize();
MeasureTime(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);

@ -45,6 +45,8 @@ const std::map<LevelStatType, LevelStat> InternalStats::compaction_level_stats =
{LevelStatType::READ_MBPS, LevelStat{"ReadMBps", "Rd(MB/s)"}},
{LevelStatType::WRITE_MBPS, LevelStat{"WriteMBps", "Wr(MB/s)"}},
{LevelStatType::COMP_SEC, LevelStat{"CompSec", "Comp(sec)"}},
{LevelStatType::COMP_CPU_SEC,
LevelStat{"CompMergeCPU", "CompMergeCPU(sec)"}},
{LevelStatType::COMP_COUNT, LevelStat{"CompCount", "Comp(cnt)"}},
{LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}},
{LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}},
@ -64,7 +66,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) {
};
int line_size = snprintf(
buf + written_size, len - written_size,
"Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
"Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
// Note that we skip COMPACTED_FILES and merge it with Files column
hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES),
hdr(LevelStatType::SCORE), hdr(LevelStatType::READ_GB),
@ -72,9 +74,9 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) {
hdr(LevelStatType::WRITE_GB), hdr(LevelStatType::W_NEW_GB),
hdr(LevelStatType::MOVED_GB), hdr(LevelStatType::WRITE_AMP),
hdr(LevelStatType::READ_MBPS), hdr(LevelStatType::WRITE_MBPS),
hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_COUNT),
hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN),
hdr(LevelStatType::KEY_DROP));
hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC),
hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC),
hdr(LevelStatType::KEY_IN), hdr(LevelStatType::KEY_DROP));
written_size += line_size;
snprintf(buf + written_size, len - written_size, "%s\n",
@ -106,6 +108,7 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
(*level_stats)[LevelStatType::WRITE_MBPS] =
stats.bytes_written / kMB / elapsed;
(*level_stats)[LevelStatType::COMP_SEC] = stats.micros / kMicrosInSec;
(*level_stats)[LevelStatType::COMP_CPU_SEC] = stats.cpu_micros / kMicrosInSec;
(*level_stats)[LevelStatType::COMP_COUNT] = stats.count;
(*level_stats)[LevelStatType::AVG_SEC] =
stats.count == 0 ? 0 : stats.micros / kMicrosInSec / stats.count;
@ -132,7 +135,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
"%5.1f " /* W-Amp */
"%8.1f " /* Rd(MB/s) */
"%8.1f " /* Wr(MB/s) */
"%9.0f " /* Comp(sec) */
"%9.2f " /* Comp(sec) */
"%17.2f " /* CompMergeCPU(sec) */
"%9d " /* Comp(cnt) */
"%8.3f " /* Avg(sec) */
"%7s " /* KeyIn */
@ -153,6 +157,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
stat_value.at(LevelStatType::READ_MBPS),
stat_value.at(LevelStatType::WRITE_MBPS),
stat_value.at(LevelStatType::COMP_SEC),
stat_value.at(LevelStatType::COMP_CPU_SEC),
static_cast<int>(stat_value.at(LevelStatType::COMP_COUNT)),
stat_value.at(LevelStatType::AVG_SEC),
NumberToHumanString(

@ -71,6 +71,7 @@ enum class LevelStatType {
READ_MBPS,
WRITE_MBPS,
COMP_SEC,
COMP_CPU_SEC,
COMP_COUNT,
AVG_SEC,
KEY_IN,
@ -135,6 +136,7 @@ class InternalStats {
// compactions that produced data for the specified "level".
struct CompactionStats {
uint64_t micros;
uint64_t cpu_micros;
// The number of bytes read from all non-output levels
uint64_t bytes_read_non_output_levels;
@ -172,6 +174,7 @@ class InternalStats {
explicit CompactionStats()
: micros(0),
cpu_micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_written(0),
@ -190,6 +193,7 @@ class InternalStats {
explicit CompactionStats(CompactionReason reason, int c)
: micros(0),
cpu_micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_written(0),
@ -214,14 +218,14 @@ class InternalStats {
explicit CompactionStats(const CompactionStats& c)
: micros(c.micros),
cpu_micros(c.cpu_micros),
bytes_read_non_output_levels(c.bytes_read_non_output_levels),
bytes_read_output_level(c.bytes_read_output_level),
bytes_written(c.bytes_written),
bytes_moved(c.bytes_moved),
num_input_files_in_non_output_levels(
c.num_input_files_in_non_output_levels),
num_input_files_in_output_level(
c.num_input_files_in_output_level),
num_input_files_in_output_level(c.num_input_files_in_output_level),
num_output_files(c.num_output_files),
num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records),
@ -234,6 +238,7 @@ class InternalStats {
void Clear() {
this->micros = 0;
this->cpu_micros = 0;
this->bytes_read_non_output_levels = 0;
this->bytes_read_output_level = 0;
this->bytes_written = 0;
@ -252,6 +257,7 @@ class InternalStats {
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->cpu_micros += c.cpu_micros;
this->bytes_read_non_output_levels += c.bytes_read_non_output_levels;
this->bytes_read_output_level += c.bytes_read_output_level;
this->bytes_written += c.bytes_written;
@ -272,6 +278,7 @@ class InternalStats {
void Subtract(const CompactionStats& c) {
this->micros -= c.micros;
this->cpu_micros -= c.cpu_micros;
this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels;
this->bytes_read_output_level -= c.bytes_read_output_level;
this->bytes_written -= c.bytes_written;
@ -595,6 +602,7 @@ class InternalStats {
struct CompactionStats {
uint64_t micros;
uint64_t cpu_micros;
uint64_t bytes_read_non_output_levels;
uint64_t bytes_read_output_level;
uint64_t bytes_written;

@ -298,8 +298,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// to combine the keys. Since VersionSet::SetupOtherInputs() always makes
// sure that all merge-operands on the same level get compacted together,
// this will simply lead to these merge operands moving to the next level.
bool surely_seen_the_beginning = (hit_the_next_user_key || !iter->Valid())
&& at_bottom;
bool surely_seen_the_beginning =
(hit_the_next_user_key || !iter->Valid()) && at_bottom;
if (surely_seen_the_beginning) {
// do a final merge with nullptr as the existing value and say
// bye to the merge type (it's now converted to a Put)

@ -3044,8 +3044,8 @@ Status VersionSet::ProcessManifestWrites(
db_options_->manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr,
db_options_->listeners));
std::move(descriptor_file), descriptor_fname, opt_env_opts, env_,
nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());

3
env/env_posix.cc vendored

@ -844,7 +844,8 @@ class PosixEnv : public Env {
}
virtual uint64_t NowCPUNanos() override {
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
defined(__MACH__)
struct timespec ts;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;

@ -18,6 +18,9 @@ struct CompactionJobStats {
// the elapsed time of this compaction in microseconds.
uint64_t elapsed_micros;
// the elapsed CPU time of this compaction in microseconds.
uint64_t cpu_micros;
// the number of compaction input records.
uint64_t num_input_records;
// the number of compaction input files.

@ -44,6 +44,10 @@ struct IOStatsContext {
uint64_t prepare_write_nanos;
// time spent in Logger::Logv().
uint64_t logger_nanos;
// CPU time spent in write() and pwrite()
uint64_t cpu_write_nanos;
// CPU time spent in read() and pread()
uint64_t cpu_read_nanos;
};
// Get Thread-local IOStatsContext object pointer

@ -352,6 +352,7 @@ enum Histograms : uint32_t {
DB_GET = 0,
DB_WRITE,
COMPACTION_TIME,
COMPACTION_CPU_TIME,
SUBCOMPACTION_SETUP_TIME,
TABLE_SYNC_MICROS,
COMPACTION_OUTFILE_SYNC_MICROS,

@ -37,6 +37,13 @@ extern __thread IOStatsContext iostats_context;
PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \
iostats_step_timer_##metric.Start();
// Declare and set start time of the timer
#define IOSTATS_CPU_TIMER_GUARD(metric, env) \
PerfStepTimer iostats_step_timer_##metric( \
&(iostats_context.metric), env, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
iostats_step_timer_##metric.Start();
#else // ROCKSDB_SUPPORT_THREAD_LOCAL
#define IOSTATS_ADD(metric, value)

@ -184,6 +184,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{DB_GET, "rocksdb.db.get.micros"},
{DB_WRITE, "rocksdb.db.write.micros"},
{COMPACTION_TIME, "rocksdb.compaction.times.micros"},
{COMPACTION_CPU_TIME, "rocksdb.compaction.times.cpu_micros"},
{SUBCOMPACTION_SETUP_TIME, "rocksdb.subcompaction.setup.times.micros"},
{TABLE_SYNC_MICROS, "rocksdb.table.sync.micros"},
{COMPACTION_OUTFILE_SYNC_MICROS, "rocksdb.compaction.outfile.sync.micros"},

@ -237,9 +237,9 @@ Status SstFileWriter::Open(const std::string& file_path) {
&int_tbl_prop_collector_factories, compression_type, compression_opts,
nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
unknown_level);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), file_path, r->env_options,
nullptr /* stats */, r->ioptions.listeners));
r->file_writer.reset(new WritableFileWriter(
std::move(sst_file), file_path, r->env_options, r->ioptions.env,
nullptr /* stats */, r->ioptions.listeners));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.

@ -4667,18 +4667,18 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t seek = 0;
int64_t seek_found = 0;
int64_t bytes = 0;
const int64_t default_value_max = 64*1024*1024;
const int64_t default_value_max = 64 * 1024 * 1024;
int64_t value_max = default_value_max;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
std::vector<double> ratio {FLAGS_mix_get_ratio,
FLAGS_mix_put_ratio, FLAGS_mix_seek_ratio};
std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
FLAGS_mix_seek_ratio};
char value_buffer[default_value_max];
QueryDecider query;
RandomGenerator gen;
Status s;
if(value_max > FLAGS_mix_max_value_size) {
if (value_max > FLAGS_mix_max_value_size) {
value_max = FLAGS_mix_max_value_size;
}

@ -11,6 +11,7 @@ namespace rocksdb {
void CompactionJobStats::Reset() {
elapsed_micros = 0;
cpu_micros = 0;
num_input_records = 0;
num_input_files = 0;
@ -45,6 +46,7 @@ void CompactionJobStats::Reset() {
void CompactionJobStats::Add(const CompactionJobStats& stats) {
elapsed_micros += stats.elapsed_micros;
cpu_micros += stats.cpu_micros;
num_input_records += stats.num_input_records;
num_input_files += stats.num_input_files;

@ -195,9 +195,7 @@ struct CompressionDict {
}
#if ZSTD_VERSION_NUMBER >= 700
const ZSTD_CDict* GetDigestedZstdCDict() const {
return zstd_cdict_;
}
const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
#endif // ZSTD_VERSION_NUMBER >= 700
Slice GetRawDict() const { return dict_; }
@ -259,9 +257,7 @@ struct UncompressionDict {
}
#ifdef ROCKSDB_ZSTD_DDICT
const ZSTD_DDict* GetDigestedZstdDDict() const {
return zstd_ddict_;
}
const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
#endif // ROCKSDB_ZSTD_DDICT
Slice GetRawDict() const { return dict_; }

@ -77,6 +77,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos);
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
@ -105,8 +106,11 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
start_ts = std::chrono::system_clock::now();
orig_offset = aligned_offset + buf.CurrentSize();
}
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
buf.Destination());
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
buf.Destination());
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
@ -151,7 +155,10 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
start_ts = std::chrono::system_clock::now();
}
#endif
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
@ -176,10 +183,12 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
*result = Slice(res_scratch, s.ok() ? pos : 0);
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
SetPerfLevel(prev_perf_level);
}
if (stats_ != nullptr && file_read_hist_ != nullptr) {
file_read_hist_->Add(elapsed);
}
return s;
}
@ -409,11 +418,14 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
if (use_fsync) {
s = writable_file_->Fsync();
} else {
s = writable_file_->Sync();
}
SetPerfLevel(prev_perf_level);
return s;
}
@ -453,7 +465,12 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
old_size = next_write_offset_;
}
#endif
s = writable_file_->Append(Slice(src, allowed));
{
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
s = writable_file_->Append(Slice(src, allowed));
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();

@ -174,6 +174,7 @@ class WritableFileWriter {
std::unique_ptr<WritableFile> writable_file_;
std::string file_name_;
Env* env_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
@ -195,10 +196,12 @@ class WritableFileWriter {
public:
WritableFileWriter(
std::unique_ptr<WritableFile>&& file, const std::string& _file_name,
const EnvOptions& options, Statistics* stats = nullptr,
const EnvOptions& options, Env* env = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: writable_file_(std::move(file)),
file_name_(_file_name),
env_(env),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),

@ -791,14 +791,15 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool publish_seq_;
};
// For two_write_queues commit both the aborted batch and the cleanup batch and then published the seq
// For two_write_queues commit both the aborted batch and the cleanup batch and
// then published the seq
class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
public:
WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
DBImpl* db_impl,
SequenceNumber prep_seq,
SequenceNumber rollback_seq,
size_t prep_batch_cnt)
DBImpl* db_impl,
SequenceNumber prep_seq,
SequenceNumber rollback_seq,
size_t prep_batch_cnt)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),

Loading…
Cancel
Save