Core-local statistics

Summary:
This diff changes `StatisticsImpl` from a thread-local approach to a core-local one. The goal is to perform faster aggregations, particularly for applications that have many threads. There should be no behavior change.
Closes https://github.com/facebook/rocksdb/pull/2258

Differential Revision: D5016258

Pulled By: ajkr

fbshipit-source-id: 7d4d165b4a91d8110f0409d113d1be91f22d31a9
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 3e86c0f07c
commit ac39d6bec5
  1. 2
      HISTORY.md
  2. 127
      monitoring/statistics.cc
  3. 111
      monitoring/statistics.h
  4. 21
      util/core_local.h

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features
* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
## 5.5.0 (05/17/2017) ## 5.5.0 (05/17/2017)
### New Features ### New Features

@ -23,13 +23,9 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>(nullptr, false); return std::make_shared<StatisticsImpl>(nullptr, false);
} }
StatisticsImpl::StatisticsImpl( StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats,
std::shared_ptr<Statistics> stats,
bool enable_internal_stats) bool enable_internal_stats)
: stats_shared_(stats), : stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {}
stats_(stats.get()),
enable_internal_stats_(enable_internal_stats) {
}
StatisticsImpl::~StatisticsImpl() {} StatisticsImpl::~StatisticsImpl() {}
@ -43,79 +39,36 @@ uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
enable_internal_stats_ ? enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX); tickerType < TICKER_ENUM_MAX);
uint64_t thread_local_sum = 0; uint64_t res = 0;
tickers_[tickerType].thread_value->Fold( for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
[](void* curr_ptr, void* res) { res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
auto* sum_ptr = static_cast<uint64_t*>(res); }
*sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load( return res;
std::memory_order_relaxed);
},
&thread_local_sum);
return thread_local_sum +
tickers_[tickerType].merged_sum.load(std::memory_order_relaxed);
}
std::unique_ptr<HistogramImpl>
StatisticsImpl::HistogramInfo::getMergedHistogram() const {
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
{
MutexLock lock(&merge_lock);
res_hist->Merge(merged_hist);
}
thread_value->Fold(
[](void* curr_ptr, void* res) {
auto tmp_res_hist = static_cast<HistogramImpl*>(res);
auto curr_hist = static_cast<HistogramImpl*>(curr_ptr);
tmp_res_hist->Merge(*curr_hist);
},
res_hist.get());
return res_hist;
} }
void StatisticsImpl::histogramData(uint32_t histogramType, void StatisticsImpl::histogramData(uint32_t histogramType,
HistogramData* const data) const { HistogramData* const data) const {
MutexLock lock(&aggregate_lock_); MutexLock lock(&aggregate_lock_);
histogramDataLocked(histogramType, data); getHistogramImplLocked(histogramType)->Data(data);
} }
void StatisticsImpl::histogramDataLocked(uint32_t histogramType, std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
HistogramData* const data) const { uint32_t histogramType) const {
assert( assert(
enable_internal_stats_ ? enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX : histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX); histogramType < HISTOGRAM_ENUM_MAX);
histograms_[histogramType].getMergedHistogram()->Data(data); std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res_hist->Merge(
per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
}
return res_hist;
} }
std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const { std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
MutexLock lock(&aggregate_lock_); MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX return getHistogramImplLocked(histogramType)->ToString();
: histogramType < HISTOGRAM_ENUM_MAX);
return histograms_[histogramType].getMergedHistogram()->ToString();
}
StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
uint32_t tickerType) {
auto info_ptr =
static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr =
new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
tickers_[tickerType].thread_value->Reset(info_ptr);
}
return info_ptr;
}
StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo(
uint32_t histogram_type) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(
histograms_[histogram_type].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist,
&histograms_[histogram_type].merge_lock);
histograms_[histogram_type].thread_value->Reset(info_ptr);
}
return info_ptr;
} }
void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
@ -131,14 +84,12 @@ void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) { void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX); : tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
tickers_[tickerType].thread_value->Fold( if (core_idx == 0) {
[](void* curr_ptr, void* res) { per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
static_cast<std::atomic<uint64_t>*>(curr_ptr)->store( } else {
0, std::memory_order_relaxed); per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
}, }
nullptr /* res */);
tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed);
} }
} }
@ -148,15 +99,9 @@ uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
MutexLock lock(&aggregate_lock_); MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX); : tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
tickers_[tickerType].thread_value->Fold( sum +=
[](void* curr_ptr, void* res) { per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic<uint64_t>*>(curr_ptr)->exchange(
0, std::memory_order_relaxed);
},
&sum);
sum += tickers_[tickerType].merged_sum.exchange(
0, std::memory_order_relaxed); 0, std::memory_order_relaxed);
} }
} }
@ -171,10 +116,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
enable_internal_stats_ ? enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX); tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { per_core_stats_.Access()->tickers_[tickerType].fetch_add(
auto info_ptr = getThreadTickerInfo(tickerType); count, std::memory_order_relaxed);
info_ptr->value.fetch_add(count, std::memory_order_relaxed);
}
if (stats_ && tickerType < TICKER_ENUM_MAX) { if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count); stats_->recordTick(tickerType, count);
} }
@ -185,9 +128,7 @@ void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) {
enable_internal_stats_ ? enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX : histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX); histogramType < HISTOGRAM_ENUM_MAX);
if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) { per_core_stats_.Access()->histograms_[histogramType].Add(value);
getThreadHistogramInfo(histogramType)->value.Add(value);
}
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) { if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
stats_->measureTime(histogramType, value); stats_->measureTime(histogramType, value);
} }
@ -199,11 +140,9 @@ Status StatisticsImpl::Reset() {
setTickerCountLocked(i, 0); setTickerCountLocked(i, 0);
} }
for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) { for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
histograms_[i].thread_value->Fold( for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
[](void* curr_ptr, void* res) { per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
static_cast<HistogramImpl*>(curr_ptr)->Clear(); }
},
nullptr /* res */);
} }
return Status::OK(); return Status::OK();
} }
@ -231,7 +170,7 @@ std::string StatisticsImpl::ToString() const {
if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) { if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
char buffer[kTmpStrBufferSize]; char buffer[kTmpStrBufferSize];
HistogramData hData; HistogramData hData;
histogramDataLocked(h.first, &hData); getHistogramImplLocked(h.first)->Data(&hData);
snprintf( snprintf(
buffer, kTmpStrBufferSize, buffer, kTmpStrBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n", "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n",

@ -15,8 +15,14 @@
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "port/likely.h" #include "port/likely.h"
#include "port/port.h" #include "port/port.h"
#include "util/core_local.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/thread_local.h"
#ifdef __clang__
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
#else
#define ROCKSDB_FIELD_UNUSED
#endif // __clang__
namespace rocksdb { namespace rocksdb {
@ -52,97 +58,38 @@ class StatisticsImpl : public Statistics {
virtual bool HistEnabledForType(uint32_t type) const override; virtual bool HistEnabledForType(uint32_t type) const override;
private: private:
std::shared_ptr<Statistics> stats_shared_; // If non-nullptr, forwards updates to the object pointed to by `stats_`.
Statistics* stats_; std::shared_ptr<Statistics> stats_;
// TODO(ajkr): clean this up since there are no internal stats anymore
bool enable_internal_stats_; bool enable_internal_stats_;
// Synchronizes anything that operates on other threads' thread-specific data // Synchronizes anything that operates across other cores' local data,
// such that operations like Reset() can be performed atomically. // such that operations like Reset() can be performed atomically.
mutable port::Mutex aggregate_lock_; mutable port::Mutex aggregate_lock_;
// Holds data maintained by each thread for implementing tickers. // The ticker/histogram data are stored in this structure, which we will store
struct ThreadTickerInfo { // per-core. It is cache-aligned, so tickers/histograms belonging to different
std::atomic_uint_fast64_t value; // cores can never share the same cache line.
// During teardown, value will be summed into *merged_sum. //
std::atomic_uint_fast64_t* merged_sum; // Alignment attributes expand to nothing depending on the platform
struct StatisticsData {
ThreadTickerInfo(uint_fast64_t _value, std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}};
std::atomic_uint_fast64_t* _merged_sum) HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
: value(_value), merged_sum(_merged_sum) {} char
padding[(CACHE_LINE_SIZE -
(INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) +
INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) %
CACHE_LINE_SIZE) %
CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
}; };
// Holds data maintained by each thread for implementing histograms. static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned");
struct ThreadHistogramInfo {
HistogramImpl value;
// During teardown, value will be merged into *merged_hist while holding
// *merge_lock, which also syncs with the merges necessary for reads.
HistogramImpl* merged_hist;
port::Mutex* merge_lock;
ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock) CoreLocalArray<StatisticsData> per_core_stats_;
: value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {}
};
// Holds global data for implementing tickers.
struct TickerInfo {
TickerInfo()
: thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
// Holds thread-specific pointer to ThreadTickerInfo
std::unique_ptr<ThreadLocalPtr> thread_value;
// Sum of thread-specific values for tickers that have been reset due to
// thread termination or ThreadLocalPtr destruction. Also, this is used by
// setTickerCount() to conveniently change the global value by setting this
// while simultaneously zeroing all thread-local values.
std::atomic_uint_fast64_t merged_sum;
static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
*info_ptr->merged_sum += info_ptr->value;
delete info_ptr;
}
};
// Holds global data for implementing histograms.
struct HistogramInfo {
HistogramInfo()
: merged_hist(),
merge_lock(),
thread_value(new ThreadLocalPtr(&mergeThreadValue)) {}
// Merged thread-specific values for histograms that have been reset due to
// thread termination or ThreadLocalPtr destruction. Note these must be
// destroyed after thread_value since its destructor accesses them.
HistogramImpl merged_hist;
mutable port::Mutex merge_lock;
// Holds thread-specific pointer to ThreadHistogramInfo
std::unique_ptr<ThreadLocalPtr> thread_value;
static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(ptr);
{
MutexLock lock(info_ptr->merge_lock);
info_ptr->merged_hist->Merge(info_ptr->value);
}
delete info_ptr;
}
// Returns a histogram that merges all histograms (thread-specific and
// previously merged ones).
std::unique_ptr<HistogramImpl> getMergedHistogram() const;
};
uint64_t getTickerCountLocked(uint32_t ticker_type) const; uint64_t getTickerCountLocked(uint32_t ticker_type) const;
void histogramDataLocked(uint32_t histogram_type, std::unique_ptr<HistogramImpl> getHistogramImplLocked(
HistogramData* const data) const; uint32_t histogram_type) const;
void setTickerCountLocked(uint32_t ticker_type, uint64_t count); void setTickerCountLocked(uint32_t ticker_type, uint64_t count);
// Returns the info for this tickerType/thread. It sets a new info with zeroed
// counter if none exists.
ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type);
// Returns the info for this histogramType/thread. It sets a new histogram
// with zeroed data if none exists.
ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type);
TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
}; };
// Utility functions // Utility functions

