Simplify GenericRateLimiter algorithm (#8602)

Summary:
`GenericRateLimiter` slow path handles requests that cannot be satisfied
immediately.  Such requests enter a queue, and their thread stays in `Request()`
until they are granted or the rate limiter is stopped.  These threads are
responsible for unblocking themselves.  The work to do so is split into two main
duties.

(1) Waiting for the next refill time.
(2) Refilling the bytes and granting requests.

Prior to this PR, the slow path logic involved a leader election algorithm to
pick one thread to perform (1) followed by (2).  It elected the thread whose
request was at the front of the highest priority non-empty queue since that
request was most likely to be granted.  This algorithm was efficient in terms of
reducing intermediate wakeups, which is a thread waking up only to resume
waiting after finding its request is not granted.  However, the conceptual
complexity of this algorithm was too high.  It took me a long time to draw a
timeline to understand how it works for just one edge case yet there were so
many.

This PR drops the leader election to reduce conceptual complexity.  Now, the two
duties can be performed by whichever thread acquires the lock first.  The risk
of this change is increasing the number of intermediate wakeups, however, we
took steps to mitigate that.

- `wait_until_refill_pending_` flag ensures only one thread performs (1). This\
prevents the thundering herd problem at the next refill time. The remaining\
threads wait on their condition variable with an unbounded duration -- thus we\
must remember to notify them to ensure forward progress.
- (1) is typically done by a thread at the front of a queue. This is trivial\
when the queues are initially empty as the first choice that arrives must be\
the only entry in its queue. When queues are initially non-empty, we achieve\
this by having (2) notify a thread at the front of a queue (preferring higher\
priority) to perform the next duty.
- We do not require any additional wakeup for (2). Typically it will just be\
done by the thread that finished (1).

Combined, the second and third bullet points above suggest the refill/granting
will typically be done by a request at the front of its queue.  This is
important because one wakeup is saved when a granted request happens to be in an
already running thread.

Note there are a few cases that still lead to intermediate wakeup, however.  The
first two are existing issues that also apply to the old algorithm, however, the
third (including both subpoints) is new.

- No request may be granted (only possible when rate limit dynamically\
decreases).
- Requests from a different queue may be granted.
- (2) may be run by a non-front request thread causing it to not be granted even\
if some requests in that same queue are granted. It can happen for a couple\
(unlikely) reasons.
  - A new request may sneak in and grab the lock at the refill time, before the\
thread finishing (1) can wake up and grab it.
  - A new request may sneak in and grab the lock and execute (1) before (2)'s\
chosen candidate can wake up and grab the lock. Then that non-front request\
thread performing (1) can carry over to perform (2).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8602

Test Plan:
- Use existing tests. The edge cases listed in the comment are all performance\
related; I could not really think of any related to correctness. The logic\
looks the same whether a thread wakes up/finishes its work early/on-time/late,\
or whether the thread is chosen vs. "steals" the work.
- Verified write throughput and CPU overhead are basically the same with and\
  without this change, even in a rate limiter heavy workload:

Test command:
```
$ rm -rf /dev/shm/dbbench/ && TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench -benchmarks=fillrandom -num_multi_db=64 -num_low_pri_threads=64 -num_high_pri_threads=64 -write_buffer_size=262144 -target_file_size_base=262144 -max_bytes_for_level_base=1048576 -rate_limiter_bytes_per_sec=16777216 -key_size=24 -value_size=1000 -num=10000 -compression_type=none -rate_limiter_refill_period_us=1000
```

Results before this PR:

```
fillrandom   :     108.463 micros/op 9219 ops/sec;    9.0 MB/s
7.40user 8.84system 1:26.20elapsed 18%CPU (0avgtext+0avgdata 256140maxresident)k
```

Results after this PR:

```
fillrandom   :     108.108 micros/op 9250 ops/sec;    9.0 MB/s
7.45user 8.23system 1:26.68elapsed 18%CPU (0avgtext+0avgdata 255688maxresident)k
```

Reviewed By: hx235

Differential Revision: D30048013

Pulled By: ajkr

fbshipit-source-id: 6741bba9d9dfbccab359806d725105817fef818b
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent a756fb9c85
commit 82b81dc8b5
  1. 6
      tools/db_bench_tool.cc
  2. 194
      util/rate_limiter.cc
  3. 2
      util/rate_limiter.h
  4. 14
      util/rate_limiter_test.cc

@ -1217,6 +1217,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_int64(rate_limiter_refill_period_us, 100 * 1000,
"Set refill period on "
"rate limiter.");
DEFINE_bool(rate_limiter_auto_tuned, false,
"Enable dynamic adjustment of rate limit according to demand for "
"background I/O");
@ -4443,7 +4447,7 @@ class Benchmark {
exit(1);
}
options.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
FLAGS_rate_limiter_bytes_per_sec, FLAGS_rate_limiter_refill_period_us,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly,

@ -62,7 +62,7 @@ GenericRateLimiter::GenericRateLimiter(
next_refill_us_(NowMicrosMonotonic()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr),
wait_until_refill_pending_(false),
auto_tuned_(auto_tuned),
num_drains_(0),
prev_num_drains_(0),
@ -139,148 +139,70 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Req r(bytes, &request_mutex_);
queue_[pri].push_back(&r);
// A thread representing a queued request coordinates with other such threads.
// There are two main duties.
//
// (1) Waiting for the next refill time.
// (2) Refilling the bytes and granting requests.
do {
bool timedout = false;
// Leader election:
// Leader request's duty:
// (1) Waiting for the next refill time;
// (2) Refilling the bytes and granting requests.
//
// If the following three conditions are all true for a request,
// then the request is selected as a leader:
// (1) The request thread acquired the request_mutex_ and is running;
// (2) There is currently no leader;
// (3) The request sits at the front of a queue.
//
// If not selected as a leader, the request thread will wait
// for one of the following signals to wake up and
// compete for the request_mutex_:
// (1) Signal from the previous leader to exit since its requested bytes
// are fully granted;
// (2) Signal from the previous leader to particpate in next-round
// leader election;
// (3) Signal from rate limiter's destructor as part of the clean-up.
//
// Therefore, a leader request can only be one of the following types:
// (1) a new incoming request placed at the front of a queue;
// (2) a previous leader request whose quota has not been not fully
// granted yet due to its lower priority, hence still at
// the front of a queue;
// (3) a waiting request at the front of a queue, which got
// signaled by the previous leader to participate in leader election.
if (leader_ == nullptr &&
((!queue_[Env::IO_HIGH].empty() &&
&r == queue_[Env::IO_HIGH].front()) ||
(!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()))) {
leader_ = &r;
int64_t delta = next_refill_us_ - NowMicrosMonotonic();
delta = delta > 0 ? delta : 0;
if (delta == 0) {
timedout = true;
int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic();
if (time_until_refill_us > 0) {
if (wait_until_refill_pending_) {
// Somebody is performing (1). Trust we'll be woken up when our request
// is granted or we are needed for future duties.
r.cv.Wait();
} else {
// The leader request thread waits till next_refill_us_
int64_t wait_until = clock_->NowMicros() + delta;
// Whichever thread reaches here first performs duty (1) as described
// above.
int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_;
timedout = r.cv.TimedWait(wait_until);
wait_until_refill_pending_ = true;
r.cv.TimedWait(wait_until);
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
&time_until_refill_us);
wait_until_refill_pending_ = false;
}
} else {
r.cv.Wait();
}
if (stop_) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any woken-up request will exit here,
// might or might not has been satiesfied.
--requests_to_wait_;
exit_cv_.Signal();
return;
}
// Assertion: request thread running through this point is one of the
// following in terms of the request type and quota granting situation:
// (1) a leader request that is not fully granted with quota and about
// to carry out its leader's work;
// (2) a non-leader request that got fully granted with quota and is
// running to exit;
// (3) a non-leader request that is not fully granted with quota and
// is running to particpate in next-round leader election.
assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) ||
(&r != leader_ && !r.granted));
// Assertion: request thread running through this point is one of the
// following in terms of its position in queue:
// (1) a request got popped off the queue because it is fully granted
// with bytes;
// (2) a request sits at the front of its queue.
assert(r.granted ||
(!queue_[Env::IO_HIGH].empty() &&
&r == queue_[Env::IO_HIGH].front()) ||
(!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()));
if (leader_ == &r) {
// The leader request thread is now running.
// It might or might not has been TimedWait().
if (timedout) {
// Time for the leader to do refill and grant bytes to requests
RefillBytesAndGrantRequests();
// The leader request retires after refilling and granting bytes
// regardless. This is to simplify the election handling.
leader_ = nullptr;
if (r.granted) {
// The leader request (that was just retired)
// already got fully granted with quota and will soon exit
// Assertion: the fully granted leader request is popped off its queue
assert((queue_[Env::IO_HIGH].empty() ||
&r != queue_[Env::IO_HIGH].front()) &&
(queue_[Env::IO_LOW].empty() ||
&r != queue_[Env::IO_LOW].front()));
// If there is any remaining requests, the leader request (that was
// just retired) makes sure there exists at least one leader candidate
// by signaling a front request of a queue to particpate in
// next-round leader election
if (!queue_[Env::IO_HIGH].empty()) {
queue_[Env::IO_HIGH].front()->cv.Signal();
} else if (!queue_[Env::IO_LOW].empty()) {
queue_[Env::IO_LOW].front()->cv.Signal();
}
// The leader request (that was just retired) exits
break;
} else {
// The leader request (that was just retired) is not fully granted
// with quota. It will particpate in leader election and claim back
// the leader position immediately.
assert(!r.granted);
// Whichever thread reaches here first performs duty (2) as described
// above.
RefillBytesAndGrantRequests();
if (r.granted) {
// If there is any remaining requests, make sure there exists at least
// one candidate is awake for future duties by signaling a front request
// of a queue.
if (!queue_[Env::IO_HIGH].empty()) {
queue_[Env::IO_HIGH].front()->cv.Signal();
} else if (!queue_[Env::IO_LOW].empty()) {
queue_[Env::IO_LOW].front()->cv.Signal();
}
} else {
// Spontaneous wake up, need to continue to wait
assert(!r.granted);
leader_ = nullptr;
}
}
// Invariant: non-granted request is always in one queue, and granted
// request is always in zero queues.
#ifndef NDEBUG
int num_found = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
queue_[i].end()) {
++num_found;
}
}
if (r.granted) {
assert(num_found == 0);
} else {
// The non-leader request thread is running.
// It is one of the following request types:
// (1) The request got fully granted with quota and signaled to run to
// exit by the previous leader;
// (2) The request is not fully granted with quota and signaled to run to
// particpate in next-round leader election by the previous leader.
// It might or might not become the next-round leader because a new
// request may come in and acquire the request_mutex_ before this
// request thread does after it was signaled. The new request might
// sit at front of a queue and hence become the next-round leader
// instead.
assert(&r != leader_);
assert(num_found == 1);
}
} while (!r.granted);
#endif // NDEBUG
} while (!stop_ && !r.granted);
if (stop_) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any woken-up request will have come out of the loop and then
// exit here. It might or might not have been satisfied.
--requests_to_wait_;
exit_cv_.Signal();
}
}
void GenericRateLimiter::RefillBytesAndGrantRequests() {
@ -314,10 +236,8 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
queue->pop_front();
next_req->granted = true;
if (next_req != leader_) {
// Quota granted, signal the thread to exit
next_req->cv.Signal();
}
// Quota granted, signal the thread to exit
next_req->cv.Signal();
}
}
}

@ -101,8 +101,8 @@ class GenericRateLimiter : public RateLimiter {
Random rnd_;
struct Req;
Req* leader_;
std::deque<Req*> queue_[Env::IO_TOTAL];
bool wait_until_refill_pending_;
bool auto_tuned_;
int64_t num_drains_;

@ -211,15 +211,13 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(),
true /* auto_tuned */));
// Use callback to advance time because we need to advance (1) after Request()
// has determined the bytes are not available; and (2) before
// RefillBytesAndGrantRequests() computes the next refill time (ensuring
// refill time in the future allows the next request to drain the rate
// limiter).
// Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
// `Env` to advance its time according to the fake wait duration. The
// workaround is to install a callback that advance the `Env`'s mock time.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) {
special_env.SleepForMicroseconds(static_cast<int>(
std::chrono::microseconds(kTimePerRefill).count()));
"GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
int64_t time_waited_us = *static_cast<int64_t*>(arg);
special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Loading…
Cancel
Save