Add options.compaction_measure_io_stats to print write I/O stats in compactions

Summary:
Add options.compaction_measure_io_stats to print out / pass to listener accumulated time spent on write calls. Example outputs in info logs:

2015/08/12-16:27:59.463944 7fd428bff700 (Original Log Time 2015/08/12-16:27:59.463922) EVENT_LOG_v1 {"time_micros": 1439422079463897, "job": 6, "event": "compaction_finished", "output_level": 1, "num_output_files": 4, "total_output_size": 6900525, "num_input_records": 111483, "num_output_records": 106877, "file_write_nanos": 15663206, "file_range_sync_nanos": 649588, "file_fsync_nanos": 349614797, "file_prepare_write_nanos": 1505812, "lsm_state": [2, 4, 0, 0, 0, 0, 0]}

Add two more counters in iostats_context.

Also add a parameter of db_bench.

Test Plan: Add a unit test. Also manually verify LOG outputs in db_bench

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D44115
main
sdong 9 years ago
parent dc9d5634fd
commit 603b6da8b8
  1. 46
      db/compaction_job.cc
  2. 5
      db/compaction_job.h
  3. 74
      db/compaction_job_stats_test.cc
  4. 9
      db/compaction_job_test.cc
  5. 4
      db/db_bench.cc
  6. 6
      db/db_impl.cc
  7. 15
      include/rocksdb/compaction_job_stats.h
  8. 5
      include/rocksdb/iostats_context.h
  9. 5
      include/rocksdb/options.h
  10. 5
      util/compaction_job_stats_impl.cc
  11. 8
      util/file_reader_writer.cc
  12. 15
      util/iostats_context.cc
  13. 9
      util/mutable_cf_options.h
  14. 10
      util/options.cc
  15. 4
      util/options_helper.cc
  16. 1
      util/options_test.cc

@ -96,7 +96,7 @@ CompactionJob::CompactionJob(
Directory* db_directory, Directory* output_directory, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, const std::string& dbname,
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
@ -115,7 +115,8 @@ CompactionJob::CompactionJob(
existing_snapshots_(std::move(existing_snapshots)),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks) {
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats) {
assert(log_buffer_ != nullptr);
ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
@ -270,6 +271,22 @@ Status CompactionJob::Run() {
}
Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) {
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
uint64_t prev_write_nanos = 0;
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
bool enabled_io_stats = false;
if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
prev_perf_level = GetPerfLevel();
SetPerfLevel(PerfLevel::kEnableTime);
prev_write_nanos = iostats_context.write_nanos;
prev_fsync_nanos = iostats_context.fsync_nanos;
prev_range_sync_nanos = iostats_context.range_sync_nanos;
prev_prepare_write_nanos = iostats_context.prepare_write_nanos;
enabled_io_stats = true;
}
auto* compaction = compact_->compaction;
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
@ -286,6 +303,21 @@ Status CompactionJob::SubCompactionRun(Slice* start, Slice* end) {
compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
if (enabled_io_stats) {
// Not supporting parallel running yet
compaction_job_stats_->file_write_nanos +=
iostats_context.write_nanos - prev_write_nanos;
compaction_job_stats_->file_fsync_nanos +=
iostats_context.fsync_nanos - prev_fsync_nanos;
compaction_job_stats_->file_range_sync_nanos +=
iostats_context.range_sync_nanos - prev_range_sync_nanos;
compaction_job_stats_->file_prepare_write_nanos +=
iostats_context.prepare_write_nanos - prev_prepare_write_nanos;
if (prev_perf_level != PerfLevel::kEnableTime) {
SetPerfLevel(prev_perf_level);
}
}
return status;
}
@ -340,6 +372,16 @@ void CompactionJob::Install(Status* status,
<< "total_output_size" << compact_->total_bytes
<< "num_input_records" << compact_->num_input_records
<< "num_output_records" << compact_->num_output_records;
if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
stream << "file_range_sync_nanos"
<< compaction_job_stats_->file_range_sync_nanos;
stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
stream << "file_prepare_write_nanos"
<< compaction_job_stats_->file_prepare_write_nanos;
}
stream << "lsm_state";
stream.StartArray();
for (int level = 0; level < vstorage->num_levels(); ++level) {

@ -57,8 +57,8 @@ class CompactionJob {
Directory* db_directory, Directory* output_directory,
Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname,
CompactionJobStats* compaction_job_stats);
@ -153,6 +153,7 @@ class CompactionJob {
EventLogger* event_logger_;
bool paranoid_file_checks_;
bool measure_io_stats_;
std::vector<Slice> sub_compaction_boundaries_;
};

@ -417,14 +417,25 @@ class CompactionJobStatsTest : public testing::Test,
// test CompactionJobStatsTest.
class CompactionJobStatsChecker : public EventListener {
public:
CompactionJobStatsChecker() : compression_enabled_(false) {}
CompactionJobStatsChecker()
: compression_enabled_(false), verify_next_comp_io_stats_(false) {}
size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
// Once a compaction completed, this function will verify the returned
// CompactionJobInfo with the oldest CompactionJobInfo added earlier
// in "expected_stats_" which has not yet being used for verification.
virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {
if (verify_next_comp_io_stats_) {
ASSERT_GT(ci.stats.file_write_nanos, 0);
ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
ASSERT_GT(ci.stats.file_fsync_nanos, 0);
ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
verify_next_comp_io_stats_ = false;
}
std::lock_guard<std::mutex> lock(mutex_);
if (expected_stats_.size()) {
Verify(ci.stats, expected_stats_.front());
@ -498,10 +509,13 @@ class CompactionJobStatsChecker : public EventListener {
compression_enabled_ = flag;
}
bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
private:
std::mutex mutex_;
std::queue<CompactionJobStats> expected_stats_;
bool compression_enabled_;
bool verify_next_comp_io_stats_;
};
// An EventListener which helps verify the compaction statistics in
@ -643,7 +657,9 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
options.num_levels = 3;
options.compression = kNoCompression;
options.num_subcompactions = num_subcompactions_;
options.bytes_per_sync = 512 * 1024;
options.compaction_measure_io_stats = true;
for (int test = 0; test < 2; ++test) {
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
@ -766,6 +782,7 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
num_keys_per_L0_file));
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
Compact(1, smallest_key, largest_key);
num_L1_files = options.num_subcompactions > 1 ? 7 : 4;
char L1_buf[4];
snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
@ -777,6 +794,61 @@ TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
}
stats_checker->EnableCompression(true);
compression_ratio = kCompressionRatio;
for (int i = 0; i < 5; i++) {
ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
Slice(RandomString(&rnd, 512 * 1024, 1))));
}
ASSERT_OK(Flush(1));
reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
stats_checker->set_verify_next_comp_io_stats(true);
std::atomic<bool> first_prepare_write(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::Append:BeforePrepareWrite", [&](void* arg) {
if (first_prepare_write.load()) {
options.env->SleepForMicroseconds(3);
first_prepare_write.store(false);
}
});
std::atomic<bool> first_flush(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::Flush:BeforeAppend", [&](void* arg) {
if (first_flush.load()) {
options.env->SleepForMicroseconds(3);
first_flush.store(false);
}
});
std::atomic<bool> first_sync(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::SyncInternal:0", [&](void* arg) {
if (first_sync.load()) {
options.env->SleepForMicroseconds(3);
first_sync.store(false);
}
});
std::atomic<bool> first_range_sync(true);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::RangeSync:0", [&](void* arg) {
if (first_range_sync.load()) {
options.env->SleepForMicroseconds(3);
first_range_sync.store(false);
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Compact(1, smallest_key, largest_key);
ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
ASSERT_TRUE(!first_prepare_write.load());
ASSERT_TRUE(!first_flush.load());
ASSERT_TRUE(!first_sync.load());
ASSERT_TRUE(!first_range_sync.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}

@ -246,10 +246,11 @@ class CompactionJobTest : public testing::Test {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {},
table_cache_, &event_logger, false, dbname_, &compaction_job_stats_);
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, {}, table_cache_,
&event_logger, false, false, dbname_,
&compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

@ -465,6 +465,9 @@ DEFINE_int32(transaction_sleep, 0,
"Max microseconds to sleep in between "
"reading and writing a value (used in RandomTransaction only). ");
DEFINE_bool(compaction_measure_io_stats, false,
"Measure times spents on I/Os while in compactions. ");
namespace {
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);
@ -2379,6 +2382,7 @@ class Benchmark {
exit(1);
}
options.max_successive_merges = FLAGS_max_successive_merges;
options.compaction_measure_io_stats = FLAGS_compaction_measure_io_stats;
// set universal style compaction configurations, if applicable
if (FLAGS_universal_size_ratio != 0) {

@ -1669,7 +1669,8 @@ Status DBImpl::CompactFilesImpl(
&shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(),
table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks, dbname_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(),
// which is the only place where CompactionJobStats is
@ -2683,7 +2684,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_,
snapshots_.GetAll(), table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks, dbname_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
&compaction_job_stats);
compaction_job.Prepare();

@ -58,6 +58,21 @@ struct CompactionJobStats {
// the key) encountered and written out.
uint64_t num_corrupt_keys;
// Following counters are only populated if
// options.compaction_measure_io_stats = true;
// Time spent on file's Append() call.
uint64_t file_write_nanos;
// Time spent on sync file range.
uint64_t file_range_sync_nanos;
// Time spent on file fsync.
uint64_t file_fsync_nanos;
// Time spent on preparing file write (falocate, etc)
uint64_t file_prepare_write_nanos;
// 0-terminated strings storing the first 8 bytes of the smallest and
// largest key in the output.
static const size_t kMaxPrefixLength = 8;

@ -38,7 +38,10 @@ struct IOStatsContext {
uint64_t read_nanos;
// time spent in sync_file_range().
uint64_t range_sync_nanos;
// time spent in fsync
uint64_t fsync_nanos;
// time spent in preparing write (fallocate etc).
uint64_t prepare_write_nanos;
// time spent in Logger::Logv().
uint64_t logger_nanos;
};

@ -15,7 +15,6 @@
#include <memory>
#include <vector>
#include <limits>
#include <stdint.h>
#include <unordered_map>
#include "rocksdb/version.h"
@ -736,6 +735,10 @@ struct ColumnFamilyOptions {
// Default: false
bool paranoid_file_checks;
// Measure IO stats in compactions, if true.
// Default: false
bool compaction_measure_io_stats;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options

@ -33,6 +33,11 @@ void CompactionJobStats::Reset() {
num_expired_deletion_records = 0;
num_corrupt_keys = 0;
file_write_nanos = 0;
file_range_sync_nanos = 0;
file_fsync_nanos = 0;
file_prepare_write_nanos = 0;
}
#else

@ -43,7 +43,11 @@ Status WritableFileWriter::Append(const Slice& data) {
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
{
IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
}
// if there is no space in the cache, then flush
if (cursize_ + left > capacity_) {
s = Flush();
@ -105,6 +109,7 @@ Status WritableFileWriter::Flush() {
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
Status s = writable_file_->Append(Slice(src, size));
if (!s.ok()) {
return s;
@ -182,6 +187,8 @@ Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
if (use_fsync) {
s = writable_file_->Fsync();
} else {
@ -192,6 +199,7 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
return writable_file_->RangeSync(offset, nbytes);
}

@ -26,6 +26,8 @@ void IOStatsContext::Reset() {
write_nanos = 0;
read_nanos = 0;
range_sync_nanos = 0;
prepare_write_nanos = 0;
fsync_nanos = 0;
logger_nanos = 0;
}
@ -33,15 +35,10 @@ void IOStatsContext::Reset() {
std::string IOStatsContext::ToString() const {
std::ostringstream ss;
ss << OUTPUT(thread_pool_id)
<< OUTPUT(bytes_read)
<< OUTPUT(bytes_written)
<< OUTPUT(open_nanos)
<< OUTPUT(allocate_nanos)
<< OUTPUT(write_nanos)
<< OUTPUT(read_nanos)
<< OUTPUT(range_sync_nanos)
<< OUTPUT(logger_nanos);
ss << OUTPUT(thread_pool_id) << OUTPUT(bytes_read) << OUTPUT(bytes_written)
<< OUTPUT(open_nanos) << OUTPUT(allocate_nanos) << OUTPUT(write_nanos)
<< OUTPUT(read_nanos) << OUTPUT(range_sync_nanos) << OUTPUT(fsync_nanos)
<< OUTPUT(prepare_write_nanos) << OUTPUT(logger_nanos);
return ss.str();
}

@ -43,7 +43,9 @@ struct MutableCFOptions {
num_subcompactions(options.num_subcompactions),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks)
paranoid_file_checks(options.paranoid_file_checks),
compaction_measure_io_stats(options.compaction_measure_io_stats)
{
RefreshDerivedOptions(ioptions);
}
@ -73,8 +75,8 @@ struct MutableCFOptions {
verify_checksums_in_compaction(false),
num_subcompactions(1),
max_sequential_skip_in_iterations(0),
paranoid_file_checks(false)
{}
paranoid_file_checks(false),
compaction_measure_io_stats(false) {}
// Must be called after any change to MutableCFOptions
void RefreshDerivedOptions(const ImmutableCFOptions& ioptions);
@ -128,6 +130,7 @@ struct MutableCFOptions {
// Misc options
uint64_t max_sequential_skip_in_iterations;
bool paranoid_file_checks;
bool compaction_measure_io_stats;
// Derived options
// Per-level target file size.

@ -121,7 +121,8 @@ ColumnFamilyOptions::ColumnFamilyOptions()
max_successive_merges(0),
min_partial_merge_operands(2),
optimize_filters_for_hits(false),
paranoid_file_checks(false) {
paranoid_file_checks(false),
compaction_measure_io_stats(false) {
assert(memtable_factory.get() != nullptr);
}
@ -185,7 +186,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands),
optimize_filters_for_hits(options.optimize_filters_for_hits),
paranoid_file_checks(options.paranoid_file_checks) {
paranoid_file_checks(options.paranoid_file_checks),
compaction_measure_io_stats(options.compaction_measure_io_stats) {
assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) {
@ -514,6 +516,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
max_successive_merges);
Warn(log, " Options.optimize_fllters_for_hits: %d",
optimize_filters_for_hits);
Warn(log, " Options.paranoid_file_checks: %d",
paranoid_file_checks);
Warn(log, " Options.compaction_measure_io_stats: %d",
compaction_measure_io_stats);
} // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const {

@ -2,6 +2,7 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/options_helper.h"
#include <cassert>
#include <cctype>
@ -16,7 +17,6 @@
#include "rocksdb/table.h"
#include "table/block_based_table_factory.h"
#include "util/logging.h"
#include "util/options_helper.h"
namespace rocksdb {
@ -449,6 +449,8 @@ bool ParseColumnFamilyOption(const std::string& name, const std::string& value,
new_options->min_partial_merge_operands = ParseUint32(value);
} else if (name == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(name, value);
} else if (name == "compaction_measure_io_stats") {
new_options->compaction_measure_io_stats = ParseBoolean(name, value);
} else if (name == "prefix_extractor") {
const std::string kFixedPrefixName = "fixed:";
const std::string kCappedPrefixName = "capped:";

@ -129,6 +129,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"filter_deletes", "0"},
{"max_sequential_skip_in_iterations", "24"},
{"inplace_update_support", "true"},
{"compaction_measure_io_stats", "true"},
{"inplace_update_num_locks", "25"},
{"memtable_prefix_bloom_bits", "26"},
{"memtable_prefix_bloom_probes", "27"},

Loading…
Cancel
Save