diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 6bbcabfae..1534a7b90 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -38,11 +38,10 @@ size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, // Pending request struct GenericRateLimiter::Req { explicit Req(int64_t _bytes, port::Mutex* _mu) - : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} + : request_bytes(_bytes), bytes(_bytes), cv(_mu) {} int64_t request_bytes; int64_t bytes; port::CondVar cv; - bool granted; }; GenericRateLimiter::GenericRateLimiter( @@ -137,12 +136,14 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, ++total_requests_[pri]; - if (available_bytes_ >= bytes) { - // Refill thread assigns quota and notifies requests waiting on - // the queue under mutex. So if we get here, that means nobody - // is waiting? - available_bytes_ -= bytes; - total_bytes_through_[pri] += bytes; + if (available_bytes_ > 0) { + int64_t bytes_through = std::min(available_bytes_, bytes); + total_bytes_through_[pri] += bytes_through; + available_bytes_ -= bytes_through; + bytes -= bytes_through; + } + + if (bytes == 0) { return; } @@ -179,7 +180,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // Whichever thread reaches here first performs duty (2) as described // above. RefillBytesAndGrantRequestsLocked(); - if (r.granted) { + if (r.request_bytes == 0) { // If there is any remaining requests, make sure there exists at least // one candidate is awake for future duties by signaling a front request // of a queue. @@ -202,13 +203,13 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, ++num_found; } } - if (r.granted) { + if (r.request_bytes == 0) { assert(num_found == 0); } else { assert(num_found == 1); } #endif // NDEBUG - } while (!stop_ && !r.granted); + } while (!stop_ && r.request_bytes > 0); if (stop_) { // It is now in the clean-up of ~GenericRateLimiter(). @@ -265,9 +266,8 @@ void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); - if (available_bytes_ < refill_bytes_per_period) { - available_bytes_ += refill_bytes_per_period; - } + assert(available_bytes_ == 0); + available_bytes_ = refill_bytes_per_period; std::vector pri_iteration_order = GeneratePriorityIterationOrderLocked(); @@ -292,7 +292,6 @@ void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { total_bytes_through_[current_pri] += next_req->bytes; queue->pop_front(); - next_req->granted = true; // Quota granted, signal the thread to exit next_req->cv.Signal(); } diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index cda134867..93361acba 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -416,6 +416,51 @@ TEST_F(RateLimiterTest, LimitChangeTest) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(RateLimiterTest, AvailableByteSizeExhaustTest) { + SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + const std::chrono::seconds kTimePerRefill(1); + + // This test makes sure available_bytes_ get exhausted first before queuing + // any remaining bytes when requested_bytes > available_bytes + const int64_t available_bytes_per_period = 500; + + std::shared_ptr limiter = std::make_shared( + available_bytes_per_period, + std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), + false /* auto_tuned */); + + // Step 1. Request 100 and wait for the refill + // so that the remaining available bytes are 400 + limiter->Request(100, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + special_env.SleepForMicroseconds( + static_cast(std::chrono::microseconds(kTimePerRefill).count())); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { + port::Mutex* request_mutex = (port::Mutex*)arg; + request_mutex->Unlock(); + // Step 3. Check GetTotalBytesThrough = available_bytes_per_period + // to make sure that the first request (100) and the part of the second + // request (400) made through when the remaining of the second request + // got queued + ASSERT_EQ(available_bytes_per_period, + limiter->GetTotalBytesThrough(Env::IO_USER)); + request_mutex->Lock(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Step 2. Request 500, which is greater than the remaining available bytes + // (400) + limiter->Request(500, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest"); +} + TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { const std::chrono::seconds kTimePerRefill(1); const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc