From 7cd5835a282e3eadec12e2223ee1f6e888897d01 Mon Sep 17 00:00:00 2001 From: mrambacher Date: Wed, 1 Dec 2021 06:55:59 -0800 Subject: [PATCH] Make RateLimiter Customizable (#9141) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9141 Reviewed By: zhichao-cao Differential Revision: D32432190 Pulled By: mrambacher fbshipit-source-id: 7930ed88a02412128cd407b5063522484e45c6ce --- HISTORY.md | 2 +- db/db_test.cc | 3 + include/rocksdb/rate_limiter.h | 16 ++- options/customizable_test.cc | 56 ++++++++++ options/db_options.cc | 6 ++ util/rate_limiter.cc | 187 +++++++++++++++++++++++++++------ util/rate_limiter.h | 44 ++++++-- util/rate_limiter_test.cc | 92 ++++++++++++++++ 8 files changed, 361 insertions(+), 45 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e2f274bed..9932fae1c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -40,7 +40,7 @@ ### Public API change * When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly. -* Made FileSystem extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. +* Made FileSystem and RateLimiter extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. * Clarified in API comments that RocksDB is not exception safe for callbacks and custom extensions. An exception propagating into RocksDB can lead to undefined behavior, including data loss, unreported corruption, deadlocks, and more. * Marked `WriteBufferManager` as `final` because it is not intended for extension. * Removed unimportant implementation details from table_properties.h diff --git a/db/db_test.cc b/db/db_test.cc index c3f1954c6..970725aa7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3981,6 +3981,9 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter { ~MockedRateLimiterWithNoOptionalAPIImpl() override {} + const char* Name() const override { + return "MockedRateLimiterWithNoOptionalAPI"; + } void SetBytesPerSecond(int64_t bytes_per_second) override { (void)bytes_per_second; } diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index d605c65b9..a6c606e4c 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -9,6 +9,7 @@ #pragma once +#include "rocksdb/customizable.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -18,7 +19,7 @@ namespace ROCKSDB_NAMESPACE { // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. -class RateLimiter { +class RateLimiter : public Customizable { public: enum class OpType { // Limitation: we currently only invoke Request() with OpType::kRead for @@ -32,11 +33,20 @@ class RateLimiter { kAllIo, }; + static const char* Type() { return "RateLimiter"; } + static Status CreateFromString(const ConfigOptions& options, + const std::string& value, + std::shared_ptr* result); + // For API compatibility, default to rate-limiting writes only. - explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} + explicit RateLimiter(Mode mode = Mode::kWritesOnly); virtual ~RateLimiter() {} + // Deprecated. Will be removed in a major release. Derived classes + // should implement this method. + virtual const char* Name() const override { return ""; } + // This API allows user to dynamically change rate limiter's bytes per second. // REQUIRED: bytes_per_second > 0 virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; @@ -124,7 +134,7 @@ class RateLimiter { Mode GetMode() { return mode_; } private: - const Mode mode_; + Mode mode_; }; // Create a RateLimiter object, which can be shared among RocksDB instances to diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 98e953e3e..dd00c730b 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -22,6 +22,7 @@ #include "rocksdb/env_encryption.h" #include "rocksdb/file_checksum.h" #include "rocksdb/flush_block_policy.h" +#include "rocksdb/rate_limiter.h" #include "rocksdb/secondary_cache.h" #include "rocksdb/slice_transform.h" #include "rocksdb/sst_partitioner.h" @@ -35,6 +36,7 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/file_checksum_helper.h" +#include "util/rate_limiter.h" #include "util/string_util.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" @@ -1391,6 +1393,21 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory { } }; +class MockRateLimiter : public RateLimiter { + public: + static const char* kClassName() { return "MockRateLimiter"; } + const char* Name() const override { return kClassName(); } + void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} + int64_t GetBytesPerSecond() const override { return 0; } + int64_t GetSingleBurstBytes() const override { return 0; } + int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { + return 0; + } + int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { + return 0; + } +}; + #ifndef ROCKSDB_LITE static int RegisterLocalObjects(ObjectLibrary& library, const std::string& /*arg*/) { @@ -1497,6 +1514,15 @@ static int RegisterLocalObjects(ObjectLibrary& library, guard->reset(new MockTablePropertiesCollectorFactory()); return guard->get(); }); + + library.Register( + MockRateLimiter::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new MockRateLimiter()); + return guard->get(); + }); + return static_cast(library.GetFactoryCount(&num_types)); } #endif // !ROCKSDB_LITE @@ -1895,6 +1921,36 @@ TEST_F(LoadCustomizableTest, LoadSystemClockTest) { } } +TEST_F(LoadCustomizableTest, LoadRateLimiterTest) { + std::shared_ptr result; + ASSERT_NOK(RateLimiter::CreateFromString( + config_options_, MockRateLimiter::kClassName(), &result)); + ASSERT_OK(RateLimiter::CreateFromString( + config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234", + &result)); + ASSERT_NE(result, nullptr); +#ifndef ROCKSDB_LITE + ASSERT_OK(RateLimiter::CreateFromString( + config_options_, GenericRateLimiter::kClassName(), &result)); + ASSERT_NE(result, nullptr); + ASSERT_OK(GetDBOptionsFromString( + config_options_, db_opts_, + std::string("rate_limiter=") + GenericRateLimiter::kClassName(), + &db_opts_)); + ASSERT_NE(db_opts_.rate_limiter, nullptr); + if (RegisterTests("Test")) { + ASSERT_OK(RateLimiter::CreateFromString( + config_options_, MockRateLimiter::kClassName(), &result)); + ASSERT_NE(result, nullptr); + ASSERT_OK(GetDBOptionsFromString( + config_options_, db_opts_, + std::string("rate_limiter=") + MockRateLimiter::kClassName(), + &db_opts_)); + ASSERT_NE(db_opts_.rate_limiter, nullptr); + } +#endif // ROCKSDB_LITE +} + TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) { std::shared_ptr table; std::shared_ptr result; diff --git a/options/db_options.cc b/options/db_options.cc index bd1cbda88..0fd23a9e9 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -418,6 +418,12 @@ static std::unordered_map {"db_host_id", {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString, OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}}, + {"rate_limiter", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct ImmutableDBOptions, rate_limiter), + OptionVerificationType::kNormal, + OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)}, + // The following properties were handled as special cases in ParseOption // This means that the properties could be read from the options file // but never written to the file or compared to each other. diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index c59a05c54..ef9f1ca94 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -13,12 +13,16 @@ #include "monitoring/statistics.h" #include "port/port.h" +#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" +#include "rocksdb/utilities/customizable_util.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "util/aligned_buffer.h" +#include "util/string_util.h" namespace ROCKSDB_NAMESPACE { - size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, Env::IOPriority io_priority, Statistics* stats, RateLimiter::OpType op_type) { @@ -46,34 +50,69 @@ struct GenericRateLimiter::Req { bool granted; }; +static std::unordered_map + generic_rate_limiter_type_info = { +#ifndef ROCKSDB_LITE + {"rate_bytes_per_sec", + {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, + max_bytes_per_sec), + OptionType::kInt64T}}, + {"refill_period_us", + {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, + refill_period_us), + OptionType::kInt64T}}, + {"fairness", + {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, + fairness), + OptionType::kInt32T}}, + {"auto_tuned", + {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, + auto_tuned), + OptionType::kBoolean}}, + {"clock", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, + clock), + OptionVerificationType::kByNameAllowFromNull, + OptionTypeFlags::kAllowNull)}, +#endif // ROCKSDB_LITE +}; + GenericRateLimiter::GenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, RateLimiter::Mode mode, const std::shared_ptr& clock, bool auto_tuned) : RateLimiter(mode), - refill_period_us_(refill_period_us), - rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 - : rate_bytes_per_sec), - refill_bytes_per_period_( - CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), - clock_(clock), + options_(rate_bytes_per_sec, refill_period_us, fairness, clock, + auto_tuned), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), available_bytes_(0), - next_refill_us_(NowMicrosMonotonic()), - fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), wait_until_refill_pending_(false), - auto_tuned_(auto_tuned), num_drains_(0), - prev_num_drains_(0), - max_bytes_per_sec_(rate_bytes_per_sec), - tuned_time_(NowMicrosMonotonic()) { + prev_num_drains_(0) { + RegisterOptions(&options_, &generic_rate_limiter_type_info); for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { total_requests_[i] = 0; total_bytes_through_[i] = 0; } + Initialize(); +} +void GenericRateLimiter::Initialize() { + if (options_.clock == nullptr) { + options_.clock = SystemClock::Default(); + } + options_.fairness = std::min(options_.fairness, 100); + next_refill_us_ = NowMicrosMonotonic(); + tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); + if (options_.auto_tuned) { + rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2; + } else { + rate_bytes_per_sec_ = options_.max_bytes_per_sec; + } + refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_); } GenericRateLimiter::~GenericRateLimiter() { @@ -97,6 +136,18 @@ GenericRateLimiter::~GenericRateLimiter() { } } +Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) { + if (options_.fairness <= 0) { + return Status::InvalidArgument("Fairness must be > 0"); + } else if (options_.max_bytes_per_sec <= 0) { + return Status::InvalidArgument("max_bytes_per_sec must be > 0"); + } else if (options_.refill_period_us <= 0) { + return Status::InvalidArgument("Refill_period_us must be > 0"); + } + Initialize(); + return RateLimiter::PrepareOptions(options); +} + // This API allows user to dynamically change rate limiter's bytes per second. void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { assert(bytes_per_second > 0); @@ -115,11 +166,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, &rate_bytes_per_sec_); MutexLock g(&request_mutex_); - if (auto_tuned_) { + if (options_.auto_tuned) { static const int kRefillsPerTune = 100; std::chrono::microseconds now(NowMicrosMonotonic()); - if (now - tuned_time_ >= - kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { + if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds( + options_.refill_period_us)) { Status s = Tune(); s.PermitUncheckedError(); //**TODO: What to do on error? } @@ -163,7 +214,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { // Whichever thread reaches here first performs duty (1) as described // above. - int64_t wait_until = clock_->NowMicros() + time_until_refill_us; + int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; wait_until_refill_pending_ = true; @@ -223,12 +274,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { // first pri_iteration_order[0] = Env::IO_USER; - bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); + bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForHighPri", &high_pri_iterated_after_mid_low_pri); - bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); + bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForMidPri", @@ -257,7 +308,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { void GenericRateLimiter::RefillBytesAndGrantRequests() { TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); - next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; + next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us; // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); @@ -297,12 +348,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t rate_bytes_per_sec) { - if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) { + if (port::kMaxInt64 / rate_bytes_per_sec < options_.refill_period_us) { // Avoid unexpected result in the overflow case. The result now is still // inaccurate but is a number that is large enough. return port::kMaxInt64 / 1000000; } else { - return rate_bytes_per_sec * refill_period_us_ / 1000000; + return rate_bytes_per_sec * options_.refill_period_us / 1000000; } } @@ -317,10 +368,11 @@ Status GenericRateLimiter::Tune() { std::chrono::microseconds prev_tuned_time = tuned_time_; tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); - int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + - std::chrono::microseconds(refill_period_us_) - - std::chrono::microseconds(1)) / - std::chrono::microseconds(refill_period_us_); + int64_t elapsed_intervals = + (tuned_time_ - prev_tuned_time + + std::chrono::microseconds(options_.refill_period_us) - + std::chrono::microseconds(1)) / + std::chrono::microseconds(options_.refill_period_us); // We tune every kRefillsPerTune intervals, so the overflow and division-by- // zero conditions should never happen. assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100); @@ -331,20 +383,20 @@ Status GenericRateLimiter::Tune() { int64_t prev_bytes_per_sec = GetBytesPerSecond(); int64_t new_bytes_per_sec; if (drained_pct == 0) { - new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; + new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor; } else if (drained_pct < kLowWatermarkPct) { // sanitize to prevent overflow int64_t sanitized_prev_bytes_per_sec = std::min(prev_bytes_per_sec, port::kMaxInt64 / 100); new_bytes_per_sec = - std::max(max_bytes_per_sec_ / kAllowedRangeFactor, + std::max(options_.max_bytes_per_sec / kAllowedRangeFactor, sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); } else if (drained_pct > kHighWatermarkPct) { // sanitize to prevent overflow int64_t sanitized_prev_bytes_per_sec = std::min( prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct)); new_bytes_per_sec = - std::min(max_bytes_per_sec_, + std::min(options_.max_bytes_per_sec, sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); } else { new_bytes_per_sec = prev_bytes_per_sec; @@ -364,8 +416,81 @@ RateLimiter* NewGenericRateLimiter( assert(rate_bytes_per_sec > 0); assert(refill_period_us > 0); assert(fairness > 0); - return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, - mode, SystemClock::Default(), auto_tuned); + std::unique_ptr limiter( + new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, + mode, SystemClock::Default(), auto_tuned)); + Status s = limiter->PrepareOptions(ConfigOptions()); + if (s.ok()) { + return limiter.release(); + } else { + assert(false); + return nullptr; + } +} +namespace { +#ifndef ROCKSDB_LITE +static int RegisterBuiltinRateLimiters(ObjectLibrary& library, + const std::string& /*arg*/) { + library.Register( + GenericRateLimiter::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new GenericRateLimiter(port::kMaxInt64)); + return guard->get(); + }); + size_t num_types; + return static_cast(library.GetFactoryCount(&num_types)); +} + +static std::unordered_map + rate_limiter_mode_map = { + {"kReadsOnly", RateLimiter::Mode::kReadsOnly}, + {"kWritesOnly", RateLimiter::Mode::kWritesOnly}, + {"kAllIo", RateLimiter::Mode::kAllIo}, +}; +#endif // ROCKSDB_LITE +static bool LoadRateLimiter(const std::string& name, + std::shared_ptr* limiter) { + auto plen = strlen(GenericRateLimiter::kClassName()); + if (name.size() > plen + 2 && name[plen] == ':' && + StartsWith(name, GenericRateLimiter::kClassName())) { + auto rate = ParseInt64(name.substr(plen + 1)); + limiter->reset(new GenericRateLimiter(rate)); + return true; + } else { + return false; + } +} + +static std::unordered_map rate_limiter_type_info = + { +#ifndef ROCKSDB_LITE + {"mode", + OptionTypeInfo::Enum(0, &rate_limiter_mode_map)}, +#endif // ROCKSDB_LITE +}; +} // namespace + +RateLimiter::RateLimiter(Mode mode) : mode_(mode) { + RegisterOptions("", &mode_, &rate_limiter_type_info); +} + +Status RateLimiter::CreateFromString(const ConfigOptions& config_options, + const std::string& value, + std::shared_ptr* result) { + if (value.empty()) { + result->reset(); + return Status::OK(); + } else { +#ifndef ROCKSDB_LITE + static std::once_flag once; + std::call_once(once, [&]() { + RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), ""); + }); +#endif // ROCKSDB_LITE + return LoadSharedObject(config_options, value, LoadRateLimiter, + result); + } } } // namespace ROCKSDB_NAMESPACE diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 3752bf88d..0e170c971 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -26,13 +26,38 @@ namespace ROCKSDB_NAMESPACE { class GenericRateLimiter : public RateLimiter { public: - GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, - int32_t fairness, RateLimiter::Mode mode, - const std::shared_ptr& clock, - bool auto_tuned); + struct GenericRateLimiterOptions { + static const char* kName() { return "GenericRateLimiterOptions"; } + GenericRateLimiterOptions(int64_t _rate_bytes_per_sec, + int64_t _refill_period_us, int32_t _fairness, + const std::shared_ptr& _clock, + bool _auto_tuned) + : max_bytes_per_sec(_rate_bytes_per_sec), + refill_period_us(_refill_period_us), + clock(_clock), + fairness(_fairness > 100 ? 100 : _fairness), + auto_tuned(_auto_tuned) {} + int64_t max_bytes_per_sec; + int64_t refill_period_us; + std::shared_ptr clock; + int32_t fairness; + bool auto_tuned; + }; + + public: + explicit GenericRateLimiter( + int64_t refill_bytes, int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, + const std::shared_ptr& clock = nullptr, + bool auto_tuned = false); virtual ~GenericRateLimiter(); + static const char* kClassName() { return "GenericRateLimiter"; } + const char* Name() const override { return kClassName(); } + Status PrepareOptions(const ConfigOptions& options) override; + // This API allows user to dynamically change rate limiter's bytes per second. virtual void SetBytesPerSecond(int64_t bytes_per_second) override; @@ -96,22 +121,24 @@ class GenericRateLimiter : public RateLimiter { } private: + void Initialize(); void RefillBytesAndGrantRequests(); std::vector GeneratePriorityIterationOrder(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); Status Tune(); - uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; } + uint64_t NowMicrosMonotonic() { + return options_.clock->NowNanos() / std::milli::den; + } // This mutex guard all internal states mutable port::Mutex request_mutex_; - const int64_t refill_period_us_; + GenericRateLimiterOptions options_; int64_t rate_bytes_per_sec_; // This variable can be changed dynamically. std::atomic refill_bytes_per_period_; - std::shared_ptr clock_; bool stop_; port::CondVar exit_cv_; @@ -122,17 +149,14 @@ class GenericRateLimiter : public RateLimiter { int64_t available_bytes_; int64_t next_refill_us_; - int32_t fairness_; Random rnd_; struct Req; std::deque queue_[Env::IO_TOTAL]; bool wait_until_refill_pending_; - bool auto_tuned_; int64_t num_drains_; int64_t prev_num_drains_; - const int64_t max_bytes_per_sec_; std::chrono::microseconds tuned_time_; }; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 5ea3da475..ad44d5736 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,8 +15,11 @@ #include #include "db/db_test_util.h" +#include "options/options_parser.h" #include "port/port.h" +#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" +#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" @@ -463,6 +466,95 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); } +TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) { + std::shared_ptr limiter; + ConfigOptions config_options; + std::string limiter_id = GenericRateLimiter::kClassName(); + ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024", + &limiter)); + ASSERT_NE(limiter, nullptr); + ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U); +#ifndef ROCKSDB_LITE + ASSERT_OK(RateLimiter::CreateFromString( + config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter)); + ASSERT_NE(limiter, nullptr); + ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U); + ASSERT_NOK(RateLimiter::CreateFromString( + config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter)); + ASSERT_NOK(RateLimiter::CreateFromString( + config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id, + &limiter)); + + ASSERT_OK( + RateLimiter::CreateFromString(config_options, + "rate_bytes_per_sec=2048;refill_period_us=" + "1024;fairness=42;auto_tuned=true;" + "mode=kReadsOnly;id=" + + limiter_id, + &limiter)); + ASSERT_NE(limiter, nullptr); + auto opts = + limiter->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->max_bytes_per_sec, 2048); + ASSERT_EQ(opts->refill_period_us, 1024); + ASSERT_EQ(opts->fairness, 42); + ASSERT_EQ(opts->auto_tuned, true); + ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead)); + ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite)); +#endif // ROCKSDB_LITE +} + +#ifndef ROCKSDB_LITE +// This test is for a rate limiter that has no name (Name() returns ""). +// When the default Name() method is deprecated, this test should be removed. +TEST_F(RateLimiterTest, NoNameRateLimiter) { + static std::unordered_map dummy_limiter_options = + { + {"dummy", + {0, OptionType::kInt, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + }; + class NoNameRateLimiter : public RateLimiter { + public: + explicit NoNameRateLimiter(bool do_register) { + if (do_register) { + RegisterOptions("", &dummy, &dummy_limiter_options); + } + } + void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} + int64_t GetSingleBurstBytes() const override { return 0; } + int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { + return 0; + } + int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { + return 0; + } + int64_t GetBytesPerSecond() const override { return 0; } + + private: + int dummy; + }; + + ConfigOptions config_options; + DBOptions db_opts, copy; + db_opts.rate_limiter.reset(new NoNameRateLimiter(false)); + ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); + ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); + db_opts.rate_limiter.reset(new NoNameRateLimiter(true)); + ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); + ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); + std::string opt_str; + ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str)); + ASSERT_OK( + GetDBOptionsFromString(config_options, DBOptions(), opt_str, ©)); + ASSERT_OK( + RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy)); + ASSERT_EQ(copy.rate_limiter, nullptr); + ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter); +} +#endif // ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {