diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index bb69878a1..17be134cb 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1217,6 +1217,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_int64(rate_limiter_refill_period_us, 100 * 1000, + "Set refill period on " + "rate limiter."); + DEFINE_bool(rate_limiter_auto_tuned, false, "Enable dynamic adjustment of rate limit according to demand for " "background I/O"); @@ -4443,7 +4447,7 @@ class Benchmark { exit(1); } options.rate_limiter.reset(NewGenericRateLimiter( - FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, + FLAGS_rate_limiter_bytes_per_sec, FLAGS_rate_limiter_refill_period_us, 10 /* fairness */, FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly : RateLimiter::Mode::kWritesOnly, diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 70bdac026..a98400a9b 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -62,7 +62,7 @@ GenericRateLimiter::GenericRateLimiter( next_refill_us_(NowMicrosMonotonic()), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), - leader_(nullptr), + wait_until_refill_pending_(false), auto_tuned_(auto_tuned), num_drains_(0), prev_num_drains_(0), @@ -139,148 +139,70 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, Req r(bytes, &request_mutex_); queue_[pri].push_back(&r); + // A thread representing a queued request coordinates with other such threads. + // There are two main duties. + // + // (1) Waiting for the next refill time. + // (2) Refilling the bytes and granting requests. do { - bool timedout = false; - - // Leader election: - // Leader request's duty: - // (1) Waiting for the next refill time; - // (2) Refilling the bytes and granting requests. - // - // If the following three conditions are all true for a request, - // then the request is selected as a leader: - // (1) The request thread acquired the request_mutex_ and is running; - // (2) There is currently no leader; - // (3) The request sits at the front of a queue. - // - // If not selected as a leader, the request thread will wait - // for one of the following signals to wake up and - // compete for the request_mutex_: - // (1) Signal from the previous leader to exit since its requested bytes - // are fully granted; - // (2) Signal from the previous leader to particpate in next-round - // leader election; - // (3) Signal from rate limiter's destructor as part of the clean-up. - // - // Therefore, a leader request can only be one of the following types: - // (1) a new incoming request placed at the front of a queue; - // (2) a previous leader request whose quota has not been not fully - // granted yet due to its lower priority, hence still at - // the front of a queue; - // (3) a waiting request at the front of a queue, which got - // signaled by the previous leader to participate in leader election. - if (leader_ == nullptr && - ((!queue_[Env::IO_HIGH].empty() && - &r == queue_[Env::IO_HIGH].front()) || - (!queue_[Env::IO_LOW].empty() && - &r == queue_[Env::IO_LOW].front()))) { - leader_ = &r; - - int64_t delta = next_refill_us_ - NowMicrosMonotonic(); - delta = delta > 0 ? delta : 0; - if (delta == 0) { - timedout = true; + int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic(); + if (time_until_refill_us > 0) { + if (wait_until_refill_pending_) { + // Somebody is performing (1). Trust we'll be woken up when our request + // is granted or we are needed for future duties. + r.cv.Wait(); } else { - // The leader request thread waits till next_refill_us_ - int64_t wait_until = clock_->NowMicros() + delta; + // Whichever thread reaches here first performs duty (1) as described + // above. + int64_t wait_until = clock_->NowMicros() + time_until_refill_us; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; - timedout = r.cv.TimedWait(wait_until); + wait_until_refill_pending_ = true; + r.cv.TimedWait(wait_until); + TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait", + &time_until_refill_us); + wait_until_refill_pending_ = false; } } else { - r.cv.Wait(); - } - - if (stop_) { - // It is now in the clean-up of ~GenericRateLimiter(). - // Therefore any woken-up request will exit here, - // might or might not has been satiesfied. - --requests_to_wait_; - exit_cv_.Signal(); - return; - } - - // Assertion: request thread running through this point is one of the - // following in terms of the request type and quota granting situation: - // (1) a leader request that is not fully granted with quota and about - // to carry out its leader's work; - // (2) a non-leader request that got fully granted with quota and is - // running to exit; - // (3) a non-leader request that is not fully granted with quota and - // is running to particpate in next-round leader election. - assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) || - (&r != leader_ && !r.granted)); - - // Assertion: request thread running through this point is one of the - // following in terms of its position in queue: - // (1) a request got popped off the queue because it is fully granted - // with bytes; - // (2) a request sits at the front of its queue. - assert(r.granted || - (!queue_[Env::IO_HIGH].empty() && - &r == queue_[Env::IO_HIGH].front()) || - (!queue_[Env::IO_LOW].empty() && - &r == queue_[Env::IO_LOW].front())); - - if (leader_ == &r) { - // The leader request thread is now running. - // It might or might not has been TimedWait(). - if (timedout) { - // Time for the leader to do refill and grant bytes to requests - RefillBytesAndGrantRequests(); - - // The leader request retires after refilling and granting bytes - // regardless. This is to simplify the election handling. - leader_ = nullptr; - - if (r.granted) { - // The leader request (that was just retired) - // already got fully granted with quota and will soon exit - - // Assertion: the fully granted leader request is popped off its queue - assert((queue_[Env::IO_HIGH].empty() || - &r != queue_[Env::IO_HIGH].front()) && - (queue_[Env::IO_LOW].empty() || - &r != queue_[Env::IO_LOW].front())); - - // If there is any remaining requests, the leader request (that was - // just retired) makes sure there exists at least one leader candidate - // by signaling a front request of a queue to particpate in - // next-round leader election - if (!queue_[Env::IO_HIGH].empty()) { - queue_[Env::IO_HIGH].front()->cv.Signal(); - } else if (!queue_[Env::IO_LOW].empty()) { - queue_[Env::IO_LOW].front()->cv.Signal(); - } - - // The leader request (that was just retired) exits - break; - } else { - // The leader request (that was just retired) is not fully granted - // with quota. It will particpate in leader election and claim back - // the leader position immediately. - assert(!r.granted); + // Whichever thread reaches here first performs duty (2) as described + // above. + RefillBytesAndGrantRequests(); + if (r.granted) { + // If there is any remaining requests, make sure there exists at least + // one candidate is awake for future duties by signaling a front request + // of a queue. + if (!queue_[Env::IO_HIGH].empty()) { + queue_[Env::IO_HIGH].front()->cv.Signal(); + } else if (!queue_[Env::IO_LOW].empty()) { + queue_[Env::IO_LOW].front()->cv.Signal(); } - } else { - // Spontaneous wake up, need to continue to wait - assert(!r.granted); - leader_ = nullptr; } + } + // Invariant: non-granted request is always in one queue, and granted + // request is always in zero queues. +#ifndef NDEBUG + int num_found = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + if (std::find(queue_[i].begin(), queue_[i].end(), &r) != + queue_[i].end()) { + ++num_found; + } + } + if (r.granted) { + assert(num_found == 0); } else { - // The non-leader request thread is running. - // It is one of the following request types: - // (1) The request got fully granted with quota and signaled to run to - // exit by the previous leader; - // (2) The request is not fully granted with quota and signaled to run to - // particpate in next-round leader election by the previous leader. - // It might or might not become the next-round leader because a new - // request may come in and acquire the request_mutex_ before this - // request thread does after it was signaled. The new request might - // sit at front of a queue and hence become the next-round leader - // instead. - assert(&r != leader_); + assert(num_found == 1); } - } while (!r.granted); +#endif // NDEBUG + } while (!stop_ && !r.granted); + + if (stop_) { + // It is now in the clean-up of ~GenericRateLimiter(). + // Therefore any woken-up request will have come out of the loop and then + // exit here. It might or might not have been satisfied. + --requests_to_wait_; + exit_cv_.Signal(); + } } void GenericRateLimiter::RefillBytesAndGrantRequests() { @@ -314,10 +236,8 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { queue->pop_front(); next_req->granted = true; - if (next_req != leader_) { - // Quota granted, signal the thread to exit - next_req->cv.Signal(); - } + // Quota granted, signal the thread to exit + next_req->cv.Signal(); } } } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 58342a097..aa5e6e5e1 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -101,8 +101,8 @@ class GenericRateLimiter : public RateLimiter { Random rnd_; struct Req; - Req* leader_; std::deque queue_[Env::IO_TOTAL]; + bool wait_until_refill_pending_; bool auto_tuned_; int64_t num_drains_; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 04625964c..a4b1de2f4 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -211,15 +211,13 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), true /* auto_tuned */)); - // Use callback to advance time because we need to advance (1) after Request() - // has determined the bytes are not available; and (2) before - // RefillBytesAndGrantRequests() computes the next refill time (ensuring - // refill time in the future allows the next request to drain the rate - // limiter). + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) { - special_env.SleepForMicroseconds(static_cast( - std::chrono::microseconds(kTimePerRefill).count())); + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast(arg); + special_env.SleepForMicroseconds(static_cast(time_waited_us)); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();