|
|
|
@ -4036,7 +4036,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
|
|
|
|
|
|
if (UNLIKELY(status.ok()) && |
|
|
|
|
(write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) { |
|
|
|
|
DelayWrite(expiration_time); |
|
|
|
|
status = DelayWrite(expiration_time); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (UNLIKELY(status.ok() && has_timeout && |
|
|
|
@ -4151,7 +4151,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
|
|
|
|
|
|
// REQUIRES: mutex_ is held
|
|
|
|
|
// REQUIRES: this thread is currently at the front of the writer queue
|
|
|
|
|
void DBImpl::DelayWrite(uint64_t expiration_time) { |
|
|
|
|
Status DBImpl::DelayWrite(uint64_t expiration_time) { |
|
|
|
|
StopWatch sw(env_, stats_, WRITE_STALL); |
|
|
|
|
bool has_timeout = (expiration_time > 0); |
|
|
|
|
auto delay = write_controller_.GetDelay(); |
|
|
|
@ -4161,16 +4161,18 @@ void DBImpl::DelayWrite(uint64_t expiration_time) { |
|
|
|
|
mutex_.Lock(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while (write_controller_.IsStopped()) { |
|
|
|
|
while (bg_error_.ok() && write_controller_.IsStopped()) { |
|
|
|
|
if (has_timeout) { |
|
|
|
|
bg_cv_.TimedWait(expiration_time); |
|
|
|
|
if (env_->NowMicros() > expiration_time) { |
|
|
|
|
break; |
|
|
|
|
return Status::TimedOut(); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
bg_cv_.Wait(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return bg_error_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status DBImpl::ScheduleFlushes(WriteContext* context) { |
|
|
|
|