rocksdb/monitoring/histogram_windowing.cc

197 lines
6.1 KiB

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "monitoring/histogram_windowing.h"
#include "monitoring/histogram.h"
#include <algorithm>
namespace rocksdb {
HistogramWindowingImpl::HistogramWindowingImpl() {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::HistogramWindowingImpl(
uint64_t num_windows,
uint64_t micros_per_window,
uint64_t min_num_per_window) :
num_windows_(num_windows),
micros_per_window_(micros_per_window),
min_num_per_window_(min_num_per_window) {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::~HistogramWindowingImpl() {
}
void HistogramWindowingImpl::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Clear();
for (size_t i = 0; i < num_windows_; i++) {
window_stats_[i].Clear();
}
current_window_.store(0, std::memory_order_relaxed);
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
}
bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
// This function is designed to be lock free, as it's in the critical path
// of any operation.
// Each individual value is atomic, it is just that some samples can go
// in the older bucket which is tolerable.
void HistogramWindowingImpl::Add(uint64_t value){
TimerTick();
// Parent (global) member update
stats_.Add(value);
// Current window update
window_stats_[current_window()].Add(value);
}
void HistogramWindowingImpl::Merge(const Histogram& other) {
if (strcmp(Name(), other.Name()) == 0) {
Merge(dynamic_cast<const HistogramWindowingImpl&>(other));
}
}
void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Merge(other.stats_);
if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
micros_per_window_ != other.micros_per_window_) {
return;
}
uint64_t cur_window = current_window();
uint64_t other_cur_window = other.current_window();
// going backwards for alignment
for (unsigned int i = 0;
i < std::min(num_windows_, other.num_windows_); i++) {
uint64_t window_index =
(cur_window + num_windows_ - i) % num_windows_;
uint64_t other_window_index =
(other_cur_window + other.num_windows_ - i) % other.num_windows_;
window_stats_[window_index].Merge(other.window_stats_[other_window_index]);
}
}
std::string HistogramWindowingImpl::ToString() const {
return stats_.ToString();
}
double HistogramWindowingImpl::Median() const {
return Percentile(50.0);
}
double HistogramWindowingImpl::Percentile(double p) const {
// Retry 3 times in total
for (int retry = 0; retry < 3; retry++) {
uint64_t start_num = stats_.num();
double result = stats_.Percentile(p);
// Detect if swap buckets or Clear() was called during calculation
if (stats_.num() >= start_num) {
return result;
}
}
return 0.0;
}
double HistogramWindowingImpl::Average() const {
return stats_.Average();
}
double HistogramWindowingImpl::StandardDeviation() const {
return stats_.StandardDeviation();
}
void HistogramWindowingImpl::Data(HistogramData * const data) const {
stats_.Data(data);
}
void HistogramWindowingImpl::TimerTick() {
uint64_t curr_time = env_->NowMicros();
if (curr_time - last_swap_time() > micros_per_window_ &&
window_stats_[current_window()].num() >= min_num_per_window_) {
SwapHistoryBucket();
}
}
void HistogramWindowingImpl::SwapHistoryBucket() {
// Threads executing Add() would be competing for this mutex, the first one
// who got the metex would take care of the bucket swap, other threads
// can skip this.
// If mutex is held by Merge() or Clear(), next Add() will take care of the
// swap, if needed.
if (mutex_.try_lock()) {
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
uint64_t curr_window = current_window();
uint64_t next_window = (curr_window == num_windows_ - 1) ?
0 : curr_window + 1;
// subtract next buckets from totals and swap to next buckets
HistogramStat& stats_to_drop = window_stats_[next_window];
if (!stats_to_drop.Empty()) {
for (size_t b = 0; b < stats_.num_buckets_; b++){
stats_.buckets_[b].fetch_sub(
stats_to_drop.bucket_at(b), std::memory_order_relaxed);
}
if (stats_.min() == stats_to_drop.min()) {
uint64_t new_min = std::numeric_limits<uint64_t>::max();
for (unsigned int i = 0; i < num_windows_; i++) {
if (i != next_window) {
uint64_t m = window_stats_[i].min();
if (m < new_min) new_min = m;
}
}
stats_.min_.store(new_min, std::memory_order_relaxed);
}
if (stats_.max() == stats_to_drop.max()) {
uint64_t new_max = 0;
for (unsigned int i = 0; i < num_windows_; i++) {
if (i != next_window) {
uint64_t m = window_stats_[i].max();
if (m > new_max) new_max = m;
}
}
stats_.max_.store(new_max, std::memory_order_relaxed);
}
stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
stats_.sum_squares_.fetch_sub(
stats_to_drop.sum_squares(), std::memory_order_relaxed);
stats_to_drop.Clear();
}
// advance to next window bucket
current_window_.store(next_window, std::memory_order_relaxed);
mutex_.unlock();
}
}
} // namespace rocksdb