Merge pull request #1026 from SherlockNoMad/Hist

Histogram Concurrency Improvement and Time-Windowing Support
main
Siying Dong 9 years ago
commit 774922c680
  1. 1
      CMakeLists.txt
  2. 1
      src.mk
  3. 12
      tools/db_bench_tool.cc
  4. 224
      util/histogram.cc
  5. 111
      util/histogram.h
  6. 194
      util/histogram_test.cc
  7. 199
      util/histogram_windowing.cc
  8. 80
      util/histogram_windowing.h

@ -207,6 +207,7 @@ set(SOURCES
util/filter_policy.cc util/filter_policy.cc
util/hash.cc util/hash.cc
util/histogram.cc util/histogram.cc
util/histogram_windowing.cc
util/instrumented_mutex.cc util/instrumented_mutex.cc
util/iostats_context.cc util/iostats_context.cc
tools/ldb_cmd.cc tools/ldb_cmd.cc

@ -108,6 +108,7 @@ LIB_SOURCES = \
util/filter_policy.cc \ util/filter_policy.cc \
util/hash.cc \ util/hash.cc \
util/histogram.cc \ util/histogram.cc \
util/histogram_windowing.cc \
util/instrumented_mutex.cc \ util/instrumented_mutex.cc \
util/iostats_context.cc \ util/iostats_context.cc \
utilities/backupable/backupable_db.cc \ utilities/backupable/backupable_db.cc \

@ -1206,7 +1206,7 @@ class Stats {
uint64_t bytes_; uint64_t bytes_;
uint64_t last_op_finish_; uint64_t last_op_finish_;
uint64_t last_report_finish_; uint64_t last_report_finish_;
std::unordered_map<OperationType, HistogramImpl, std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
std::hash<unsigned char>> hist_; std::hash<unsigned char>> hist_;
std::string message_; std::string message_;
bool exclude_from_merge_; bool exclude_from_merge_;
@ -1243,7 +1243,7 @@ class Stats {
for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) { for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
auto this_it = hist_.find(it->first); auto this_it = hist_.find(it->first);
if (this_it != hist_.end()) { if (this_it != hist_.end()) {
this_it->second.Merge(other.hist_.at(it->first)); this_it->second->Merge(*(other.hist_.at(it->first)));
} else { } else {
hist_.insert({ it->first, it->second }); hist_.insert({ it->first, it->second });
} }
@ -1317,10 +1317,10 @@ class Stats {
if (hist_.find(op_type) == hist_.end()) if (hist_.find(op_type) == hist_.end())
{ {
HistogramImpl hist_temp; auto hist_temp = std::make_shared<HistogramImpl>();
hist_.insert({op_type, hist_temp}); hist_.insert({op_type, std::move(hist_temp)});
} }
hist_[op_type].Add(micros); hist_[op_type]->Add(micros);
if (micros > 20000 && !FLAGS_stats_interval) { if (micros > 20000 && !FLAGS_stats_interval) {
fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, ""); fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
@ -1453,7 +1453,7 @@ class Stats {
for (auto it = hist_.begin(); it != hist_.end(); ++it) { for (auto it = hist_.begin(); it != hist_.end(); ++it) {
fprintf(stdout, "Microseconds per %s:\n%s\n", fprintf(stdout, "Microseconds per %s:\n%s\n",
OperationTypeString[it->first].c_str(), OperationTypeString[it->first].c_str(),
it->second.ToString().c_str()); it->second->ToString().c_str());
} }
} }
if (FLAGS_report_file_operations) { if (FLAGS_report_file_operations) {

@ -7,11 +7,15 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/histogram.h" #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <cassert> #include <cassert>
#include <math.h> #include <math.h>
#include <stdio.h> #include <stdio.h>
#include "util/histogram.h"
#include "port/port.h" #include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -73,90 +77,126 @@ namespace {
const HistogramBucketMapper bucketMapper; const HistogramBucketMapper bucketMapper;
} }
void HistogramImpl::Clear() { HistogramStat::HistogramStat()
min_ = static_cast<double>(bucketMapper.LastValue()); : num_buckets_(bucketMapper.BucketCount()) {
max_ = 0; assert(num_buckets_ == sizeof(buckets_) / sizeof(*buckets_));
num_ = 0; Clear();
sum_ = 0;
sum_squares_ = 0;
memset(buckets_, 0, sizeof buckets_);
} }
bool HistogramImpl::Empty() { return num_ == 0; } void HistogramStat::Clear() {
min_.store(bucketMapper.LastValue(), std::memory_order_relaxed);
max_.store(0, std::memory_order_relaxed);
num_.store(0, std::memory_order_relaxed);
sum_.store(0, std::memory_order_relaxed);
sum_squares_.store(0, std::memory_order_relaxed);
for (unsigned int b = 0; b < num_buckets_; b++) {
buckets_[b].store(0, std::memory_order_relaxed);
}
};
void HistogramImpl::Add(uint64_t value) { bool HistogramStat::Empty() const { return num() == 0; }
void HistogramStat::Add(uint64_t value) {
// This function is designed to be lock free, as it's in the critical path
// of any operation. Each individual value is atomic and the order of updates
// by concurrent threads is tolerable.
const size_t index = bucketMapper.IndexForValue(value); const size_t index = bucketMapper.IndexForValue(value);
buckets_[index] += 1; assert(index < num_buckets_ && index >= 0);
if (min_ > value) min_ = static_cast<double>(value); buckets_[index].fetch_add(1, std::memory_order_relaxed);
if (max_ < value) max_ = static_cast<double>(value);
num_++; uint64_t old_min = min();
sum_ += value; while (value < old_min && !min_.compare_exchange_weak(old_min, value)) {}
sum_squares_ += (value * value);
uint64_t old_max = max();
while (value > old_max && !max_.compare_exchange_weak(old_max, value)) {}
num_.fetch_add(1, std::memory_order_relaxed);
sum_.fetch_add(value, std::memory_order_relaxed);
sum_squares_.fetch_add(value * value, std::memory_order_relaxed);
} }
void HistogramImpl::Merge(const HistogramImpl& other) { void HistogramStat::Merge(const HistogramStat& other) {
if (other.min_ < min_) min_ = other.min_; // This function needs to be performned with the outer lock acquired
if (other.max_ > max_) max_ = other.max_; // However, atomic operation on every member is still need, since Add()
num_ += other.num_; // requires no lock and value update can still happen concurrently
sum_ += other.sum_; uint64_t old_min = min();
sum_squares_ += other.sum_squares_; uint64_t other_min = other.min();
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { while (other_min < old_min &&
buckets_[b] += other.buckets_[b]; !min_.compare_exchange_weak(old_min, other_min)) {}
uint64_t old_max = max();
uint64_t other_max = other.max();
while (other_max > old_max &&
!max_.compare_exchange_weak(old_max, other_max)) {}
num_.fetch_add(other.num(), std::memory_order_relaxed);
sum_.fetch_add(other.sum(), std::memory_order_relaxed);
sum_squares_.fetch_add(other.sum_squares(), std::memory_order_relaxed);
for (unsigned int b = 0; b < num_buckets_; b++) {
buckets_[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed);
} }
} }
double HistogramImpl::Median() const { double HistogramStat::Median() const {
return Percentile(50.0); return Percentile(50.0);
} }
double HistogramImpl::Percentile(double p) const { double HistogramStat::Percentile(double p) const {
double threshold = num_ * (p / 100.0); double threshold = num() * (p / 100.0);
double sum = 0; uint64_t cumulative_sum = 0;
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { for (unsigned int b = 0; b < num_buckets_; b++) {
sum += buckets_[b]; uint64_t bucket_value = bucket_at(b);
if (sum >= threshold) { cumulative_sum += bucket_value;
if (cumulative_sum >= threshold) {
// Scale linearly within this bucket // Scale linearly within this bucket
double left_point = uint64_t left_point = (b == 0) ? 0 : bucketMapper.BucketLimit(b-1);
static_cast<double>((b == 0) ? 0 : bucketMapper.BucketLimit(b-1)); uint64_t right_point = bucketMapper.BucketLimit(b);
double right_point = uint64_t left_sum = cumulative_sum - bucket_value;
static_cast<double>(bucketMapper.BucketLimit(b)); uint64_t right_sum = cumulative_sum;
double left_sum = sum - buckets_[b];
double right_sum = sum;
double pos = 0; double pos = 0;
double right_left_diff = right_sum - left_sum; uint64_t right_left_diff = right_sum - left_sum;
if (right_left_diff != 0) { if (right_left_diff != 0) {
pos = (threshold - left_sum) / (right_sum - left_sum); pos = (threshold - left_sum) / right_left_diff;
} }
double r = left_point + (right_point - left_point) * pos; double r = left_point + (right_point - left_point) * pos;
if (r < min_) r = min_; uint64_t cur_min = min();
if (r > max_) r = max_; uint64_t cur_max = max();
if (r < cur_min) r = static_cast<double>(cur_min);
if (r > cur_max) r = static_cast<double>(cur_max);
return r; return r;
} }
} }
return max_; return static_cast<double>(max());
} }
double HistogramImpl::Average() const { double HistogramStat::Average() const {
if (num_ == 0.0) return 0; uint64_t cur_num = num();
return sum_ / num_; uint64_t cur_sum = sum();
if (cur_num == 0) return 0;
return static_cast<double>(cur_sum) / static_cast<double>(cur_num);
} }
double HistogramImpl::StandardDeviation() const { double HistogramStat::StandardDeviation() const {
if (num_ == 0.0) return 0; uint64_t cur_num = num();
double variance = (sum_squares_ * num_ - sum_ * sum_) / (num_ * num_); uint64_t cur_sum = sum();
uint64_t cur_sum_squares = sum_squares();
if (cur_num == 0) return 0;
double variance =
static_cast<double>(cur_sum_squares * cur_num - cur_sum * cur_sum) /
static_cast<double>(cur_num * cur_num);
return sqrt(variance); return sqrt(variance);
} }
std::string HistogramStat::ToString() const {
std::string HistogramImpl::ToString() const { uint64_t cur_num = num();
std::string r; std::string r;
char buf[200]; char buf[200];
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Count: %.0f Average: %.4f StdDev: %.2f\n", "Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n",
num_, Average(), StandardDeviation()); cur_num, Average(), StandardDeviation());
r.append(buf); r.append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Min: %.4f Median: %.4f Max: %.4f\n", "Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n",
(num_ == 0.0 ? 0.0 : min_), Median(), max_); (cur_num == 0 ? 0 : min()), Median(), (cur_num == 0 ? 0 : max()));
r.append(buf); r.append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"Percentiles: " "Percentiles: "
@ -165,30 +205,30 @@ std::string HistogramImpl::ToString() const {
Percentile(99.99)); Percentile(99.99));
r.append(buf); r.append(buf);
r.append("------------------------------------------------------\n"); r.append("------------------------------------------------------\n");
const double mult = 100.0 / num_; const double mult = 100.0 / cur_num;
double sum = 0; uint64_t cumulative_sum = 0;
for (unsigned int b = 0; b < bucketMapper.BucketCount(); b++) { for (unsigned int b = 0; b < num_buckets_; b++) {
if (buckets_[b] <= 0.0) continue; uint64_t bucket_value = bucket_at(b);
sum += buckets_[b]; if (bucket_value <= 0.0) continue;
cumulative_sum += bucket_value;
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"[ %7lu, %7lu ) %8lu %7.3f%% %7.3f%% ", "[ %7" PRIu64 ", %7" PRIu64 " ) %8" PRIu64 " %7.3f%% %7.3f%% ",
// left (b == 0) ? 0 : bucketMapper.BucketLimit(b-1), // left
(unsigned long)((b == 0) ? 0 : bucketMapper.BucketLimit(b-1)), bucketMapper.BucketLimit(b), // right
(unsigned long)bucketMapper.BucketLimit(b), // right bucket_value, // count
(unsigned long)buckets_[b], // count (mult * bucket_value), // percentage
(mult * buckets_[b]), // percentage (mult * cumulative_sum)); // cumulative percentage
(mult * sum)); // cumulative percentage
r.append(buf); r.append(buf);
// Add hash marks based on percentage; 20 marks for 100%. // Add hash marks based on percentage; 20 marks for 100%.
int marks = static_cast<int>(20*(buckets_[b] / num_) + 0.5); size_t marks = static_cast<size_t>(mult * bucket_value / 5 + 0.5);
r.append(marks, '#'); r.append(marks, '#');
r.push_back('\n'); r.push_back('\n');
} }
return r; return r;
} }
void HistogramImpl::Data(HistogramData * const data) const { void HistogramStat::Data(HistogramData * const data) const {
assert(data); assert(data);
data->median = Median(); data->median = Median();
data->percentile95 = Percentile(95); data->percentile95 = Percentile(95);
@ -197,4 +237,52 @@ void HistogramImpl::Data(HistogramData * const data) const {
data->standard_deviation = StandardDeviation(); data->standard_deviation = StandardDeviation();
} }
void HistogramImpl::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Clear();
}
bool HistogramImpl::Empty() const {
return stats_.Empty();
}
void HistogramImpl::Add(uint64_t value) {
stats_.Add(value);
}
void HistogramImpl::Merge(const Histogram& other) {
if (strcmp(Name(), other.Name()) == 0) {
Merge(dynamic_cast<const HistogramImpl&>(other));
}
}
void HistogramImpl::Merge(const HistogramImpl& other) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Merge(other.stats_);
}
double HistogramImpl::Median() const {
return stats_.Median();
}
double HistogramImpl::Percentile(double p) const {
return stats_.Percentile(p);
}
double HistogramImpl::Average() const {
return stats_.Average();
}
double HistogramImpl::StandardDeviation() const {
return stats_.StandardDeviation();
}
std::string HistogramImpl::ToString() const {
return stats_.ToString();
}
void HistogramImpl::Data(HistogramData * const data) const {
stats_.Data(data);
}
} // namespace levedb } // namespace levedb

