Make RateLimiter Customizable (#9141)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9141

Reviewed By: zhichao-cao

Differential Revision: D32432190

Pulled By: mrambacher

fbshipit-source-id: 7930ed88a02412128cd407b5063522484e45c6ce
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 04b2c16f9b
commit 7cd5835a28
  1. 2
      HISTORY.md
  2. 3
      db/db_test.cc
  3. 16
      include/rocksdb/rate_limiter.h
  4. 56
      options/customizable_test.cc
  5. 6
      options/db_options.cc
  6. 185
      util/rate_limiter.cc
  7. 44
      util/rate_limiter.h
  8. 92
      util/rate_limiter_test.cc

@ -40,7 +40,7 @@
### Public API change ### Public API change
* When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly. * When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly.
* Made FileSystem extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. * Made FileSystem and RateLimiter extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
* Clarified in API comments that RocksDB is not exception safe for callbacks and custom extensions. An exception propagating into RocksDB can lead to undefined behavior, including data loss, unreported corruption, deadlocks, and more. * Clarified in API comments that RocksDB is not exception safe for callbacks and custom extensions. An exception propagating into RocksDB can lead to undefined behavior, including data loss, unreported corruption, deadlocks, and more.
* Marked `WriteBufferManager` as `final` because it is not intended for extension. * Marked `WriteBufferManager` as `final` because it is not intended for extension.
* Removed unimportant implementation details from table_properties.h * Removed unimportant implementation details from table_properties.h

@ -3981,6 +3981,9 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter {
~MockedRateLimiterWithNoOptionalAPIImpl() override {} ~MockedRateLimiterWithNoOptionalAPIImpl() override {}
const char* Name() const override {
return "MockedRateLimiterWithNoOptionalAPI";
}
void SetBytesPerSecond(int64_t bytes_per_second) override { void SetBytesPerSecond(int64_t bytes_per_second) override {
(void)bytes_per_second; (void)bytes_per_second;
} }

@ -9,6 +9,7 @@
#pragma once #pragma once
#include "rocksdb/customizable.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -18,7 +19,7 @@ namespace ROCKSDB_NAMESPACE {
// Exceptions MUST NOT propagate out of overridden functions into RocksDB, // Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior // because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more. // including data loss, unreported corruption, deadlocks, and more.
class RateLimiter { class RateLimiter : public Customizable {
public: public:
enum class OpType { enum class OpType {
// Limitation: we currently only invoke Request() with OpType::kRead for // Limitation: we currently only invoke Request() with OpType::kRead for
@ -32,11 +33,20 @@ class RateLimiter {
kAllIo, kAllIo,
}; };
static const char* Type() { return "RateLimiter"; }
static Status CreateFromString(const ConfigOptions& options,
const std::string& value,
std::shared_ptr<RateLimiter>* result);
// For API compatibility, default to rate-limiting writes only. // For API compatibility, default to rate-limiting writes only.
explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} explicit RateLimiter(Mode mode = Mode::kWritesOnly);
virtual ~RateLimiter() {} virtual ~RateLimiter() {}
// Deprecated. Will be removed in a major release. Derived classes
// should implement this method.
virtual const char* Name() const override { return ""; }
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
// REQUIRED: bytes_per_second > 0 // REQUIRED: bytes_per_second > 0
virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0;
@ -124,7 +134,7 @@ class RateLimiter {
Mode GetMode() { return mode_; } Mode GetMode() { return mode_; }
private: private:
const Mode mode_; Mode mode_;
}; };
// Create a RateLimiter object, which can be shared among RocksDB instances to // Create a RateLimiter object, which can be shared among RocksDB instances to

@ -22,6 +22,7 @@
#include "rocksdb/env_encryption.h" #include "rocksdb/env_encryption.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/sst_partitioner.h" #include "rocksdb/sst_partitioner.h"
@ -35,6 +36,7 @@
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_checksum_helper.h" #include "util/file_checksum_helper.h"
#include "util/rate_limiter.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
@ -1391,6 +1393,21 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory {
} }
}; };
class MockRateLimiter : public RateLimiter {
public:
static const char* kClassName() { return "MockRateLimiter"; }
const char* Name() const override { return kClassName(); }
void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
int64_t GetBytesPerSecond() const override { return 0; }
int64_t GetSingleBurstBytes() const override { return 0; }
int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
return 0;
}
};
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
static int RegisterLocalObjects(ObjectLibrary& library, static int RegisterLocalObjects(ObjectLibrary& library,
const std::string& /*arg*/) { const std::string& /*arg*/) {
@ -1497,6 +1514,15 @@ static int RegisterLocalObjects(ObjectLibrary& library,
guard->reset(new MockTablePropertiesCollectorFactory()); guard->reset(new MockTablePropertiesCollectorFactory());
return guard->get(); return guard->get();
}); });
library.Register<RateLimiter>(
MockRateLimiter::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
std::string* /* errmsg */) {
guard->reset(new MockRateLimiter());
return guard->get();
});
return static_cast<int>(library.GetFactoryCount(&num_types)); return static_cast<int>(library.GetFactoryCount(&num_types));
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
@ -1895,6 +1921,36 @@ TEST_F(LoadCustomizableTest, LoadSystemClockTest) {
} }
} }
TEST_F(LoadCustomizableTest, LoadRateLimiterTest) {
std::shared_ptr<RateLimiter> result;
ASSERT_NOK(RateLimiter::CreateFromString(
config_options_, MockRateLimiter::kClassName(), &result));
ASSERT_OK(RateLimiter::CreateFromString(
config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234",
&result));
ASSERT_NE(result, nullptr);
#ifndef ROCKSDB_LITE
ASSERT_OK(RateLimiter::CreateFromString(
config_options_, GenericRateLimiter::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_OK(GetDBOptionsFromString(
config_options_, db_opts_,
std::string("rate_limiter=") + GenericRateLimiter::kClassName(),
&db_opts_));
ASSERT_NE(db_opts_.rate_limiter, nullptr);
if (RegisterTests("Test")) {
ASSERT_OK(RateLimiter::CreateFromString(
config_options_, MockRateLimiter::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_OK(GetDBOptionsFromString(
config_options_, db_opts_,
std::string("rate_limiter=") + MockRateLimiter::kClassName(),
&db_opts_));
ASSERT_NE(db_opts_.rate_limiter, nullptr);
}
#endif // ROCKSDB_LITE
}
TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) { TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) {
std::shared_ptr<TableFactory> table; std::shared_ptr<TableFactory> table;
std::shared_ptr<FlushBlockPolicyFactory> result; std::shared_ptr<FlushBlockPolicyFactory> result;

@ -418,6 +418,12 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"db_host_id", {"db_host_id",
{offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString, {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString,
OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}}, OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}},
{"rate_limiter",
OptionTypeInfo::AsCustomSharedPtr<RateLimiter>(
offsetof(struct ImmutableDBOptions, rate_limiter),
OptionVerificationType::kNormal,
OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)},
// The following properties were handled as special cases in ParseOption // The following properties were handled as special cases in ParseOption
// This means that the properties could be read from the options file // This means that the properties could be read from the options file
// but never written to the file or compared to each other. // but never written to the file or compared to each other.

@ -13,12 +13,16 @@
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/aligned_buffer.h" #include "util/aligned_buffer.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats, Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type) { RateLimiter::OpType op_type) {
@ -46,34 +50,69 @@ struct GenericRateLimiter::Req {
bool granted; bool granted;
}; };
static std::unordered_map<std::string, OptionTypeInfo>
generic_rate_limiter_type_info = {
#ifndef ROCKSDB_LITE
{"rate_bytes_per_sec",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
max_bytes_per_sec),
OptionType::kInt64T}},
{"refill_period_us",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
refill_period_us),
OptionType::kInt64T}},
{"fairness",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
fairness),
OptionType::kInt32T}},
{"auto_tuned",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
auto_tuned),
OptionType::kBoolean}},
{"clock",
OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
clock),
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kAllowNull)},
#endif // ROCKSDB_LITE
};
GenericRateLimiter::GenericRateLimiter( GenericRateLimiter::GenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock, RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
bool auto_tuned) bool auto_tuned)
: RateLimiter(mode), : RateLimiter(mode),
refill_period_us_(refill_period_us), options_(rate_bytes_per_sec, refill_period_us, fairness, clock,
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 auto_tuned),
: rate_bytes_per_sec),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
clock_(clock),
stop_(false), stop_(false),
exit_cv_(&request_mutex_), exit_cv_(&request_mutex_),
requests_to_wait_(0), requests_to_wait_(0),
available_bytes_(0), available_bytes_(0),
next_refill_us_(NowMicrosMonotonic()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)), rnd_((uint32_t)time(nullptr)),
wait_until_refill_pending_(false), wait_until_refill_pending_(false),
auto_tuned_(auto_tuned),
num_drains_(0), num_drains_(0),
prev_num_drains_(0), prev_num_drains_(0) {
max_bytes_per_sec_(rate_bytes_per_sec), RegisterOptions(&options_, &generic_rate_limiter_type_info);
tuned_time_(NowMicrosMonotonic()) {
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_requests_[i] = 0; total_requests_[i] = 0;
total_bytes_through_[i] = 0; total_bytes_through_[i] = 0;
} }
Initialize();
}
void GenericRateLimiter::Initialize() {
if (options_.clock == nullptr) {
options_.clock = SystemClock::Default();
}
options_.fairness = std::min(options_.fairness, 100);
next_refill_us_ = NowMicrosMonotonic();
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
if (options_.auto_tuned) {
rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2;
} else {
rate_bytes_per_sec_ = options_.max_bytes_per_sec;
}
refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_);
} }
GenericRateLimiter::~GenericRateLimiter() { GenericRateLimiter::~GenericRateLimiter() {
@ -97,6 +136,18 @@ GenericRateLimiter::~GenericRateLimiter() {
} }
} }
Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) {
if (options_.fairness <= 0) {
return Status::InvalidArgument("Fairness must be > 0");
} else if (options_.max_bytes_per_sec <= 0) {
return Status::InvalidArgument("max_bytes_per_sec must be > 0");
} else if (options_.refill_period_us <= 0) {
return Status::InvalidArgument("Refill_period_us must be > 0");
}
Initialize();
return RateLimiter::PrepareOptions(options);
}
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
assert(bytes_per_second > 0); assert(bytes_per_second > 0);
@ -115,11 +166,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
&rate_bytes_per_sec_); &rate_bytes_per_sec_);
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
if (auto_tuned_) { if (options_.auto_tuned) {
static const int kRefillsPerTune = 100; static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic()); std::chrono::microseconds now(NowMicrosMonotonic());
if (now - tuned_time_ >= if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds(
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { options_.refill_period_us)) {
Status s = Tune(); Status s = Tune();
s.PermitUncheckedError(); //**TODO: What to do on error? s.PermitUncheckedError(); //**TODO: What to do on error?
} }
@ -163,7 +214,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} else { } else {
// Whichever thread reaches here first performs duty (1) as described // Whichever thread reaches here first performs duty (1) as described
// above. // above.
int64_t wait_until = clock_->NowMicros() + time_until_refill_us; int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_; ++num_drains_;
wait_until_refill_pending_ = true; wait_until_refill_pending_ = true;
@ -223,12 +274,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
// first // first
pri_iteration_order[0] = Env::IO_USER; pri_iteration_order[0] = Env::IO_USER;
bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForHighPri", "PostRandomOneInFairnessForHighPri",
&high_pri_iterated_after_mid_low_pri); &high_pri_iterated_after_mid_low_pri);
bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForMidPri", "PostRandomOneInFairnessForMidPri",
@ -257,7 +308,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
void GenericRateLimiter::RefillBytesAndGrantRequests() { void GenericRateLimiter::RefillBytesAndGrantRequests() {
TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us;
// Carry over the left over quota from the last period // Carry over the left over quota from the last period
auto refill_bytes_per_period = auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed); refill_bytes_per_period_.load(std::memory_order_relaxed);
@ -297,12 +348,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
int64_t rate_bytes_per_sec) { int64_t rate_bytes_per_sec) {
if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) { if (port::kMaxInt64 / rate_bytes_per_sec < options_.refill_period_us) {
// Avoid unexpected result in the overflow case. The result now is still // Avoid unexpected result in the overflow case. The result now is still
// inaccurate but is a number that is large enough. // inaccurate but is a number that is large enough.
return port::kMaxInt64 / 1000000; return port::kMaxInt64 / 1000000;
} else { } else {
return rate_bytes_per_sec * refill_period_us_ / 1000000; return rate_bytes_per_sec * options_.refill_period_us / 1000000;
} }
} }
@ -317,10 +368,11 @@ Status GenericRateLimiter::Tune() {
std::chrono::microseconds prev_tuned_time = tuned_time_; std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + int64_t elapsed_intervals =
std::chrono::microseconds(refill_period_us_) - (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(options_.refill_period_us) -
std::chrono::microseconds(1)) / std::chrono::microseconds(1)) /
std::chrono::microseconds(refill_period_us_); std::chrono::microseconds(options_.refill_period_us);
// We tune every kRefillsPerTune intervals, so the overflow and division-by- // We tune every kRefillsPerTune intervals, so the overflow and division-by-
// zero conditions should never happen. // zero conditions should never happen.
assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100); assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
@ -331,20 +383,20 @@ Status GenericRateLimiter::Tune() {
int64_t prev_bytes_per_sec = GetBytesPerSecond(); int64_t prev_bytes_per_sec = GetBytesPerSecond();
int64_t new_bytes_per_sec; int64_t new_bytes_per_sec;
if (drained_pct == 0) { if (drained_pct == 0) {
new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor;
} else if (drained_pct < kLowWatermarkPct) { } else if (drained_pct < kLowWatermarkPct) {
// sanitize to prevent overflow // sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec = int64_t sanitized_prev_bytes_per_sec =
std::min(prev_bytes_per_sec, port::kMaxInt64 / 100); std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
new_bytes_per_sec = new_bytes_per_sec =
std::max(max_bytes_per_sec_ / kAllowedRangeFactor, std::max(options_.max_bytes_per_sec / kAllowedRangeFactor,
sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
} else if (drained_pct > kHighWatermarkPct) { } else if (drained_pct > kHighWatermarkPct) {
// sanitize to prevent overflow // sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec = std::min( int64_t sanitized_prev_bytes_per_sec = std::min(
prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct)); prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
new_bytes_per_sec = new_bytes_per_sec =
std::min(max_bytes_per_sec_, std::min(options_.max_bytes_per_sec,
sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
} else { } else {
new_bytes_per_sec = prev_bytes_per_sec; new_bytes_per_sec = prev_bytes_per_sec;
@ -364,8 +416,81 @@ RateLimiter* NewGenericRateLimiter(
assert(rate_bytes_per_sec > 0); assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0); assert(refill_period_us > 0);
assert(fairness > 0); assert(fairness > 0);
return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, std::unique_ptr<RateLimiter> limiter(
mode, SystemClock::Default(), auto_tuned); new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
mode, SystemClock::Default(), auto_tuned));
Status s = limiter->PrepareOptions(ConfigOptions());
if (s.ok()) {
return limiter.release();
} else {
assert(false);
return nullptr;
}
}
namespace {
#ifndef ROCKSDB_LITE
static int RegisterBuiltinRateLimiters(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<RateLimiter>(
GenericRateLimiter::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
std::string* /*errmsg*/) {
guard->reset(new GenericRateLimiter(port::kMaxInt64));
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
static std::unordered_map<std::string, RateLimiter::Mode>
rate_limiter_mode_map = {
{"kReadsOnly", RateLimiter::Mode::kReadsOnly},
{"kWritesOnly", RateLimiter::Mode::kWritesOnly},
{"kAllIo", RateLimiter::Mode::kAllIo},
};
#endif // ROCKSDB_LITE
static bool LoadRateLimiter(const std::string& name,
std::shared_ptr<RateLimiter>* limiter) {
auto plen = strlen(GenericRateLimiter::kClassName());
if (name.size() > plen + 2 && name[plen] == ':' &&
StartsWith(name, GenericRateLimiter::kClassName())) {
auto rate = ParseInt64(name.substr(plen + 1));
limiter->reset(new GenericRateLimiter(rate));
return true;
} else {
return false;
}
}
static std::unordered_map<std::string, OptionTypeInfo> rate_limiter_type_info =
{
#ifndef ROCKSDB_LITE
{"mode",
OptionTypeInfo::Enum<RateLimiter::Mode>(0, &rate_limiter_mode_map)},
#endif // ROCKSDB_LITE
};
} // namespace
RateLimiter::RateLimiter(Mode mode) : mode_(mode) {
RegisterOptions("", &mode_, &rate_limiter_type_info);
}
Status RateLimiter::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<RateLimiter>* result) {
if (value.empty()) {
result->reset();
return Status::OK();
} else {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
return LoadSharedObject<RateLimiter>(config_options, value, LoadRateLimiter,
result);
}
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -26,13 +26,38 @@ namespace ROCKSDB_NAMESPACE {
class GenericRateLimiter : public RateLimiter { class GenericRateLimiter : public RateLimiter {
public: public:
GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, struct GenericRateLimiterOptions {
int32_t fairness, RateLimiter::Mode mode, static const char* kName() { return "GenericRateLimiterOptions"; }
const std::shared_ptr<SystemClock>& clock, GenericRateLimiterOptions(int64_t _rate_bytes_per_sec,
bool auto_tuned); int64_t _refill_period_us, int32_t _fairness,
const std::shared_ptr<SystemClock>& _clock,
bool _auto_tuned)
: max_bytes_per_sec(_rate_bytes_per_sec),
refill_period_us(_refill_period_us),
clock(_clock),
fairness(_fairness > 100 ? 100 : _fairness),
auto_tuned(_auto_tuned) {}
int64_t max_bytes_per_sec;
int64_t refill_period_us;
std::shared_ptr<SystemClock> clock;
int32_t fairness;
bool auto_tuned;
};
public:
explicit GenericRateLimiter(
int64_t refill_bytes, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
const std::shared_ptr<SystemClock>& clock = nullptr,
bool auto_tuned = false);
virtual ~GenericRateLimiter(); virtual ~GenericRateLimiter();
static const char* kClassName() { return "GenericRateLimiter"; }
const char* Name() const override { return kClassName(); }
Status PrepareOptions(const ConfigOptions& options) override;
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
virtual void SetBytesPerSecond(int64_t bytes_per_second) override; virtual void SetBytesPerSecond(int64_t bytes_per_second) override;
@ -96,22 +121,24 @@ class GenericRateLimiter : public RateLimiter {
} }
private: private:
void Initialize();
void RefillBytesAndGrantRequests(); void RefillBytesAndGrantRequests();
std::vector<Env::IOPriority> GeneratePriorityIterationOrder(); std::vector<Env::IOPriority> GeneratePriorityIterationOrder();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Status Tune(); Status Tune();
uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; } uint64_t NowMicrosMonotonic() {
return options_.clock->NowNanos() / std::milli::den;
}
// This mutex guard all internal states // This mutex guard all internal states
mutable port::Mutex request_mutex_; mutable port::Mutex request_mutex_;
const int64_t refill_period_us_; GenericRateLimiterOptions options_;
int64_t rate_bytes_per_sec_; int64_t rate_bytes_per_sec_;
// This variable can be changed dynamically. // This variable can be changed dynamically.
std::atomic<int64_t> refill_bytes_per_period_; std::atomic<int64_t> refill_bytes_per_period_;
std::shared_ptr<SystemClock> clock_;
bool stop_; bool stop_;
port::CondVar exit_cv_; port::CondVar exit_cv_;
@ -122,17 +149,14 @@ class GenericRateLimiter : public RateLimiter {
int64_t available_bytes_; int64_t available_bytes_;
int64_t next_refill_us_; int64_t next_refill_us_;
int32_t fairness_;
Random rnd_; Random rnd_;
struct Req; struct Req;
std::deque<Req*> queue_[Env::IO_TOTAL]; std::deque<Req*> queue_[Env::IO_TOTAL];
bool wait_until_refill_pending_; bool wait_until_refill_pending_;
bool auto_tuned_;
int64_t num_drains_; int64_t num_drains_;
int64_t prev_num_drains_; int64_t prev_num_drains_;
const int64_t max_bytes_per_sec_;
std::chrono::microseconds tuned_time_; std::chrono::microseconds tuned_time_;
}; };

@ -15,8 +15,11 @@
#include <limits> #include <limits>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "options/options_parser.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "util/random.h" #include "util/random.h"
@ -463,6 +466,95 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
} }
TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) {
std::shared_ptr<RateLimiter> limiter;
ConfigOptions config_options;
std::string limiter_id = GenericRateLimiter::kClassName();
ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024",
&limiter));
ASSERT_NE(limiter, nullptr);
ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U);
#ifndef ROCKSDB_LITE
ASSERT_OK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter));
ASSERT_NE(limiter, nullptr);
ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U);
ASSERT_NOK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter));
ASSERT_NOK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id,
&limiter));
ASSERT_OK(
RateLimiter::CreateFromString(config_options,
"rate_bytes_per_sec=2048;refill_period_us="
"1024;fairness=42;auto_tuned=true;"
"mode=kReadsOnly;id=" +
limiter_id,
&limiter));
ASSERT_NE(limiter, nullptr);
auto opts =
limiter->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->max_bytes_per_sec, 2048);
ASSERT_EQ(opts->refill_period_us, 1024);
ASSERT_EQ(opts->fairness, 42);
ASSERT_EQ(opts->auto_tuned, true);
ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead));
ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite));
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
// This test is for a rate limiter that has no name (Name() returns "").
// When the default Name() method is deprecated, this test should be removed.
TEST_F(RateLimiterTest, NoNameRateLimiter) {
static std::unordered_map<std::string, OptionTypeInfo> dummy_limiter_options =
{
{"dummy",
{0, OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
class NoNameRateLimiter : public RateLimiter {
public:
explicit NoNameRateLimiter(bool do_register) {
if (do_register) {
RegisterOptions("", &dummy, &dummy_limiter_options);
}
}
void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
int64_t GetSingleBurstBytes() const override { return 0; }
int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetBytesPerSecond() const override { return 0; }
private:
int dummy;
};
ConfigOptions config_options;
DBOptions db_opts, copy;
db_opts.rate_limiter.reset(new NoNameRateLimiter(false));
ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
db_opts.rate_limiter.reset(new NoNameRateLimiter(true));
ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
std::string opt_str;
ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str));
ASSERT_OK(
GetDBOptionsFromString(config_options, DBOptions(), opt_str, &copy));
ASSERT_OK(
RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy));
ASSERT_EQ(copy.rate_limiter, nullptr);
ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter);
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save