From 7531cbda9177e8d218a287718c4fc2355c6e1597 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Wed, 10 May 2023 10:18:36 -0700 Subject: [PATCH] Clean up rate limiter refill logic (#11425) Summary: Context: This pull request update is in response to a comment made on https://github.com/facebook/rocksdb/pull/8596#discussion_r680264932. The current implementation of RefillBytesAndGrantRequestsLocked() drains all available_bytes, but the first request after the last wave of requesting/bytes granting is done is not being handled in the same way. This creates a scenario where if a request for a large amount of bytes is enqueued first, but there are not enough available_bytes to fulfill it, the request is put to sleep until the next refill time. Meanwhile, a later request for a smaller number of bytes comes in and is granted immediately. This behavior is not fair as the earlier request was made first. To address this issue, we have made changes to the code to exhaust the remaining available bytes from the request and queue the remaining. With this change, requests are granted in the order they are received, ensuring that earlier requests are not unfairly delayed by later, smaller requests. The specific scenario described above will no longer occur with this change. Also consolidated `granted` and `request_bytes` as part of the change since `granted` is equivalent to `request_bytes == 0` Pull Request resolved: https://github.com/facebook/rocksdb/pull/11425 Test Plan: Added `AvailableByteSizeExhaustTest` Reviewed By: hx235 Differential Revision: D45570711 Pulled By: jaykorean fbshipit-source-id: a7117ed17bf4b8a7ae0f76124cb41902db1a2592 --- util/rate_limiter.cc | 29 ++++++++++++------------- util/rate_limiter_test.cc | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 6bbcabfae..1534a7b90 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -38,11 +38,10 @@ size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, // Pending request struct GenericRateLimiter::Req { explicit Req(int64_t _bytes, port::Mutex* _mu) - : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} + : request_bytes(_bytes), bytes(_bytes), cv(_mu) {} int64_t request_bytes; int64_t bytes; port::CondVar cv; - bool granted; }; GenericRateLimiter::GenericRateLimiter( @@ -137,12 +136,14 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, ++total_requests_[pri]; - if (available_bytes_ >= bytes) { - // Refill thread assigns quota and notifies requests waiting on - // the queue under mutex. So if we get here, that means nobody - // is waiting? - available_bytes_ -= bytes; - total_bytes_through_[pri] += bytes; + if (available_bytes_ > 0) { + int64_t bytes_through = std::min(available_bytes_, bytes); + total_bytes_through_[pri] += bytes_through; + available_bytes_ -= bytes_through; + bytes -= bytes_through; + } + + if (bytes == 0) { return; } @@ -179,7 +180,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // Whichever thread reaches here first performs duty (2) as described // above. RefillBytesAndGrantRequestsLocked(); - if (r.granted) { + if (r.request_bytes == 0) { // If there is any remaining requests, make sure there exists at least // one candidate is awake for future duties by signaling a front request // of a queue. @@ -202,13 +203,13 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, ++num_found; } } - if (r.granted) { + if (r.request_bytes == 0) { assert(num_found == 0); } else { assert(num_found == 1); } #endif // NDEBUG - } while (!stop_ && !r.granted); + } while (!stop_ && r.request_bytes > 0); if (stop_) { // It is now in the clean-up of ~GenericRateLimiter(). @@ -265,9 +266,8 @@ void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); - if (available_bytes_ < refill_bytes_per_period) { - available_bytes_ += refill_bytes_per_period; - } + assert(available_bytes_ == 0); + available_bytes_ = refill_bytes_per_period; std::vector pri_iteration_order = GeneratePriorityIterationOrderLocked(); @@ -292,7 +292,6 @@ void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { total_bytes_through_[current_pri] += next_req->bytes; queue->pop_front(); - next_req->granted = true; // Quota granted, signal the thread to exit next_req->cv.Signal(); } diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index cda134867..93361acba 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -416,6 +416,51 @@ TEST_F(RateLimiterTest, LimitChangeTest) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(RateLimiterTest, AvailableByteSizeExhaustTest) { + SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + const std::chrono::seconds kTimePerRefill(1); + + // This test makes sure available_bytes_ get exhausted first before queuing + // any remaining bytes when requested_bytes > available_bytes + const int64_t available_bytes_per_period = 500; + + std::shared_ptr limiter = std::make_shared( + available_bytes_per_period, + std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), + false /* auto_tuned */); + + // Step 1. Request 100 and wait for the refill + // so that the remaining available bytes are 400 + limiter->Request(100, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + special_env.SleepForMicroseconds( + static_cast(std::chrono::microseconds(kTimePerRefill).count())); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { + port::Mutex* request_mutex = (port::Mutex*)arg; + request_mutex->Unlock(); + // Step 3. Check GetTotalBytesThrough = available_bytes_per_period + // to make sure that the first request (100) and the part of the second + // request (400) made through when the remaining of the second request + // got queued + ASSERT_EQ(available_bytes_per_period, + limiter->GetTotalBytesThrough(Env::IO_USER)); + request_mutex->Lock(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Step 2. Request 500, which is greater than the remaining available bytes + // (400) + limiter->Request(500, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest"); +} + TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { const std::chrono::seconds kTimePerRefill(1); const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc