expose RateLimiter definition

Summary:
User gets undefinied error since the definition is not exposed.
Also re-enable the db test with only upper bound check

Test Plan: db_test, rate_limit_test

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D20403
main
Lei Jin 11 years ago
parent 28b367db15
commit d650612c4c
  1. 11
      db/db_test.cc
  2. 24
      include/rocksdb/options.h
  3. 60
      include/rocksdb/rate_limiter.h
  4. 17
      util/rate_limiter.cc
  5. 34
      util/rate_limiter.h
  6. 4
      util/rate_limiter_test.cc

@ -7442,7 +7442,7 @@ TEST(DBTest, MTRandomTimeoutTest) {
/* /*
* This test is not reliable enough as it heavily depends on disk behavior. * This test is not reliable enough as it heavily depends on disk behavior.
* */
TEST(DBTest, RateLimitingTest) { TEST(DBTest, RateLimitingTest) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.write_buffer_size = 1 << 20; // 1MB options.write_buffer_size = 1 << 20; // 1MB
@ -7473,7 +7473,7 @@ TEST(DBTest, RateLimitingTest) {
// # rate limiting with 0.7 x threshold // # rate limiting with 0.7 x threshold
options.rate_limiter.reset( options.rate_limiter.reset(
NewRateLimiter(static_cast<int64_t>(0.7 * raw_rate))); NewGenericRateLimiter(static_cast<int64_t>(0.7 * raw_rate)));
env_->bytes_written_ = 0; env_->bytes_written_ = 0;
DestroyAndReopen(&options); DestroyAndReopen(&options);
@ -7489,11 +7489,11 @@ TEST(DBTest, RateLimitingTest) {
env_->bytes_written_); env_->bytes_written_);
double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio); fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
ASSERT_TRUE(ratio > 0.6 && ratio < 0.8); ASSERT_TRUE(ratio < 0.8);
// # rate limiting with half of the raw_rate // # rate limiting with half of the raw_rate
options.rate_limiter.reset( options.rate_limiter.reset(
NewRateLimiter(static_cast<int64_t>(raw_rate / 2))); NewGenericRateLimiter(static_cast<int64_t>(raw_rate / 2)));
env_->bytes_written_ = 0; env_->bytes_written_ = 0;
DestroyAndReopen(&options); DestroyAndReopen(&options);
@ -7509,9 +7509,8 @@ TEST(DBTest, RateLimitingTest) {
env_->bytes_written_); env_->bytes_written_);
ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio); fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
ASSERT_TRUE(ratio > 0.4 && ratio < 0.6); ASSERT_TRUE(ratio < 0.6);
} }
*/
} // namespace rocksdb } // namespace rocksdb

@ -39,7 +39,6 @@ class Slice;
class SliceTransform; class SliceTransform;
class Statistics; class Statistics;
class InternalKeyComparator; class InternalKeyComparator;
class RateLimiter;
// DB contents are stored in a set of blocks, each of which holds a // DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before // sequence of key,value pairs. Each block may be compressed before
@ -1027,29 +1026,6 @@ struct FlushOptions {
FlushOptions() : wait(true) {} FlushOptions() : wait(true) {}
}; };
// Create a RateLimiter object, which can be shared among RocksDB instances to
// control write rate of flush and compaction.
// @rate_bytes_per_sec: this is the only parameter you want to set most of the
// time. It controls the total write rate of compaction and flush in bytes per
// second. Currently, RocksDB does not enforce rate limit for anything other
// than flush and compaction, e.g. write to WAL.
// @refill_period_us: this controls how often tokens are refilled. For example,
// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
// burstier writes while smaller value introduces more CPU overhead.
// The default should work for most cases.
// @fairness: RateLimiter accepts high-pri requests and low-pri requests.
// A low-pri request is usually blocked in favor of hi-pri request. Currently,
// RocksDB assigns low-pri to request from compaciton and high-pri to request
// from flush. Low-pri requests can get blocked if flush requests come in
// continuouly. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
extern RateLimiter* NewRateLimiter(
int64_t rate_bytes_per_sec,
int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10);
// Get options based on some guidelines. Now only tune parameter based on // Get options based on some guidelines. Now only tune parameter based on
// flush/compaction and fill default parameters for other parameters. // flush/compaction and fill default parameters for other parameters.
// total_write_buffer_limit: budget for memory spent for mem tables // total_write_buffer_limit: budget for memory spent for mem tables

@ -0,0 +1,60 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/env.h"
namespace rocksdb {
class RateLimiter {
public:
virtual ~RateLimiter() {}
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0;
// Max bytes can be granted in a single burst
virtual int64_t GetSingleBurstBytes() const = 0;
// Total bytes that go though rate limiter
virtual int64_t GetTotalBytesThrough(
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
// Total # of requests that go though rate limiter
virtual int64_t GetTotalRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
};
// Create a RateLimiter object, which can be shared among RocksDB instances to
// control write rate of flush and compaction.
// @rate_bytes_per_sec: this is the only parameter you want to set most of the
// time. It controls the total write rate of compaction and flush in bytes per
// second. Currently, RocksDB does not enforce rate limit for anything other
// than flush and compaction, e.g. write to WAL.
// @refill_period_us: this controls how often tokens are refilled. For example,
// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
// burstier writes while smaller value introduces more CPU overhead.
// The default should work for most cases.
// @fairness: RateLimiter accepts high-pri requests and low-pri requests.
// A low-pri request is usually blocked in favor of hi-pri request. Currently,
// RocksDB assigns low-pri to request from compaciton and high-pri to request
// from flush. Low-pri requests can get blocked if flush requests come in
// continuouly. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
extern RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec,
int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10);
} // namespace rocksdb

@ -14,7 +14,7 @@ namespace rocksdb {
// Pending request // Pending request
struct RateLimiter::Req { struct GenericRateLimiter::Req {
explicit Req(int64_t bytes, port::Mutex* mu) : explicit Req(int64_t bytes, port::Mutex* mu) :
bytes(bytes), cv(mu), granted(false) {} bytes(bytes), cv(mu), granted(false) {}
int64_t bytes; int64_t bytes;
@ -23,7 +23,9 @@ struct RateLimiter::Req {
}; };
RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, GenericRateLimiter::GenericRateLimiter(
int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness) int32_t fairness)
: refill_period_us_(refill_period_us), : refill_period_us_(refill_period_us),
refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0), refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0),
@ -42,7 +44,7 @@ RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us,
total_bytes_through_[1] = 0; total_bytes_through_[1] = 0;
} }
RateLimiter::~RateLimiter() { GenericRateLimiter::~GenericRateLimiter() {
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
stop_ = true; stop_ = true;
requests_to_wait_ = queue_[Env::IO_LOW].size() + queue_[Env::IO_HIGH].size(); requests_to_wait_ = queue_[Env::IO_LOW].size() + queue_[Env::IO_HIGH].size();
@ -57,7 +59,7 @@ RateLimiter::~RateLimiter() {
} }
} }
void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
assert(bytes < refill_bytes_per_period_); assert(bytes < refill_bytes_per_period_);
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
@ -163,7 +165,7 @@ void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
} while (!r.granted); } while (!r.granted);
} }
void RateLimiter::Refill() { void GenericRateLimiter::Refill() {
next_refill_us_ = env_->NowMicros() + refill_period_us_; next_refill_us_ = env_->NowMicros() + refill_period_us_;
// Carry over the left over quota from the last period // Carry over the left over quota from the last period
if (available_bytes_ < refill_bytes_per_period_) { if (available_bytes_ < refill_bytes_per_period_) {
@ -192,9 +194,10 @@ void RateLimiter::Refill() {
} }
} }
RateLimiter* NewRateLimiter( RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
return new RateLimiter(rate_bytes_per_sec, refill_period_us, fairness); return new GenericRateLimiter(
rate_bytes_per_sec, refill_period_us, fairness);
} }
} // namespace rocksdb } // namespace rocksdb

@ -9,29 +9,34 @@
#pragma once #pragma once
#include <thread>
#include <deque> #include <deque>
#include "port/port_posix.h" #include "port/port_posix.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
namespace rocksdb { namespace rocksdb {
class RateLimiter { class GenericRateLimiter : public RateLimiter {
public: public:
RateLimiter(int64_t refill_bytes, int64_t refill_period_us, int32_t fairness); GenericRateLimiter(int64_t refill_bytes,
int64_t refill_period_us, int32_t fairness);
~RateLimiter(); virtual ~GenericRateLimiter();
// Request for token to write bytes. If this request can not be satisfied, // Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure // the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes() // bytes < GetSingleBurstBytes()
void Request(const int64_t bytes, const Env::IOPriority pri); virtual void Request(const int64_t bytes, const Env::IOPriority pri) override;
virtual int64_t GetSingleBurstBytes() const override {
// const var
return refill_bytes_per_period_;
}
int64_t GetTotalBytesThrough( virtual int64_t GetTotalBytesThrough(
const Env::IOPriority pri = Env::IO_TOTAL) const { const Env::IOPriority pri = Env::IO_TOTAL) const override {
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) { if (pri == Env::IO_TOTAL) {
return total_bytes_through_[Env::IO_LOW] + return total_bytes_through_[Env::IO_LOW] +
@ -40,7 +45,8 @@ class RateLimiter {
return total_bytes_through_[pri]; return total_bytes_through_[pri];
} }
int64_t GetTotalRequests(const Env::IOPriority pri = Env::IO_TOTAL) const { virtual int64_t GetTotalRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) { if (pri == Env::IO_TOTAL) {
return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH]; return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH];
@ -48,16 +54,6 @@ class RateLimiter {
return total_requests_[pri]; return total_requests_[pri];
} }
int64_t GetSingleBurstBytes() const {
// const var
return refill_bytes_per_period_;
}
int64_t GetAvailableBytes() const {
MutexLock g(&request_mutex_);
return available_bytes_;
}
private: private:
void Refill(); void Refill();

@ -21,14 +21,14 @@ class RateLimiterTest {
}; };
TEST(RateLimiterTest, StartStop) { TEST(RateLimiterTest, StartStop) {
std::unique_ptr<RateLimiter> limiter(new RateLimiter(100, 100, 10)); std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(100, 100, 10));
} }
TEST(RateLimiterTest, Rate) { TEST(RateLimiterTest, Rate) {
auto* env = Env::Default(); auto* env = Env::Default();
struct Arg { struct Arg {
Arg(int64_t target_rate, int burst) Arg(int64_t target_rate, int burst)
: limiter(new RateLimiter(target_rate, 100 * 1000, 10)), : limiter(new GenericRateLimiter(target_rate, 100 * 1000, 10)),
request_size(target_rate / 10), request_size(target_rate / 10),
burst(burst) {} burst(burst) {}
std::unique_ptr<RateLimiter> limiter; std::unique_ptr<RateLimiter> limiter;

Loading…
Cancel
Save