make statistics forward-able

Summary:
Make StatisticsImpl being able to forward stats to provided statistics
implementation. The main purpose is to allow us to collect internal
stats in the future even when user supplies custom statistics
implementation. It avoids intrumenting 2 sets of stats collection code.
One immediate use case is tuning advisor, which needs to collect some
internal stats, users may not be interested.

Test Plan:
ran db_bench and see stats show up at the end of run
Will run make all check since some tests rely on statistics

Reviewers: yhchiang, sdong, igor

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D20145
main
Lei Jin 10 years ago
parent 4a8f0c957c
commit 40fa8a4cd5
  1. 2
      HISTORY.md
  2. 4
      db/compaction_picker.cc
  3. 116
      db/db_impl.cc
  4. 1
      db/db_impl.h
  5. 2
      db/db_iter.cc
  6. 3
      db/version_set.cc
  7. 27
      include/rocksdb/statistics.h
  8. 3
      table/block_based_table_reader.cc
  9. 1
      util/options.cc
  10. 129
      util/statistics.cc
  11. 59
      util/statistics.h
  12. 29
      util/stop_watch.h

@ -9,7 +9,7 @@
* DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size * DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size
* NewPlainTableFactory instead of bunch of parameters now accepts PlainTableOptions, which is defined in include/rocksdb/table.h * NewPlainTableFactory instead of bunch of parameters now accepts PlainTableOptions, which is defined in include/rocksdb/table.h
* Moved include/utilities/*.h to include/rocksdb/utilities/*.h * Moved include/utilities/*.h to include/rocksdb/utilities/*.h
* Statistics APIs now take uint32_t as type instead of Tickers. Also make two access functions getTickerCount and histogramData const
## 3.3.0 (7/10/2014) ## 3.3.0 (7/10/2014)
### New Features ### New Features

@ -622,8 +622,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->bottommost_level_ = c->inputs_[0].files.back() == last_file; c->bottommost_level_ = c->inputs_[0].files.back() == last_file;
// update statistics // update statistics
MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, MeasureTime(options_->statistics.get(),
c->inputs_[0].size()); NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size());
// mark all the files that are being compacted // mark all the files that are being compacted
c->MarkFilesBeingCompacted(true); c->MarkFilesBeingCompacted(true);

@ -334,6 +334,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
: env_(options.env), : env_(options.env),
dbname_(dbname), dbname_(dbname),
options_(SanitizeOptions(dbname, options)), options_(SanitizeOptions(dbname, options)),
stats_(options_.statistics.get()),
db_lock_(nullptr), db_lock_(nullptr),
mutex_(options.use_adaptive_mutex), mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr), shutting_down_(nullptr),
@ -1221,8 +1222,7 @@ Status DBImpl::Recover(
versions_->MarkFileNumberUsed(log); versions_->MarkFileNumberUsed(log);
s = RecoverLogFile(log, &max_sequence, read_only); s = RecoverLogFile(log, &max_sequence, read_only);
} }
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence());
versions_->LastSequence());
} }
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -1436,8 +1436,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
cfd->internal_stats()->AddCompactionStats(level, stats); cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCFStats( cfd->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
meta.fd.GetFileSize());
return s; return s;
} }
@ -1528,8 +1527,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
cfd->internal_stats()->AddCompactionStats(level, stats); cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCFStats( cfd->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
meta.fd.GetFileSize());
return s; return s;
} }
@ -1933,17 +1931,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
void DBImpl::RecordFlushIOStats() { void DBImpl::RecordFlushIOStats() {
RecordTick(options_.statistics.get(), FLUSH_WRITE_BYTES, RecordTick(stats_, FLUSH_WRITE_BYTES, iostats_context.bytes_written);
iostats_context.bytes_written);
IOSTATS_RESET(bytes_written); IOSTATS_RESET(bytes_written);
} }
void DBImpl::RecordCompactionIOStats() { void DBImpl::RecordCompactionIOStats() {
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
IOSTATS(bytes_read));
IOSTATS_RESET(bytes_read); IOSTATS_RESET(bytes_read);
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written); IOSTATS_RESET(bytes_written);
} }
@ -2206,7 +2201,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c.reset(cfd->PickCompaction(log_buffer)); c.reset(cfd->PickCompaction(log_buffer));
if (c != nullptr) { if (c != nullptr) {
// update statistics // update statistics
MeasureTime(options_.statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size()); c->inputs(0)->size());
break; break;
} }
@ -2431,12 +2426,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
// Finish and check for file errors // Finish and check for file errors
if (s.ok() && !options_.disableDataSync) { if (s.ok() && !options_.disableDataSync) {
if (options_.use_fsync) { if (options_.use_fsync) {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
COMPACTION_OUTFILE_SYNC_MICROS, false);
s = compact->outfile->Fsync(); s = compact->outfile->Fsync();
} else { } else {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
COMPACTION_OUTFILE_SYNC_MICROS, false);
s = compact->outfile->Sync(); s = compact->outfile->Sync();
} }
} }
@ -2668,7 +2661,7 @@ Status DBImpl::ProcessKeyValueCompaction(
ParseInternalKey(key, &ikey); ParseInternalKey(key, &ikey);
// no value associated with delete // no value associated with delete
value.clear(); value.clear();
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER); RecordTick(stats_, COMPACTION_KEY_DROP_USER);
} else if (value_changed) { } else if (value_changed) {
value = compaction_filter_value; value = compaction_filter_value;
} }
@ -2692,7 +2685,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO: why not > ? // TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence); assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A) drop = true; // (A)
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY); RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY);
} else if (ikey.type == kTypeDeletion && } else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot && ikey.sequence <= earliest_snapshot &&
compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) { compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) {
@ -2704,7 +2697,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// few iterations of this loop (by rule (A) above). // few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped. // Therefore this deletion marker is obsolete and can be dropped.
drop = true; drop = true;
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_OBSOLETE); RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE);
} else if (ikey.type == kTypeMerge) { } else if (ikey.type == kTypeMerge) {
// We know the merge type entry is not hidden, otherwise we would // We know the merge type entry is not hidden, otherwise we would
// have hit (A) // have hit (A)
@ -2894,7 +2887,7 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
// no value associated with delete // no value associated with delete
compact->existing_value_buf_[i].clear(); compact->existing_value_buf_[i].clear();
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER); RecordTick(stats_, COMPACTION_KEY_DROP_USER);
} else if (compact->value_changed_buf_[i]) { } else if (compact->value_changed_buf_[i]) {
compact->existing_value_buf_[i] = compact->existing_value_buf_[i] =
Slice(compact->new_value_buf_[new_value_idx++]); Slice(compact->new_value_buf_[new_value_idx++]);
@ -3143,9 +3136,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(1);
stats.micros = env_->NowMicros() - start_micros - imm_micros; stats.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
stats.files_in_leveln = compact->compaction->num_input_files(0); stats.files_in_leveln = compact->compaction->num_input_files(0);
stats.files_in_levelnp1 = compact->compaction->num_input_files(1); stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
MeasureTime(stats_, COMPACTION_TIME, stats.micros);
int num_output_files = compact->outputs.size(); int num_output_files = compact->outputs.size();
if (compact->builder != nullptr) { if (compact->builder != nullptr) {
@ -3306,7 +3299,7 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
Status DBImpl::GetImpl(const ReadOptions& options, Status DBImpl::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) { std::string* value, bool* value_found) {
StopWatch sw(env_, options_.statistics.get(), DB_GET, false); StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_AUTO(get_snapshot_time); PERF_TIMER_AUTO(get_snapshot_time);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
@ -3341,16 +3334,16 @@ Status DBImpl::GetImpl(const ReadOptions& options,
PERF_TIMER_STOP(get_snapshot_time); PERF_TIMER_STOP(get_snapshot_time);
if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) { if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) { } else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done // Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} else { } else {
PERF_TIMER_START(get_from_output_files_time); PERF_TIMER_START(get_from_output_files_time);
sv->current->Get(options, lkey, value, &s, &merge_context, value_found); sv->current->Get(options, lkey, value, &s, &merge_context, value_found);
PERF_TIMER_STOP(get_from_output_files_time); PERF_TIMER_STOP(get_from_output_files_time);
RecordTick(options_.statistics.get(), MEMTABLE_MISS); RecordTick(stats_, MEMTABLE_MISS);
} }
PERF_TIMER_START(get_post_process_time); PERF_TIMER_START(get_post_process_time);
@ -3367,13 +3360,13 @@ Status DBImpl::GetImpl(const ReadOptions& options,
sv->Cleanup(); sv->Cleanup();
mutex_.Unlock(); mutex_.Unlock();
delete sv; delete sv;
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
} }
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
} }
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size()); RecordTick(stats_, BYTES_READ, value->size());
PERF_TIMER_STOP(get_post_process_time); PERF_TIMER_STOP(get_post_process_time);
return s; return s;
} }
@ -3383,7 +3376,7 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) { const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); StopWatch sw(env_, stats_, DB_MULTIGET);
PERF_TIMER_AUTO(get_snapshot_time); PERF_TIMER_AUTO(get_snapshot_time);
SequenceNumber snapshot; SequenceNumber snapshot;
@ -3481,9 +3474,9 @@ std::vector<Status> DBImpl::MultiGet(
delete mgd.second; delete mgd.second;
} }
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytes_read); RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
PERF_TIMER_STOP(get_post_process_time); PERF_TIMER_STOP(get_post_process_time);
return stat_list; return stat_list;
@ -3798,7 +3791,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
if (!options.disableWAL) { if (!options.disableWAL) {
RecordTick(options_.statistics.get(), WRITE_WITH_WAL, 1); RecordTick(stats_, WRITE_WITH_WAL);
default_cf_internal_stats_->AddDBStats( default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_WITH_WAL, 1); InternalStats::WRITE_WITH_WAL, 1);
} }
@ -3807,7 +3800,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
default_cf_internal_stats_->AddDBStats( default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_DONE_BY_OTHER, 1); InternalStats::WRITE_DONE_BY_OTHER, 1);
mutex_.Unlock(); mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.status; return w.status;
} else if (timed_out) { } else if (timed_out) {
#ifndef NDEBUG #ifndef NDEBUG
@ -3832,10 +3825,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
writers_.front()->cv.Signal(); writers_.front()->cv.Signal();
} }
mutex_.Unlock(); mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); RecordTick(stats_, WRITE_TIMEDOUT);
return Status::TimedOut(); return Status::TimedOut();
} else { } else {
RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); RecordTick(stats_, WRITE_DONE_BY_SELF);
default_cf_internal_stats_->AddDBStats( default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_DONE_BY_SELF, 1); InternalStats::WRITE_DONE_BY_SELF, 1);
} }
@ -3925,11 +3918,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
last_sequence += my_batch_count; last_sequence += my_batch_count;
const uint64_t batch_size = WriteBatchInternal::ByteSize(updates); const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
// Record statistics // Record statistics
RecordTick(options_.statistics.get(), RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
NUMBER_KEYS_WRITTEN, my_batch_count); RecordTick(stats_, BYTES_WRITTEN, WriteBatchInternal::ByteSize(updates));
RecordTick(options_.statistics.get(),
BYTES_WRITTEN,
batch_size);
if (options.disableWAL) { if (options.disableWAL) {
flush_on_destroy_ = true; flush_on_destroy_ = true;
} }
@ -3944,14 +3934,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
alive_log_files_.back().AddSize(log_entry.size()); alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false; log_empty_ = false;
log_size = log_entry.size(); log_size = log_entry.size();
RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); RecordTick(stats_, WAL_FILE_SYNCED);
RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_size); RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
if (options_.use_fsync) { if (options_.use_fsync) {
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS);
status = log_->file()->Fsync(); status = log_->file()->Fsync();
} else { } else {
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS);
status = log_->file()->Sync(); status = log_->file()->Sync();
} }
} }
@ -3972,8 +3962,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
PERF_TIMER_STOP(write_memtable_time); PERF_TIMER_STOP(write_memtable_time);
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
last_sequence);
} }
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
if (updates == &tmp_batch_) { if (updates == &tmp_batch_) {
@ -4019,7 +4008,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
mutex_.Unlock(); mutex_.Unlock();
if (status.IsTimedOut()) { if (status.IsTimedOut()) {
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1); RecordTick(stats_, WRITE_TIMEDOUT);
} }
for (auto& sv : superversions_to_free) { for (auto& sv : superversions_to_free) {
@ -4166,11 +4155,11 @@ Status DBImpl::MakeRoomForWrite(
mutex_.Unlock(); mutex_.Unlock();
uint64_t delayed; uint64_t delayed;
{ {
StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT); StopWatch sw(env_, stats_, STALL_L0_SLOWDOWN_COUNT, true);
env_->SleepForMicroseconds(slowdown); env_->SleepForMicroseconds(slowdown);
delayed = sw.ElapsedMicros(); delayed = sw.ElapsedMicros();
} }
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed); RecordTick(stats_, STALL_L0_SLOWDOWN_MICROS, delayed);
allow_delay = false; // Do not delay a single write more than once allow_delay = false; // Do not delay a single write more than once
mutex_.Lock(); mutex_.Lock();
cfd->internal_stats()->AddCFStats( cfd->internal_stats()->AddCFStats(
@ -4194,8 +4183,7 @@ Status DBImpl::MakeRoomForWrite(
} }
uint64_t stall; uint64_t stall;
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, STALL_MEMTABLE_COMPACTION_COUNT, true);
STALL_MEMTABLE_COMPACTION_COUNT);
if (!has_timeout) { if (!has_timeout) {
bg_cv_.Wait(); bg_cv_.Wait();
} else { } else {
@ -4203,8 +4191,7 @@ Status DBImpl::MakeRoomForWrite(
} }
stall = sw.ElapsedMicros(); stall = sw.ElapsedMicros();
} }
RecordTick(options_.statistics.get(), RecordTick(stats_, STALL_MEMTABLE_COMPACTION_MICROS, stall);
STALL_MEMTABLE_COMPACTION_MICROS, stall);
cfd->internal_stats()->AddCFStats( cfd->internal_stats()->AddCFStats(
InternalStats::MEMTABLE_COMPACTION, stall); InternalStats::MEMTABLE_COMPACTION, stall);
} else if (cfd->NeedWaitForNumLevel0Files()) { } else if (cfd->NeedWaitForNumLevel0Files()) {
@ -4213,8 +4200,7 @@ Status DBImpl::MakeRoomForWrite(
cfd->GetName().c_str()); cfd->GetName().c_str());
uint64_t stall; uint64_t stall;
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, STALL_L0_NUM_FILES_COUNT, true);
STALL_L0_NUM_FILES_COUNT);
if (!has_timeout) { if (!has_timeout) {
bg_cv_.Wait(); bg_cv_.Wait();
} else { } else {
@ -4222,8 +4208,7 @@ Status DBImpl::MakeRoomForWrite(
} }
stall = sw.ElapsedMicros(); stall = sw.ElapsedMicros();
} }
RecordTick(options_.statistics.get(), RecordTick(stats_, STALL_L0_NUM_FILES_MICROS, stall);
STALL_L0_NUM_FILES_MICROS, stall);
cfd->internal_stats()->AddCFStats( cfd->internal_stats()->AddCFStats(
InternalStats::LEVEL0_NUM_FILES, stall); InternalStats::LEVEL0_NUM_FILES, stall);
} else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) {
@ -4233,16 +4218,14 @@ Status DBImpl::MakeRoomForWrite(
mutex_.Unlock(); mutex_.Unlock();
uint64_t delayed; uint64_t delayed;
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, HARD_RATE_LIMIT_DELAY_COUNT, true);
HARD_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(1000); env_->SleepForMicroseconds(1000);
delayed = sw.ElapsedMicros(); delayed = sw.ElapsedMicros();
} }
// Make sure the following value doesn't round to zero. // Make sure the following value doesn't round to zero.
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
rate_limit_delay_millis += rate_limit; rate_limit_delay_millis += rate_limit;
RecordTick(options_.statistics.get(), RecordTick(stats_, RATE_LIMIT_DELAY_MILLIS, rate_limit);
RATE_LIMIT_DELAY_MILLIS, rate_limit);
if (cfd->options()->rate_limit_delay_max_milliseconds > 0 && if (cfd->options()->rate_limit_delay_max_milliseconds > 0 &&
rate_limit_delay_millis >= rate_limit_delay_millis >=
(unsigned)cfd->options()->rate_limit_delay_max_milliseconds) { (unsigned)cfd->options()->rate_limit_delay_max_milliseconds) {
@ -4259,8 +4242,7 @@ Status DBImpl::MakeRoomForWrite(
cfd->options()->hard_rate_limit); cfd->options()->hard_rate_limit);
mutex_.Unlock(); mutex_.Unlock();
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, stats_, SOFT_RATE_LIMIT_DELAY_COUNT, true);
SOFT_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(slowdown); env_->SleepForMicroseconds(slowdown);
slowdown = sw.ElapsedMicros(); slowdown = sw.ElapsedMicros();
rate_limit_delay_millis += slowdown; rate_limit_delay_millis += slowdown;
@ -4434,7 +4416,7 @@ Status DBImpl::GetUpdatesSince(
SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter, SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) { const TransactionLogIterator::ReadOptions& read_options) {
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
if (seq > versions_->LastSequence()) { if (seq > versions_->LastSequence()) {
return Status::NotFound("Requested sequence not yet written in the db"); return Status::NotFound("Requested sequence not yet written in the db");
} }

@ -290,6 +290,7 @@ class DBImpl : public DB {
const std::string dbname_; const std::string dbname_;
unique_ptr<VersionSet> versions_; unique_ptr<VersionSet> versions_;
const DBOptions options_; const DBOptions options_;
Statistics* stats_;
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version, SuperVersion* super_version,

@ -70,7 +70,7 @@ class DBIter: public Iterator {
valid_(false), valid_(false),
current_entry_is_merged_(false), current_entry_is_merged_(false),
statistics_(options.statistics.get()) { statistics_(options.statistics.get()) {
RecordTick(statistics_, NO_ITERATORS, 1); RecordTick(statistics_, NO_ITERATORS);
max_skip_ = options.max_sequential_skip_in_iterations; max_skip_ = options.max_sequential_skip_in_iterations;
} }
virtual ~DBIter() { virtual ~DBIter() {

@ -566,8 +566,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
RecordTick(options->statistics.get(), RecordTick(options->statistics.get(), NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
*tp = std::shared_ptr<const TableProperties>(raw_table_properties); *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
return s; return s;

@ -20,12 +20,12 @@ namespace rocksdb {
* 1. Any ticker should be added before TICKER_ENUM_MAX. * 1. Any ticker should be added before TICKER_ENUM_MAX.
* 2. Add a readable string in TickersNameMap below for the newly added ticker. * 2. Add a readable string in TickersNameMap below for the newly added ticker.
*/ */
enum Tickers { enum Tickers : uint32_t {
// total block cache misses // total block cache misses
// REQUIRES: BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + // REQUIRES: BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS +
// BLOCK_CACHE_FILTER_MISS + // BLOCK_CACHE_FILTER_MISS +
// BLOCK_CACHE_DATA_MISS; // BLOCK_CACHE_DATA_MISS;
BLOCK_CACHE_MISS, BLOCK_CACHE_MISS = 0,
// total block cache hit // total block cache hit
// REQUIRES: BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + // REQUIRES: BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT +
// BLOCK_CACHE_FILTER_HIT + // BLOCK_CACHE_FILTER_HIT +
@ -198,8 +198,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
* Add a string representation in HistogramsNameMap below * Add a string representation in HistogramsNameMap below
* And increment HISTOGRAM_ENUM_MAX * And increment HISTOGRAM_ENUM_MAX
*/ */
enum Histograms { enum Histograms : uint32_t {
DB_GET, DB_GET = 0,
DB_WRITE, DB_WRITE,
COMPACTION_TIME, COMPACTION_TIME,
TABLE_SYNC_MICROS, TABLE_SYNC_MICROS,
@ -256,14 +256,21 @@ class Statistics {
public: public:
virtual ~Statistics() {} virtual ~Statistics() {}
virtual long getTickerCount(Tickers tickerType) = 0; virtual uint64_t getTickerCount(uint32_t tickerType) const = 0;
virtual void recordTick(Tickers tickerType, uint64_t count = 0) = 0; virtual void histogramData(uint32_t type,
virtual void setTickerCount(Tickers tickerType, uint64_t count) = 0; HistogramData* const data) const = 0;
virtual void measureTime(Histograms histogramType, uint64_t time) = 0;
virtual void recordTick(uint32_t tickerType, uint64_t count = 0) = 0;
virtual void setTickerCount(uint32_t tickerType, uint64_t count) = 0;
virtual void measureTime(uint32_t histogramType, uint64_t time) = 0;
virtual void histogramData(Histograms type, HistogramData* const data) = 0;
// String representation of the statistic object. // String representation of the statistic object.
std::string ToString(); virtual std::string ToString() const = 0;
// Override this function to disable particular histogram collection
virtual bool HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;
}
}; };
// Create a concrete DBStatistics object // Create a concrete DBStatistics object

@ -870,10 +870,9 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
statistics, ro, &block); statistics, ro, &block);
if (block.value == nullptr && !no_io && ro.fill_cache) { if (block.value == nullptr && !no_io && ro.fill_cache) {
Histograms histogram = READ_BLOCK_GET_MICROS;
Block* raw_block = nullptr; Block* raw_block = nullptr;
{ {
StopWatch sw(rep->options.env, statistics, histogram); StopWatch sw(rep->options.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&raw_block, rep->options.env, &raw_block, rep->options.env,
block_cache_compressed == nullptr); block_cache_compressed == nullptr);

@ -25,6 +25,7 @@
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "util/statistics.h"
namespace rocksdb { namespace rocksdb {

@ -5,43 +5,82 @@
// //
#include "util/statistics.h" #include "util/statistics.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "port/likely.h"
#include <algorithm> #include <algorithm>
#include <cstdio> #include <cstdio>
namespace rocksdb { namespace rocksdb {
std::shared_ptr<Statistics> CreateDBStatistics() { std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>(); return std::make_shared<StatisticsImpl>(nullptr, false);
} }
StatisticsImpl::StatisticsImpl() {} StatisticsImpl::StatisticsImpl(
std::shared_ptr<Statistics> stats,
bool enable_internal_stats)
: stats_shared_(stats),
stats_(stats.get()),
enable_internal_stats_(enable_internal_stats) {
}
StatisticsImpl::~StatisticsImpl() {} StatisticsImpl::~StatisticsImpl() {}
long StatisticsImpl::getTickerCount(Tickers tickerType) { uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const {
assert(tickerType < TICKER_ENUM_MAX); assert(
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
// Return its own ticker version
return tickers_[tickerType].value; return tickers_[tickerType].value;
} }
void StatisticsImpl::setTickerCount(Tickers tickerType, uint64_t count) { void StatisticsImpl::histogramData(uint32_t histogramType,
assert(tickerType < TICKER_ENUM_MAX); HistogramData* const data) const {
assert(
enable_internal_stats_ ?
histogramType < INTERNAL_TICKER_ENUM_MAX :
histogramType < TICKER_ENUM_MAX);
// Return its own ticker version
histograms_[histogramType].Data(data);
}
void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
assert(
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].value = count; tickers_[tickerType].value = count;
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->setTickerCount(tickerType, count);
}
} }
void StatisticsImpl::recordTick(Tickers tickerType, uint64_t count) { void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
assert(tickerType < TICKER_ENUM_MAX); assert(
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].value += count; tickers_[tickerType].value += count;
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count);
}
} }
void StatisticsImpl::measureTime(Histograms histogramType, uint64_t value) { void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) {
assert(histogramType < HISTOGRAM_ENUM_MAX); assert(
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
histograms_[histogramType].Add(value); histograms_[histogramType].Add(value);
} }
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
void StatisticsImpl::histogramData(Histograms histogramType, stats_->measureTime(histogramType, value);
HistogramData* const data) { }
assert(histogramType < HISTOGRAM_ENUM_MAX);
histograms_[histogramType].Data(data);
} }
namespace { namespace {
@ -49,46 +88,44 @@ namespace {
// a buffer size used for temp string buffers // a buffer size used for temp string buffers
const int kBufferSize = 200; const int kBufferSize = 200;
std::string HistogramToString (
Statistics* dbstats,
const Histograms& histogram_type,
const std::string& name) {
char buffer[kBufferSize];
HistogramData histogramData;
dbstats->histogramData(histogram_type, &histogramData);
snprintf(
buffer,
kBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f\n",
name.c_str(),
histogramData.median,
histogramData.percentile95,
histogramData.percentile99
);
return std::string(buffer);
};
std::string TickerToString(Statistics* dbstats, const Tickers& ticker,
const std::string& name) {
char buffer[kBufferSize];
snprintf(buffer, kBufferSize, "%s COUNT : %ld\n",
name.c_str(), dbstats->getTickerCount(ticker));
return std::string(buffer);
};
} // namespace } // namespace
std::string Statistics::ToString() { std::string StatisticsImpl::ToString() const {
std::string res; std::string res;
res.reserve(20000); res.reserve(20000);
for (const auto& t : TickersNameMap) { for (const auto& t : TickersNameMap) {
res.append(TickerToString(this, t.first, t.second)); if (t.first < TICKER_ENUM_MAX || enable_internal_stats_) {
char buffer[kBufferSize];
snprintf(buffer, kBufferSize, "%s COUNT : %ld\n",
t.second.c_str(), getTickerCount(t.first));
res.append(buffer);
}
} }
for (const auto& h : HistogramsNameMap) { for (const auto& h : HistogramsNameMap) {
res.append(HistogramToString(this, h.first, h.second)); if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
char buffer[kBufferSize];
HistogramData hData;
histogramData(h.first, &hData);
snprintf(
buffer,
kBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f\n",
h.second.c_str(),
hData.median,
hData.percentile95,
hData.percentile99);
res.append(buffer);
}
} }
res.shrink_to_fit(); res.shrink_to_fit();
return res; return res;
} }
bool StatisticsImpl::HistEnabledForType(uint32_t type) const {
if (LIKELY(!enable_internal_stats_)) {
return type < HISTOGRAM_ENUM_MAX;
}
return true;
}
} // namespace rocksdb } // namespace rocksdb

@ -5,29 +5,51 @@
// //
#pragma once #pragma once
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "util/histogram.h"
#include "util/mutexlock.h"
#include "port/likely.h"
#include <vector> #include <vector>
#include <atomic> #include <atomic>
#include <string>
#include "util/histogram.h"
#include "util/mutexlock.h"
#include "port/likely.h"
namespace rocksdb { namespace rocksdb {
enum TickersInternal : uint32_t {
INTERNAL_TICKER_ENUM_START = TICKER_ENUM_MAX,
INTERNAL_TICKER_ENUM_MAX
};
enum HistogramsInternal : uint32_t {
INTERNAL_HISTOGRAM_START = HISTOGRAM_ENUM_MAX,
INTERNAL_HISTOGRAM_ENUM_MAX
};
class StatisticsImpl : public Statistics { class StatisticsImpl : public Statistics {
public: public:
StatisticsImpl(); StatisticsImpl(std::shared_ptr<Statistics> stats,
bool enable_internal_stats);
virtual ~StatisticsImpl(); virtual ~StatisticsImpl();
virtual long getTickerCount(Tickers tickerType); virtual uint64_t getTickerCount(uint32_t ticker_type) const override;
virtual void setTickerCount(Tickers tickerType, uint64_t count); virtual void histogramData(uint32_t histogram_type,
virtual void recordTick(Tickers tickerType, uint64_t count); HistogramData* const data) const override;
virtual void measureTime(Histograms histogramType, uint64_t value);
virtual void histogramData(Histograms histogramType, virtual void setTickerCount(uint32_t ticker_type, uint64_t count) override;
HistogramData* const data); virtual void recordTick(uint32_t ticker_type, uint64_t count) override;
virtual void measureTime(uint32_t histogram_type, uint64_t value) override;
virtual std::string ToString() const override;
virtual bool HistEnabledForType(uint32_t type) const override;
private: private:
std::shared_ptr<Statistics> stats_shared_;
Statistics* stats_;
bool enable_internal_stats_;
struct Ticker { struct Ticker {
Ticker() : value(uint_fast64_t()) {} Ticker() : value(uint_fast64_t()) {}
@ -38,29 +60,30 @@ class StatisticsImpl : public Statistics {
char padding[64 - sizeof(std::atomic_uint_fast64_t)]; char padding[64 - sizeof(std::atomic_uint_fast64_t)];
}; };
Ticker tickers_[TICKER_ENUM_MAX] __attribute__((aligned(64))); Ticker tickers_[INTERNAL_TICKER_ENUM_MAX] __attribute__((aligned(64)));
HistogramImpl histograms_[HISTOGRAM_ENUM_MAX] __attribute__((aligned(64))); HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX]
__attribute__((aligned(64)));
}; };
// Utility functions // Utility functions
inline void MeasureTime(Statistics* statistics, Histograms histogramType, inline void MeasureTime(Statistics* statistics, uint32_t histogram_type,
uint64_t value) { uint64_t value) {
if (statistics) { if (statistics) {
statistics->measureTime(histogramType, value); statistics->measureTime(histogram_type, value);
} }
} }
inline void RecordTick(Statistics* statistics, Tickers ticker, inline void RecordTick(Statistics* statistics, uint32_t ticker_type,
uint64_t count = 1) { uint64_t count = 1) {
if (statistics) { if (statistics) {
statistics->recordTick(ticker, count); statistics->recordTick(ticker_type, count);
} }
} }
inline void SetTickerCount(Statistics* statistics, Tickers ticker, inline void SetTickerCount(Statistics* statistics, uint32_t ticker_type,
uint64_t count) { uint64_t count) {
if (statistics) { if (statistics) {
statistics->setTickerCount(ticker, count); statistics->setTickerCount(ticker_type, count);
} }
} }
} }

