//  Copyright (c) 2014, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "util/rate_limiter.h"
#include "rocksdb/env.h"

namespace rocksdb {

// Pending request
struct RateLimiter::Req {
  explicit Req(int64_t bytes, port::Mutex* mu) :
    bytes(bytes), cv(mu), granted(false) {}
  int64_t bytes;
  port::CondVar cv;
  bool granted;

RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us,
    int32_t fairness)
  : refill_period_us_(refill_period_us),
    refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0),
    total_requests_{0, 0},
    total_bytes_through_{0, 0},
    fairness_(fairness > 100 ? 100 : fairness),
    leader_(nullptr) {
  total_bytes_through_[0] = 0;
  total_bytes_through_[1] = 0;

RateLimiter::~RateLimiter() {
  MutexLock g(&request_mutex_);
  stop_ = true;
  requests_to_wait_ = queue_[Env::IO_LOW].size() + queue_[Env::IO_HIGH].size();
  for (auto& r : queue_[Env::IO_HIGH]) {
  for (auto& r : queue_[Env::IO_LOW]) {
  while (requests_to_wait_ > 0) {

void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
  assert(bytes < refill_bytes_per_period_);

  MutexLock g(&request_mutex_);
  if (stop_) {


  if (available_bytes_ >= bytes) {
    // Refill thread assigns quota and notifies requests waiting on
    // the queue under mutex. So if we get here, that means nobody
    // is waiting?
    available_bytes_ -= bytes;
    total_bytes_through_[pri] += bytes;

  // Request cannot be satisfied at this moment, enqueue
  Req r(bytes, &request_mutex_);

  do {
    bool timedout = false;
    // Leader election, candidates can be:
    // (1) a new incoming request,
    // (2) a previous leader, whose quota has not been not assigned yet due
    //     to lower priority
    // (3) a previous waiter at the front of queue, who got notified by
    //     previous leader
    if (leader_ == nullptr &&
        ((!queue_[Env::IO_HIGH].empty() &&
            &r == queue_[Env::IO_HIGH].front()) ||
         (!queue_[Env::IO_LOW].empty() &&
            &r == queue_[Env::IO_LOW].front()))) {
      leader_ = &r;
      timedout = r.cv.TimedWait(next_refill_us_);
    } else {
      // Not at the front of queue or an leader has already been elected

    // request_mutex_ is held from now on
    if (stop_) {

    // Make sure the waken up request is always the header of its queue
    assert(r.granted ||
           (!queue_[Env::IO_HIGH].empty() &&
            &r == queue_[Env::IO_HIGH].front()) ||
           (!queue_[Env::IO_LOW].empty() &&
            &r == queue_[Env::IO_LOW].front()));
    assert(leader_ == nullptr ||
           (!queue_[Env::IO_HIGH].empty() &&
            leader_ == queue_[Env::IO_HIGH].front()) ||
           (!queue_[Env::IO_LOW].empty() &&
            leader_ == queue_[Env::IO_LOW].front()));

    if (leader_ == &r) {
      // Waken up from TimedWait()
      if (timedout) {
        // Time to do refill!

        // Re-elect a new leader regardless. This is to simplify the
        // election handling.
        leader_ = nullptr;

        // Notify the header of queue if current leader is going away
        if (r.granted) {
          // Current leader already got granted with quota. Notify header
          // of waiting queue to participate next round of election.
          assert((queue_[Env::IO_HIGH].empty() ||
                    &r != queue_[Env::IO_HIGH].front()) &&
                 (queue_[Env::IO_LOW].empty() ||
                    &r != queue_[Env::IO_LOW].front()));
          if (!queue_[Env::IO_HIGH].empty()) {
          } else if (!queue_[Env::IO_LOW].empty()) {
          // Done
      } else {
        // Spontaneous wake up, need to continue to wait
        leader_ = nullptr;
    } else {
      // Waken up by previous leader:
      // (1) if requested quota is granted, it is done.
      // (2) if requested quota is not granted, this means current thread
      // was picked as a new leader candidate (previous leader got quota).
      // It needs to participate leader election because a new request may
      // come in before this thread gets waken up. So it may actually need
      // to do Wait() again.
  } while (!r.granted);

void RateLimiter::Refill() {
  next_refill_us_ = env_->NowMicros() + refill_period_us_;
  // Carry over the left over quota from the last period
  if (available_bytes_ < refill_bytes_per_period_) {
    available_bytes_ += refill_bytes_per_period_;

  int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
  for (int q = 0; q < 2; ++q) {
    auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
    auto* queue = &queue_[use_pri];
    while (!queue->empty()) {
      auto* next_req = queue->front();
      if (available_bytes_ < next_req->bytes) {
      available_bytes_ -= next_req->bytes;
      total_bytes_through_[use_pri] += next_req->bytes;

      next_req->granted = true;
      if (next_req != leader_) {
        // Quota granted, signal the thread

RateLimiter* NewRateLimiter(
    int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
  return new RateLimiter(rate_bytes_per_sec, refill_period_us, fairness);

}  // namespace rocksdb