From 25cc564ff7c396e4a9728a6dfa5fd053a5181545 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 18 Jul 2022 14:48:42 -0700 Subject: [PATCH] Make RateLimiter not Customizable (#10378) Summary: (PR created for informational/testing purposes only.) - Fixes lost dynamic updates to GenericRateLimiter bandwidth using `SetBytesPerSecond()` - Benefit over #10374 is eliminating race conditions with Configurable framework. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10378 Reviewed By: pdillinger Differential Revision: D37914865 fbshipit-source-id: d4f566d60ec9726d26932388c61671adf0ee0f30 --- HISTORY.md | 5 + db/db_test.cc | 3 - include/rocksdb/rate_limiter.h | 16 +-- options/customizable_test.cc | 56 --------- options/db_options.cc | 9 +- util/rate_limiter.cc | 183 +++++------------------------- util/rate_limiter.h | 50 +++----- util/rate_limiter_test.cc | 92 --------------- utilities/backup/backup_engine.cc | 49 +------- 9 files changed, 61 insertions(+), 402 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 0ad4471d4..330a01bf6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,10 @@ # Rocksdb Change Log ## Unreleased +### Public API changes +* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. + +### Bug Fixes +* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object. ## 7.5.0 (07/15/2022) ### New Features diff --git a/db/db_test.cc b/db/db_test.cc index 1e430d08c..9defb81d0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4101,9 +4101,6 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter { ~MockedRateLimiterWithNoOptionalAPIImpl() override {} - const char* Name() const override { - return "MockedRateLimiterWithNoOptionalAPI"; - } void SetBytesPerSecond(int64_t bytes_per_second) override { (void)bytes_per_second; } diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 203d73dcf..9cad6edf4 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -9,7 +9,6 @@ #pragma once -#include "rocksdb/customizable.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -19,7 +18,7 @@ namespace ROCKSDB_NAMESPACE { // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. -class RateLimiter : public Customizable { +class RateLimiter { public: enum class OpType { kRead, @@ -32,20 +31,11 @@ class RateLimiter : public Customizable { kAllIo, }; - static const char* Type() { return "RateLimiter"; } - static Status CreateFromString(const ConfigOptions& options, - const std::string& value, - std::shared_ptr* result); - // For API compatibility, default to rate-limiting writes only. - explicit RateLimiter(Mode mode = Mode::kWritesOnly); + explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} virtual ~RateLimiter() {} - // Deprecated. Will be removed in a major release. Derived classes - // should implement this method. - virtual const char* Name() const override { return ""; } - // This API allows user to dynamically change rate limiter's bytes per second. // REQUIRED: bytes_per_second > 0 virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; @@ -135,7 +125,7 @@ class RateLimiter : public Customizable { Mode GetMode() { return mode_; } private: - Mode mode_; + const Mode mode_; }; // Create a RateLimiter object, which can be shared among RocksDB instances to diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 61c0eb368..e7ab2cd08 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -27,7 +27,6 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/memory_allocator.h" -#include "rocksdb/rate_limiter.h" #include "rocksdb/secondary_cache.h" #include "rocksdb/slice_transform.h" #include "rocksdb/sst_partitioner.h" @@ -42,7 +41,6 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/file_checksum_helper.h" -#include "util/rate_limiter.h" #include "util/string_util.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #include "utilities/memory_allocators.h" @@ -1472,21 +1470,6 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory { } }; -class MockRateLimiter : public RateLimiter { - public: - static const char* kClassName() { return "MockRateLimiter"; } - const char* Name() const override { return kClassName(); } - void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} - int64_t GetBytesPerSecond() const override { return 0; } - int64_t GetSingleBurstBytes() const override { return 0; } - int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { - return 0; - } -}; - class MockFilterPolicy : public FilterPolicy { public: static const char* kClassName() { return "MockFilterPolicy"; } @@ -1618,14 +1601,6 @@ static int RegisterLocalObjects(ObjectLibrary& library, return guard->get(); }); - library.AddFactory( - MockRateLimiter::kClassName(), - [](const std::string& /*uri*/, std::unique_ptr* guard, - std::string* /* errmsg */) { - guard->reset(new MockRateLimiter()); - return guard->get(); - }); - library.AddFactory( MockFilterPolicy::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, @@ -2149,37 +2124,6 @@ TEST_F(LoadCustomizableTest, LoadMemoryAllocatorTest) { } } -TEST_F(LoadCustomizableTest, LoadRateLimiterTest) { -#ifndef ROCKSDB_LITE - ASSERT_OK(TestSharedBuiltins(MockRateLimiter::kClassName(), - GenericRateLimiter::kClassName())); -#else - ASSERT_OK(TestSharedBuiltins(MockRateLimiter::kClassName(), "")); -#endif // ROCKSDB_LITE - - std::shared_ptr result; - ASSERT_OK(RateLimiter::CreateFromString( - config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234", - &result)); - ASSERT_NE(result, nullptr); - ASSERT_TRUE(result->IsInstanceOf(GenericRateLimiter::kClassName())); -#ifndef ROCKSDB_LITE - ASSERT_OK(GetDBOptionsFromString( - config_options_, db_opts_, - std::string("rate_limiter=") + GenericRateLimiter::kClassName(), - &db_opts_)); - ASSERT_NE(db_opts_.rate_limiter, nullptr); - if (RegisterTests("Test")) { - ExpectCreateShared(MockRateLimiter::kClassName()); - ASSERT_OK(GetDBOptionsFromString( - config_options_, db_opts_, - std::string("rate_limiter=") + MockRateLimiter::kClassName(), - &db_opts_)); - ASSERT_NE(db_opts_.rate_limiter, nullptr); - } -#endif // ROCKSDB_LITE -} - TEST_F(LoadCustomizableTest, LoadFilterPolicyTest) { const std::string kAutoBloom = BloomFilterPolicy::kClassName(); const std::string kAutoRibbon = RibbonFilterPolicy::kClassName(); diff --git a/options/db_options.cc b/options/db_options.cc index 92c56398d..e0bc892fc 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -421,12 +421,11 @@ static std::unordered_map {"db_host_id", {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString, OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}}, + // Temporarily deprecated due to race conditions (examples in PR 10375). {"rate_limiter", - OptionTypeInfo::AsCustomSharedPtr( - offsetof(struct ImmutableDBOptions, rate_limiter), - OptionVerificationType::kNormal, - OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)}, - + {offsetof(struct ImmutableDBOptions, rate_limiter), + OptionType::kUnknown, OptionVerificationType::kDeprecated, + OptionTypeFlags::kDontSerialize | OptionTypeFlags::kCompareNever}}, // The following properties were handled as special cases in ParseOption // This means that the properties could be read from the options file // but never written to the file or compared to each other. diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index f369e3220..3e3fe1787 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -13,14 +13,9 @@ #include "monitoring/statistics.h" #include "port/port.h" -#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" -#include "rocksdb/utilities/customizable_util.h" -#include "rocksdb/utilities/object_registry.h" -#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "util/aligned_buffer.h" -#include "util/string_util.h" namespace ROCKSDB_NAMESPACE { size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, @@ -50,68 +45,33 @@ struct GenericRateLimiter::Req { bool granted; }; -static std::unordered_map - generic_rate_limiter_type_info = { -#ifndef ROCKSDB_LITE - {"rate_bytes_per_sec", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - max_bytes_per_sec), - OptionType::kInt64T}}, - {"refill_period_us", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - refill_period_us), - OptionType::kInt64T}}, - {"fairness", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - fairness), - OptionType::kInt32T}}, - {"auto_tuned", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - auto_tuned), - OptionType::kBoolean}}, - {"clock", - OptionTypeInfo::AsCustomSharedPtr( - offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - clock), - OptionVerificationType::kByNameAllowFromNull, - OptionTypeFlags::kAllowNull)}, -#endif // ROCKSDB_LITE -}; - GenericRateLimiter::GenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, RateLimiter::Mode mode, const std::shared_ptr& clock, bool auto_tuned) : RateLimiter(mode), - options_(rate_bytes_per_sec, refill_period_us, fairness, clock, - auto_tuned), + refill_period_us_(refill_period_us), + rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 + : rate_bytes_per_sec), + refill_bytes_per_period_( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + clock_(clock), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), available_bytes_(0), + next_refill_us_(NowMicrosMonotonic()), + fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), wait_until_refill_pending_(false), - num_drains_(0) { - RegisterOptions(&options_, &generic_rate_limiter_type_info); + auto_tuned_(auto_tuned), + num_drains_(0), + max_bytes_per_sec_(rate_bytes_per_sec), + tuned_time_(NowMicrosMonotonic()) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { total_requests_[i] = 0; total_bytes_through_[i] = 0; } - Initialize(); -} -void GenericRateLimiter::Initialize() { - if (options_.clock == nullptr) { - options_.clock = SystemClock::Default(); - } - options_.fairness = std::min(options_.fairness, 100); - next_refill_us_ = NowMicrosMonotonic(); - tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); - if (options_.auto_tuned) { - rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2; - } else { - rate_bytes_per_sec_ = options_.max_bytes_per_sec; - } - refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_); } GenericRateLimiter::~GenericRateLimiter() { @@ -135,18 +95,6 @@ GenericRateLimiter::~GenericRateLimiter() { } } -Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) { - if (options_.fairness <= 0) { - return Status::InvalidArgument("Fairness must be > 0"); - } else if (options_.max_bytes_per_sec <= 0) { - return Status::InvalidArgument("max_bytes_per_sec must be > 0"); - } else if (options_.refill_period_us <= 0) { - return Status::InvalidArgument("Refill_period_us must be > 0"); - } - Initialize(); - return RateLimiter::PrepareOptions(options); -} - // This API allows user to dynamically change rate limiter's bytes per second. void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { assert(bytes_per_second > 0); @@ -165,11 +113,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, &rate_bytes_per_sec_); MutexLock g(&request_mutex_); - if (options_.auto_tuned) { + if (auto_tuned_) { static const int kRefillsPerTune = 100; std::chrono::microseconds now(NowMicrosMonotonic()); - if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds( - options_.refill_period_us)) { + if (now - tuned_time_ >= + kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { Status s = Tune(); s.PermitUncheckedError(); //**TODO: What to do on error? } @@ -213,7 +161,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { // Whichever thread reaches here first performs duty (1) as described // above. - int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us; + int64_t wait_until = clock_->NowMicros() + time_until_refill_us; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; wait_until_refill_pending_ = true; @@ -273,12 +221,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { // first pri_iteration_order[0] = Env::IO_USER; - bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness); + bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForHighPri", &high_pri_iterated_after_mid_low_pri); - bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness); + bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForMidPri", @@ -307,7 +255,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { void GenericRateLimiter::RefillBytesAndGrantRequests() { TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); - next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us; + next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); @@ -348,12 +296,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t rate_bytes_per_sec) { if (std::numeric_limits::max() / rate_bytes_per_sec < - options_.refill_period_us) { + refill_period_us_) { // Avoid unexpected result in the overflow case. The result now is still // inaccurate but is a number that is large enough. return std::numeric_limits::max() / 1000000; } else { - return rate_bytes_per_sec * options_.refill_period_us / 1000000; + return rate_bytes_per_sec * refill_period_us_ / 1000000; } } @@ -368,11 +316,10 @@ Status GenericRateLimiter::Tune() { std::chrono::microseconds prev_tuned_time = tuned_time_; tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); - int64_t elapsed_intervals = - (tuned_time_ - prev_tuned_time + - std::chrono::microseconds(options_.refill_period_us) - - std::chrono::microseconds(1)) / - std::chrono::microseconds(options_.refill_period_us); + int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + + std::chrono::microseconds(refill_period_us_) - + std::chrono::microseconds(1)) / + std::chrono::microseconds(refill_period_us_); // We tune every kRefillsPerTune intervals, so the overflow and division-by- // zero conditions should never happen. assert(num_drains_ <= std::numeric_limits::max() / 100); @@ -382,13 +329,13 @@ Status GenericRateLimiter::Tune() { int64_t prev_bytes_per_sec = GetBytesPerSecond(); int64_t new_bytes_per_sec; if (drained_pct == 0) { - new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor; + new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; } else if (drained_pct < kLowWatermarkPct) { // sanitize to prevent overflow int64_t sanitized_prev_bytes_per_sec = std::min(prev_bytes_per_sec, std::numeric_limits::max() / 100); new_bytes_per_sec = - std::max(options_.max_bytes_per_sec / kAllowedRangeFactor, + std::max(max_bytes_per_sec_ / kAllowedRangeFactor, sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); } else if (drained_pct > kHighWatermarkPct) { // sanitize to prevent overflow @@ -396,7 +343,7 @@ Status GenericRateLimiter::Tune() { std::min(prev_bytes_per_sec, std::numeric_limits::max() / (100 + kAdjustFactorPct)); new_bytes_per_sec = - std::min(options_.max_bytes_per_sec, + std::min(max_bytes_per_sec_, sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); } else { new_bytes_per_sec = prev_bytes_per_sec; @@ -419,79 +366,7 @@ RateLimiter* NewGenericRateLimiter( std::unique_ptr limiter( new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, mode, SystemClock::Default(), auto_tuned)); - Status s = limiter->PrepareOptions(ConfigOptions()); - if (s.ok()) { - return limiter.release(); - } else { - assert(false); - return nullptr; - } -} -namespace { -#ifndef ROCKSDB_LITE -static int RegisterBuiltinRateLimiters(ObjectLibrary& library, - const std::string& /*arg*/) { - library.AddFactory( - GenericRateLimiter::kClassName(), - [](const std::string& /*uri*/, std::unique_ptr* guard, - std::string* /*errmsg*/) { - guard->reset( - new GenericRateLimiter(std::numeric_limits::max())); - return guard->get(); - }); - size_t num_types; - return static_cast(library.GetFactoryCount(&num_types)); -} - -static std::unordered_map - rate_limiter_mode_map = { - {"kReadsOnly", RateLimiter::Mode::kReadsOnly}, - {"kWritesOnly", RateLimiter::Mode::kWritesOnly}, - {"kAllIo", RateLimiter::Mode::kAllIo}, -}; -#endif // ROCKSDB_LITE -static bool LoadRateLimiter(const std::string& name, - std::shared_ptr* limiter) { - auto plen = strlen(GenericRateLimiter::kClassName()); - if (name.size() > plen + 2 && name[plen] == ':' && - StartsWith(name, GenericRateLimiter::kClassName())) { - auto rate = ParseInt64(name.substr(plen + 1)); - limiter->reset(new GenericRateLimiter(rate)); - return true; - } else { - return false; - } -} - -static std::unordered_map rate_limiter_type_info = - { -#ifndef ROCKSDB_LITE - {"mode", - OptionTypeInfo::Enum(0, &rate_limiter_mode_map)}, -#endif // ROCKSDB_LITE -}; -} // namespace - -RateLimiter::RateLimiter(Mode mode) : mode_(mode) { - RegisterOptions("", &mode_, &rate_limiter_type_info); -} - -Status RateLimiter::CreateFromString(const ConfigOptions& config_options, - const std::string& value, - std::shared_ptr* result) { - if (value.empty()) { - result->reset(); - return Status::OK(); - } else { -#ifndef ROCKSDB_LITE - static std::once_flag once; - std::call_once(once, [&]() { - RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), ""); - }); -#endif // ROCKSDB_LITE - return LoadSharedObject(config_options, value, LoadRateLimiter, - result); - } + return limiter.release(); } } // namespace ROCKSDB_NAMESPACE diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 75751d3c5..7f01864c5 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -26,38 +26,13 @@ namespace ROCKSDB_NAMESPACE { class GenericRateLimiter : public RateLimiter { public: - struct GenericRateLimiterOptions { - static const char* kName() { return "GenericRateLimiterOptions"; } - GenericRateLimiterOptions(int64_t _rate_bytes_per_sec, - int64_t _refill_period_us, int32_t _fairness, - const std::shared_ptr& _clock, - bool _auto_tuned) - : max_bytes_per_sec(_rate_bytes_per_sec), - refill_period_us(_refill_period_us), - clock(_clock), - fairness(_fairness > 100 ? 100 : _fairness), - auto_tuned(_auto_tuned) {} - int64_t max_bytes_per_sec; - int64_t refill_period_us; - std::shared_ptr clock; - int32_t fairness; - bool auto_tuned; - }; - - public: - explicit GenericRateLimiter( - int64_t refill_bytes, int64_t refill_period_us = 100 * 1000, - int32_t fairness = 10, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, - const std::shared_ptr& clock = nullptr, - bool auto_tuned = false); + GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, + int32_t fairness, RateLimiter::Mode mode, + const std::shared_ptr& clock, + bool auto_tuned); virtual ~GenericRateLimiter(); - static const char* kClassName() { return "GenericRateLimiter"; } - const char* Name() const override { return kClassName(); } - Status PrepareOptions(const ConfigOptions& options) override; - // This API allows user to dynamically change rate limiter's bytes per second. virtual void SetBytesPerSecond(int64_t bytes_per_second) override; @@ -120,25 +95,29 @@ class GenericRateLimiter : public RateLimiter { return rate_bytes_per_sec_; } + virtual void TEST_SetClock(std::shared_ptr clock) { + MutexLock g(&request_mutex_); + clock_ = std::move(clock); + next_refill_us_ = NowMicrosMonotonic(); + } + private: - void Initialize(); void RefillBytesAndGrantRequests(); std::vector GeneratePriorityIterationOrder(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); Status Tune(); - uint64_t NowMicrosMonotonic() { - return options_.clock->NowNanos() / std::milli::den; - } + uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; } // This mutex guard all internal states mutable port::Mutex request_mutex_; - GenericRateLimiterOptions options_; + const int64_t refill_period_us_; int64_t rate_bytes_per_sec_; // This variable can be changed dynamically. std::atomic refill_bytes_per_period_; + std::shared_ptr clock_; bool stop_; port::CondVar exit_cv_; @@ -149,13 +128,16 @@ class GenericRateLimiter : public RateLimiter { int64_t available_bytes_; int64_t next_refill_us_; + int32_t fairness_; Random rnd_; struct Req; std::deque queue_[Env::IO_TOTAL]; bool wait_until_refill_pending_; + bool auto_tuned_; int64_t num_drains_; + const int64_t max_bytes_per_sec_; std::chrono::microseconds tuned_time_; }; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index cd809d183..5691ab26c 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,11 +15,8 @@ #include #include "db/db_test_util.h" -#include "options/options_parser.h" #include "port/port.h" -#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" -#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" @@ -466,95 +463,6 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); } -TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) { - std::shared_ptr limiter; - ConfigOptions config_options; - std::string limiter_id = GenericRateLimiter::kClassName(); - ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024", - &limiter)); - ASSERT_NE(limiter, nullptr); - ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U); -#ifndef ROCKSDB_LITE - ASSERT_OK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter)); - ASSERT_NE(limiter, nullptr); - ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U); - ASSERT_NOK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter)); - ASSERT_NOK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id, - &limiter)); - - ASSERT_OK( - RateLimiter::CreateFromString(config_options, - "rate_bytes_per_sec=2048;refill_period_us=" - "1024;fairness=42;auto_tuned=true;" - "mode=kReadsOnly;id=" + - limiter_id, - &limiter)); - ASSERT_NE(limiter, nullptr); - auto opts = - limiter->GetOptions(); - ASSERT_NE(opts, nullptr); - ASSERT_EQ(opts->max_bytes_per_sec, 2048); - ASSERT_EQ(opts->refill_period_us, 1024); - ASSERT_EQ(opts->fairness, 42); - ASSERT_EQ(opts->auto_tuned, true); - ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead)); - ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite)); -#endif // ROCKSDB_LITE -} - -#ifndef ROCKSDB_LITE -// This test is for a rate limiter that has no name (Name() returns ""). -// When the default Name() method is deprecated, this test should be removed. -TEST_F(RateLimiterTest, NoNameRateLimiter) { - static std::unordered_map dummy_limiter_options = - { - {"dummy", - {0, OptionType::kInt, OptionVerificationType::kNormal, - OptionTypeFlags::kNone}}, - }; - class NoNameRateLimiter : public RateLimiter { - public: - explicit NoNameRateLimiter(bool do_register) { - if (do_register) { - RegisterOptions("", &dummy, &dummy_limiter_options); - } - } - void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} - int64_t GetSingleBurstBytes() const override { return 0; } - int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetBytesPerSecond() const override { return 0; } - - private: - int dummy; - }; - - ConfigOptions config_options; - DBOptions db_opts, copy; - db_opts.rate_limiter.reset(new NoNameRateLimiter(false)); - ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); - ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); - db_opts.rate_limiter.reset(new NoNameRateLimiter(true)); - ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); - ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); - std::string opt_str; - ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str)); - ASSERT_OK( - GetDBOptionsFromString(config_options, DBOptions(), opt_str, ©)); - ASSERT_OK( - RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy)); - ASSERT_EQ(copy.rate_limiter, nullptr); - ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter); -} -#endif // ROCKSDB_LITE - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index f6f795993..754293e74 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -186,54 +186,13 @@ class BackupEngineImpl { const std::shared_ptr& backup_rate_limiter_clock, const std::shared_ptr& restore_rate_limiter_clock) { if (backup_rate_limiter_clock) { - assert(options_.backup_rate_limiter->IsInstanceOf( - GenericRateLimiter::kClassName())); - auto* backup_rate_limiter_options = - options_.backup_rate_limiter - ->GetOptions(); - - assert(backup_rate_limiter_options); - RateLimiter::Mode backup_rate_limiter_mode; - if (!options_.backup_rate_limiter->IsRateLimited( - RateLimiter::OpType::kRead)) { - backup_rate_limiter_mode = RateLimiter::Mode::kWritesOnly; - } else if (!options_.backup_rate_limiter->IsRateLimited( - RateLimiter::OpType::kWrite)) { - backup_rate_limiter_mode = RateLimiter::Mode::kReadsOnly; - } else { - backup_rate_limiter_mode = RateLimiter::Mode::kAllIo; - } - options_.backup_rate_limiter.reset(new GenericRateLimiter( - backup_rate_limiter_options->max_bytes_per_sec, - backup_rate_limiter_options->refill_period_us, - backup_rate_limiter_options->fairness, backup_rate_limiter_mode, - backup_rate_limiter_clock, backup_rate_limiter_options->auto_tuned)); + static_cast(options_.backup_rate_limiter.get()) + ->TEST_SetClock(backup_rate_limiter_clock); } if (restore_rate_limiter_clock) { - assert(options_.restore_rate_limiter->IsInstanceOf( - GenericRateLimiter::kClassName())); - auto* restore_rate_limiter_options = - options_.restore_rate_limiter - ->GetOptions(); - assert(restore_rate_limiter_options); - - RateLimiter::Mode restore_rate_limiter_mode; - if (!options_.restore_rate_limiter->IsRateLimited( - RateLimiter::OpType::kRead)) { - restore_rate_limiter_mode = RateLimiter::Mode::kWritesOnly; - } else if (!options_.restore_rate_limiter->IsRateLimited( - RateLimiter::OpType::kWrite)) { - restore_rate_limiter_mode = RateLimiter::Mode::kReadsOnly; - } else { - restore_rate_limiter_mode = RateLimiter::Mode::kAllIo; - } - options_.restore_rate_limiter.reset(new GenericRateLimiter( - restore_rate_limiter_options->max_bytes_per_sec, - restore_rate_limiter_options->refill_period_us, - restore_rate_limiter_options->fairness, restore_rate_limiter_mode, - restore_rate_limiter_clock, - restore_rate_limiter_options->auto_tuned)); + static_cast(options_.restore_rate_limiter.get()) + ->TEST_SetClock(restore_rate_limiter_clock); } }