@ -14,8 +14,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <map> #include <map>
#include <mutex>
#include <string.h>
namespace rocksdb { namespace rocksdb {
@ -25,7 +24,7 @@ class HistogramBucketMapper {
HistogramBucketMapper(); HistogramBucketMapper();
// converts a value to the bucket index. // converts a value to the bucket index.
size_t IndexForValue(const uint64_t value) const; size_t IndexForValue(uint64_t value) const;
// number of buckets required. // number of buckets required.
size_t BucketCount() const { size_t BucketCount() const {
@ -52,33 +51,99 @@ class HistogramBucketMapper {
std::map<uint64_t, uint64_t> valueIndexMap_; std::map<uint64_t, uint64_t> valueIndexMap_;
}; };
class HistogramImpl { struct HistogramStat {
HistogramStat();
~HistogramStat() {}
HistogramStat(const HistogramStat&) = delete;
HistogramStat& operator=(const HistogramStat&) = delete;
void Clear();
bool Empty() const;
void Add(uint64_t value);
void Merge(const HistogramStat& other);
inline uint64_t min() const { return min_.load(std::memory_order_relaxed); }
inline uint64_t max() const { return max_.load(std::memory_order_relaxed); }
inline uint64_t num() const { return num_.load(std::memory_order_relaxed); }
inline uint64_t sum() const { return sum_.load(std::memory_order_relaxed); }
inline uint64_t sum_squares() const {
return sum_squares_.load(std::memory_order_relaxed);
}
inline uint64_t bucket_at(size_t b) const {
return buckets_[b].load(std::memory_order_relaxed);
}
double Median() const;
double Percentile(double p) const;
double Average() const;
double StandardDeviation() const;
void Data(HistogramData* const data) const;
std::string ToString() const;
// To be able to use HistogramStat as thread local variable, it
// cannot have dynamic allocated member. That's why we're
// using manually values from BucketMapper
std::atomic_uint_fast64_t min_;
std::atomic_uint_fast64_t max_;
std::atomic_uint_fast64_t num_;
std::atomic_uint_fast64_t sum_;
std::atomic_uint_fast64_t sum_squares_;
std::atomic_uint_fast64_t buckets_[138]; // 138==BucketMapper::BucketCount()
const uint64_t num_buckets_;
};
class Histogram {
public:
Histogram() {}
virtual ~Histogram() {};
virtual void Clear() = 0;
virtual bool Empty() const = 0;
virtual void Add(uint64_t value) = 0;
virtual void Merge(const Histogram&) = 0;
virtual std::string ToString() const = 0;
virtual const char* Name() const = 0;
virtual uint64_t min() const = 0;
virtual uint64_t max() const = 0;
virtual uint64_t num() const = 0;
virtual double Median() const = 0;
virtual double Percentile(double p) const = 0;
virtual double Average() const = 0;
virtual double StandardDeviation() const = 0;
virtual void Data(HistogramData* const data) const = 0;
};
class HistogramImpl : public Histogram {
public: public:
HistogramImpl() { memset(buckets_, 0, sizeof(buckets_)); } HistogramImpl() { Clear(); }
virtual void Clear();
virtual bool Empty(); HistogramImpl(const HistogramImpl&) = delete;
virtual void Add(uint64_t value); HistogramImpl& operator=(const HistogramImpl&) = delete;
void Merge(const HistogramImpl& other);
virtual std::string ToString() const; virtual void Clear() override;
virtual bool Empty() const override;
virtual void Add(uint64_t value) override;
virtual void Merge(const Histogram& other) override;
void Merge(const HistogramImpl& other);
virtual double Median() const; virtual std::string ToString() const override;
virtual double Percentile(double p) const; virtual const char* Name() const override { return "HistogramImpl"; }
virtual double Average() const; virtual uint64_t min() const override { return stats_.min(); }
virtual double StandardDeviation() const; virtual uint64_t max() const override { return stats_.max(); }
virtual void Data(HistogramData * const data) const; virtual uint64_t num() const override { return stats_.num(); }
virtual double Median() const override;
virtual double Percentile(double p) const override;
virtual double Average() const override;
virtual double StandardDeviation() const override;
virtual void Data(HistogramData* const data) const override;
virtual ~HistogramImpl() {} virtual ~HistogramImpl() {}
private: private:
// To be able to use HistogramImpl as thread local variable, its constructor HistogramStat stats_;
// has to be static. That's why we're using manually values from BucketMapper std::mutex mutex_;
double min_ = 1000000000; // this is BucketMapper:LastValue()
double max_ = 0;
double num_ = 0;
double sum_ = 0;
double sum_squares_ = 0;
uint64_t buckets_[138]; // this is BucketMapper::BucketCount()
}; };
} // namespace rocksdb } // namespace rocksdb

@ -3,57 +3,205 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
// //
#include "util/histogram.h" #include <cmath>
#include "util/histogram.h"
#include "util/histogram_windowing.h"
#include "util/testharness.h" #include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
class HistogramTest : public testing::Test {}; class HistogramTest : public testing::Test {};
TEST_F(HistogramTest, BasicOperation) { namespace {
HistogramImpl histogram; const double kIota = 0.1;
for (uint64_t i = 1; i <= 100; i++) { const HistogramBucketMapper bucketMapper;
Env* env = Env::Default();
}
void PopulateHistogram(Histogram& histogram,
uint64_t low, uint64_t high, uint64_t loop = 1) {
for (; loop > 0; loop--) {
for (uint64_t i = low; i <= high; i++) {
histogram.Add(i); histogram.Add(i);
} }
{
double median = histogram.Median();
// ASSERT_LE(median, 50);
ASSERT_GT(median, 0);
} }
}
{ void BasicOperation(Histogram& histogram) {
double percentile100 = histogram.Percentile(100.0); PopulateHistogram(histogram, 1, 100, 10);
ASSERT_LE(percentile100, 100.0);
ASSERT_GT(percentile100, 0.0); HistogramData data;
double percentile99 = histogram.Percentile(99.0); histogram.Data(&data);
double percentile85 = histogram.Percentile(85.0);
ASSERT_LE(percentile99, 99.0);
ASSERT_TRUE(percentile99 >= percentile85);
}
ASSERT_EQ(histogram.Average(), 50.5); // avg is acurately calculated. ASSERT_LE(fabs(histogram.Percentile(100.0) - 100.0), kIota);
ASSERT_LE(fabs(data.percentile99 - 99.0), kIota);
ASSERT_LE(fabs(data.percentile95 - 95.0), kIota);
ASSERT_LE(fabs(data.median - 50.0), kIota);
ASSERT_EQ(data.average, 50.5); // avg is acurately calculated.
ASSERT_LT(fabs(data.standard_deviation- 28.86), kIota); //sd is ~= 28.86
} }
TEST_F(HistogramTest, EmptyHistogram) { void MergeHistogram(Histogram& histogram, Histogram& other) {
HistogramImpl histogram; PopulateHistogram(histogram, 1, 100);
PopulateHistogram(other, 101, 200);
histogram.Merge(other);
HistogramData data;
histogram.Data(&data);
ASSERT_LE(fabs(histogram.Percentile(100.0) - 200.0), kIota);
ASSERT_LE(fabs(data.percentile99 - 198.0), kIota);
ASSERT_LE(fabs(data.percentile95 - 190.0), kIota);
ASSERT_LE(fabs(data.median - 100.0), kIota);
ASSERT_EQ(data.average, 100.5); // avg is acurately calculated.
ASSERT_LT(fabs(data.standard_deviation - 57.73), kIota); //sd is ~= 57.73
}
void EmptyHistogram(Histogram& histogram) {
ASSERT_EQ(histogram.min(), bucketMapper.LastValue());
ASSERT_EQ(histogram.max(), 0);
ASSERT_EQ(histogram.num(), 0);
ASSERT_EQ(histogram.Median(), 0.0); ASSERT_EQ(histogram.Median(), 0.0);
ASSERT_EQ(histogram.Percentile(85.0), 0.0); ASSERT_EQ(histogram.Percentile(85.0), 0.0);
ASSERT_EQ(histogram.Average(), 0.0); ASSERT_EQ(histogram.Average(), 0.0);
ASSERT_EQ(histogram.StandardDeviation(), 0.0);
} }
TEST_F(HistogramTest, ClearHistogram) { void ClearHistogram(Histogram& histogram) {
HistogramImpl histogram;
for (uint64_t i = 1; i <= 100; i++) { for (uint64_t i = 1; i <= 100; i++) {
histogram.Add(i); histogram.Add(i);
} }
histogram.Clear(); histogram.Clear();
ASSERT_TRUE(histogram.Empty());
ASSERT_EQ(histogram.Median(), 0); ASSERT_EQ(histogram.Median(), 0);
ASSERT_EQ(histogram.Percentile(85.0), 0); ASSERT_EQ(histogram.Percentile(85.0), 0);
ASSERT_EQ(histogram.Average(), 0); ASSERT_EQ(histogram.Average(), 0);
} }
TEST_F(HistogramTest, BasicOperation) {
HistogramImpl histogram;
BasicOperation(histogram);
HistogramWindowingImpl histogramWindowing;
BasicOperation(histogramWindowing);
}
TEST_F(HistogramTest, MergeHistogram) {
HistogramImpl histogram;
HistogramImpl other;
MergeHistogram(histogram, other);
HistogramWindowingImpl histogramWindowing;
HistogramWindowingImpl otherWindowing;
MergeHistogram(histogramWindowing, otherWindowing);
}
TEST_F(HistogramTest, EmptyHistogram) {
HistogramImpl histogram;
EmptyHistogram(histogram);
HistogramWindowingImpl histogramWindowing;
EmptyHistogram(histogramWindowing);
}
TEST_F(HistogramTest, ClearHistogram) {
HistogramImpl histogram;
ClearHistogram(histogram);
HistogramWindowingImpl histogramWindowing;
ClearHistogram(histogramWindowing);
}
TEST_F(HistogramTest, HistogramWindowingExpire) {
uint64_t num_windows = 3;
int micros_per_window = 1000000;
uint64_t min_num_per_window = 0;
HistogramWindowingImpl
histogramWindowing(num_windows, micros_per_window, min_num_per_window);
PopulateHistogram(histogramWindowing, 1, 1, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 100);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 1);
ASSERT_EQ(histogramWindowing.Average(), 1);
PopulateHistogram(histogramWindowing, 2, 2, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 200);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 2);
ASSERT_EQ(histogramWindowing.Average(), 1.5);
PopulateHistogram(histogramWindowing, 3, 3, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 3);
ASSERT_EQ(histogramWindowing.Average(), 2.0);
// dropping oldest window with value 1, remaining 2 ~ 4
PopulateHistogram(histogramWindowing, 4, 4, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 2);
ASSERT_EQ(histogramWindowing.max(), 4);
ASSERT_EQ(histogramWindowing.Average(), 3.0);
// dropping oldest window with value 2, remaining 3 ~ 5
PopulateHistogram(histogramWindowing, 5, 5, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 300);
ASSERT_EQ(histogramWindowing.min(), 3);
ASSERT_EQ(histogramWindowing.max(), 5);
ASSERT_EQ(histogramWindowing.Average(), 4.0);
}
TEST_F(HistogramTest, HistogramWindowingMerge) {
uint64_t num_windows = 3;
int micros_per_window = 1000000;
uint64_t min_num_per_window = 0;
HistogramWindowingImpl
histogramWindowing(num_windows, micros_per_window, min_num_per_window);
HistogramWindowingImpl
otherWindowing(num_windows, micros_per_window, min_num_per_window);
PopulateHistogram(histogramWindowing, 1, 1, 100);
PopulateHistogram(otherWindowing, 1, 1, 100);
env->SleepForMicroseconds(micros_per_window);
PopulateHistogram(histogramWindowing, 2, 2, 100);
PopulateHistogram(otherWindowing, 2, 2, 100);
env->SleepForMicroseconds(micros_per_window);
PopulateHistogram(histogramWindowing, 3, 3, 100);
PopulateHistogram(otherWindowing, 3, 3, 100);
env->SleepForMicroseconds(micros_per_window);
histogramWindowing.Merge(otherWindowing);
ASSERT_EQ(histogramWindowing.num(), 600);
ASSERT_EQ(histogramWindowing.min(), 1);
ASSERT_EQ(histogramWindowing.max(), 3);
ASSERT_EQ(histogramWindowing.Average(), 2.0);
// dropping oldest window with value 1, remaining 2 ~ 4
PopulateHistogram(histogramWindowing, 4, 4, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 500);
ASSERT_EQ(histogramWindowing.min(), 2);
ASSERT_EQ(histogramWindowing.max(), 4);
// dropping oldest window with value 2, remaining 3 ~ 5
PopulateHistogram(histogramWindowing, 5, 5, 100);
env->SleepForMicroseconds(micros_per_window);
ASSERT_EQ(histogramWindowing.num(), 400);
ASSERT_EQ(histogramWindowing.min(), 3);
ASSERT_EQ(histogramWindowing.max(), 5);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -0,0 +1,199 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/histogram.h"
#include "util/histogram_windowing.h"
#include <algorithm>
namespace rocksdb {
namespace {
const HistogramBucketMapper bucketMapper;
}
HistogramWindowingImpl::HistogramWindowingImpl() {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::HistogramWindowingImpl(
uint64_t num_windows,
uint64_t micros_per_window,
uint64_t min_num_per_window) :
num_windows_(num_windows),
micros_per_window_(micros_per_window),
min_num_per_window_(min_num_per_window) {
env_ = Env::Default();
window_stats_.reset(new HistogramStat[num_windows_]);
Clear();
}
HistogramWindowingImpl::~HistogramWindowingImpl(){
window_stats_.release();
}
void HistogramWindowingImpl::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Clear();
for (size_t i = 0; i < num_windows_; i++) {
window_stats_[i].Clear();
}
current_window_.store(0, std::memory_order_relaxed);
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
}
bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
// This function is designed to be lock free, as it's in the critical path
// of any operation.
// Each individual value is atomic, it is just that some samples can go
// in the older bucket which is tolerable.
void HistogramWindowingImpl::Add(uint64_t value){
TimerTick();
// Parent (global) member update
stats_.Add(value);
// Current window update
window_stats_[current_window()].Add(value);
}
void HistogramWindowingImpl::Merge(const Histogram& other) {
if (strcmp(Name(), other.Name()) == 0) {
Merge(dynamic_cast<const HistogramWindowingImpl&>(other));
}
}
void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.Merge(other.stats_);
if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
micros_per_window_ != other.micros_per_window_) {
return;
}
uint64_t cur_window = current_window();
uint64_t other_cur_window = other.current_window();
// going backwards for alignment
for (unsigned int i = 0;
i < std::min(num_windows_, other.num_windows_); i++) {
uint64_t window_index =
(cur_window + num_windows_ - i) % num_windows_;
uint64_t other_window_index =
(other_cur_window + other.num_windows_ - i) % other.num_windows_;
window_stats_[window_index].Merge(other.window_stats_[other_window_index]);
}
}
std::string HistogramWindowingImpl::ToString() const {
return stats_.ToString();
}
double HistogramWindowingImpl::Median() const {
return Percentile(50.0);
}
double HistogramWindowingImpl::Percentile(double p) const {
// Retry 3 times in total
for (int retry = 0; retry < 3; retry++) {
uint64_t start_num = stats_.num();
double result = stats_.Percentile(p);
// Detect if swap buckets or Clear() was called during calculation
if (stats_.num() >= start_num) {
return result;
}
}
return 0.0;
}
double HistogramWindowingImpl::Average() const {
return stats_.Average();
}
double HistogramWindowingImpl::StandardDeviation() const {
return stats_.StandardDeviation();
}
void HistogramWindowingImpl::Data(HistogramData * const data) const {
stats_.Data(data);
}
void HistogramWindowingImpl::TimerTick() {
uint64_t curr_time = env_->NowMicros();
if (curr_time - last_swap_time() > micros_per_window_ &&
window_stats_[current_window()].num() >= min_num_per_window_) {
SwapHistoryBucket();
}
}
void HistogramWindowingImpl::SwapHistoryBucket() {
// Threads executing Add() would be competing for this mutex, the first one
// who got the metex would take care of the bucket swap, other threads
// can skip this.
// If mutex is held by Merge() or Clear(), next Add() will take care of the
// swap, if needed.
if (mutex_.try_lock()) {
last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
uint64_t curr_window = current_window();
uint64_t next_window = (curr_window == num_windows_ - 1) ?
0 : curr_window + 1;
// subtract next buckets from totals and swap to next buckets
HistogramStat& stats_to_drop = window_stats_[next_window];
if (!stats_to_drop.Empty()) {
for (size_t b = 0; b < stats_.num_buckets_; b++){
stats_.buckets_[b].fetch_sub(
stats_to_drop.bucket_at(b), std::memory_order_relaxed);
}
if (stats_.min() == stats_to_drop.min()) {
uint64_t new_min = bucketMapper.LastValue();
for (unsigned int i = 0; i < num_windows_; i++) {
if (i != next_window) {
uint64_t m = window_stats_[i].min();
if (m < new_min) new_min = m;
}
}
stats_.min_.store(new_min, std::memory_order_relaxed);
}
if (stats_.max() == stats_to_drop.max()) {
uint64_t new_max = 0;
for (unsigned int i = 0; i < num_windows_; i++) {
if (i != next_window) {
uint64_t m = window_stats_[i].max();
if (m > new_max) new_max = m;
}
}
stats_.max_.store(new_max, std::memory_order_relaxed);
}
stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
stats_.sum_squares_.fetch_sub(
stats_to_drop.sum_squares(), std::memory_order_relaxed);
stats_to_drop.Clear();
}
// advance to next window bucket
current_window_.store(next_window, std::memory_order_relaxed);
mutex_.unlock();
}
}
} // namespace rocksdb

