diff --git a/db/db_impl.cc b/db/db_impl.cc index 2830ee055..4e3d7dcb9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4739,7 +4739,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. - status = DelayWrite(last_batch_group_size_); + status = DelayWrite(last_batch_group_size_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } @@ -4749,6 +4749,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + bool logs_getting_synced = false; if (status.ok()) { if (need_log_sync) { while (logs_.front().getting_synced) { @@ -4758,6 +4759,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(!log.getting_synced); log.getting_synced = true; } + logs_getting_synced = true; } // Add to log and apply to memtable. We can release the lock @@ -4977,7 +4979,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (immutable_db_options_.paranoid_checks && !status.ok() && - !w.CallbackFailed() && !status.IsBusy()) { + !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); if (bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes @@ -4985,7 +4987,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Unlock(); } - if (need_log_sync) { + if (logs_getting_synced) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); @@ -5040,13 +5042,17 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::DelayWrite(uint64_t num_bytes) { +Status DBImpl::DelayWrite(uint64_t num_bytes, + const WriteOptions& write_options) { uint64_t time_delayed = 0; bool delayed = false; { StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); auto delay = write_controller_.GetDelay(env_, num_bytes); if (delay > 0) { + if (write_options.no_slowdown) { + return Status::Incomplete(); + } mutex_.Unlock(); delayed = true; TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); @@ -5056,11 +5062,15 @@ Status DBImpl::DelayWrite(uint64_t num_bytes) { } while (bg_error_.ok() && write_controller_.IsStopped()) { + if (write_options.no_slowdown) { + return Status::Incomplete(); + } delayed = true; TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); bg_cv_.Wait(); } } + assert(!delayed || !write_options.no_slowdown); if (delayed) { default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS, time_delayed); diff --git a/db/db_impl.h b/db/db_impl.h index 6e3c11049..cfedd8103 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -648,7 +648,7 @@ class DBImpl : public DB { // num_bytes: for slowdown case, delay time is calculated based on // `num_bytes` going through. - Status DelayWrite(uint64_t num_bytes); + Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options); Status ScheduleFlushes(WriteContext* context); diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc index 3b13e7727..a0e66dd9f 100644 --- a/db/db_io_failure_test.cc +++ b/db/db_io_failure_test.cc @@ -33,7 +33,7 @@ TEST_F(DBIOFailureTest, DropWrites) { // Force out-of-space errors env_->drop_writes_.store(true, std::memory_order_release); env_->sleep_counter_.Reset(); - env_->no_sleep_ = true; + env_->no_slowdown_ = true; for (int i = 0; i < 5; i++) { if (option_config_ != kUniversalCompactionMultiLevel && option_config_ != kUniversalSubcompactions) { diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index cc421fa08..06d140f75 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -322,7 +322,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - env_->no_sleep_ = true; + env_->no_slowdown_ = true; env_->time_elapse_only_sleep_ = true; Options options = CurrentOptions(); options.disable_auto_compactions = true; diff --git a/db/db_test.cc b/db/db_test.cc index 0e6d5eee4..2ca7de3dc 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -215,6 +215,50 @@ TEST_F(DBTest, WriteEmptyBatch) { ASSERT_EQ("bar", Get(1, "foo")); } +TEST_F(DBTest, SkipDelay) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; + CreateAndReopenWithCF({"pikachu"}, options); + + for (bool sync : {true, false}) { + for (bool disableWAL : {true, false}) { + // Use a small number to ensure a large delay that is still effective + // when we do Put + // TODO(myabandeh): this is time dependent and could potentially make + // the test flaky + auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Sleep", + [&](void* arg) { sleep_count.fetch_add(1); }); + std::atomic wait_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Wait", + [&](void* arg) { wait_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = sync; + wo.disableWAL = disableWAL; + wo.no_slowdown = true; + dbfull()->Put(wo, "foo", "bar"); + // We need the 2nd write to trigger delay. This is because delay is + // estimated based on the last write size which is 0 for the first write. + ASSERT_NOK(dbfull()->Put(wo, "foo2", "bar2")); + ASSERT_GE(sleep_count.load(), 0); + ASSERT_GE(wait_count.load(), 0); + token.reset(); + + token = dbfull()->TEST_write_controler().GetDelayToken(1000000000); + wo.no_slowdown = false; + ASSERT_OK(dbfull()->Put(wo, "foo3", "bar3")); + ASSERT_GE(sleep_count.load(), 1); + token.reset(); + } + } +} + #ifndef ROCKSDB_LITE TEST_F(DBTest, ReadOnlyDB) { ASSERT_OK(Put("foo", "v1")); @@ -4894,7 +4938,7 @@ TEST_F(DBTest, MergeTestTime) { SetPerfLevel(kEnableTime); this->env_->addon_time_.store(0); this->env_->time_elapse_only_sleep_ = true; - this->env_->no_sleep_ = true; + this->env_->no_slowdown_ = true; Options options = CurrentOptions(); options.statistics = rocksdb::CreateDBStatistics(); options.merge_operator.reset(new DelayedMergeOperator(this)); @@ -5339,7 +5383,7 @@ TEST_F(DBTest, DelayedWriteRate) { Options options = CurrentOptions(); env_->SetBackgroundThreads(1, Env::LOW); options.env = env_; - env_->no_sleep_ = true; + env_->no_slowdown_ = true; options.write_buffer_size = 100000000; options.max_write_buffer_number = 256; options.max_background_compactions = 1; @@ -5393,7 +5437,7 @@ TEST_F(DBTest, DelayedWriteRate) { ASSERT_LT(env_->addon_time_.load(), static_cast(estimated_sleep_time * 2)); - env_->no_sleep_ = false; + env_->no_slowdown_ = false; rocksdb::SyncPoint::GetInstance()->DisableProcessing(); sleeping_task_low.WakeUp(); sleeping_task_low.WaitUntilDone(); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index c54dfcc27..5508d6a13 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -19,7 +19,7 @@ SpecialEnv::SpecialEnv(Env* base) sleep_counter_(this), addon_time_(0), time_elapse_only_sleep_(false), - no_sleep_(false) { + no_slowdown_(false) { delay_sstable_sync_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release); diff --git a/db/db_test_util.h b/db/db_test_util.h index 9aa86e4c7..c02a79a1e 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -432,10 +432,10 @@ class SpecialEnv : public EnvWrapper { virtual void SleepForMicroseconds(int micros) override { sleep_counter_.Increment(); - if (no_sleep_ || time_elapse_only_sleep_) { + if (no_slowdown_ || time_elapse_only_sleep_) { addon_time_.fetch_add(micros); } - if (!no_sleep_) { + if (!no_slowdown_) { target()->SleepForMicroseconds(micros); } } @@ -524,7 +524,7 @@ class SpecialEnv : public EnvWrapper { bool time_elapse_only_sleep_; - bool no_sleep_; + bool no_slowdown_; std::atomic is_wal_sync_thread_safe_{true}; }; diff --git a/db/write_thread.cc b/db/write_thread.cc index 9ee87dbd2..face62911 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -268,6 +268,12 @@ size_t WriteThread::EnterAsBatchGroupLeader( break; } + if (w->no_slowdown != leader->no_slowdown) { + // Do not mix writes that are ok with delays with the ones that + // request fail on delays. + break; + } + if (!w->disableWAL && leader->disableWAL) { // Do not include a write that needs WAL into a batch that has // WAL disabled. diff --git a/db/write_thread.h b/db/write_thread.h index 87ffff6f9..7cad1d0d5 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -79,6 +79,7 @@ class WriteThread { struct Writer { WriteBatch* batch; bool sync; + bool no_slowdown; bool disableWAL; bool disable_memtable; uint64_t log_used; // log number that this batch was inserted into @@ -99,6 +100,7 @@ class WriteThread { Writer() : batch(nullptr), sync(false), + no_slowdown(false), disableWAL(false), disable_memtable(false), log_used(0), diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fe8b7128a..8906c5fbd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1593,11 +1593,16 @@ struct WriteOptions { // Default: false bool ignore_missing_column_families; + // If true and we need to wait or sleep for the write request, fails + // immediately with Status::Incomplete(). + bool no_slowdown; + WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0), - ignore_missing_column_families(false) {} + ignore_missing_column_families(false), + no_slowdown(false) {} }; // Options that control flush operations