Use monotonic time points in write_controller.cc and rate_limiter.cc

Summary:
NowMicros() provides non-monotonic time. When wall clock is
synchronized or changed, the non-monotonicity time points will affect write rate
controllers. This patch changes write_controller.cc and rate_limiter.cc to use
monotonic time points.
Closes https://github.com/facebook/rocksdb/pull/1865

Differential Revision: D4561732

Pulled By: siying

fbshipit-source-id: 95ece62
main
Xiaofei Du 8 years ago committed by Facebook Github Bot
parent c2247dc1c7
commit 7106a994fe
  1. 7
      db/write_controller.cc
  2. 3
      db/write_controller.h
  3. 4
      db/write_controller_test.cc
  4. 10
      include/rocksdb/env.h
  5. 13
      util/rate_limiter.cc
  6. 3
      util/rate_limiter.h

@ -7,6 +7,7 @@
#include <atomic> #include <atomic>
#include <cassert> #include <cassert>
#include <ratio>
#include "rocksdb/env.h" #include "rocksdb/env.h"
namespace rocksdb { namespace rocksdb {
@ -56,7 +57,7 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
} }
// The frequency to get time inside DB mutex is less than one per refill // The frequency to get time inside DB mutex is less than one per refill
// interval. // interval.
auto time_now = env->NowMicros(); auto time_now = NowMicrosMonotonic(env);
uint64_t sleep_debt = 0; uint64_t sleep_debt = 0;
uint64_t time_since_last_refill = 0; uint64_t time_since_last_refill = 0;
@ -103,6 +104,10 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
return sleep_amount; return sleep_amount;
} }
uint64_t WriteController::NowMicrosMonotonic(Env* env) {
return env->NowNanos() / std::milli::den;
}
StopWriteToken::~StopWriteToken() { StopWriteToken::~StopWriteToken() {
assert(controller_->total_stopped_ >= 1); assert(controller_->total_stopped_ >= 1);
--controller_->total_stopped_; --controller_->total_stopped_;

@ -77,6 +77,9 @@ class WriteController {
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; } uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
private:
uint64_t NowMicrosMonotonic(Env* env);
private: private:
friend class WriteControllerToken; friend class WriteControllerToken;
friend class StopWriteToken; friend class StopWriteToken;

@ -3,6 +3,8 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
// //
#include <ratio>
#include "db/write_controller.h" #include "db/write_controller.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -16,7 +18,7 @@ class TimeSetEnv : public EnvWrapper {
public: public:
explicit TimeSetEnv() : EnvWrapper(nullptr) {} explicit TimeSetEnv() : EnvWrapper(nullptr) {}
uint64_t now_micros_ = 6666; uint64_t now_micros_ = 6666;
virtual uint64_t NowMicros() override { return now_micros_; } virtual uint64_t NowNanos() override { return now_micros_ * std::milli::den; }
}; };
TEST_F(WriteControllerTest, ChangeDelayRateTest) { TEST_F(WriteControllerTest, ChangeDelayRateTest) {

@ -318,15 +318,16 @@ class Env {
virtual Status NewLogger(const std::string& fname, virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) = 0; shared_ptr<Logger>* result) = 0;
// Returns the number of micro-seconds since some fixed point in time. Only // Returns the number of micro-seconds since some fixed point in time.
// useful for computing deltas of time. // It is often used as system time such as in GenericRateLimiter
// However, it is often used as system time such as in GenericRateLimiter
// and other places so a port needs to return system time in order to work. // and other places so a port needs to return system time in order to work.
virtual uint64_t NowMicros() = 0; virtual uint64_t NowMicros() = 0;
// Returns the number of nano-seconds since some fixed point in time. Only // Returns the number of nano-seconds since some fixed point in time. Only
// useful for computing deltas of time in one run. // useful for computing deltas of time in one run.
// Default implementation simply relies on NowMicros // Default implementation simply relies on NowMicros.
// In platform-specific implementations, NowNanos() should return time points
// that are MONOTONIC.
virtual uint64_t NowNanos() { virtual uint64_t NowNanos() {
return NowMicros() * 1000; return NowMicros() * 1000;
} }
@ -982,6 +983,7 @@ class EnvWrapper : public Env {
return target_->NewLogger(fname, result); return target_->NewLogger(fname, result);
} }
uint64_t NowMicros() override { return target_->NowMicros(); } uint64_t NowMicros() override { return target_->NowMicros(); }
void SleepForMicroseconds(int micros) override { void SleepForMicroseconds(int micros) override {
target_->SleepForMicroseconds(micros); target_->SleepForMicroseconds(micros);
} }

@ -36,7 +36,7 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
exit_cv_(&request_mutex_), exit_cv_(&request_mutex_),
requests_to_wait_(0), requests_to_wait_(0),
available_bytes_(0), available_bytes_(0),
next_refill_us_(env_->NowMicros()), next_refill_us_(NowMicrosMonotonic(env_)),
fairness_(fairness > 100 ? 100 : fairness), fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)), rnd_((uint32_t)time(nullptr)),
leader_(nullptr) { leader_(nullptr) {
@ -107,7 +107,14 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
(!queue_[Env::IO_LOW].empty() && (!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()))) { &r == queue_[Env::IO_LOW].front()))) {
leader_ = &r; leader_ = &r;
timedout = r.cv.TimedWait(next_refill_us_); int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);
delta = delta > 0 ? delta : 0;
if (delta == 0) {
timedout = true;
} else {
int64_t wait_until = env_->NowMicros() + delta;
timedout = r.cv.TimedWait(wait_until);
}
} else { } else {
// Not at the front of queue or an leader has already been elected // Not at the front of queue or an leader has already been elected
r.cv.Wait(); r.cv.Wait();
@ -178,7 +185,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
void GenericRateLimiter::Refill() { void GenericRateLimiter::Refill() {
TEST_SYNC_POINT("GenericRateLimiter::Refill"); TEST_SYNC_POINT("GenericRateLimiter::Refill");
next_refill_us_ = env_->NowMicros() + refill_period_us_; next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;
// Carry over the left over quota from the last period // Carry over the left over quota from the last period
auto refill_bytes_per_period = auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed); refill_bytes_per_period_.load(std::memory_order_relaxed);

@ -61,6 +61,9 @@ class GenericRateLimiter : public RateLimiter {
private: private:
void Refill(); void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
uint64_t NowMicrosMonotonic(Env* env) {
return env->NowNanos() / std::milli::den;
}
// This mutex guard all internal states // This mutex guard all internal states
mutable port::Mutex request_mutex_; mutable port::Mutex request_mutex_;

Loading…
Cancel
Save