@ -118,6 +118,9 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
}
if ( stop_ ) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any new incoming request will exit from here
// and not get satiesfied.
return ;
}
@ -138,77 +141,125 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
do {
bool timedout = false ;
// Leader election, candidates can be:
// (1) a new incoming request,
// (2) a previous leader, whose quota has not been not assigned yet due
// to lower priority
// (3) a previous waiter at the front of queue, who got notified by
// previous leader
// Leader election:
// Leader request's duty:
// (1) Waiting for the next refill time;
// (2) Refilling the bytes and granting requests.
//
// If the following three conditions are all true for a request,
// then the request is selected as a leader:
// (1) The request thread acquired the request_mutex_ and is running;
// (2) There is currently no leader;
// (3) The request sits at the front of a queue.
//
// If not selected as a leader, the request thread will wait
// for one of the following signals to wake up and
// compete for the request_mutex_:
// (1) Signal from the previous leader to exit since its requested bytes
// are fully granted;
// (2) Signal from the previous leader to particpate in next-round
// leader election;
// (3) Signal from rate limiter's destructor as part of the clean-up.
//
// Therefore, a leader request can only be one of the following types:
// (1) a new incoming request placed at the front of a queue;
// (2) a previous leader request whose quota has not been not fully
// granted yet due to its lower priority, hence still at
// the front of a queue;
// (3) a waiting request at the front of a queue, which got
// signaled by the previous leader to participate in leader election.
if ( leader_ = = nullptr & &
( ( ! queue_ [ Env : : IO_HIGH ] . empty ( ) & &
& r = = queue_ [ Env : : IO_HIGH ] . front ( ) ) | |
( ! queue_ [ Env : : IO_LOW ] . empty ( ) & &
& r = = queue_ [ Env : : IO_LOW ] . front ( ) ) ) ) {
leader_ = & r ;
int64_t delta = next_refill_us_ - NowMicrosMonotonic ( ) ;
delta = delta > 0 ? delta : 0 ;
if ( delta = = 0 ) {
timedout = true ;
} else {
// The leader request thread waits till next_refill_us_
int64_t wait_until = clock_ - > NowMicros ( ) + delta ;
RecordTick ( stats , NUMBER_RATE_LIMITER_DRAINS ) ;
+ + num_drains_ ;
timedout = r . cv . TimedWait ( wait_until ) ;
}
} else {
// Not at the front of queue or an leader has already been elected
r . cv . Wait ( ) ;
}
// request_mutex_ is held from now on
if ( stop_ ) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any woken-up request will exit here,
// might or might not has been satiesfied.
- - requests_to_wait_ ;
exit_cv_ . Signal ( ) ;
return ;
}
// Make sure the waken up request is always the header of its queue
// Assertion: request thread running through this point is one of the
// following in terms of the request type and quota granting situation:
// (1) a leader request that is not fully granted with quota and about
// to carry out its leader's work;
// (2) a non-leader request that got fully granted with quota and is
// running to exit;
// (3) a non-leader request that is not fully granted with quota and
// is running to particpate in next-round leader election.
assert ( ( & r = = leader_ & & ! r . granted ) | | ( & r ! = leader_ & & r . granted ) | |
( & r ! = leader_ & & ! r . granted ) ) ;
// Assertion: request thread running through this point is one of the
// following in terms of its position in queue:
// (1) a request got popped off the queue because it is fully granted
// with bytes;
// (2) a request sits at the front of its queue.
assert ( r . granted | |
( ! queue_ [ Env : : IO_HIGH ] . empty ( ) & &
& r = = queue_ [ Env : : IO_HIGH ] . front ( ) ) | |
( ! queue_ [ Env : : IO_LOW ] . empty ( ) & &
& r = = queue_ [ Env : : IO_LOW ] . front ( ) ) ) ;
assert ( leader_ = = nullptr | |
( ! queue_ [ Env : : IO_HIGH ] . empty ( ) & &
leader_ = = queue_ [ Env : : IO_HIGH ] . front ( ) ) | |
( ! queue_ [ Env : : IO_LOW ] . empty ( ) & &
leader_ = = queue_ [ Env : : IO_LOW ] . front ( ) ) ) ;
if ( leader_ = = & r ) {
// Waken up from TimedWait()
// The leader request thread is now running.
// It might or might not has been TimedWait().
if ( timedout ) {
// Time to do refill!
Refill ( ) ;
// Time for the leader to do refill and grant bytes to requests
RefillBytesAndGrantRequests ( ) ;
// Re-elect a new leader regardless. This is to simplify the
// election handling.
// The leader request retires after refilling and granting bytes
// regardless. This is to simplify the election handling.
leader_ = nullptr ;
// Notify the header of queue if current leader is going away
if ( r . granted ) {
// Current leader already got granted with quota. Notify header
// of waiting queue to participate next round of election.
// The leader request (that was just retired)
// already got fully granted with quota and will soon exit
// Assertion: the fully granted leader request is popped off its queue
assert ( ( queue_ [ Env : : IO_HIGH ] . empty ( ) | |
& r ! = queue_ [ Env : : IO_HIGH ] . front ( ) ) & &
( queue_ [ Env : : IO_LOW ] . empty ( ) | |
& r ! = queue_ [ Env : : IO_LOW ] . front ( ) ) ) ;
// If there is any remaining requests, the leader request (that was
// just retired) makes sure there exists at least one leader candidate
// by signaling a front request of a queue to particpate in
// next-round leader election
if ( ! queue_ [ Env : : IO_HIGH ] . empty ( ) ) {
queue_ [ Env : : IO_HIGH ] . front ( ) - > cv . Signal ( ) ;
} else if ( ! queue_ [ Env : : IO_LOW ] . empty ( ) ) {
queue_ [ Env : : IO_LOW ] . front ( ) - > cv . Signal ( ) ;
}
// Done
// The leader request (that was just retired) exits
break ;
} else {
// The leader request (that was just retired) is not fully granted
// with quota. It will particpate in leader election and claim back
// the leader position immediately.
assert ( ! r . granted ) ;
}
} else {
// Spontaneous wake up, need to continue to wait
@ -216,20 +267,24 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
leader_ = nullptr ;
}
} else {
// Waken up by previous leader:
// (1) if requested quota is granted, it is done.
// (2) if requested quota is not granted, this means current thread
// was picked as a new leader candidate (previous leader got quota).
// It needs to participate leader election because a new request may
// come in before this thread gets waken up. So it may actually need
// to do Wait() again.
assert ( ! timedout ) ;
// The non-leader request thread is running.
// It is one of the following request types:
// (1) The request got fully granted with quota and signaled to run to
// exit by the previous leader;
// (2) The request is not fully granted with quota and signaled to run to
// particpate in next-round leader election by the previous leader.
// It might or might not become the next-round leader because a new
// request may come in and acquire the request_mutex_ before this
// request thread does after it was signaled. The new request might
// sit at front of a queue and hence become the next-round leader
// instead.
assert ( & r ! = leader_ ) ;
}
} while ( ! r . granted ) ;
}
void GenericRateLimiter : : Refill ( ) {
TEST_SYNC_POINT ( " GenericRateLimiter::Refill " ) ;
void GenericRateLimiter : : RefillBytesAndGrantRequests ( ) {
TEST_SYNC_POINT ( " GenericRateLimiter::RefillBytesAndGrantRequests " ) ;
next_refill_us_ = NowMicrosMonotonic ( ) + refill_period_us_ ;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
@ -245,7 +300,10 @@ void GenericRateLimiter::Refill() {
while ( ! queue - > empty ( ) ) {
auto * next_req = queue - > front ( ) ;
if ( available_bytes_ < next_req - > request_bytes ) {
// avoid starvation
// Grant partial request_bytes to avoid starvation of requests
// that become asking for more bytes than available_bytes_
// due to dynamically reduced rate limiter's bytes_per_second that
// leads to reduced refill_bytes_per_period hence available_bytes_
next_req - > request_bytes - = available_bytes_ ;
available_bytes_ = 0 ;
break ;
@ -257,7 +315,7 @@ void GenericRateLimiter::Refill() {
next_req - > granted = true ;
if ( next_req ! = leader_ ) {
// Quota granted, signal the thread
// Quota granted, signal the thread to exit
next_req - > cv . Signal ( ) ;
}
}