Thread-specific ticker statistics

Summary:
The global atomics we previously used for tickers had poor cache performance
since they were typically updated from different threads, causing frequent
invalidations. In this diff,

- recordTick() updates a local ticker value specific to the thread in which it was called
- When a thread exits, its local ticker value is added into merged_sum
- getTickerCount() returns the sum of all threads' local ticker values and the merged_sum
- setTickerCount() resets all threads' local ticker values and sets merged_sum to the value provided by the caller.

In a next diff I will make a similar change for histogram stats.

Test Plan:
before:

  $ TEST_TMPDIR=/dev/shm/ perf record -g ./db_bench --benchmarks=readwhilewriting --statistics --num=1000000 --use_existing_db --threads=64 --cache_size=250000000 --compression_type=lz4
  $ perf report -g --stdio | grep recordTick
  7.59%  db_bench     db_bench             [.] rocksdb::StatisticsImpl::recordTick
  ...

after:

  $ TEST_TMPDIR=/dev/shm/ perf record -g ./db_bench --benchmarks=readwhilewriting --statistics --num=1000000 --use_existing_db --threads=64 --cache_size=250000000 --compression_type=lz4
  $ perf report -g --stdio | grep recordTick
  1.46%  db_bench     db_bench             [.] rocksdb::StatisticsImpl::recordTick
  ...

Reviewers: kradhakrishnan, MarkCallaghan, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: yiwu, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D62337
main
Andrew Kryczka 8 years ago
parent ea9e0757ff
commit 7c95868378
  1. 42
      util/statistics.cc
  2. 49
      util/statistics.h

@ -32,12 +32,19 @@ StatisticsImpl::StatisticsImpl(
StatisticsImpl::~StatisticsImpl() {} StatisticsImpl::~StatisticsImpl() {}
uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const { uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const {
MutexLock lock(&aggregate_lock_);
assert( assert(
enable_internal_stats_ ? enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX); tickerType < TICKER_ENUM_MAX);
// Return its own ticker version uint64_t thread_local_sum = 0;
return tickers_[tickerType].value; tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load();
},
&thread_local_sum);
return thread_local_sum + tickers_[tickerType].merged_sum.load();
} }
void StatisticsImpl::histogramData(uint32_t histogramType, void StatisticsImpl::histogramData(uint32_t histogramType,
@ -56,13 +63,31 @@ std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
return histograms_[histogramType].ToString(); return histograms_[histogramType].ToString();
} }
StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
uint32_t tickerType) {
auto info_ptr =
static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr =
new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
tickers_[tickerType].thread_value->Reset(info_ptr);
}
return info_ptr;
}
void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
assert( {
enable_internal_stats_ ? MutexLock lock(&aggregate_lock_);
tickerType < INTERNAL_TICKER_ENUM_MAX : assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
tickerType < TICKER_ENUM_MAX); : tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].value.store(count, std::memory_order_relaxed); tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
static_cast<std::atomic<uint64_t>*>(curr_ptr)->store(0);
},
nullptr /* res */);
tickers_[tickerType].merged_sum.store(count);
}
} }
if (stats_ && tickerType < TICKER_ENUM_MAX) { if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->setTickerCount(tickerType, count); stats_->setTickerCount(tickerType, count);
@ -75,7 +100,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX); tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].value.fetch_add(count, std::memory_order_relaxed); auto info_ptr = getThreadTickerInfo(tickerType);
info_ptr->value.fetch_add(count);
} }
if (stats_ && tickerType < TICKER_ENUM_MAX) { if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count); stats_->recordTick(tickerType, count);

@ -10,10 +10,11 @@
#include <atomic> #include <atomic>
#include <string> #include <string>
#include "port/likely.h"
#include "port/port.h"
#include "util/histogram.h" #include "util/histogram.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "port/likely.h" #include "util/thread_local.h"
namespace rocksdb { namespace rocksdb {
@ -50,24 +51,46 @@ class StatisticsImpl : public Statistics {
std::shared_ptr<Statistics> stats_shared_; std::shared_ptr<Statistics> stats_shared_;
Statistics* stats_; Statistics* stats_;
bool enable_internal_stats_; bool enable_internal_stats_;
// Synchronizes setTickerCount()/getTickerCount() operations so partially
// completed setTickerCount() won't be visible.
mutable port::Mutex aggregate_lock_;
struct Ticker { // Holds data maintained by each thread for implementing tickers.
Ticker() : value(uint_fast64_t()) {} struct ThreadTickerInfo {
std::atomic_uint_fast64_t value; std::atomic_uint_fast64_t value;
// Pad the structure to make it size of 64 bytes. A plain array of // During teardown, value will be summed into *merged_sum.
// std::atomic_uint_fast64_t results in huge performance degradataion std::atomic_uint_fast64_t* merged_sum;
// due to false sharing.
char padding[64 - sizeof(std::atomic_uint_fast64_t)]; ThreadTickerInfo(uint_fast64_t _value,
std::atomic_uint_fast64_t* _merged_sum)
: value(_value), merged_sum(_merged_sum) {}
}; };
static_assert(sizeof(Ticker) == 64, "Expecting to fit into 64 bytes"); struct Ticker {
Ticker()
: thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
// Holds thread-specific pointer to ThreadTickerInfo
std::unique_ptr<ThreadLocalPtr> thread_value;
// Sum of thread-specific values for tickers that have been reset due to
// thread termination or ThreadLocalPtr destruction. Also, this is used by
// setTickerCount() to conveniently change the global value by setting this
// while simultaneously zeroing all thread-local values.
std::atomic_uint_fast64_t merged_sum;
static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
*info_ptr->merged_sum += info_ptr->value;
delete info_ptr;
}
};
// Returns the info for this tickerType/thread. It sets a new info with zeroed
// counter if none exists.
ThreadTickerInfo* getThreadTickerInfo(uint32_t tickerType);
Ticker tickers_[INTERNAL_TICKER_ENUM_MAX];
// Attributes expand to nothing depending on the platform // Attributes expand to nothing depending on the platform
__declspec(align(64)) __declspec(align(64))
Ticker tickers_[INTERNAL_TICKER_ENUM_MAX]
__attribute__((aligned(64)));
__declspec(align(64))
HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX] HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX]
__attribute__((aligned(64))); __attribute__((aligned(64)));
}; };

Loading…
Cancel
Save