Fix race conditions in GenericRateLimiter (#10374)

Summary:
Made locking strict for all accesses of `GenericRateLimiter` internal state.

`SetBytesPerSecond()` was the main problem since it had no locking, while the two updates it makes need to be done as one atomic operation.

The test case, "ConfigOptionsTest.ConfiguringOptionsDoesNotRevertRateLimiterBandwidth", is for the issue fixed in https://github.com/facebook/rocksdb/issues/10378, but I forgot to include the test there.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10374

Reviewed By: pdillinger

Differential Revision: D37906367

Pulled By: ajkr

fbshipit-source-id: ccde620d2a7f96d1401bdafd2bdb685cbefbafa5
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent 0b6bc101ba
commit e576f2ab19
  1. 1
      HISTORY.md
  2. 16
      options/options_test.cc
  3. 48
      util/rate_limiter.cc
  4. 22
      util/rate_limiter.h
  5. 17
      util/rate_limiter_test.cc

@ -9,6 +9,7 @@
### Bug Fixes ### Bug Fixes
* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object. * Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object.
* Fix race conditions in `GenericRateLimiter`.
## 7.5.0 (07/15/2022) ## 7.5.0 (07/15/2022)
### New Features ### New Features

@ -4958,6 +4958,22 @@ TEST_F(ConfigOptionsTest, MergeOperatorFromString) {
ASSERT_EQ(*delimiter, "&&"); ASSERT_EQ(*delimiter, "&&");
} }
TEST_F(ConfigOptionsTest, ConfiguringOptionsDoesNotRevertRateLimiterBandwidth) {
// Regression test for bug where rate limiter's dynamically set bandwidth
// could be silently reverted when configuring an options structure with an
// existing `rate_limiter`.
Options base_options;
base_options.rate_limiter.reset(
NewGenericRateLimiter(1 << 20 /* rate_bytes_per_sec */));
Options copy_options(base_options);
base_options.rate_limiter->SetBytesPerSecond(2 << 20);
ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond());
ASSERT_OK(GetOptionsFromString(base_options, "", &copy_options));
ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond());
}
INSTANTIATE_TEST_CASE_P(OptionsSanityCheckTest, OptionsSanityCheckTest, INSTANTIATE_TEST_CASE_P(OptionsSanityCheckTest, OptionsSanityCheckTest,
::testing::Bool()); ::testing::Bool());
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -54,20 +54,20 @@ GenericRateLimiter::GenericRateLimiter(
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
: rate_bytes_per_sec), : rate_bytes_per_sec),
refill_bytes_per_period_( refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
clock_(clock), clock_(clock),
stop_(false), stop_(false),
exit_cv_(&request_mutex_), exit_cv_(&request_mutex_),
requests_to_wait_(0), requests_to_wait_(0),
available_bytes_(0), available_bytes_(0),
next_refill_us_(NowMicrosMonotonic()), next_refill_us_(NowMicrosMonotonicLocked()),
fairness_(fairness > 100 ? 100 : fairness), fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)), rnd_((uint32_t)time(nullptr)),
wait_until_refill_pending_(false), wait_until_refill_pending_(false),
auto_tuned_(auto_tuned), auto_tuned_(auto_tuned),
num_drains_(0), num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec), max_bytes_per_sec_(rate_bytes_per_sec),
tuned_time_(NowMicrosMonotonic()) { tuned_time_(NowMicrosMonotonicLocked()) {
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_requests_[i] = 0; total_requests_[i] = 0;
total_bytes_through_[i] = 0; total_bytes_through_[i] = 0;
@ -97,10 +97,15 @@ GenericRateLimiter::~GenericRateLimiter() {
// This API allows user to dynamically change rate limiter's bytes per second. // This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
MutexLock g(&request_mutex_);
SetBytesPerSecondLocked(bytes_per_second);
}
void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
assert(bytes_per_second > 0); assert(bytes_per_second > 0);
rate_bytes_per_sec_ = bytes_per_second; rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
refill_bytes_per_period_.store( refill_bytes_per_period_.store(
CalculateRefillBytesPerPeriod(bytes_per_second), CalculateRefillBytesPerPeriodLocked(bytes_per_second),
std::memory_order_relaxed); std::memory_order_relaxed);
} }
@ -115,10 +120,10 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
if (auto_tuned_) { if (auto_tuned_) {
static const int kRefillsPerTune = 100; static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic()); std::chrono::microseconds now(NowMicrosMonotonicLocked());
if (now - tuned_time_ >= if (now - tuned_time_ >=
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
Status s = Tune(); Status s = TuneLocked();
s.PermitUncheckedError(); //**TODO: What to do on error? s.PermitUncheckedError(); //**TODO: What to do on error?
} }
} }
@ -152,7 +157,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
// (1) Waiting for the next refill time. // (1) Waiting for the next refill time.
// (2) Refilling the bytes and granting requests. // (2) Refilling the bytes and granting requests.
do { do {
int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic(); int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
if (time_until_refill_us > 0) { if (time_until_refill_us > 0) {
if (wait_until_refill_pending_) { if (wait_until_refill_pending_) {
// Somebody is performing (1). Trust we'll be woken up when our request // Somebody is performing (1). Trust we'll be woken up when our request
@ -173,7 +178,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} else { } else {
// Whichever thread reaches here first performs duty (2) as described // Whichever thread reaches here first performs duty (2) as described
// above. // above.
RefillBytesAndGrantRequests(); RefillBytesAndGrantRequestsLocked();
if (r.granted) { if (r.granted) {
// If there is any remaining requests, make sure there exists at least // If there is any remaining requests, make sure there exists at least
// one candidate is awake for future duties by signaling a front request // one candidate is awake for future duties by signaling a front request
@ -215,7 +220,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} }
std::vector<Env::IOPriority> std::vector<Env::IOPriority>
GenericRateLimiter::GeneratePriorityIterationOrder() { GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */); std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
// We make Env::IO_USER a superior priority by always iterating its queue // We make Env::IO_USER a superior priority by always iterating its queue
// first // first
@ -223,12 +228,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri", "PostRandomOneInFairnessForHighPri",
&high_pri_iterated_after_mid_low_pri); &high_pri_iterated_after_mid_low_pri);
bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri", "PostRandomOneInFairnessForMidPri",
&mid_pri_itereated_after_low_pri); &mid_pri_itereated_after_low_pri);
@ -247,15 +252,16 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
} }
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder", "PreReturnPriIterationOrder",
&pri_iteration_order); &pri_iteration_order);
return pri_iteration_order; return pri_iteration_order;
} }
void GenericRateLimiter::RefillBytesAndGrantRequests() { void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); TEST_SYNC_POINT_CALLBACK(
next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
// Carry over the left over quota from the last period // Carry over the left over quota from the last period
auto refill_bytes_per_period = auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed); refill_bytes_per_period_.load(std::memory_order_relaxed);
@ -264,7 +270,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
} }
std::vector<Env::IOPriority> pri_iteration_order = std::vector<Env::IOPriority> pri_iteration_order =
GeneratePriorityIterationOrder(); GeneratePriorityIterationOrderLocked();
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
assert(!pri_iteration_order.empty()); assert(!pri_iteration_order.empty());
@ -293,7 +299,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
} }
} }
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
int64_t rate_bytes_per_sec) { int64_t rate_bytes_per_sec) {
if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec < if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
refill_period_us_) { refill_period_us_) {
@ -305,7 +311,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
} }
} }
Status GenericRateLimiter::Tune() { Status GenericRateLimiter::TuneLocked() {
const int kLowWatermarkPct = 50; const int kLowWatermarkPct = 50;
const int kHighWatermarkPct = 90; const int kHighWatermarkPct = 90;
const int kAdjustFactorPct = 5; const int kAdjustFactorPct = 5;
@ -314,7 +320,7 @@ Status GenericRateLimiter::Tune() {
const int kAllowedRangeFactor = 20; const int kAllowedRangeFactor = 20;
std::chrono::microseconds prev_tuned_time = tuned_time_; std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());
int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(refill_period_us_) - std::chrono::microseconds(refill_period_us_) -
@ -349,7 +355,7 @@ Status GenericRateLimiter::Tune() {
new_bytes_per_sec = prev_bytes_per_sec; new_bytes_per_sec = prev_bytes_per_sec;
} }
if (new_bytes_per_sec != prev_bytes_per_sec) { if (new_bytes_per_sec != prev_bytes_per_sec) {
SetBytesPerSecond(new_bytes_per_sec); SetBytesPerSecondLocked(new_bytes_per_sec);
} }
num_drains_ = 0; num_drains_ = 0;
return Status::OK(); return Status::OK();

@ -92,30 +92,32 @@ class GenericRateLimiter : public RateLimiter {
} }
virtual int64_t GetBytesPerSecond() const override { virtual int64_t GetBytesPerSecond() const override {
return rate_bytes_per_sec_; return rate_bytes_per_sec_.load(std::memory_order_relaxed);
} }
virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) { virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
MutexLock g(&request_mutex_); MutexLock g(&request_mutex_);
clock_ = std::move(clock); clock_ = std::move(clock);
next_refill_us_ = NowMicrosMonotonic(); next_refill_us_ = NowMicrosMonotonicLocked();
} }
private: private:
void RefillBytesAndGrantRequests(); void RefillBytesAndGrantRequestsLocked();
std::vector<Env::IOPriority> GeneratePriorityIterationOrder(); std::vector<Env::IOPriority> GeneratePriorityIterationOrderLocked();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec);
Status Tune(); Status TuneLocked();
void SetBytesPerSecondLocked(int64_t bytes_per_second);
uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; }
uint64_t NowMicrosMonotonicLocked() {
return clock_->NowNanos() / std::milli::den;
}
// This mutex guard all internal states // This mutex guard all internal states
mutable port::Mutex request_mutex_; mutable port::Mutex request_mutex_;
const int64_t refill_period_us_; const int64_t refill_period_us_;
int64_t rate_bytes_per_sec_; std::atomic<int64_t> rate_bytes_per_sec_;
// This variable can be changed dynamically.
std::atomic<int64_t> refill_bytes_per_period_; std::atomic<int64_t> refill_bytes_per_period_;
std::shared_ptr<SystemClock> clock_; std::shared_ptr<SystemClock> clock_;

