diff --git a/db/db_impl.cc b/db/db_impl.cc index 138cf024b..852e16deb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -5134,17 +5134,30 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, bool delayed = false; { StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); - auto delay = write_controller_.GetDelay(env_, num_bytes); + uint64_t delay = write_controller_.GetDelay(env_, num_bytes); if (delay > 0) { if (write_options.no_slowdown) { return Status::Incomplete(); } - mutex_.Unlock(); - delayed = true; TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); - // hopefully we don't have to sleep more than 2 billion microseconds - env_->SleepForMicroseconds(static_cast(delay)); + + mutex_.Unlock(); + // We will delay the write until we have slept for delay ms or + // we don't need a delay anymore + const uint64_t kDelayInterval = 1000; + uint64_t stall_end = sw.start_time() + delay; + while (write_controller_.NeedsDelay()) { + if (env_->NowMicros() >= stall_end) { + // We already delayed this write `delay` microseconds + break; + } + + delayed = true; + // Sleep for 0.001 seconds + env_->SleepForMicroseconds(kDelayInterval); + } mutex_.Lock(); + } while (bg_error_.ok() && write_controller_.IsStopped()) { diff --git a/db/write_controller.cc b/db/write_controller.cc index d6c379fd6..2b5d8bf8c 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -44,7 +44,7 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { if (total_stopped_ > 0) { return 0; } - if (total_delayed_ == 0) { + if (total_delayed_.load() == 0) { return 0; } @@ -115,7 +115,7 @@ StopWriteToken::~StopWriteToken() { DelayWriteToken::~DelayWriteToken() { controller_->total_delayed_--; - assert(controller_->total_delayed_ >= 0); + assert(controller_->total_delayed_.load() >= 0); } CompactionPressureToken::~CompactionPressureToken() { diff --git a/db/write_controller.h b/db/write_controller.h index b84092ca6..b3f6ba8c7 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -7,6 +7,7 @@ #include +#include #include namespace rocksdb { @@ -45,7 +46,7 @@ class WriteController { // these three metods are querying the state of the WriteController bool IsStopped() const; - bool NeedsDelay() const { return total_delayed_ > 0; } + bool NeedsDelay() const { return total_delayed_.load() > 0; } bool NeedSpeedupCompaction() const { return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; } @@ -87,7 +88,7 @@ class WriteController { friend class CompactionPressureToken; int total_stopped_; - int total_delayed_; + std::atomic total_delayed_; int total_compaction_pressure_; uint64_t bytes_left_; uint64_t last_refill_time_; diff --git a/util/stop_watch.h b/util/stop_watch.h index bf2f6ebd0..de1772809 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -40,6 +40,8 @@ class StopWatch { } } + uint64_t start_time() const { return start_time_; } + private: Env* const env_; Statistics* statistics_;