Histogram Concurrency Improvement and Time-Windowing Support

main
SherlockNoMad 9 years ago
parent 451678c8c9
commit 54f6b9e162
  1. 1
      CMakeLists.txt
  2. 1
      src.mk
  3. 12
      tools/db_bench_tool.cc
  4. 224
      util/histogram.cc
  5. 111
      util/histogram.h
  6. 192
      util/histogram_test.cc
  7. 193
      util/histogram_windowing.cc
  8. 80
      util/histogram_windowing.h

@ -207,6 +207,7 @@ set(SOURCES
util/filter_policy.cc
util/hash.cc
util/histogram.cc
util/histogram_windowing.cc
util/instrumented_mutex.cc
util/iostats_context.cc
tools/ldb_cmd.cc

@ -107,6 +107,7 @@ LIB_SOURCES = \
util/filter_policy.cc \
util/hash.cc \
util/histogram.cc \
util/histogram_windowing.cc \
util/instrumented_mutex.cc \
util/iostats_context.cc \
utilities/backupable/backupable_db.cc \

@ -1205,7 +1205,7 @@ class Stats {
uint64_t bytes_;
uint64_t last_op_finish_;
uint64_t last_report_finish_;
std::unordered_map<OperationType, HistogramImpl,
std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
std::hash<unsigned char>> hist_;
std::string message_;
bool exclude_from_merge_;
@ -1242,7 +1242,7 @@ class Stats {
for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
auto this_it = hist_.find(it->first);
if (this_it != hist_.end()) {
this_it->second.Merge(other.hist_.at(it->first));
this_it->second->Merge(*(other.hist_.at(it->first)));
} else {
hist_.insert({ it->first, it->second });
}
@ -1316,10 +1316,10 @@ class Stats {
if (hist_.find(op_type) == hist_.end())
{
HistogramImpl hist_temp;
hist_.insert({op_type, hist_temp});
auto hist_temp = std::make_shared<HistogramImpl>();
hist_.insert({op_type, std::move(hist_temp)});
}
hist_[op_type].Add(micros);
hist_[op_type]->Add(micros);
if (micros > 20000 && !FLAGS_stats_interval) {
fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
@ -1452,7 +1452,7 @@ class Stats {
for (auto it = hist_.begin(); it != hist_.end(); ++it) {
fprintf(stdout, "Microseconds per %s:\n%s\n",
OperationTypeString[it->first].c_str(),
it->second.ToString().c_str());
it->second->ToString().c_str());
}
}
if (FLAGS_report_file_operations) {

@ -7,11 +7,15 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/histogram.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <cassert>
#include <math.h>
#include <stdio.h>
#include "util/histogram.h"
#include "port/port.h"
namespace rocksdb {
@ -73,90 +77,126 @@ namespace {
const HistogramBucketMapper bucketMapper;
}
void HistogramImpl::Clear() {
min_ = static_cast<double>(bucketMapper.LastValue());
max_ = 0;
num_ = 0;
sum_ = 0;
sum_squares_ = 0;
memset(buckets_, 0, sizeof buckets_);
HistogramStat::HistogramStat()
: num_buckets_(bucketMapper.BucketCount()) {
assert(num_buckets_ == sizeof(buckets_) / sizeof(*buckets_));
Clear();
}
bool HistogramImpl::Empty() { return num_ == 0; }
void HistogramStat::Clear() {
min_.store(bucketMapper.LastValue(), std::memory_order_relaxed);
max_.store(0, std::memory_order_relaxed);
num_.store(0, std::memory_order_relaxed);
sum_.store(0, std::memory_order_relaxed);
sum_squares_.store(0, std::memory_order_relaxed);
for (unsigned int b = 0; b < num_buckets_; b++) {
buckets_[b].store(0, std::memory_order_relaxed);
}
};
void HistogramImpl::Add(uint64_t value) {
bool HistogramStat::Empty() const { return num() == 0; }
void HistogramStat::Add(uint64_t value) {
// This function is designed to be lock free, as it's in the critical path
// of any operation. Each individual value is atomic and the order of updates
// by concurrent threads is tolerable.
const size_t index = bucketMapper.IndexForValue(value);
buckets_[index] += 1;
if (min_ > value) min_ = static_cast<double>(value);
if (max_ < value) max_ = static_cast<double>(value);
num_++;
sum_ += value;
sum_squares_ += (value * value);
assert(index < num_buckets_ && index >= 0);
buckets_[index].fetch_add(1, std::memory_order_relaxed);
uint64_t old_min = min();
while (value < old_min && !min_.compare_exchange_weak(old_min, value)) {}
uint64_t old_max = max();
while (value > old_max && !max_.compare_exchange_weak(old_max, value)) {}
num_.fetch_add(1, std::memory_order_relaxed);
sum_.fetch_add(value, std::memory_order_relaxed);
sum_squares_.fetch_add(value * value, std::memory_order_relaxed);
}
void HistogramImpl::Merge(const HistogramImpl& other) {
if (other.min_ < min_) min_ = other.min_;
if (other.max_ > max_) max_ = other.max_;
num_ += other.num_;
sum_ += other.sum_;
sum_squares_ += other.sum_squares_;
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) {
buckets_[b] += other.buckets_[b];
void HistogramStat::Merge(const HistogramStat& other) {
// This function needs to be performned with the outer lock acquired
// However, atomic operation on every member is still need, since Add()
// requires no lock and value update can still happen concurrently
uint64_t old_min = min();
uint64_t other_min = other.min();
while (other_min < old_min &&
!min_.compare_exchange_weak(old_min, other_min)) {}
uint64_t old_max = max();
uint64_t other_max = other.max();
while (other_max > old_max &&
!max_.compare_exchange_weak(old_max, other_max)) {}
num_.fetch_add(other.num(), std::memory_order_relaxed);
sum_.fetch_add(other.sum(), std::memory_order_relaxed);
sum_squares_.fetch_add(other.sum_squares(), std::memory_order_relaxed);
for (unsigned int b = 0; b < num_buckets_; b++) {
buckets_[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed);
}
}
double HistogramImpl::Median() const {
double HistogramStat::Median() const {
return Percentile(50.0);
}
double HistogramImpl::Percentile(double p) const {
double threshold = num_ * (p / 100.0);
double sum = 0;
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) {
sum += buckets_[b];
if (sum >= threshold) {
double HistogramStat::Percentile(double p) const {
double threshold = num() * (p / 100.0);
uint64_t cumulative_sum = 0;
for (unsigned int b = 0; b < num_buckets_; b++) {
uint64_t bucket_value = bucket_at(b);
cumulative_sum += bucket_value;
if (cumulative_sum >= threshold) {
// Scale linearly within this bucket
double left_point =
static_cast<double>((b == 0) ? 0 : bucketMapper.BucketLimit(b-1));
double right_point =
static_cast<double>(bucketMapper.BucketLimit(b));
double left_sum = sum - buckets_[b];
double right_sum = sum;
uint64_t left_point = (b == 0) ? 0 : bucketMapper.BucketLimit(b-1);
uint64_t right_point = bucketMapper.BucketLimit(b);
uint64_t left_sum = cumulative_sum - bucket_value;
uint64_t right_sum = cumulative_sum;
double pos = 0;
double right_left_diff = right_sum - left_sum;
uint64_t right_left_diff = right_sum - left_sum;
if (right_left_diff != 0) {
pos = (threshold - left_sum) / (right_sum - left_sum);
pos = (threshold - left_sum) / right_left_diff;
}
double r = left_point + (right_point - left_point) * pos;
if (r < min_) r = min_;
if (r > max_) r = max_;
uint64_t cur_min = min();
uint64_t cur_max = max();
if (r < cur_min) r = static_cast<double>(cur_min);
if (r > cur_max) r = static_cast<double>(cur_max);
return r;
}
}
return max_;
return static_cast<double>(max());
}
double HistogramImpl::Average() const {
if (num_ == 0.0) return 0;
return sum_ / num_;
double HistogramStat::Average() const {
uint64_t cur_num = num();
uint64_t cur_sum = sum();
if (cur_num == 0) return 0;
return static_cast<double>(cur_sum) / static_cast<double>(cur_num);
}
double HistogramImpl::StandardDeviation() const {
if (num_ == 0.0) return 0;
double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_);
double HistogramStat::StandardDeviation() const {
uint64_t cur_num = num();
uint64_t cur_sum = sum();
uint64_t cur_sum_squares = sum_squares();
if (cur_num == 0) return 0;
double variance =
static_cast<double>(cur_sum_squares * cur_num - cur_sum * cur_sum) /
static_cast<double>(cur_num * cur_num);
return sqrt(variance);
}
std::string HistogramImpl::ToString() const {
std::string HistogramStat::ToString() const {
uint64_t cur_num = num();
std::string r;
char buf[200];
snprintf(buf, sizeof(buf),
"Count: %.0f Average: %.4f StdDev: %.2f\n",
num_, Average(), StandardDeviation());
"Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n",
cur_num, Average(), StandardDeviation());
r.append(buf);
snprintf(buf, sizeof(buf),
"Min: %.4f Median: %.4f Max: %.4f\n",
(num_ == 0.0 ? 0.0 : min_), Median(), max_);
"Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n",
(cur_num == 0 ? 0 : min()), Median(), (cur_num == 0 ? 0 : max()));
r.append(buf);
snprintf(buf, sizeof(buf),
"Percentiles: "
@ -165,30 +205,30 @@ std::string HistogramImpl::ToString() const {
Percentile(99.99));
r.append(buf);
r.append("------------------------------------------------------\n");
const double mult = 100.0 / num_;
double sum = 0;
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) {
if (buckets_[b] <= 0.0) continue;
sum += buckets_[b];
const double mult = 100.0 / cur_num;
uint64_t cumulative_sum = 0;
for (unsigned int b = 0; b < num_buckets_; b++) {
uint64_t bucket_value = bucket_at(b);
if (bucket_value <= 0.0) continue;
cumulative_sum += bucket_value;
snprintf(buf, sizeof(buf),
"[ %7lu, %7lu ) %8lu %7.3f%% %7.3f%% ",
// left
(unsigned long)((b == 0) ? 0 : bucketMapper.BucketLimit(b-1)),
(unsigned long)bucketMapper.BucketLimit(b), // right
(unsigned long)buckets_[b], // count
(mult * buckets_[b]), // percentage
(mult * sum)); // cumulative percentage
"[ %7" PRIu64 ", %7" PRIu64 " ) %8" PRIu64 " %7.3f%% %7.3f%% ",
(b == 0) ? 0 : bucketMapper.BucketLimit(b-1), // left
bucketMapper.BucketLimit(b), // right
bucket_value, // count
(mult * bucket_value), // percentage
(mult * cumulative_sum)); // cumulative percentage
r.append(buf);
// Add hash marks based on percentage; 20 marks for 100%.
int marks = static_cast<int>(20*(buckets_[b] / num_) + 0.5);
size_t marks = static_cast<size_t>(mult * bucket_value / 5 + 0.5);
r.append(marks, '#');
r.push_back('\n');
}
return r;
}
void HistogramImpl::Data(HistogramData * const data) const {
void HistogramStat::Data(HistogramData * const data) const {
assert(data);
data->median = Median();
data->percentile95 = Percentile(95);
@ -197,4 +237,52 @@ void HistogramImpl::Data(HistogramData * const data) const {
data->standard_deviation = StandardDeviation();
}
void HistogramImpl::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Clear();
}
bool HistogramImpl::Empty() const {
return stats_.Empty();
}
void HistogramImpl::Add(uint64_t value) {
stats_.Add(value);
}
void HistogramImpl::Merge(const Histogram& other) {
if (strcmp(Name(), other.Name()) == 0) {
Merge(dynamic_cast<const HistogramImpl&>(other));
}
}
void HistogramImpl::Merge(const HistogramImpl& other) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Merge(other.stats_);
}
double HistogramImpl::Median() const {
return stats_.Median();
}
double HistogramImpl::Percentile(double p) const {
return stats_.Percentile(p);
}
double HistogramImpl::Average() const {
return stats_.Average();
}
double HistogramImpl::StandardDeviation() const {
return stats_.StandardDeviation();
}
std::string HistogramImpl::ToString() const {
return stats_.ToString();
}
void HistogramImpl::Data(HistogramData * const data) const {
stats_.Data(data);
}
} // namespace levedb

@ -14,8 +14,7 @@
#include <string>
#include <vector>
#include <map>
#include <string.h>
#include <mutex>
namespace rocksdb {
@ -25,7 +24,7 @@ class HistogramBucketMapper {
HistogramBucketMapper();
// converts a value to the bucket index.
size_t IndexForValue(const uint64_t value) const;
size_t IndexForValue(uint64_t value) const;
// number of buckets required.
size_t BucketCount() const {
@ -52,33 +51,99 @@ class HistogramBucketMapper {
std::map<uint64_t, uint64_t> valueIndexMap_;
};
class HistogramImpl {
struct HistogramStat {
HistogramStat();
~HistogramStat() {}
HistogramStat(const HistogramStat&) = delete;
HistogramStat& operator=(const HistogramStat&) = delete;
void Clear();
bool Empty() const;
void Add(uint64_t value);
void Merge(const HistogramStat& other);
inline uint64_t min() const { return min_.load(std::memory_order_relaxed); }
inline uint64_t max() const { return max_.load(std::memory_order_relaxed); }
inline uint64_t num() const { return num_.load(std::memory_order_relaxed); }
inline uint64_t sum() const { return sum_.load(std::memory_order_relaxed); }
inline uint64_t sum_squares() const {
return sum_squares_.load(std::memory_order_relaxed);
}
inline uint64_t bucket_at(size_t b) const {
return buckets_[b].load(std::memory_order_relaxed);
}
double Median() const;
double Percentile(double p) const;
double Average() const;
double StandardDeviation() const;
void Data(HistogramData* const data) const;
std::string ToString() const;
// To be able to use HistogramStat as thread local variable, it
// cannot have dynamic allocated member. That's why we're
// using manually values from BucketMapper
std::atomic_uint_fast64_t min_;
std::atomic_uint_fast64_t max_;
std::atomic_uint_fast64_t num_;
std::atomic_uint_fast64_t sum_;
std::atomic_uint_fast64_t sum_squares_;
std::atomic_uint_fast64_t buckets_[138]; // 138==BucketMapper::BucketCount()
const uint64_t num_buckets_;
};
class Histogram {
public:
Histogram() {}
virtual ~Histogram() {};
virtual void Clear() = 0;
virtual bool Empty() const = 0;
virtual void Add(uint64_t value) = 0;
virtual void Merge(const Histogram&) = 0;
virtual std::string ToString() const = 0;
virtual const char* Name() const = 0;
virtual uint64_t min() const = 0;
virtual uint64_t max() const = 0;
virtual uint64_t num() const = 0;
virtual double Median() const = 0;
virtual double Percentile(double p) const = 0;
virtual double Average() const = 0;
virtual double StandardDeviation() const = 0;
virtual void Data(HistogramData* const data) const = 0;
};
class HistogramImpl : public Histogram {
public:
HistogramImpl() { memset(buckets_, 0, sizeof(buckets_)); }
virtual void Clear();
virtual bool Empty();
virtual void Add(uint64_t value);
void Merge(const HistogramImpl& other);
HistogramImpl() { Clear(); }
HistogramImpl(const HistogramImpl&) = delete;
HistogramImpl& operator=(const HistogramImpl&) = delete;
virtual std::string ToString() const;
virtual void Clear() override;
virtual bool Empty() const override;
virtual void Add(uint64_t value) override;
virtual void Merge(const Histogram& other) override;
void Merge(const HistogramImpl& other);
virtual double Median() const;
virtual double Percentile(double p) const;
virtual double Average() const;
virtual double StandardDeviation() const;
virtual void Data(HistogramData * const data) const;
virtual std::string ToString() const override;
virtual const char* Name() const override { return "HistogramImpl"; }
virtual uint64_t min() const override { return stats_.min(); }
virtual uint64_t max() const override { return stats_.max(); }
virtual uint64_t num() const override { return stats_.num(); }
virtual double Median() const override;
virtual double Percentile(double p) const override;
virtual double Average() const override;
virtual double StandardDeviation() const override;
virtual void Data(HistogramData* const data) const override;
virtual ~HistogramImpl() {}
private:
// To be able to use HistogramImpl as thread local variable, its constructor
// has to be static. That's why we're using manually values from BucketMapper
double min_ = 1000000000; // this is BucketMapper:LastValue()
double max_ = 0;
double num_ = 0;
double sum_ = 0;
double sum_squares_ = 0;
uint64_t buckets_[138]; // this is BucketMapper::BucketCount()
HistogramStat stats_;
std::mutex mutex_;
};
} // namespace rocksdb

@ -4,56 +4,202 @@
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "util/histogram.h"
#include "util/histogram_windowing.h"
#include "util/testharness.h"
namespace rocksdb {
class HistogramTest : public testing::Test {};
TEST_F(HistogramTest, BasicOperation) {
HistogramImpl histogram;
for (uint64_t i = 1; i <= 100; i++) {
namespace {
const double kIota = 0.1;
const HistogramBucketMapper bucketMapper;
Env* env = Env::Default();
}
void PopulateHistogram(Histogram& histogram,
uint64_t low, uint64_t high, uint64_t loop = 1) {
for (; loop > 0; loop--) {
for (uint64_t i = low; i <= high; i++) {
histogram.Add(i);
}
{
double median = histogram.Median();
// ASSERT_LE(median, 50);
ASSERT_GT(median, 0);
}
}
{
double percentile100 = histogram.Percentile(100.0);
ASSERT_LE(percentile100, 100.0);
ASSERT_GT(percentile100, 0.0);
double percentile99 = histogram.Percentile(99.0);
double percentile85 = histogram.Percentile(85.0);
ASSERT_LE(percentile99, 99.0);
ASSERT_TRUE(percentile99 >= percentile85);
}
void BasicOperation(Histogram& histogram) {
PopulateHistogram(histogram, 1, 100, 10);
ASSERT_EQ(histogram.Average(), 50.5); // avg is acurately calculated.
HistogramData data;
histogram.Data(&data);
ASSERT_LE(std::fabs(histogram.Percentile(100.0) - 100.0), kIota);
ASSERT_LE(std::fabs(data.percentile99 - 99.0), kIota);
ASSERT_LE(std::fabs(data.percentile95 - 95.0), kIota);
ASSERT_LE(std::fabs(data.median - 50.0), kIota);
ASSERT_EQ(data.average, 50.5); // avg is acurately calculated.
ASSERT_LT(std::fabs(data.standard_deviation- 28.86), kIota); //sd is ~= 28.86
}
TEST_F(HistogramTest, EmptyHistogram) {
HistogramImpl histogram;
void MergeHistogram(Histogram& histogram, Histogram& other) {
PopulateHistogram(histogram, 1, 100);
PopulateHistogram(other, 101, 200);
histogram.Merge(other);
HistogramData data;
histogram.Data(&data);
ASSERT_LE(std::fabs(histogram.Percentile(100.0) - 200.0), kIota);
ASSERT_LE(std::fabs(data.percentile99 - 198.0), kIota);
ASSERT_LE(std::fabs(data.percentile95 - 190.0), kIota);
ASSERT_LE(std::fabs(data.median - 100.0), kIota);
ASSERT_EQ(data.average, 100.5); // avg is acurately calculated.
ASSERT_LT(std::fabs(data.standard_deviation - 57.73), kIota); //sd is ~= 57.73
}
void EmptyHistogram(Histogram& histogram) {
ASSERT_EQ(histogram.min(), bucketMapper.LastValue());
ASSERT_EQ(histogram.max(), 0);
ASSERT_EQ(histogram.num(), 0);
ASSERT_EQ(histogram.Median(), 0.0);
ASSERT_EQ(histogram.Percentile(85.0), 0.0);
ASSERT_EQ(histogram.Average(), 0.0);
ASSERT_EQ(histogram.StandardDeviation(), 0.0);
}
TEST_F(HistogramTest, ClearHistogram) {
HistogramImpl histogram;
void ClearHistogram(Histogram& histogram) {
for (uint64_t i = 1; i <= 100; i++) {
histogram.Add(i);
}
histogram.Clear();
ASSERT_TRUE(histogram.Empty());
ASSERT_EQ(histogram.Median(), 0);
ASSERT_EQ(histogram.Percentile(85.0), 0);
ASSERT_EQ(histogram.Average(), 0);
}
TEST_F(HistogramTest, BasicOperation) {
HistogramImpl histogram;
BasicOperation(histogram);
HistogramWindowingImpl histogramWindowing;
BasicOperation(histogramWindowing);
}
TEST_F(HistogramTest, MergeHistogram) {
HistogramImpl histogram;
HistogramImpl other;
MergeHistogram(histogram, other);
HistogramWindowingImpl histogramWindowing;
HistogramWindowingImpl otherWindowing;
MergeHistogram(histogramWindowing, otherWindowing);
}
TEST_F(HistogramTest, EmptyHistogram) {
HistogramImpl histogram;
EmptyHistogram(histogram);
HistogramWindowingImpl histogramWindowing;
EmptyHistogram(histogramWindowing);
}
TEST_F(HistogramTest, ClearHistogram) {
HistogramImpl histogram;
ClearHistogram(histogram);
HistogramWindowingImpl histogramWindowing;
ClearHistogram(histogramWindowing);
}
TEST_F(HistogramTest, HistogramWindowingExpire) {
uint64_t num_windows = 3;
int micros_per_window = 1000000;
uint64_t min_num_per_window = 0;
HistogramWindowingImpl
histogramWindowing(num_windows, micros_per_window, min_num_per_window);
PopulateHistogram(histogramWindowing, 1, 1, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 100);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 1);
ASSERT_EQ(histogramWindowing.Average(), 1);
PopulateHistogram(histogramWindowing, 2, 2, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 200);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 2);
ASSERT_EQ(histogramWindowing.Average(), 1.5);
PopulateHistogram(histogramWindowing, 3, 3, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 3);
ASSERT_EQ(histogramWindowing.Average(), 2.0);
// dropping oldest window with value 1, remaining 2 ~ 4
PopulateHistogram(histogramWindowing, 4, 4, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 2);
ASSERT_EQ(histogramWindowing.max(), 4);
ASSERT_EQ(histogramWindowing.Average(), 3.0);
// dropping oldest window with value 2, remaining 3 ~ 5
PopulateHistogram(histogramWindowing, 5, 5, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 3);
ASSERT_EQ(histogramWindowing.max(), 5);
ASSERT_EQ(histogramWindowing.Average(), 4.0);
}
TEST_F(HistogramTest, HistogramWindowingMerge) {
uint64_t num_windows = 3;
int micros_per_window = 1000000;
uint64_t min_num_per_window = 0;
HistogramWindowingImpl
histogramWindowing(num_windows, micros_per_window, min_num_per_window);
HistogramWindowingImpl
otherWindowing(num_windows, micros_per_window, min_num_per_window);
PopulateHistogram(histogramWindowing, 1, 1, 100);
PopulateHistogram(otherWindowing, 1, 1, 100);
env->SleepForMicroseconds(micros_per_window);
PopulateHistogram(histogramWindowing, 2, 2, 100);
PopulateHistogram(otherWindowing, 2, 2, 100);
env->SleepForMicroseconds(micros_per_window);
PopulateHistogram(histogramWindowing, 3, 3, 100);
PopulateHistogram(otherWindowing, 3, 3, 100);
env->SleepForMicroseconds(micros_per_window);
histogramWindowing.Merge(otherWindowing);
ASSERT_EQ(histogramWindowing.num(), 600);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 3);
ASSERT_EQ(histogramWindowing.Average(), 2.0);
// dropping oldest window with value 1, remaining 2 ~ 4
PopulateHistogram(histogramWindowing, 4, 4, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 500);
ASSERT_EQ(histogramWindowing.min(), 2);
ASSERT_EQ(histogramWindowing.max(), 4);
// dropping oldest window with value 2, remaining 3 ~ 5
PopulateHistogram(histogramWindowing, 5, 5, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 400);
ASSERT_EQ(histogramWindowing.min(), 3);
ASSERT_EQ(histogramWindowing.max(), 5);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -0,0 +1,193 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/histogram.h"
#include "util/histogram_windowing.h"
#include <algorithm>
namespace rocksdb {
namespace {
const HistogramBucketMapper bucketMapper;
}
HistogramWindowingImpl::HistogramWindowingImpl() {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::HistogramWindowingImpl(
uint64_t num_windows,
uint64_t micros_per_window,
uint64_t min_num_per_window) :
num_windows_(num_windows),
micros_per_window_(micros_per_window),
min_num_per_window_(min_num_per_window) {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::~HistogramWindowingImpl(){
window_stats_.release();
}
void HistogramWindowingImpl::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Clear();
for (size_t i = 0; i < num_windows_; i++) {
window_stats_[i].Clear();
}
current_window_.store(0, std::memory_order_relaxed);
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
}
bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
// This function is designed to be lock free, as it's in the critical path
// of any operation.
// Each individual value is atomic, it is just that some samples can go
// in the older bucket which is tolerable.
void HistogramWindowingImpl::Add(uint64_t value){
TimerTick();
// Parent (global) member update
stats_.Add(value);
// Current window update
window_stats_[current_window()].Add(value);
}
void HistogramWindowingImpl::Merge(const Histogram& other) {
if (strcmp(Name(), other.Name()) == 0) {
Merge(dynamic_cast<const HistogramWindowingImpl&>(other));
}
}
void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Merge(other.stats_);
if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
micros_per_window_ != other.micros_per_window_) {
return;
}
uint64_t cur_window = current_window();
uint64_t other_cur_window = other.current_window();
// going backwards for alignment
for (unsigned int i = 0;
i < std::min(num_windows_, other.num_windows_); i++) {
uint64_t window_index =
(cur_window + num_windows_ - i) % num_windows_;
uint64_t other_window_index =
(other_cur_window + other.num_windows_ - i) % other.num_windows_;
window_stats_[window_index].Merge(other.window_stats_[other_window_index]);
}
}
std::string HistogramWindowingImpl::ToString() const {
return stats_.ToString();
}
double HistogramWindowingImpl::Median() const {
return Percentile(50.0);
}
double HistogramWindowingImpl::Percentile(double p) const {
// Retry 3 times in total
for (int retry = 0; retry < 3; retry++) {
uint64_t start_num = stats_.num();
double result = stats_.Percentile(p);
// Detect if swap buckets or Clear() was called during calculation
if (stats_.num() >= start_num) {
return result;
}
}
return 0.0;
}
double HistogramWindowingImpl::Average() const {
return stats_.Average();
}
double HistogramWindowingImpl::StandardDeviation() const {
return stats_.StandardDeviation();
}
void HistogramWindowingImpl::Data(HistogramData * const data) const {
stats_.Data(data);
}
void HistogramWindowingImpl::TimerTick() {
uint64_t curr_time = env_->NowMicros();
if (curr_time - last_swap_time() > micros_per_window_ &&
window_stats_[current_window()].num() >= min_num_per_window_) {
SwapHistoryBucket();
}
}
void HistogramWindowingImpl::SwapHistoryBucket() {
// Threads executing Add() would be competing for this mutex, the first one
// who got the metex would take care of the bucket swap, other threads
// can skip this.
// If mutex is held by Merge() or Clear(), next Add() will take care of the
// swap, if needed.
if (mutex_.try_lock()) {
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
uint64_t next_window = (current_window() + 1) % num_windows_;
// subtract next buckets from totals and swap to next buckets
HistogramStat& stats_to_drop = window_stats_[next_window];
if (!stats_to_drop.Empty()) {
for (size_t b = 0; b < stats_.num_buckets_; b++){
stats_.buckets_[b].fetch_sub(
stats_to_drop.bucket_at(b), std::memory_order_relaxed);
}
if (stats_.min() == stats_to_drop.min()) {
uint64_t new_min = bucketMapper.LastValue();
for (unsigned int i = 1; i < num_windows_; i++) {
uint64_t m = window_stats_[(next_window + i) % num_windows_].min();
if (m < new_min) new_min = m;
}
stats_.min_.store(new_min, std::memory_order_relaxed);
}
if (stats_.max() == stats_to_drop.max()) {
uint64_t new_max = 0;
for (unsigned int i = 1; i < num_windows_; i++) {
uint64_t m = window_stats_[(next_window + i) % num_windows_].max();
if (m > new_max) new_max = m;
}
stats_.max_.store(new_max, std::memory_order_relaxed);
}
stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
stats_.sum_squares_.fetch_sub(
stats_to_drop.sum_squares(), std::memory_order_relaxed);
stats_to_drop.Clear();
}
// advance to next window bucket
current_window_.store(next_window, std::memory_order_relaxed);
mutex_.unlock();
}
}
} // namespace rocksdb

@ -0,0 +1,80 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "util/histogram.h"
#include "rocksdb/env.h"
namespace rocksdb {
class HistogramWindowingImpl : public Histogram
{
public:
HistogramWindowingImpl();
HistogramWindowingImpl(uint64_t num_windows,
uint64_t micros_per_window,
uint64_t min_num_per_window);
HistogramWindowingImpl(const HistogramImpl&) = delete;
HistogramWindowingImpl& operator=(const HistogramImpl&) = delete;
~HistogramWindowingImpl();
virtual void Clear() override;
virtual bool Empty() const override;
virtual void Add(uint64_t value) override;
virtual void Merge(const Histogram& other) override;
void Merge(const HistogramWindowingImpl& other);
virtual std::string ToString() const override;
virtual const char* Name() const override { return "HistogramWindowingImpl"; }
virtual uint64_t min() const override { return stats_.min(); }
virtual uint64_t max() const override { return stats_.max(); }
virtual uint64_t num() const override { return stats_.num(); }
virtual double Median() const override;
virtual double Percentile(double p) const override;
virtual double Average() const override;
virtual double StandardDeviation() const override;
virtual void Data(HistogramData* const data) const override;
private:
void TimerTick();
void SwapHistoryBucket();
inline uint64_t current_window() const {
return current_window_.load(std::memory_order_relaxed);
}
inline uint64_t last_swap_time() const{
return last_swap_time_.load(std::memory_order_relaxed);
}
Env* env_;
std::mutex mutex_;
// Aggregated stats over windows_stats_, all the computation is done
// upon aggregated values
HistogramStat stats_;
// This is a circular array representing the latest N time-windows.
// Each entry stores a time-window of data. Expiration is done
// on window-based.
std::unique_ptr<HistogramStat[]> window_stats_;
std::atomic_uint_fast64_t current_window_;
std::atomic_uint_fast64_t last_swap_time_;
// Following parameters are configuable
uint64_t num_windows_ = 5;
uint64_t micros_per_window_ = 60000000;
// By default, don't care about the number of values in current window
// when decide whether to swap windows or not.
uint64_t min_num_per_window_ = 0;
};
} // namespace rocksdb
Loading…
Cancel
Save