@ -0,0 +1,80 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "util/histogram.h"
#include "rocksdb/env.h"
namespace rocksdb {
class HistogramWindowingImpl : public Histogram
{
public:
HistogramWindowingImpl();
HistogramWindowingImpl(uint64_t num_windows,
uint64_t micros_per_window,
uint64_t min_num_per_window);
HistogramWindowingImpl(const HistogramImpl&) = delete;
HistogramWindowingImpl& operator=(const HistogramImpl&) = delete;
~HistogramWindowingImpl();
virtual void Clear() override;
virtual bool Empty() const override;
virtual void Add(uint64_t value) override;
virtual void Merge(const Histogram& other) override;
void Merge(const HistogramWindowingImpl& other);
virtual std::string ToString() const override;
virtual const char* Name() const override { return "HistogramWindowingImpl"; }
virtual uint64_t min() const override { return stats_.min(); }
virtual uint64_t max() const override { return stats_.max(); }
virtual uint64_t num() const override { return stats_.num(); }
virtual double Median() const override;
virtual double Percentile(double p) const override;
virtual double Average() const override;
virtual double StandardDeviation() const override;
virtual void Data(HistogramData* const data) const override;
private:
void TimerTick();
void SwapHistoryBucket();
inline uint64_t current_window() const {
return current_window_.load(std::memory_order_relaxed);
}
inline uint64_t last_swap_time() const{
return last_swap_time_.load(std::memory_order_relaxed);
}
Env* env_;
std::mutex mutex_;
// Aggregated stats over windows_stats_, all the computation is done
// upon aggregated values
HistogramStat stats_;
// This is a circular array representing the latest N time-windows.
// Each entry stores a time-window of data. Expiration is done
// on window-based.
std::unique_ptr<HistogramStat[]> window_stats_;
std::atomic_uint_fast64_t current_window_;
std::atomic_uint_fast64_t last_swap_time_;
// Following parameters are configuable
uint64_t num_windows_ = 5;
uint64_t micros_per_window_ = 60000000;
// By default, don't care about the number of values in current window
// when decide whether to swap windows or not.
uint64_t min_num_per_window_ = 0;
};
} // namespace rocksdb
Loading…
Cancel
Save