@ -7,19 +7,20 @@
#pragma once #pragma once
#include "port/likely.h"
#include "port/port.h"
#include "util/random.h"
#include <cstddef> #include <cstddef>
#include <thread> #include <thread>
#include <utility>
#include <vector> #include <vector>
#include "port/likely.h"
#include "port/port.h"
#include "util/random.h"
namespace rocksdb { namespace rocksdb {
// An array of core-local values. Ideally the value type, T, is cache aligned to // An array of core-local values. Ideally the value type, T, is cache aligned to
// prevent false sharing. // prevent false sharing.
template<typename T> template <typename T>
class CoreLocalArray { class CoreLocalArray {
public: public:
CoreLocalArray(); CoreLocalArray();
@ -41,7 +42,7 @@ class CoreLocalArray {
int size_shift_; int size_shift_;
}; };
template<typename T> template <typename T>
CoreLocalArray<T>::CoreLocalArray() { CoreLocalArray<T>::CoreLocalArray() {
int num_cpus = static_cast<int>(std::thread::hardware_concurrency()); int num_cpus = static_cast<int>(std::thread::hardware_concurrency());
// find a power of two >= num_cpus and >= 8 // find a power of two >= num_cpus and >= 8
@ -52,17 +53,17 @@ CoreLocalArray<T>::CoreLocalArray() {
data_.reset(new T[static_cast<size_t>(1) << size_shift_]); data_.reset(new T[static_cast<size_t>(1) << size_shift_]);
} }
template<typename T> template <typename T>
size_t CoreLocalArray<T>::Size() const { size_t CoreLocalArray<T>::Size() const {
return static_cast<size_t>(1) << size_shift_; return static_cast<size_t>(1) << size_shift_;
} }
template<typename T> template <typename T>
T* CoreLocalArray<T>::Access() const { T* CoreLocalArray<T>::Access() const {
return AccessElementAndIndex().first; return AccessElementAndIndex().first;
} }
template<typename T> template <typename T>
std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const { std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
int cpuid = port::PhysicalCoreID(); int cpuid = port::PhysicalCoreID();
size_t core_idx; size_t core_idx;
@ -75,7 +76,7 @@ std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
return {AccessAtCore(core_idx), core_idx}; return {AccessAtCore(core_idx), core_idx};
} }
template<typename T> template <typename T>
T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const { T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
assert(core_idx < static_cast<size_t>(1) << size_shift_); assert(core_idx < static_cast<size_t>(1) << size_shift_);
return &data_[core_idx]; return &data_[core_idx];

Loading…
Cancel
Save