@ -202,7 +202,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
bool mid_pri_itereated_after_low_pri_set = false; bool mid_pri_itereated_after_low_pri_set = false;
bool pri_iteration_order_verified = false; bool pri_iteration_order_verified = false;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri", "PostRandomOneInFairnessForHighPri",
[&](void* arg) { [&](void* arg) {
bool* high_pri_iterated_after_mid_low_pri = (bool*)arg; bool* high_pri_iterated_after_mid_low_pri = (bool*)arg;
@ -212,7 +212,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
}); });
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri", "PostRandomOneInFairnessForMidPri",
[&](void* arg) { [&](void* arg) {
bool* mid_pri_itereated_after_low_pri = (bool*)arg; bool* mid_pri_itereated_after_low_pri = (bool*)arg;
@ -222,7 +222,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
}); });
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder", "PreReturnPriIterationOrder",
[&](void* arg) { [&](void* arg) {
std::vector<Env::IOPriority>* pri_iteration_order = std::vector<Env::IOPriority>* pri_iteration_order =
@ -249,13 +249,13 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
ASSERT_EQ(pri_iteration_order_verified, true); ASSERT_EQ(pri_iteration_order_verified, true);
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack( SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder"); "PreReturnPriIterationOrder");
SyncPoint::GetInstance()->ClearCallBack( SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri"); "PostRandomOneInFairnessForMidPri");
SyncPoint::GetInstance()->ClearCallBack( SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::" "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri"); "PostRandomOneInFairnessForHighPri");
} }
} }
@ -387,11 +387,14 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
std::make_shared<GenericRateLimiter>( std::make_shared<GenericRateLimiter>(
target, refill_period, 10, RateLimiter::Mode::kWritesOnly, target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
SystemClock::Default(), false /* auto_tuned */); SystemClock::Default(), false /* auto_tuned */);
// After "GenericRateLimiter::Request:1" the mutex is held until the bytes
// are refilled. This test could be improved to change the limit when lock
// is released in `TimedWait()`.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"GenericRateLimiter::Request", {{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"}, "RateLimiterTest::LimitChangeTest:changeLimitStart"},
{"RateLimiterTest::LimitChangeTest:changeLimitEnd", {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
"GenericRateLimiter::RefillBytesAndGrantRequests"}}); "GenericRateLimiter::Request:1"}});
Arg arg(target, Env::IO_HIGH, limiter); Arg arg(target, Env::IO_HIGH, limiter);
// The idea behind is to start a request first, then before it refills, // The idea behind is to start a request first, then before it refills,
// update limit to a different value (2X/0.5X). No starvation should // update limit to a different value (2X/0.5X). No starvation should

Loading…
Cancel
Save