@ -12,30 +12,31 @@ namespace rocksdb {
// Records the statistic into the corresponding histogram. // Records the statistic into the corresponding histogram.
class StopWatch { class StopWatch {
public: public:
explicit StopWatch( StopWatch(Env * const env, Statistics* statistics,
Env * const env, const uint32_t hist_type, bool force_enable = false)
Statistics* statistics = nullptr, : env_(env),
const Histograms histogram_name = DB_GET,
bool auto_start = true) :
env_(env),
start_time_((!auto_start && !statistics) ? 0 : env->NowMicros()),
statistics_(statistics), statistics_(statistics),
histogram_name_(histogram_name) {} hist_type_(hist_type),
enabled_(statistics && statistics->HistEnabledForType(hist_type)),
start_time_(enabled_ || force_enable ? env->NowMicros() : 0) {
}
uint64_t ElapsedMicros() const { uint64_t ElapsedMicros() const {
return env_->NowMicros() - start_time_; return env_->NowMicros() - start_time_;
} }
~StopWatch() { MeasureTime(statistics_, histogram_name_, ElapsedMicros()); } ~StopWatch() {
if (enabled_) {
statistics_->measureTime(hist_type_, ElapsedMicros());
}
}
private: private:
Env* const env_; Env* const env_;
const uint64_t start_time_;
Statistics* statistics_; Statistics* statistics_;
const Histograms histogram_name_; const uint32_t hist_type_;
bool enabled_;
const uint64_t start_time_;
}; };
// a nano second precision stopwatch // a nano second precision stopwatch

Loading…
Cancel
Save