From ca0ef54f16fadd8375156a6b97045f4cb7261d4a Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Tue, 8 Mar 2022 13:19:39 -0800 Subject: [PATCH] Rate-limit automatic WAL flush after each user write (#9607) Summary: **Context:** WAL flush is currently not rate-limited by `Options::rate_limiter`. This PR is to provide rate-limiting to auto WAL flush, the one that automatically happen after each user write operation (i.e, `Options::manual_wal_flush == false`), by adding `WriteOptions::rate_limiter_options`. Note that we are NOT rate-limiting WAL flush that do NOT automatically happen after each user write, such as `Options::manual_wal_flush == true + manual FlushWAL()` (rate-limiting multiple WAL flushes), for the benefits of: - being consistent with [ReadOptions::rate_limiter_priority](https://github.com/facebook/rocksdb/blob/7.0.fb/include/rocksdb/options.h#L515) - being able to turn off some WAL flush's rate-limiting but not all (e.g, turn off specific the WAL flush of a critical user write like a service's heartbeat) `WriteOptions::rate_limiter_options` only accept `Env::IO_USER` and `Env::IO_TOTAL` currently due to an implementation constraint. - The constraint is that we currently queue parallel writes (including WAL writes) based on FIFO policy which does not factor rate limiter priority into this layer's scheduling. If we allow lower priorities such as `Env::IO_HIGH/MID/LOW` and such writes specified with lower priorities occurs before ones specified with higher priorities (even just by a tiny bit in arrival time), the former would have blocked the latter, leading to a "priority inversion" issue and contradictory to what we promise for rate-limiting priority. Therefore we only allow `Env::IO_USER` and `Env::IO_TOTAL` right now before improving that scheduling. A pre-requisite to this feature is to support operation-level rate limiting in `WritableFileWriter`, which is also included in this PR. **Summary:** - Renamed test suite `DBRateLimiterTest to DBRateLimiterOnReadTest` for adding a new test suite - Accept `rate_limiter_priority` in `WritableFileWriter`'s private and public write functions - Passed `WriteOptions::rate_limiter_options` to `WritableFileWriter` in the path of automatic WAL flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9607 Test Plan: - Added new unit test to verify existing flush/compaction rate-limiting does not break, since `DBTest, RateLimitingTest` is disabled and current db-level rate-limiting tests focus on read only (e.g, `db_rate_limiter_test`, `DBTest2, RateLimitedCompactionReads`). - Added new unit test `DBRateLimiterOnWriteWALTest, AutoWalFlush` - `strace -ftt -e trace=write ./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -rate_limit_auto_wal_flush=1 -rate_limiter_bytes_per_sec=15 -rate_limiter_refill_period_us=1000000 -write_buffer_size=100000000 -disable_auto_compactions=1 -num=100` - verified that WAL flush(i.e, system-call _write_) were chunked into 15 bytes and each _write_ was roughly 1 second apart - verified the chunking disappeared when `-rate_limit_auto_wal_flush=0` - crash test: `python3 tools/db_crashtest.py blackbox --disable_wal=0 --rate_limit_auto_wal_flush=1 --rate_limiter_bytes_per_sec=10485760 --interval=10` killed as normal **Benchmarked on flush/compaction to ensure no performance regression:** - compaction with rate-limiting (see table 1, avg over 1280-run): pre-change: **915635 micros/op**; post-change: **907350 micros/op (improved by 0.106%)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f compact_bmk_output.txt compact_bmk_output_2.txt dont_care_output.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench --benchmarks=fillrandom -db=$TEST_TMPDIR -disable_auto_compactions=1 -write_buffer_size=6710886 > dont_care_output.txt && ./db_bench --benchmarks=compact -use_existing_db=1 -db=$TEST_TMPDIR -level0_file_num_compaction_trigger=1 -rate_limiter_bytes_per_sec=100000000 | egrep 'compact' done > compact_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' compact_bmk_output.txt >> compact_bmk_output_2.txt done ``` - compaction w/o rate-limiting (see table 2, avg over 640-run): pre-change: **822197 micros/op**; post-change: **823148 micros/op (regressed by 0.12%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` - flush with rate-limiting (see table 3, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench ): pre-change: **745752 micros/op**; post-change: **745331 micros/op (regressed by 0.06 %)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f flush_bmk_output.txt flush_bmk_output_2.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench -db=$TEST_TMPDIR -write_buffer_size=1048576000 -num=1000000 -rate_limiter_bytes_per_sec=100000000 -benchmarks=fillseq,flush | egrep 'flush' done > flush_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' flush_bmk_output.txt >> flush_bmk_output_2.txt done ``` - flush w/o rate-limiting (see table 4, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench): pre-change: **487512 micros/op**, post-change: **485856 micors/ops (improved by 0.34%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` | table 1 - compact with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 896978 | 16046.9 | 901242 | 15670.9 | 0.475373978 20 | 893718 | 15813 | 886505 | 17544.7 | -0.8070778478 40 | 900426 | 23882.2 | 894958 | 15104.5 | -0.6072681153 80 | 906635 | 21761.5 | 903332 | 23948.3 | -0.3643141948 160 | 898632 | 21098.9 | 907583 | 21145 | 0.9960695813 3.20E+02 | 905252 | 22785.5 | 908106 | 25325.5 | 0.3152713278 6.40E+02 | 905213 | 23598.6 | 906741 | 21370.5 | 0.1688000504 **1.28E+03** | **908316** | **23533.1** | **907350** | **24626.8** | **-0.1063506533** average over #-run | 901896.25 | 21064.9625 | 901977.125 | 20592.025 | 0.008967217682 | table 2 - compact w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 811211 | 26996.7 | 807586 | 28456.4 | -0.4468627768 20 | 815465 | 14803.7 | 814608 | 28719.7 | -0.105093413 40 | 809203 | 26187.1 | 797835 | 25492.1 | -1.404839082 80 | 822088 | 28765.3 | 822192 | 32840.4 | 0.01265071379 160 | 821719 | 36344.7 | 821664 | 29544.9 | -0.006693285661 3.20E+02 | 820921 | 27756.4 | 821403 | 28347.7 | 0.05871454135 **6.40E+02** | **822197** | **28960.6** | **823148** | **30055.1** | **0.1156657103** average over #-run | 8.18E+05 | 2.71E+04 | 8.15E+05 | 2.91E+04 | -0.25 | table 3 - flush with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 741721 | 11770.8 | 740345 | 5949.76 | -0.1855144994 20 | 735169 | 3561.83 | 743199 | 9755.77 | 1.09226586 40 | 743368 | 8891.03 | 742102 | 8683.22 | -0.1703059588 80 | 742129 | 8148.51 | 743417 | 9631.58| 0.1735547324 160 | 749045 | 9757.21 | 746256 | 9191.86 | -0.3723407806 **3.20E+02** | **745752** | **9819.65** | **745331** | **9840.62** | **-0.0564530836** 6.40E+02 | 749006 | 11080.5 | 748173 | 10578.7 | -0.1112140624 average over #-run | 743741.4286 | 9004.218571 | 744117.5714 | 9090.215714 | 0.05057441238 | table 4 - flush w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 477283 | 24719.6 | 473864 | 12379 | -0.7163464863 20 | 486743 | 20175.2 | 502296 | 23931.3 | 3.195320734 40 | 482846 | 15309.2 | 489820 | 22259.5 | 1.444352858 80 | 491490 | 21883.1 | 490071 | 23085.7 | -0.2887139108 160 | 493347 | 28074.3 | 483609 | 21211.7 | -1.973864238 **3.20E+02** | **487512** | **21401.5** | **485856** | **22195.2** | **-0.3396839462** 6.40E+02 | 490307 | 25418.6 | 485435 | 22405.2 | -0.9936631539 average over #-run | 4.87E+05 | 2.24E+04 | 4.87E+05 | 2.11E+04 | 0.00E+00 Reviewed By: ajkr Differential Revision: D34442441 Pulled By: hx235 fbshipit-source-id: 4790f13e1e5c0a95ae1d1cc93ffcf69dc6e78bdd --- HISTORY.md | 1 + db/db_impl/db_impl.h | 6 +- db/db_impl/db_impl_open.cc | 3 +- db/db_impl/db_impl_write.cc | 23 ++- db/db_rate_limiter_test.cc | 206 ++++++++++++++++++++++-- db/log_writer.cc | 18 ++- db/log_writer.h | 8 +- db/write_thread.cc | 5 + db/write_thread.h | 3 + db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 6 + db_stress_tool/db_stress_test_base.cc | 6 + db_stress_tool/no_batched_ops_stress.cc | 3 + file/writable_file_writer.cc | 94 +++++++---- file/writable_file_writer.h | 24 ++- include/rocksdb/env.h | 10 +- include/rocksdb/file_system.h | 11 ++ include/rocksdb/options.h | 18 ++- tools/db_bench_tool.cc | 8 + 19 files changed, 388 insertions(+), 66 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 52cba2724..03720b69f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. * `options.compression_per_level` is dynamically changeable with `SetOptions()`. +* Added `WriteOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for writes associated with the API to which the `WriteOptions` was provided. Currently the support covers automatic WAL flushes, which happen during live updates (`Put()`, `Write()`, `Delete()`, etc.) when `WriteOptions::disableWAL == false` and `DBOptions::manual_wal_flush == false`. ### Bug Fixes * Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d1e95c75a..c28248708 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1769,8 +1769,12 @@ class DBImpl : public DB { WriteBatch* tmp_batch, size_t* write_with_wal, WriteBatch** to_be_cached_state); + // rate_limiter_priority is used to charge `DBOptions::rate_limiter` + // for automatic WAL flush (`Options::manual_wal_flush` == false) + // associated with this WriteToWAL IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, - uint64_t* log_used, uint64_t* log_size); + uint64_t* log_used, uint64_t* log_size, + Env::IOPriority rate_limiter_priority); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ee79eb9f0..5dd8be655 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1729,7 +1729,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, WriteOptions write_options; uint64_t log_used, log_size; log::Writer* log_writer = impl->logs_.back().writer; - s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); + s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, + Env::IO_TOTAL); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a5bd89a7e..61910ede4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -132,6 +132,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::Corruption("Batch is nullptr!"); } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { return Status::InvalidArgument("write batch must have timestamp(s) set"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + write_options.rate_limiter_priority != Env::IO_USER) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority only allows " + "Env::IO_TOTAL and Env::IO_USER due to implementation constraints"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + (write_options.disableWAL || manual_wal_flush_)) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority currently only supports " + "rate-limiting automatic WAL flush, which requires " + "`WriteOptions::disableWAL` and " + "`DBOptions::manual_wal_flush` both set to false"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -1147,7 +1159,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // write thread. Otherwise this must be called holding log_write_mutex_. IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, - uint64_t* log_size) { + uint64_t* log_size, + Env::IOPriority rate_limiter_priority) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); @@ -1162,7 +1175,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } - IOStatus io_s = log_writer->AddRecord(log_entry); + IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); @@ -1200,7 +1213,8 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatchInternal::SetSequence(merged_batch, sequence); uint64_t log_size; - io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1294,7 +1308,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL( log::Writer* log_writer = logs_.back().writer; uint64_t log_size; - io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; diff --git a/db/db_rate_limiter_test.cc b/db/db_rate_limiter_test.cc index 7b200add1..f30af1974 100644 --- a/db/db_rate_limiter_test.cc +++ b/db/db_rate_limiter_test.cc @@ -3,18 +3,26 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include + +#include +#include + #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "test_util/testharness.h" #include "util/file_checksum_helper.h" namespace ROCKSDB_NAMESPACE { -class DBRateLimiterTest +class DBRateLimiterOnReadTest : public DBTestBase, public ::testing::WithParamInterface> { public: - DBRateLimiterTest() - : DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false), + explicit DBRateLimiterOnReadTest() + : DBTestBase("db_rate_limiter_on_read_test", /*env_do_fsync=*/false), use_direct_io_(std::get<0>(GetParam())), use_block_cache_(std::get<1>(GetParam())), use_readahead_(std::get<2>(GetParam())) {} @@ -89,20 +97,20 @@ std::string GetTestNameSuffix( } #ifndef ROCKSDB_LITE -INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, +INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest, ::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Bool()), GetTestNameSuffix); #else // ROCKSDB_LITE // Cannot use direct I/O in lite mode. -INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, +INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest, ::testing::Combine(::testing::Values(false), ::testing::Bool(), ::testing::Bool()), GetTestNameSuffix); #endif // ROCKSDB_LITE -TEST_P(DBRateLimiterTest, Get) { +TEST_P(DBRateLimiterOnReadTest, Get) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -130,7 +138,7 @@ TEST_P(DBRateLimiterTest, Get) { } } -TEST_P(DBRateLimiterTest, NewMultiGet) { +TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not // yet support rate limiting. if (use_direct_io_ && !IsDirectIOSupported()) { @@ -161,7 +169,7 @@ TEST_P(DBRateLimiterTest, NewMultiGet) { ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, OldMultiGet) { +TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { // The old `vector`-returning `MultiGet()` APIs use `Read()`, which // supports rate limiting. if (use_direct_io_ && !IsDirectIOSupported()) { @@ -193,7 +201,7 @@ TEST_P(DBRateLimiterTest, OldMultiGet) { ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, Iterator) { +TEST_P(DBRateLimiterOnReadTest, Iterator) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -223,7 +231,7 @@ TEST_P(DBRateLimiterTest, Iterator) { #if !defined(ROCKSDB_LITE) -TEST_P(DBRateLimiterTest, VerifyChecksum) { +TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -237,7 +245,7 @@ TEST_P(DBRateLimiterTest, VerifyChecksum) { ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, VerifyFileChecksums) { +TEST_P(DBRateLimiterOnReadTest, VerifyFileChecksums) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -253,6 +261,182 @@ TEST_P(DBRateLimiterTest, VerifyFileChecksums) { #endif // !defined(ROCKSDB_LITE) +class DBRateLimiterOnWriteTest : public DBTestBase { + public: + explicit DBRateLimiterOnWriteTest() + : DBTestBase("db_rate_limiter_on_write_test", /*env_do_fsync=*/false) {} + + void Init() { + options_ = GetOptions(); + ASSERT_OK(TryReopenWithColumnFamilies({"default"}, options_)); + Random rnd(301); + for (int i = 0; i < kNumFiles; i++) { + ASSERT_OK(Put(0, kStartKey, rnd.RandomString(2))); + ASSERT_OK(Put(0, kEndKey, rnd.RandomString(2))); + ASSERT_OK(Flush(0)); + } + } + + Options GetOptions() { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.rate_limiter.reset(NewGenericRateLimiter( + 1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly)); + options.table_factory.reset( + NewBlockBasedTableFactory(BlockBasedTableOptions())); + return options; + } + + protected: + inline const static int64_t kNumFiles = 3; + inline const static std::string kStartKey = "a"; + inline const static std::string kEndKey = "b"; + Options options_; +}; + +TEST_F(DBRateLimiterOnWriteTest, Flush) { + std::int64_t prev_total_request = 0; + + Init(); + + std::int64_t actual_flush_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + std::int64_t exepcted_flush_request = kNumFiles; + EXPECT_EQ(actual_flush_request, exepcted_flush_request); + EXPECT_EQ(actual_flush_request, + options_.rate_limiter->GetTotalRequests(Env::IO_HIGH)); +} + +TEST_F(DBRateLimiterOnWriteTest, Compact) { + Init(); + + // Pre-comaction: + // level-0 : `kNumFiles` SST files overlapping on [kStartKey, kEndKey] +#ifndef ROCKSDB_LITE + std::string files_per_level_pre_compaction = std::to_string(kNumFiles); + ASSERT_EQ(files_per_level_pre_compaction, FilesPerLevel(0 /* cf */)); +#endif // !ROCKSDB_LITE + + std::int64_t prev_total_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL); + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_LOW)); + + Compact(kStartKey, kEndKey); + + std::int64_t actual_compaction_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + + // Post-comaction: + // level-0 : 0 SST file + // level-1 : 1 SST file +#ifndef ROCKSDB_LITE + std::string files_per_level_post_compaction = "0,1"; + ASSERT_EQ(files_per_level_post_compaction, FilesPerLevel(0 /* cf */)); +#endif // !ROCKSDB_LITE + + std::int64_t exepcted_compaction_request = 1; + EXPECT_EQ(actual_compaction_request, exepcted_compaction_request); + EXPECT_EQ(actual_compaction_request, + options_.rate_limiter->GetTotalRequests(Env::IO_LOW)); +} + +class DBRateLimiterOnWriteWALTest + : public DBRateLimiterOnWriteTest, + public ::testing::WithParamInterface> { + public: + static std::string GetTestNameSuffix( + ::testing::TestParamInfo> info) { + std::ostringstream oss; + if (std::get<0>(info.param)) { + oss << "DisableWAL"; + } else { + oss << "EnableWAL"; + } + if (std::get<1>(info.param)) { + oss << "_ManualWALFlush"; + } else { + oss << "_AutoWALFlush"; + } + if (std::get<2>(info.param) == Env::IO_USER) { + oss << "_RateLimitAutoWALFlush"; + } else if (std::get<2>(info.param) == Env::IO_TOTAL) { + oss << "_NoRateLimitAutoWALFlush"; + } else { + oss << "_RateLimitAutoWALFlushWithIncorrectPriority"; + } + return oss.str(); + } + + explicit DBRateLimiterOnWriteWALTest() + : disable_wal_(std::get<0>(GetParam())), + manual_wal_flush_(std::get<1>(GetParam())), + rate_limiter_priority_(std::get<2>(GetParam())) {} + + void Init() { + options_ = GetOptions(); + options_.manual_wal_flush = manual_wal_flush_; + Reopen(options_); + } + + WriteOptions GetWriteOptions() { + WriteOptions write_options; + write_options.disableWAL = disable_wal_; + write_options.rate_limiter_priority = rate_limiter_priority_; + return write_options; + } + + protected: + bool disable_wal_; + bool manual_wal_flush_; + Env::IOPriority rate_limiter_priority_; +}; + +INSTANTIATE_TEST_CASE_P( + DBRateLimiterOnWriteWALTest, DBRateLimiterOnWriteWALTest, + ::testing::Values(std::make_tuple(false, false, Env::IO_TOTAL), + std::make_tuple(false, false, Env::IO_USER), + std::make_tuple(false, false, Env::IO_HIGH), + std::make_tuple(false, true, Env::IO_USER), + std::make_tuple(true, false, Env::IO_USER)), + DBRateLimiterOnWriteWALTest::GetTestNameSuffix); + +TEST_P(DBRateLimiterOnWriteWALTest, AutoWalFlush) { + Init(); + + const bool no_rate_limit_auto_wal_flush = + (rate_limiter_priority_ == Env::IO_TOTAL); + const bool valid_arg = (rate_limiter_priority_ == Env::IO_USER && + !disable_wal_ && !manual_wal_flush_); + + std::int64_t prev_total_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL); + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + Status s = Put("foo", "v1", GetWriteOptions()); + + if (no_rate_limit_auto_wal_flush || valid_arg) { + EXPECT_TRUE(s.ok()); + } else { + EXPECT_TRUE(s.IsInvalidArgument()); + EXPECT_TRUE(s.ToString().find("WriteOptions::rate_limiter_priority") != + std::string::npos); + } + + std::int64_t actual_auto_wal_flush_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + std::int64_t expected_auto_wal_flush_request = valid_arg ? 1 : 0; + + EXPECT_EQ(actual_auto_wal_flush_request, expected_auto_wal_flush_request); + EXPECT_EQ(actual_auto_wal_flush_request, + options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/log_writer.cc b/db/log_writer.cc index 410c634cc..77b82950a 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -50,7 +50,8 @@ IOStatus Writer::Close() { return s; } -IOStatus Writer::AddRecord(const Slice& slice) { +IOStatus Writer::AddRecord(const Slice& slice, + Env::IOPriority rate_limiter_priority) { const char* ptr = slice.data(); size_t left = slice.size(); @@ -73,7 +74,8 @@ IOStatus Writer::AddRecord(const Slice& slice) { // kRecyclableHeaderSize being <= 11) assert(header_size <= 11); s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", - static_cast(leftover))); + static_cast(leftover)), + 0 /* crc32c_checksum */, rate_limiter_priority); if (!s.ok()) { break; } @@ -99,7 +101,7 @@ IOStatus Writer::AddRecord(const Slice& slice) { type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; } - s = EmitPhysicalRecord(type, ptr, fragment_length); + s = EmitPhysicalRecord(type, ptr, fragment_length, rate_limiter_priority); ptr += fragment_length; left -= fragment_length; begin = false; @@ -107,7 +109,7 @@ IOStatus Writer::AddRecord(const Slice& slice) { if (s.ok()) { if (!manual_flush_) { - s = dest_->Flush(); + s = dest_->Flush(rate_limiter_priority); } } @@ -141,7 +143,8 @@ IOStatus Writer::AddCompressionTypeRecord() { bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } -IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { +IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, + Env::IOPriority rate_limiter_priority) { assert(n <= 0xffff); // Must fit in two bytes size_t header_size; @@ -180,9 +183,10 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { EncodeFixed32(buf, crc); // Write the header and the payload - IOStatus s = dest_->Append(Slice(buf, header_size)); + IOStatus s = dest_->Append(Slice(buf, header_size), 0 /* crc32c_checksum */, + rate_limiter_priority); if (s.ok()) { - s = dest_->Append(Slice(ptr, n), payload_crc); + s = dest_->Append(Slice(ptr, n), payload_crc, rate_limiter_priority); } block_offset_ += header_size + n; return s; diff --git a/db/log_writer.h b/db/log_writer.h index 8f60049db..ec0e4788e 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include "db/log_format.h" #include "rocksdb/compression_type.h" +#include "rocksdb/env.h" #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -81,7 +82,8 @@ class Writer { ~Writer(); - IOStatus AddRecord(const Slice& slice); + IOStatus AddRecord(const Slice& slice, + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); IOStatus AddCompressionTypeRecord(); WritableFileWriter* file() { return dest_.get(); } @@ -106,7 +108,9 @@ class Writer { // record type stored in the header. uint32_t type_crc_[kMaxRecordType + 1]; - IOStatus EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + IOStatus EmitPhysicalRecord( + RecordType type, const char* ptr, size_t length, + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() diff --git a/db/write_thread.cc b/db/write_thread.cc index ac3a2f869..d59eba263 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -471,6 +471,11 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, break; } + if (w->rate_limiter_priority != leader->rate_limiter_priority) { + // Do not mix writes with different rate limiter priorities. + break; + } + if (w->batch == nullptr) { // Do not include those writes with nullptr batch. Those are not writes, // those are something else. They want to be alone diff --git a/db/write_thread.h b/db/write_thread.h index ac2c8696d..af4d0967e 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -117,6 +117,7 @@ class WriteThread { bool sync; bool no_slowdown; bool disable_wal; + Env::IOPriority rate_limiter_priority; bool disable_memtable; size_t batch_cnt; // if non-zero, number of sub-batches in the write batch size_t protection_bytes_per_key; @@ -141,6 +142,7 @@ class WriteThread { sync(false), no_slowdown(false), disable_wal(false), + rate_limiter_priority(Env::IOPriority::IO_TOTAL), disable_memtable(false), batch_cnt(0), protection_bytes_per_key(0), @@ -163,6 +165,7 @@ class WriteThread { sync(write_options.sync), no_slowdown(write_options.no_slowdown), disable_wal(write_options.disableWAL), + rate_limiter_priority(write_options.rate_limiter_priority), disable_memtable(_disable_memtable), batch_cnt(_batch_cnt), protection_bytes_per_key(_batch->GetProtectionBytesPerKey()), diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 5f67b0b8e..137339f6c 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -176,6 +176,7 @@ DECLARE_int32(range_deletion_width); DECLARE_uint64(rate_limiter_bytes_per_sec); DECLARE_bool(rate_limit_bg_reads); DECLARE_bool(rate_limit_user_ops); +DECLARE_bool(rate_limit_auto_wal_flush); DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_bool(use_txn); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 4933b35a4..c973eeb55 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -550,6 +550,12 @@ DEFINE_bool(rate_limit_user_ops, false, "When true use Env::IO_USER priority level to charge internal rate " "limiter for reads associated with user operations."); +DEFINE_bool(rate_limit_auto_wal_flush, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for automatic WAL flush (`Options::manual_wal_flush` == " + "false) after the user " + "write operation."); + DEFINE_uint64(sst_file_manager_bytes_per_sec, 0, "Set `Options::sst_file_manager` to delete at this rate. By " "default the deletion rate is unbounded."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index c30a069ec..c2c793b9f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -485,6 +485,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, if (FLAGS_sync) { write_opts.sync = true; } + if (FLAGS_rate_limit_auto_wal_flush) { + write_opts.rate_limiter_priority = Env::IO_USER; + } char value[100]; int cf_idx = 0; Status s; @@ -640,6 +643,9 @@ void StressTest::OperateDb(ThreadState* thread) { read_opts.rate_limiter_priority = FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; WriteOptions write_opts; + if (FLAGS_rate_limit_auto_wal_flush) { + write_opts.rate_limiter_priority = Env::IO_USER; + } auto shared = thread->shared; char value[100]; std::string from_db; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 02a71e30a..b8b51d81e 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -271,6 +271,9 @@ class NonBatchedOpsStressTest : public StressTest { Transaction* txn = nullptr; if (use_txn) { WriteOptions wo; + if (FLAGS_rate_limit_auto_wal_flush) { + wo.rate_limiter_priority = Env::IO_USER; + } Status s = NewTxn(wo, &txn); if (!s.ok()) { fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str()); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index f345c6572..84be9b689 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -41,8 +41,8 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, return io_s; } -IOStatus WritableFileWriter::Append(const Slice& data, - uint32_t crc32c_checksum) { +IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, + Env::IOPriority op_rate_limiter_priority) { const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -79,7 +79,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, // Flush only when buffered I/O if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { return s; } @@ -109,7 +109,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, src += appended; if (left > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { break; } @@ -119,7 +119,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, } else { assert(buf_.CurrentSize() == 0); buffered_data_crc32c_checksum_ = crc32c_checksum; - s = WriteBufferedWithChecksum(src, left); + s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority); } } else { // In this case, either we do not need to do the data verification or @@ -139,7 +139,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, src += appended; if (left > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { break; } @@ -150,9 +150,9 @@ IOStatus WritableFileWriter::Append(const Slice& data, assert(buf_.CurrentSize() == 0); if (perform_data_verification_ && buffered_data_with_checksum_) { buffered_data_crc32c_checksum_ = crc32c::Value(src, left); - s = WriteBufferedWithChecksum(src, left); + s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority); } else { - s = WriteBuffered(src, left); + s = WriteBuffered(src, left, op_rate_limiter_priority); } } } @@ -164,7 +164,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, return s; } -IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { +IOStatus WritableFileWriter::Pad(const size_t pad_bytes, + Env::IOPriority op_rate_limiter_priority) { assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); @@ -178,7 +179,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { buf_.PadWith(append_bytes, 0); left -= append_bytes; if (left > 0) { - IOStatus s = Flush(); + IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { return s; } @@ -294,7 +295,7 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled -IOStatus WritableFileWriter::Flush() { +IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { IOStatus s; TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); @@ -303,17 +304,19 @@ IOStatus WritableFileWriter::Flush() { #ifndef ROCKSDB_LITE if (pending_sync_) { if (perform_data_verification_ && buffered_data_with_checksum_) { - s = WriteDirectWithChecksum(); + s = WriteDirectWithChecksum(op_rate_limiter_priority); } else { - s = WriteDirect(); + s = WriteDirect(op_rate_limiter_priority); } } #endif // !ROCKSDB_LITE } else { if (perform_data_verification_ && buffered_data_with_checksum_) { - s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize()); + s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize(), + op_rate_limiter_priority); } else { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize(), + op_rate_limiter_priority); } } if (!s.ok()) { @@ -479,7 +482,8 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // This method writes to disk the specified data and makes use of the rate // limiter if available -IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { +IOStatus WritableFileWriter::WriteBuffered( + const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { IOStatus s; assert(!use_direct_io()); const char* src = data; @@ -489,10 +493,14 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { while (left > 0) { size_t allowed; - if (rate_limiter_ != nullptr) { - allowed = rate_limiter_->RequestToken( - left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, - RateLimiter::OpType::kWrite); + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && + rate_limiter_priority_used != Env::IO_TOTAL) { + allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); } else { allowed = left; } @@ -562,8 +570,8 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { return s; } -IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, - size_t size) { +IOStatus WritableFileWriter::WriteBufferedWithChecksum( + const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { IOStatus s; assert(!use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); @@ -577,12 +585,15 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t tmp_size; - tmp_size = rate_limiter_->RequestToken( - data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, - RateLimiter::OpType::kWrite); + tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); data_size -= tmp_size; } } @@ -674,7 +685,8 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, // only write on aligned // offsets. #ifndef ROCKSDB_LITE -IOStatus WritableFileWriter::WriteDirect() { +IOStatus WritableFileWriter::WriteDirect( + Env::IOPriority op_rate_limiter_priority) { assert(use_direct_io()); IOStatus s; const size_t alignment = buf_.Alignment(); @@ -701,7 +713,11 @@ IOStatus WritableFileWriter::WriteDirect() { while (left > 0) { // Check how much is allowed size_t size; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && + rate_limiter_priority_used != Env::IO_TOTAL) { size = rate_limiter_->RequestToken(left, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, RateLimiter::OpType::kWrite); @@ -762,7 +778,8 @@ IOStatus WritableFileWriter::WriteDirect() { return s; } -IOStatus WritableFileWriter::WriteDirectWithChecksum() { +IOStatus WritableFileWriter::WriteDirectWithChecksum( + Env::IOPriority op_rate_limiter_priority) { assert(use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); IOStatus s; @@ -798,7 +815,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t size; size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), @@ -860,4 +880,18 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { return s; } #endif // !ROCKSDB_LITE +Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( + Env::IOPriority writable_file_io_priority, + Env::IOPriority op_rate_limiter_priority) { + if (writable_file_io_priority == Env::IO_TOTAL && + op_rate_limiter_priority == Env::IO_TOTAL) { + return Env::IO_TOTAL; + } else if (writable_file_io_priority == Env::IO_TOTAL) { + return op_rate_limiter_priority; + } else if (op_rate_limiter_priority == Env::IO_TOTAL) { + return writable_file_io_priority; + } else { + return op_rate_limiter_priority; + } +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index ede71d218..bfc756388 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -234,11 +234,13 @@ class WritableFileWriter { // When this Append API is called, if the crc32c_checksum is not provided, we // will calculate the checksum internally. - IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0); + IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0, + Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); - IOStatus Pad(const size_t pad_bytes); + IOStatus Pad(const size_t pad_bytes, + Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); - IOStatus Flush(); + IOStatus Flush(Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); IOStatus Close(); @@ -271,15 +273,21 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; private: + static Env::IOPriority DecideRateLimiterPriority( + Env::IOPriority writable_file_io_priority, + Env::IOPriority op_rate_limiter_priority); + // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE - IOStatus WriteDirect(); - IOStatus WriteDirectWithChecksum(); + IOStatus WriteDirect(Env::IOPriority op_rate_limiter_priority); + IOStatus WriteDirectWithChecksum(Env::IOPriority op_rate_limiter_priority); #endif // !ROCKSDB_LITE - // Normal write - IOStatus WriteBuffered(const char* data, size_t size); - IOStatus WriteBufferedWithChecksum(const char* data, size_t size); + // Normal write. + IOStatus WriteBuffered(const char* data, size_t size, + Env::IOPriority op_rate_limiter_priority); + IOStatus WriteBufferedWithChecksum(const char* data, size_t size, + Env::IOPriority op_rate_limiter_priority); IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus SyncInternal(bool use_fsync); }; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 21a272a13..6966022aa 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -959,8 +959,16 @@ class WritableFile { // Use the returned alignment value to allocate // aligned buffer for Direct I/O virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } + /* - * Change the priority in rate limiter if rate limiting is enabled. + * If rate limiting is enabled, change the file-granularity priority used in + * rate-limiting writes. + * + * In the presence of finer-granularity priority such as + * `WriteOptions::rate_limiter_priority`, this file-granularity priority may + * be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as + * a fallback for Env::IO_TOTAL finer-granularity priority. + * * If rate limiting is not enabled, this call has no effect. */ virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 13c459ee8..201cee95a 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -1031,6 +1031,17 @@ class FSWritableFile { write_hint_ = hint; } + /* + * If rate limiting is enabled, change the file-granularity priority used in + * rate-limiting writes. + * + * In the presence of finer-granularity priority such as + * `WriteOptions::rate_limiter_priority`, this file-granularity priority may + * be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as + * a fallback for Env::IO_TOTAL finer-granularity priority. + * + * If rate limiting is not enabled, this call has no effect. + */ virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } virtual Env::IOPriority GetIOPriority() { return io_priority_; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index edbd40b9d..44c30447f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1659,13 +1659,29 @@ struct WriteOptions { // Default: false bool memtable_insert_hint_per_batch; + // For writes associated with this option, charge the internal rate + // limiter (see `DBOptions::rate_limiter`) at the specified priority. The + // special value `Env::IO_TOTAL` disables charging the rate limiter. + // + // Currently the support covers automatic WAL flushes, which happen during + // live updates (`Put()`, `Write()`, `Delete()`, etc.) + // when `WriteOptions::disableWAL == false` + // and `DBOptions::manual_wal_flush == false`. + // + // Only `Env::IO_USER` and `Env::IO_TOTAL` are allowed + // due to implementation constraints. + // + // Default: `Env::IO_TOTAL` + Env::IOPriority rate_limiter_priority; + WriteOptions() : sync(false), disableWAL(false), ignore_missing_column_families(false), no_slowdown(false), low_pri(false), - memtable_insert_hint_per_batch(false) {} + memtable_insert_hint_per_batch(false), + rate_limiter_priority(Env::IO_TOTAL) {} }; // Options that control flush operations diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index dec6e7520..a405685a2 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1118,6 +1118,12 @@ DEFINE_bool(file_checksum, false, "When true use FileChecksumGenCrc32cFactory for " "file_checksum_gen_factory."); +DEFINE_bool(rate_limit_auto_wal_flush, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for automatic WAL flush (`Options::manual_wal_flush` == " + "false) after the user " + "write operation"); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3100,6 +3106,8 @@ class Benchmark { write_options_.sync = true; } write_options_.disableWAL = FLAGS_disable_wal; + write_options_.rate_limiter_priority = + FLAGS_rate_limit_auto_wal_flush ? Env::IO_USER : Env::IO_TOTAL; read_options_ = ReadOptions(FLAGS_verify_checksum, true); read_options_.total_order_seek = FLAGS_total_order_seek; read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start;