From e576f2ab1920b05892d6ae330f982963e02b6a14 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 19 Jul 2022 09:31:14 -0700 Subject: [PATCH] Fix race conditions in GenericRateLimiter (#10374) Summary: Made locking strict for all accesses of `GenericRateLimiter` internal state. `SetBytesPerSecond()` was the main problem since it had no locking, while the two updates it makes need to be done as one atomic operation. The test case, "ConfigOptionsTest.ConfiguringOptionsDoesNotRevertRateLimiterBandwidth", is for the issue fixed in https://github.com/facebook/rocksdb/issues/10378, but I forgot to include the test there. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10374 Reviewed By: pdillinger Differential Revision: D37906367 Pulled By: ajkr fbshipit-source-id: ccde620d2a7f96d1401bdafd2bdb685cbefbafa5 --- HISTORY.md | 1 + options/options_test.cc | 16 +++++++++++++ util/rate_limiter.cc | 48 ++++++++++++++++++++++----------------- util/rate_limiter.h | 22 ++++++++++-------- util/rate_limiter_test.cc | 17 ++++++++------ 5 files changed, 66 insertions(+), 38 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e6d01636e..5b8c42207 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ ### Bug Fixes * Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object. +* Fix race conditions in `GenericRateLimiter`. ## 7.5.0 (07/15/2022) ### New Features diff --git a/options/options_test.cc b/options/options_test.cc index fbd078ba6..0124175bc 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -4958,6 +4958,22 @@ TEST_F(ConfigOptionsTest, MergeOperatorFromString) { ASSERT_EQ(*delimiter, "&&"); } +TEST_F(ConfigOptionsTest, ConfiguringOptionsDoesNotRevertRateLimiterBandwidth) { + // Regression test for bug where rate limiter's dynamically set bandwidth + // could be silently reverted when configuring an options structure with an + // existing `rate_limiter`. + Options base_options; + base_options.rate_limiter.reset( + NewGenericRateLimiter(1 << 20 /* rate_bytes_per_sec */)); + Options copy_options(base_options); + + base_options.rate_limiter->SetBytesPerSecond(2 << 20); + ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond()); + + ASSERT_OK(GetOptionsFromString(base_options, "", ©_options)); + ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond()); +} + INSTANTIATE_TEST_CASE_P(OptionsSanityCheckTest, OptionsSanityCheckTest, ::testing::Bool()); #endif // !ROCKSDB_LITE diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 3e3fe1787..6bbcabfae 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -54,20 +54,20 @@ GenericRateLimiter::GenericRateLimiter( rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 : rate_bytes_per_sec), refill_bytes_per_period_( - CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)), clock_(clock), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), available_bytes_(0), - next_refill_us_(NowMicrosMonotonic()), + next_refill_us_(NowMicrosMonotonicLocked()), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), wait_until_refill_pending_(false), auto_tuned_(auto_tuned), num_drains_(0), max_bytes_per_sec_(rate_bytes_per_sec), - tuned_time_(NowMicrosMonotonic()) { + tuned_time_(NowMicrosMonotonicLocked()) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { total_requests_[i] = 0; total_bytes_through_[i] = 0; @@ -97,10 +97,15 @@ GenericRateLimiter::~GenericRateLimiter() { // This API allows user to dynamically change rate limiter's bytes per second. void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { + MutexLock g(&request_mutex_); + SetBytesPerSecondLocked(bytes_per_second); +} + +void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) { assert(bytes_per_second > 0); - rate_bytes_per_sec_ = bytes_per_second; + rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed); refill_bytes_per_period_.store( - CalculateRefillBytesPerPeriod(bytes_per_second), + CalculateRefillBytesPerPeriodLocked(bytes_per_second), std::memory_order_relaxed); } @@ -115,10 +120,10 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, if (auto_tuned_) { static const int kRefillsPerTune = 100; - std::chrono::microseconds now(NowMicrosMonotonic()); + std::chrono::microseconds now(NowMicrosMonotonicLocked()); if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { - Status s = Tune(); + Status s = TuneLocked(); s.PermitUncheckedError(); //**TODO: What to do on error? } } @@ -152,7 +157,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // (1) Waiting for the next refill time. // (2) Refilling the bytes and granting requests. do { - int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic(); + int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked(); if (time_until_refill_us > 0) { if (wait_until_refill_pending_) { // Somebody is performing (1). Trust we'll be woken up when our request @@ -173,7 +178,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { // Whichever thread reaches here first performs duty (2) as described // above. - RefillBytesAndGrantRequests(); + RefillBytesAndGrantRequestsLocked(); if (r.granted) { // If there is any remaining requests, make sure there exists at least // one candidate is awake for future duties by signaling a front request @@ -215,7 +220,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } std::vector -GenericRateLimiter::GeneratePriorityIterationOrder() { +GenericRateLimiter::GeneratePriorityIterationOrderLocked() { std::vector pri_iteration_order(Env::IO_TOTAL /* 4 */); // We make Env::IO_USER a superior priority by always iterating its queue // first @@ -223,12 +228,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForHighPri", &high_pri_iterated_after_mid_low_pri); bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForMidPri", &mid_pri_itereated_after_low_pri); @@ -247,15 +252,16 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { } TEST_SYNC_POINT_CALLBACK( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PreReturnPriIterationOrder", &pri_iteration_order); return pri_iteration_order; } -void GenericRateLimiter::RefillBytesAndGrantRequests() { - TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); - next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; +void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { + TEST_SYNC_POINT_CALLBACK( + "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_); + next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); @@ -264,7 +270,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { } std::vector pri_iteration_order = - GeneratePriorityIterationOrder(); + GeneratePriorityIterationOrderLocked(); for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { assert(!pri_iteration_order.empty()); @@ -293,7 +299,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { } } -int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( +int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked( int64_t rate_bytes_per_sec) { if (std::numeric_limits::max() / rate_bytes_per_sec < refill_period_us_) { @@ -305,7 +311,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( } } -Status GenericRateLimiter::Tune() { +Status GenericRateLimiter::TuneLocked() { const int kLowWatermarkPct = 50; const int kHighWatermarkPct = 90; const int kAdjustFactorPct = 5; @@ -314,7 +320,7 @@ Status GenericRateLimiter::Tune() { const int kAllowedRangeFactor = 20; std::chrono::microseconds prev_tuned_time = tuned_time_; - tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); + tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked()); int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + std::chrono::microseconds(refill_period_us_) - @@ -349,7 +355,7 @@ Status GenericRateLimiter::Tune() { new_bytes_per_sec = prev_bytes_per_sec; } if (new_bytes_per_sec != prev_bytes_per_sec) { - SetBytesPerSecond(new_bytes_per_sec); + SetBytesPerSecondLocked(new_bytes_per_sec); } num_drains_ = 0; return Status::OK(); diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 7f01864c5..4c078f5a0 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -92,30 +92,32 @@ class GenericRateLimiter : public RateLimiter { } virtual int64_t GetBytesPerSecond() const override { - return rate_bytes_per_sec_; + return rate_bytes_per_sec_.load(std::memory_order_relaxed); } virtual void TEST_SetClock(std::shared_ptr clock) { MutexLock g(&request_mutex_); clock_ = std::move(clock); - next_refill_us_ = NowMicrosMonotonic(); + next_refill_us_ = NowMicrosMonotonicLocked(); } private: - void RefillBytesAndGrantRequests(); - std::vector GeneratePriorityIterationOrder(); - int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); - Status Tune(); - - uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; } + void RefillBytesAndGrantRequestsLocked(); + std::vector GeneratePriorityIterationOrderLocked(); + int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec); + Status TuneLocked(); + void SetBytesPerSecondLocked(int64_t bytes_per_second); + + uint64_t NowMicrosMonotonicLocked() { + return clock_->NowNanos() / std::milli::den; + } // This mutex guard all internal states mutable port::Mutex request_mutex_; const int64_t refill_period_us_; - int64_t rate_bytes_per_sec_; - // This variable can be changed dynamically. + std::atomic rate_bytes_per_sec_; std::atomic refill_bytes_per_period_; std::shared_ptr clock_; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 5691ab26c..0721c1b69 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -202,7 +202,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { bool mid_pri_itereated_after_low_pri_set = false; bool pri_iteration_order_verified = false; SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForHighPri", [&](void* arg) { bool* high_pri_iterated_after_mid_low_pri = (bool*)arg; @@ -212,7 +212,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { }); SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForMidPri", [&](void* arg) { bool* mid_pri_itereated_after_low_pri = (bool*)arg; @@ -222,7 +222,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { }); SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PreReturnPriIterationOrder", [&](void* arg) { std::vector* pri_iteration_order = @@ -249,13 +249,13 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { ASSERT_EQ(pri_iteration_order_verified, true); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PreReturnPriIterationOrder"); SyncPoint::GetInstance()->ClearCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForMidPri"); SyncPoint::GetInstance()->ClearCallBack( - "GenericRateLimiter::GeneratePriorityIterationOrder::" + "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForHighPri"); } } @@ -387,11 +387,14 @@ TEST_F(RateLimiterTest, LimitChangeTest) { std::make_shared( target, refill_period, 10, RateLimiter::Mode::kWritesOnly, SystemClock::Default(), false /* auto_tuned */); + // After "GenericRateLimiter::Request:1" the mutex is held until the bytes + // are refilled. This test could be improved to change the limit when lock + // is released in `TimedWait()`. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"GenericRateLimiter::Request", "RateLimiterTest::LimitChangeTest:changeLimitStart"}, {"RateLimiterTest::LimitChangeTest:changeLimitEnd", - "GenericRateLimiter::RefillBytesAndGrantRequests"}}); + "GenericRateLimiter::Request:1"}}); Arg arg(target, Env::IO_HIGH, limiter); // The idea behind is to start a request first, then before it refills, // update limit to a different value (2X/0.5X). No starvation should