diff --git a/HISTORY.md b/HISTORY.md index 7d1dad42b..5a5d63ea5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads. * Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit. * Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit. +* Introduce WriteOptions.low_pri. If it is true, low priority writes will be throttled if the compaction is behind. ## 5.5.0 (05/17/2017) ### New Features diff --git a/db/column_family.cc b/db/column_family.cc index bf4d8a72d..72186b706 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -760,6 +760,12 @@ void ColumnFamilyData::RecalculateWriteStallConditions( uint64_t write_rate = write_controller->delayed_write_rate(); write_controller->set_delayed_write_rate(static_cast( static_cast(write_rate) * kDelayRecoverSlowdownRatio)); + // Set the low pri limit to be 1/4 the delayed write rate. + // Note we don't reset this value even after delay condition is relased. + // Low-pri rate will continue to apply if there is a compaction + // pressure. + write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate / + 4); } } prev_compaction_needed_bytes_ = compaction_needed_bytes; diff --git a/db/db_impl.cc b/db/db_impl.cc index 37eb0f203..0f412a754 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -134,6 +134,8 @@ void DumpSupportInfo(Logger* logger) { ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d", crc32c::IsFastCrc32Supported()); } + +int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024; } // namespace DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) @@ -159,6 +161,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), write_controller_(mutable_db_options_.delayed_write_rate), + // Use delayed_write_rate as a base line to determine the initial + // low pri write rate limit. It may be adjusted later. + low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min( + static_cast(mutable_db_options_.delayed_write_rate / 8), + kDefaultLowPriThrottledRate))), last_batch_group_size_(0), unscheduled_flushes_(0), unscheduled_compactions_(0), diff --git a/db/db_impl.h b/db/db_impl.h index a35312f04..fa3a294df 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -722,6 +722,9 @@ class DBImpl : public DB { // `num_bytes` going through. Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options); + Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, + WriteBatch* my_batch); + Status ScheduleFlushes(WriteContext* context); Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); @@ -941,6 +944,8 @@ class DBImpl : public DB { WriteController write_controller_; + unique_ptr low_pri_write_rate_limiter_; + // Size of the last batch group. In slowdown mode, next write needs to // sleep if it uses up the quota. uint64_t last_batch_group_size_; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 3fa103ff7..477fc7469 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -66,13 +66,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::Corruption("Batch is nullptr!"); } + Status status; + if (write_options.low_pri) { + status = ThrottleLowPriWritesIfNeeded(write_options, my_batch); + if (!status.ok()) { + return status; + } + } + if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, log_ref, disable_memtable); } - Status status; - PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable); @@ -742,6 +748,34 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, return bg_error_; } +Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, + WriteBatch* my_batch) { + assert(write_options.low_pri); + // This is called outside the DB mutex. Although it is safe to make the call, + // the consistency condition is not guaranteed to hold. It's OK to live with + // it in this case. + // If we need to speed compaction, it means the compaction is left behind + // and we start to limit low pri writes to a limit. + if (write_controller_.NeedSpeedupCompaction()) { + if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) { + // For 2PC, we only rate limit prepare, not commit. + return Status::OK(); + } + if (write_options.no_slowdown) { + return Status::Incomplete(); + } else { + assert(my_batch != nullptr); + // Rate limit those writes. The reason that we don't completely wait + // is that in case the write is heavy, low pri writes may never have + // a chance to run. Now we guarantee we are still slowly making + // progress. + write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(), + Env::IO_HIGH, nullptr); + } + } + return Status::OK(); +} + Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { diff --git a/db/db_test.cc b/db/db_test.cc index 93e5fe609..52ee73068 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5155,7 +5155,6 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { // now it's done ASSERT_TRUE(done.load()); } - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 714342a8b..42403f2aa 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2186,6 +2186,56 @@ TEST_F(DBTest2, MemtableOnlyIterator) { ASSERT_EQ(1, count); delete it; } + +TEST_F(DBTest2, LowPriWrite) { + Options options = CurrentOptions(); + // Compaction pressure should trigger since 6 files + options.level0_file_num_compaction_trigger = 4; + options.level0_slowdown_writes_trigger = 12; + options.level0_stop_writes_trigger = 30; + options.delayed_write_rate = 8 * 1024 * 1024; + Reopen(options); + + std::atomic rate_limit_count(0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:1", [&](void* arg) { + rate_limit_count.fetch_add(1); + int64_t* rate_bytes_per_sec = static_cast(arg); + ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec); + }); + // Block compaction + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wo; + for (int i = 0; i < 6; i++) { + wo.low_pri = false; + Put("", "", wo); + wo.low_pri = true; + Put("", "", wo); + Flush(); + } + ASSERT_EQ(0, rate_limit_count.load()); + wo.low_pri = true; + Put("", "", wo); + ASSERT_EQ(1, rate_limit_count.load()); + wo.low_pri = false; + Put("", "", wo); + ASSERT_EQ(1, rate_limit_count.load()); + + TEST_SYNC_POINT("DBTest.LowPriWrite:0"); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + dbfull()->TEST_WaitForCompact(); + wo.low_pri = true; + Put("", "", wo); + ASSERT_EQ(1, rate_limit_count.load()); + wo.low_pri = false; + Put("", "", wo); + ASSERT_EQ(1, rate_limit_count.load()); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/write_controller.cc b/db/write_controller.cc index 3437127d8..699044ec2 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -36,17 +36,19 @@ WriteController::GetCompactionPressureToken() { new CompactionPressureToken(this)); } -bool WriteController::IsStopped() const { return total_stopped_ > 0; } +bool WriteController::IsStopped() const { + return total_stopped_.load(std::memory_order_relaxed) > 0; +} // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. // If it turns out to be a performance issue, we can redesign the thread // synchronization model here. // The function trust caller will sleep micros returned. uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { - if (total_stopped_ > 0) { + if (total_stopped_.load(std::memory_order_relaxed) > 0) { return 0; } - if (total_delayed_.load() == 0) { + if (total_delayed_.load(std::memory_order_relaxed) == 0) { return 0; } diff --git a/db/write_controller.h b/db/write_controller.h index 8e4c20826..c344a74d8 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -11,6 +11,7 @@ #include #include +#include "rocksdb/rate_limiter.h" namespace rocksdb { @@ -23,12 +24,15 @@ class WriteControllerToken; // to be called while holding DB mutex class WriteController { public: - explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) + explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, + int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) : total_stopped_(0), total_delayed_(0), total_compaction_pressure_(0), bytes_left_(0), - last_refill_time_(0) { + last_refill_time_(0), + low_pri_rate_limiter_( + NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { set_max_delayed_write_rate(_delayed_write_rate); } ~WriteController() = default; @@ -80,6 +84,8 @@ class WriteController { uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; } + RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } + private: uint64_t NowMicrosMonotonic(Env* env); @@ -88,15 +94,17 @@ class WriteController { friend class DelayWriteToken; friend class CompactionPressureToken; - int total_stopped_; + std::atomic total_stopped_; std::atomic total_delayed_; - int total_compaction_pressure_; + std::atomic total_compaction_pressure_; uint64_t bytes_left_; uint64_t last_refill_time_; // write rate set when initialization or by `DBImpl::SetDBOptions` uint64_t max_delayed_write_rate_; // current write rate uint64_t delayed_write_rate_; + + std::unique_ptr low_pri_rate_limiter_; }; class WriteControllerToken { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 75f7a7a2a..283085a53 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1079,11 +1079,21 @@ struct WriteOptions { // immediately with Status::Incomplete(). bool no_slowdown; + // If true, this write request is of lower priority if compaction is + // behind. In this case, no_slowdown = true, the request will be cancelled + // immediately with Status::Incomplete() returned. Otherwise, it will be + // slowed down. The slowdown value is determined by RocksDB to guarantee + // it introduces minimum impacts to high priority writes. + // + // Default: false + bool low_pri; + WriteOptions() : sync(false), disableWAL(false), ignore_missing_column_families(false), - no_slowdown(false) {} + no_slowdown(false), + low_pri(false) {} }; // Options that control flush operations diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 8175745ee..064764cb6 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -79,6 +79,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, Statistics* stats) { assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); TEST_SYNC_POINT("GenericRateLimiter::Request"); + TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", + &rate_bytes_per_sec_); MutexLock g(&request_mutex_); if (stop_) { return;