Enable dynamic changing of rate limiter's bytes_per_second

Summary: This feature is going to be useful for mongodb+rocksdb. I'll expose it through mongo's API.

Test Plan: added new unit test. also will run TSAN on the new unit test

Reviewers: meyering, sdong

Reviewed By: meyering, sdong

Subscribers: meyering, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D35307
main
Igor Canadi 10 years ago
parent 652db51a31
commit 51301b869f
  1. 4
      include/rocksdb/rate_limiter.h
  2. 54
      util/rate_limiter.cc
  3. 13
      util/rate_limiter.h
  4. 41
      util/rate_limiter_test.cc

@ -17,6 +17,10 @@ class RateLimiter {
public:
virtual ~RateLimiter() {}
// This API allows user to dynamically change rate limiter's bytes per second.
// REQUIRED: bytes_per_second > 0
virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0;
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes()

@ -22,24 +22,23 @@ struct GenericRateLimiter::Req {
bool granted;
};
GenericRateLimiter::GenericRateLimiter(
int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness)
: refill_period_us_(refill_period_us),
refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0),
env_(Env::Default()),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
total_requests_{0, 0},
total_bytes_through_{0, 0},
available_bytes_(0),
next_refill_us_(env_->NowMicros()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr) {
GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness)
: refill_period_us_(refill_period_us),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec)),
env_(Env::Default()),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
total_requests_{0, 0},
total_bytes_through_{0, 0},
available_bytes_(0),
next_refill_us_(env_->NowMicros()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr) {
total_bytes_through_[0] = 0;
total_bytes_through_[1] = 0;
}
@ -60,8 +59,16 @@ GenericRateLimiter::~GenericRateLimiter() {
}
}
// This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
assert(bytes_per_second > 0);
refill_bytes_per_period_.store(
CalculateRefillBytesPerPeriod(bytes_per_second),
std::memory_order_relaxed);
}
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
assert(bytes <= refill_bytes_per_period_);
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
MutexLock g(&request_mutex_);
if (stop_) {
@ -169,8 +176,10 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
void GenericRateLimiter::Refill() {
next_refill_us_ = env_->NowMicros() + refill_period_us_;
// Carry over the left over quota from the last period
if (available_bytes_ < refill_bytes_per_period_) {
available_bytes_ += refill_bytes_per_period_;
auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed);
if (available_bytes_ < refill_bytes_per_period) {
available_bytes_ += refill_bytes_per_period;
}
int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
@ -197,6 +206,9 @@ void GenericRateLimiter::Refill() {
RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
return new GenericRateLimiter(
rate_bytes_per_sec, refill_period_us, fairness);
}

@ -9,6 +9,7 @@
#pragma once
#include <atomic>
#include <deque>
#include "port/port_posix.h"
#include "util/mutexlock.h"
@ -25,14 +26,16 @@ class GenericRateLimiter : public RateLimiter {
virtual ~GenericRateLimiter();
// This API allows user to dynamically change rate limiter's bytes per second.
virtual void SetBytesPerSecond(int64_t bytes_per_second) override;
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) override;
virtual int64_t GetSingleBurstBytes() const override {
// const var
return refill_bytes_per_period_;
return refill_bytes_per_period_.load(std::memory_order_relaxed);
}
virtual int64_t GetTotalBytesThrough(
@ -56,12 +59,16 @@ class GenericRateLimiter : public RateLimiter {
private:
void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec) {
return rate_bytes_per_sec * refill_period_us_ / 1000000.0;
}
// This mutex guard all internal states
mutable port::Mutex request_mutex_;
const int64_t refill_period_us_;
const int64_t refill_bytes_per_period_;
// This variable can be changed dynamically.
std::atomic<int64_t> refill_bytes_per_period_;
Env* const env_;
bool stop_;

@ -54,25 +54,36 @@ TEST_F(RateLimiterTest, Rate) {
}
};
for (int i = 1; i <= 16; i*=2) {
for (int i = 1; i <= 16; i *= 2) {
int32_t target = i * 1024 * 10;
Arg arg(target, i / 4 + 1);
auto start = env->NowMicros();
for (int t = 0; t < i; ++t) {
env->StartThread(writer, &arg);
}
env->WaitForJoin();
int64_t old_total_bytes_through = 0;
for (int iter = 1; iter <= 2; ++iter) {
// second iteration changes the target dynamically
if (iter == 2) {
target *= 2;
arg.limiter->SetBytesPerSecond(target);
}
auto start = env->NowMicros();
for (int t = 0; t < i; ++t) {
env->StartThread(writer, &arg);
}
env->WaitForJoin();
auto elapsed = env->NowMicros() - start;
double rate = arg.limiter->GetTotalBytesThrough()
* 1000000.0 / elapsed;
fprintf(stderr, "request size [1 - %" PRIi32 "], limit %" PRIi32
" KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
arg.request_size - 1, target / 1024, rate / 1024,
elapsed / 1000000.0);
auto elapsed = env->NowMicros() - start;
double rate =
(arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
1000000.0 / elapsed;
old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
fprintf(stderr,
"request size [1 - %" PRIi32 "], limit %" PRIi32
" KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
arg.request_size - 1, target / 1024, rate / 1024,
elapsed / 1000000.0);
ASSERT_GE(rate / target, 0.95);
ASSERT_LE(rate / target, 1.05);
ASSERT_GE(rate / target, 0.9);
ASSERT_LE(rate / target, 1.1);
}
}
}

Loading…
Cancel
Save