Make RateLimiter not Customizable (#10378)

Summary:
(PR created for informational/testing purposes only.)

- Fixes lost dynamic updates to GenericRateLimiter bandwidth using `SetBytesPerSecond()`
- Benefit over #10374 is eliminating race conditions with Configurable framework.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10378

Reviewed By: pdillinger

Differential Revision: D37914865

fbshipit-source-id: d4f566d60ec9726d26932388c61671adf0ee0f30
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent d9deffba57
commit 25cc564ff7
  1. 5
      HISTORY.md
  2. 3
      db/db_test.cc
  3. 16
      include/rocksdb/rate_limiter.h
  4. 56
      options/customizable_test.cc
  5. 9
      options/db_options.cc
  6. 179
      util/rate_limiter.cc
  7. 50
      util/rate_limiter.h
  8. 92
      util/rate_limiter_test.cc
  9. 49
      utilities/backup/backup_engine.cc

@ -1,5 +1,10 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Public API changes
* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions.
### Bug Fixes
* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object.
## 7.5.0 (07/15/2022) ## 7.5.0 (07/15/2022)
### New Features ### New Features

@ -4101,9 +4101,6 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter {
~MockedRateLimiterWithNoOptionalAPIImpl() override {} ~MockedRateLimiterWithNoOptionalAPIImpl() override {}
const char* Name() const override {
return "MockedRateLimiterWithNoOptionalAPI";
}
void SetBytesPerSecond(int64_t bytes_per_second) override { void SetBytesPerSecond(int64_t bytes_per_second) override {
(void)bytes_per_second; (void)bytes_per_second;
} }

@ -9,7 +9,6 @@
#pragma once #pragma once
#include "rocksdb/customizable.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -19,7 +18,7 @@ namespace ROCKSDB_NAMESPACE {
// Exceptions MUST NOT propagate out of overridden functions into RocksDB, // Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior // because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more. // including data loss, unreported corruption, deadlocks, and more.
class RateLimiter : public Customizable { class RateLimiter {
public: public:
enum class OpType { enum class OpType {
kRead, kRead,
@ -32,20 +31,11 @@ class RateLimiter : public Customizable {
kAllIo, kAllIo,
}; };
static const char* Type() { return "RateLimiter"; }
static Status CreateFromString(const ConfigOptions& options,
const std::string& value,
std::shared_ptr<RateLimiter>* result);
// For API compatibility, default to rate-limiting writes only. // For API compatibility, default to rate-limiting writes only.
explicit RateLimiter(Mode mode = Mode::kWritesOnly); explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {}
virtual ~RateLimiter() {} virtual ~RateLimiter() {}
// Deprecated. Will be removed in a major release. Derived classes
// should implement this method.
virtual const char* Name() const override { return ""; }
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
// REQUIRED: bytes_per_second > 0 // REQUIRED: bytes_per_second > 0
virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0;
@ -135,7 +125,7 @@ class RateLimiter : public Customizable {
Mode GetMode() { return mode_; } Mode GetMode() { return mode_; }
private: private:
Mode mode_; const Mode mode_;
}; };
// Create a RateLimiter object, which can be shared among RocksDB instances to // Create a RateLimiter object, which can be shared among RocksDB instances to

@ -27,7 +27,6 @@
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/memory_allocator.h" #include "rocksdb/memory_allocator.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/sst_partitioner.h" #include "rocksdb/sst_partitioner.h"
@ -42,7 +41,6 @@
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_checksum_helper.h" #include "util/file_checksum_helper.h"
#include "util/rate_limiter.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
#include "utilities/memory_allocators.h" #include "utilities/memory_allocators.h"
@ -1472,21 +1470,6 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory {
} }
}; };
class MockRateLimiter : public RateLimiter {
public:
static const char* kClassName() { return "MockRateLimiter"; }
const char* Name() const override { return kClassName(); }
void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
int64_t GetBytesPerSecond() const override { return 0; }
int64_t GetSingleBurstBytes() const override { return 0; }
int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
return 0;
}
};
class MockFilterPolicy : public FilterPolicy { class MockFilterPolicy : public FilterPolicy {
public: public:
static const char* kClassName() { return "MockFilterPolicy"; } static const char* kClassName() { return "MockFilterPolicy"; }
@ -1618,14 +1601,6 @@ static int RegisterLocalObjects(ObjectLibrary& library,
return guard->get(); return guard->get();
}); });
library.AddFactory<RateLimiter>(
MockRateLimiter::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
std::string* /* errmsg */) {
guard->reset(new MockRateLimiter());
return guard->get();
});
library.AddFactory<const FilterPolicy>( library.AddFactory<const FilterPolicy>(
MockFilterPolicy::kClassName(), MockFilterPolicy::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard, [](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard,
@ -2149,37 +2124,6 @@ TEST_F(LoadCustomizableTest, LoadMemoryAllocatorTest) {
} }
} }
TEST_F(LoadCustomizableTest, LoadRateLimiterTest) {
#ifndef ROCKSDB_LITE
ASSERT_OK(TestSharedBuiltins<RateLimiter>(MockRateLimiter::kClassName(),
GenericRateLimiter::kClassName()));
#else
ASSERT_OK(TestSharedBuiltins<RateLimiter>(MockRateLimiter::kClassName(), ""));
#endif // ROCKSDB_LITE
std::shared_ptr<RateLimiter> result;
ASSERT_OK(RateLimiter::CreateFromString(
config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234",
&result));
ASSERT_NE(result, nullptr);
ASSERT_TRUE(result->IsInstanceOf(GenericRateLimiter::kClassName()));
#ifndef ROCKSDB_LITE
ASSERT_OK(GetDBOptionsFromString(
config_options_, db_opts_,
std::string("rate_limiter=") + GenericRateLimiter::kClassName(),
&db_opts_));
ASSERT_NE(db_opts_.rate_limiter, nullptr);
if (RegisterTests("Test")) {
ExpectCreateShared<RateLimiter>(MockRateLimiter::kClassName());
ASSERT_OK(GetDBOptionsFromString(
config_options_, db_opts_,
std::string("rate_limiter=") + MockRateLimiter::kClassName(),
&db_opts_));
ASSERT_NE(db_opts_.rate_limiter, nullptr);
}
#endif // ROCKSDB_LITE
}
TEST_F(LoadCustomizableTest, LoadFilterPolicyTest) { TEST_F(LoadCustomizableTest, LoadFilterPolicyTest) {
const std::string kAutoBloom = BloomFilterPolicy::kClassName(); const std::string kAutoBloom = BloomFilterPolicy::kClassName();
const std::string kAutoRibbon = RibbonFilterPolicy::kClassName(); const std::string kAutoRibbon = RibbonFilterPolicy::kClassName();

@ -421,12 +421,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"db_host_id", {"db_host_id",
{offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString, {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString,
OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}}, OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}},
// Temporarily deprecated due to race conditions (examples in PR 10375).
{"rate_limiter", {"rate_limiter",
OptionTypeInfo::AsCustomSharedPtr<RateLimiter>( {offsetof(struct ImmutableDBOptions, rate_limiter),
offsetof(struct ImmutableDBOptions, rate_limiter), OptionType::kUnknown, OptionVerificationType::kDeprecated,
OptionVerificationType::kNormal, OptionTypeFlags::kDontSerialize | OptionTypeFlags::kCompareNever}},
OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)},
// The following properties were handled as special cases in ParseOption // The following properties were handled as special cases in ParseOption
// This means that the properties could be read from the options file // This means that the properties could be read from the options file
// but never written to the file or compared to each other. // but never written to the file or compared to each other.

@ -13,14 +13,9 @@
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/aligned_buffer.h" #include "util/aligned_buffer.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
@ -50,68 +45,33 @@ struct GenericRateLimiter::Req {
bool granted; bool granted;
}; };
static std::unordered_map<std::string, OptionTypeInfo>
generic_rate_limiter_type_info = {
#ifndef ROCKSDB_LITE
{"rate_bytes_per_sec",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
max_bytes_per_sec),
OptionType::kInt64T}},
{"refill_period_us",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
refill_period_us),
OptionType::kInt64T}},
{"fairness",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
fairness),
OptionType::kInt32T}},
{"auto_tuned",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
auto_tuned),
OptionType::kBoolean}},
{"clock",
OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
clock),
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kAllowNull)},
#endif // ROCKSDB_LITE
};
GenericRateLimiter::GenericRateLimiter( GenericRateLimiter::GenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock, RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
bool auto_tuned) bool auto_tuned)
: RateLimiter(mode), : RateLimiter(mode),
options_(rate_bytes_per_sec, refill_period_us, fairness, clock, refill_period_us_(refill_period_us),
auto_tuned), rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
: rate_bytes_per_sec),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
clock_(clock),
stop_(false), stop_(false),
exit_cv_(&request_mutex_), exit_cv_(&request_mutex_),
requests_to_wait_(0), requests_to_wait_(0),
available_bytes_(0), available_bytes_(0),
next_refill_us_(NowMicrosMonotonic()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)), rnd_((uint32_t)time(nullptr)),
wait_until_refill_pending_(false), wait_until_refill_pending_(false),
num_drains_(0) { auto_tuned_(auto_tuned),
RegisterOptions(&options_, &generic_rate_limiter_type_info); num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec),
tuned_time_(NowMicrosMonotonic()) {
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_requests_[i] = 0; total_requests_[i] = 0;
total_bytes_through_[i] = 0; total_bytes_through_[i] = 0;
} }
Initialize();
}
void GenericRateLimiter::Initialize() {
if (options_.clock == nullptr) {
options_.clock = SystemClock::Default();
}
options_.fairness = std::min(options_.fairness, 100);
next_refill_us_ = NowMicrosMonotonic();
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
if (options_.auto_tuned) {
rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2;
} else {
rate_bytes_per_sec_ = options_.max_bytes_per_sec;
}
refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_);
} }
GenericRateLimiter::~GenericRateLimiter() { GenericRateLimiter::~GenericRateLimiter() {
@ -135,18 +95,6 @@ GenericRateLimiter::~GenericRateLimiter() {
} }
} }
Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) {
if (options_.fairness <= 0) {
return Status::InvalidArgument("Fairness must be > 0");
} else if (options_.max_bytes_per_sec <= 0) {
return Status::InvalidArgument("max_bytes_per_sec must be > 0");
} else if (options_.refill_period_us <= 0) {
return Status::InvalidArgument("Refill_period_us must be > 0");
}
Initialize();
return RateLimiter::PrepareOptions(options);
}
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
assert(bytes_per_second > 0); assert(bytes_per_second > 0);
@ -165,11 +113,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
&rate_bytes_per_sec_); &rate_bytes_per_sec_);
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
if (options_.auto_tuned) { if (auto_tuned_) {
static const int kRefillsPerTune = 100; static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic()); std::chrono::microseconds now(NowMicrosMonotonic());
if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds( if (now - tuned_time_ >=
options_.refill_period_us)) { kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
Status s = Tune(); Status s = Tune();
s.PermitUncheckedError(); //**TODO: What to do on error? s.PermitUncheckedError(); //**TODO: What to do on error?
} }
@ -213,7 +161,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} else { } else {
// Whichever thread reaches here first performs duty (1) as described // Whichever thread reaches here first performs duty (1) as described
// above. // above.
int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us; int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_; ++num_drains_;
wait_until_refill_pending_ = true; wait_until_refill_pending_ = true;
@ -273,12 +221,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
// first // first
pri_iteration_order[0] = Env::IO_USER; pri_iteration_order[0] = Env::IO_USER;
bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness); bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForHighPri", "PostRandomOneInFairnessForHighPri",
&high_pri_iterated_after_mid_low_pri); &high_pri_iterated_after_mid_low_pri);
bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness); bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForMidPri", "PostRandomOneInFairnessForMidPri",
@ -307,7 +255,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
void GenericRateLimiter::RefillBytesAndGrantRequests() { void GenericRateLimiter::RefillBytesAndGrantRequests() {
TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us; next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
// Carry over the left over quota from the last period // Carry over the left over quota from the last period
auto refill_bytes_per_period = auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed); refill_bytes_per_period_.load(std::memory_order_relaxed);
@ -348,12 +296,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
int64_t rate_bytes_per_sec) { int64_t rate_bytes_per_sec) {
if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec < if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
options_.refill_period_us) { refill_period_us_) {
// Avoid unexpected result in the overflow case. The result now is still // Avoid unexpected result in the overflow case. The result now is still
// inaccurate but is a number that is large enough. // inaccurate but is a number that is large enough.
return std::numeric_limits<int64_t>::max() / 1000000; return std::numeric_limits<int64_t>::max() / 1000000;
} else { } else {
return rate_bytes_per_sec * options_.refill_period_us / 1000000; return rate_bytes_per_sec * refill_period_us_ / 1000000;
} }
} }
@ -368,11 +316,10 @@ Status GenericRateLimiter::Tune() {
std::chrono::microseconds prev_tuned_time = tuned_time_; std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
int64_t elapsed_intervals = int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
(tuned_time_ - prev_tuned_time + std::chrono::microseconds(refill_period_us_) -
std::chrono::microseconds(options_.refill_period_us) -
std::chrono::microseconds(1)) / std::chrono::microseconds(1)) /
std::chrono::microseconds(options_.refill_period_us); std::chrono::microseconds(refill_period_us_);
// We tune every kRefillsPerTune intervals, so the overflow and division-by- // We tune every kRefillsPerTune intervals, so the overflow and division-by-
// zero conditions should never happen. // zero conditions should never happen.
assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100); assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
@ -382,13 +329,13 @@ Status GenericRateLimiter::Tune() {
int64_t prev_bytes_per_sec = GetBytesPerSecond(); int64_t prev_bytes_per_sec = GetBytesPerSecond();
int64_t new_bytes_per_sec; int64_t new_bytes_per_sec;
if (drained_pct == 0) { if (drained_pct == 0) {
new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor; new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
} else if (drained_pct < kLowWatermarkPct) { } else if (drained_pct < kLowWatermarkPct) {
// sanitize to prevent overflow // sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec = int64_t sanitized_prev_bytes_per_sec =
std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100); std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
new_bytes_per_sec = new_bytes_per_sec =
std::max(options_.max_bytes_per_sec / kAllowedRangeFactor, std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
} else if (drained_pct > kHighWatermarkPct) { } else if (drained_pct > kHighWatermarkPct) {
// sanitize to prevent overflow // sanitize to prevent overflow
@ -396,7 +343,7 @@ Status GenericRateLimiter::Tune() {
std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
(100 + kAdjustFactorPct)); (100 + kAdjustFactorPct));
new_bytes_per_sec = new_bytes_per_sec =
std::min(options_.max_bytes_per_sec, std::min(max_bytes_per_sec_,
sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
} else { } else {
new_bytes_per_sec = prev_bytes_per_sec; new_bytes_per_sec = prev_bytes_per_sec;
@ -419,79 +366,7 @@ RateLimiter* NewGenericRateLimiter(
std::unique_ptr<RateLimiter> limiter( std::unique_ptr<RateLimiter> limiter(
new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
mode, SystemClock::Default(), auto_tuned)); mode, SystemClock::Default(), auto_tuned));
Status s = limiter->PrepareOptions(ConfigOptions());
if (s.ok()) {
return limiter.release(); return limiter.release();
} else {
assert(false);
return nullptr;
}
}
namespace {
#ifndef ROCKSDB_LITE
static int RegisterBuiltinRateLimiters(ObjectLibrary& library,
const std::string& /*arg*/) {
library.AddFactory<RateLimiter>(
GenericRateLimiter::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
std::string* /*errmsg*/) {
guard->reset(
new GenericRateLimiter(std::numeric_limits<int64_t>::max()));
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
static std::unordered_map<std::string, RateLimiter::Mode>
rate_limiter_mode_map = {
{"kReadsOnly", RateLimiter::Mode::kReadsOnly},
{"kWritesOnly", RateLimiter::Mode::kWritesOnly},
{"kAllIo", RateLimiter::Mode::kAllIo},
};
#endif // ROCKSDB_LITE
static bool LoadRateLimiter(const std::string& name,
std::shared_ptr<RateLimiter>* limiter) {
auto plen = strlen(GenericRateLimiter::kClassName());
if (name.size() > plen + 2 && name[plen] == ':' &&
StartsWith(name, GenericRateLimiter::kClassName())) {
auto rate = ParseInt64(name.substr(plen + 1));
limiter->reset(new GenericRateLimiter(rate));
return true;
} else {
return false;
}
}
static std::unordered_map<std::string, OptionTypeInfo> rate_limiter_type_info =
{
#ifndef ROCKSDB_LITE
{"mode",
OptionTypeInfo::Enum<RateLimiter::Mode>(0, &rate_limiter_mode_map)},
#endif // ROCKSDB_LITE
};
} // namespace
RateLimiter::RateLimiter(Mode mode) : mode_(mode) {
RegisterOptions("", &mode_, &rate_limiter_type_info);
}
Status RateLimiter::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<RateLimiter>* result) {
if (value.empty()) {
result->reset();
return Status::OK();
} else {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
return LoadSharedObject<RateLimiter>(config_options, value, LoadRateLimiter,
result);
}
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -26,38 +26,13 @@ namespace ROCKSDB_NAMESPACE {
class GenericRateLimiter : public RateLimiter { class GenericRateLimiter : public RateLimiter {
public: public:
struct GenericRateLimiterOptions { GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
static const char* kName() { return "GenericRateLimiterOptions"; } int32_t fairness, RateLimiter::Mode mode,
GenericRateLimiterOptions(int64_t _rate_bytes_per_sec, const std::shared_ptr<SystemClock>& clock,
int64_t _refill_period_us, int32_t _fairness, bool auto_tuned);
const std::shared_ptr<SystemClock>& _clock,
bool _auto_tuned)
: max_bytes_per_sec(_rate_bytes_per_sec),
refill_period_us(_refill_period_us),
clock(_clock),
fairness(_fairness > 100 ? 100 : _fairness),
auto_tuned(_auto_tuned) {}
int64_t max_bytes_per_sec;
int64_t refill_period_us;
std::shared_ptr<SystemClock> clock;
int32_t fairness;
bool auto_tuned;
};
public:
explicit GenericRateLimiter(
int64_t refill_bytes, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
const std::shared_ptr<SystemClock>& clock = nullptr,
bool auto_tuned = false);
virtual ~GenericRateLimiter(); virtual ~GenericRateLimiter();
static const char* kClassName() { return "GenericRateLimiter"; }
const char* Name() const override { return kClassName(); }
Status PrepareOptions(const ConfigOptions& options) override;
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
virtual void SetBytesPerSecond(int64_t bytes_per_second) override; virtual void SetBytesPerSecond(int64_t bytes_per_second) override;
@ -120,25 +95,29 @@ class GenericRateLimiter : public RateLimiter {
return rate_bytes_per_sec_; return rate_bytes_per_sec_;
} }
virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
MutexLock g(&request_mutex_);
clock_ = std::move(clock);
next_refill_us_ = NowMicrosMonotonic();
}
private: private:
void Initialize();
void RefillBytesAndGrantRequests(); void RefillBytesAndGrantRequests();
std::vector<Env::IOPriority> GeneratePriorityIterationOrder(); std::vector<Env::IOPriority> GeneratePriorityIterationOrder();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Status Tune(); Status Tune();
uint64_t NowMicrosMonotonic() { uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; }
return options_.clock->NowNanos() / std::milli::den;
}
// This mutex guard all internal states // This mutex guard all internal states
mutable port::Mutex request_mutex_; mutable port::Mutex request_mutex_;
GenericRateLimiterOptions options_; const int64_t refill_period_us_;
int64_t rate_bytes_per_sec_; int64_t rate_bytes_per_sec_;
// This variable can be changed dynamically. // This variable can be changed dynamically.
std::atomic<int64_t> refill_bytes_per_period_; std::atomic<int64_t> refill_bytes_per_period_;
std::shared_ptr<SystemClock> clock_;
bool stop_; bool stop_;
port::CondVar exit_cv_; port::CondVar exit_cv_;
@ -149,13 +128,16 @@ class GenericRateLimiter : public RateLimiter {
int64_t available_bytes_; int64_t available_bytes_;
int64_t next_refill_us_; int64_t next_refill_us_;
int32_t fairness_;
Random rnd_; Random rnd_;
struct Req; struct Req;
std::deque<Req*> queue_[Env::IO_TOTAL]; std::deque<Req*> queue_[Env::IO_TOTAL];
bool wait_until_refill_pending_; bool wait_until_refill_pending_;
bool auto_tuned_;
int64_t num_drains_; int64_t num_drains_;
const int64_t max_bytes_per_sec_;
std::chrono::microseconds tuned_time_; std::chrono::microseconds tuned_time_;
}; };

@ -15,11 +15,8 @@
#include <limits> #include <limits>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "options/options_parser.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "util/random.h" #include "util/random.h"
@ -466,95 +463,6 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
} }
TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) {
std::shared_ptr<RateLimiter> limiter;
ConfigOptions config_options;
std::string limiter_id = GenericRateLimiter::kClassName();
ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024",
&limiter));
ASSERT_NE(limiter, nullptr);
ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U);
#ifndef ROCKSDB_LITE
ASSERT_OK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter));
ASSERT_NE(limiter, nullptr);
ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U);
ASSERT_NOK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter));
ASSERT_NOK(RateLimiter::CreateFromString(
config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id,
&limiter));
ASSERT_OK(
RateLimiter::CreateFromString(config_options,
"rate_bytes_per_sec=2048;refill_period_us="
"1024;fairness=42;auto_tuned=true;"
"mode=kReadsOnly;id=" +
limiter_id,
&limiter));
ASSERT_NE(limiter, nullptr);
auto opts =
limiter->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->max_bytes_per_sec, 2048);
ASSERT_EQ(opts->refill_period_us, 1024);
ASSERT_EQ(opts->fairness, 42);
ASSERT_EQ(opts->auto_tuned, true);
ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead));
ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite));
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
// This test is for a rate limiter that has no name (Name() returns "").
// When the default Name() method is deprecated, this test should be removed.
TEST_F(RateLimiterTest, NoNameRateLimiter) {
static std::unordered_map<std::string, OptionTypeInfo> dummy_limiter_options =
{
{"dummy",
{0, OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
class NoNameRateLimiter : public RateLimiter {
public:
explicit NoNameRateLimiter(bool do_register) {
if (do_register) {
RegisterOptions("", &dummy, &dummy_limiter_options);
}
}
void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {}
int64_t GetSingleBurstBytes() const override { return 0; }
int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override {
return 0;
}
int64_t GetBytesPerSecond() const override { return 0; }
private:
int dummy;
};
ConfigOptions config_options;
DBOptions db_opts, copy;
db_opts.rate_limiter.reset(new NoNameRateLimiter(false));
ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
db_opts.rate_limiter.reset(new NoNameRateLimiter(true));
ASSERT_EQ(db_opts.rate_limiter->GetId(), "");
ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), "");
std::string opt_str;
ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str));
ASSERT_OK(
GetDBOptionsFromString(config_options, DBOptions(), opt_str, &copy));
ASSERT_OK(
RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy));
ASSERT_EQ(copy.rate_limiter, nullptr);
ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter);
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -186,54 +186,13 @@ class BackupEngineImpl {
const std::shared_ptr<SystemClock>& backup_rate_limiter_clock, const std::shared_ptr<SystemClock>& backup_rate_limiter_clock,
const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) { const std::shared_ptr<SystemClock>& restore_rate_limiter_clock) {
if (backup_rate_limiter_clock) { if (backup_rate_limiter_clock) {
assert(options_.backup_rate_limiter->IsInstanceOf( static_cast<GenericRateLimiter*>(options_.backup_rate_limiter.get())
GenericRateLimiter::kClassName())); ->TEST_SetClock(backup_rate_limiter_clock);
auto* backup_rate_limiter_options =
options_.backup_rate_limiter
->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
assert(backup_rate_limiter_options);
RateLimiter::Mode backup_rate_limiter_mode;
if (!options_.backup_rate_limiter->IsRateLimited(
RateLimiter::OpType::kRead)) {
backup_rate_limiter_mode = RateLimiter::Mode::kWritesOnly;
} else if (!options_.backup_rate_limiter->IsRateLimited(
RateLimiter::OpType::kWrite)) {
backup_rate_limiter_mode = RateLimiter::Mode::kReadsOnly;
} else {
backup_rate_limiter_mode = RateLimiter::Mode::kAllIo;
}
options_.backup_rate_limiter.reset(new GenericRateLimiter(
backup_rate_limiter_options->max_bytes_per_sec,
backup_rate_limiter_options->refill_period_us,
backup_rate_limiter_options->fairness, backup_rate_limiter_mode,
backup_rate_limiter_clock, backup_rate_limiter_options->auto_tuned));
} }
if (restore_rate_limiter_clock) { if (restore_rate_limiter_clock) {
assert(options_.restore_rate_limiter->IsInstanceOf( static_cast<GenericRateLimiter*>(options_.restore_rate_limiter.get())
GenericRateLimiter::kClassName())); ->TEST_SetClock(restore_rate_limiter_clock);
auto* restore_rate_limiter_options =
options_.restore_rate_limiter
->GetOptions<GenericRateLimiter::GenericRateLimiterOptions>();
assert(restore_rate_limiter_options);
RateLimiter::Mode restore_rate_limiter_mode;
if (!options_.restore_rate_limiter->IsRateLimited(
RateLimiter::OpType::kRead)) {
restore_rate_limiter_mode = RateLimiter::Mode::kWritesOnly;
} else if (!options_.restore_rate_limiter->IsRateLimited(
RateLimiter::OpType::kWrite)) {
restore_rate_limiter_mode = RateLimiter::Mode::kReadsOnly;
} else {
restore_rate_limiter_mode = RateLimiter::Mode::kAllIo;
}
options_.restore_rate_limiter.reset(new GenericRateLimiter(
restore_rate_limiter_options->max_bytes_per_sec,
restore_rate_limiter_options->refill_period_us,
restore_rate_limiter_options->fairness, restore_rate_limiter_mode,
restore_rate_limiter_clock,
restore_rate_limiter_options->auto_tuned));
} }
} }

Loading…
Cancel
Save