Rate-limit automatic WAL flush after each user write (#9607)

Summary:
**Context:**
WAL flush is currently not rate-limited by `Options::rate_limiter`. This PR is to provide rate-limiting to auto WAL flush, the one that automatically happen after each user write operation (i.e, `Options::manual_wal_flush == false`), by adding `WriteOptions::rate_limiter_options`.

Note that we are NOT rate-limiting WAL flush that do NOT automatically happen after each user write, such as  `Options::manual_wal_flush == true + manual FlushWAL()` (rate-limiting multiple WAL flushes),  for the benefits of:
- being consistent with [ReadOptions::rate_limiter_priority](https://github.com/facebook/rocksdb/blob/7.0.fb/include/rocksdb/options.h#L515)
- being able to turn off some WAL flush's rate-limiting but not all (e.g, turn off specific the WAL flush of a critical user write like a service's heartbeat)

`WriteOptions::rate_limiter_options` only accept `Env::IO_USER` and `Env::IO_TOTAL` currently due to an implementation constraint.
- The constraint is that we currently queue parallel writes (including WAL writes) based on FIFO policy which does not factor rate limiter priority into this layer's scheduling. If we allow lower priorities such as `Env::IO_HIGH/MID/LOW` and such writes specified with lower priorities occurs before ones specified with higher priorities (even just by a tiny bit in arrival time), the former would have blocked the latter, leading to a "priority inversion" issue and contradictory to what we promise for rate-limiting priority. Therefore we only allow `Env::IO_USER` and `Env::IO_TOTAL`  right now before improving that scheduling.

A pre-requisite to this feature is to support operation-level rate limiting in `WritableFileWriter`, which is also included in this PR.

**Summary:**
- Renamed test suite `DBRateLimiterTest to DBRateLimiterOnReadTest` for adding a new test suite
- Accept `rate_limiter_priority` in `WritableFileWriter`'s private and public write functions
- Passed `WriteOptions::rate_limiter_options` to `WritableFileWriter` in the path of automatic WAL flush.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9607

Test Plan:
- Added new unit test to verify existing flush/compaction rate-limiting does not break, since `DBTest, RateLimitingTest` is disabled and current db-level rate-limiting tests focus on read only (e.g, `db_rate_limiter_test`, `DBTest2, RateLimitedCompactionReads`).
- Added new unit test `DBRateLimiterOnWriteWALTest, AutoWalFlush`
- `strace -ftt -e trace=write ./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -rate_limit_auto_wal_flush=1 -rate_limiter_bytes_per_sec=15 -rate_limiter_refill_period_us=1000000 -write_buffer_size=100000000 -disable_auto_compactions=1 -num=100`
   - verified that WAL flush(i.e, system-call _write_) were chunked into 15 bytes and each _write_ was roughly 1 second apart
   - verified the chunking disappeared when `-rate_limit_auto_wal_flush=0`
- crash test: `python3 tools/db_crashtest.py blackbox --disable_wal=0  --rate_limit_auto_wal_flush=1 --rate_limiter_bytes_per_sec=10485760 --interval=10` killed as normal

**Benchmarked on flush/compaction to ensure no performance regression:**
- compaction with rate-limiting  (see table 1, avg over 1280-run):  pre-change: **915635 micros/op**; post-change:
   **907350 micros/op (improved by 0.106%)**
```
#!/bin/bash
TEST_TMPDIR=/dev/shm/testdb
START=1
NUM_DATA_ENTRY=8
N=10

rm -f compact_bmk_output.txt compact_bmk_output_2.txt dont_care_output.txt
for i in $(eval echo "{$START..$NUM_DATA_ENTRY}")
do
    NUM_RUN=$(($N*(2**($i-1))))
    for j in $(eval echo "{$START..$NUM_RUN}")
    do
       ./db_bench --benchmarks=fillrandom -db=$TEST_TMPDIR -disable_auto_compactions=1 -write_buffer_size=6710886 > dont_care_output.txt && ./db_bench --benchmarks=compact -use_existing_db=1 -db=$TEST_TMPDIR -level0_file_num_compaction_trigger=1 -rate_limiter_bytes_per_sec=100000000 | egrep 'compact'
    done > compact_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' compact_bmk_output.txt >> compact_bmk_output_2.txt
done
```
- compaction w/o rate-limiting  (see table 2, avg over 640-run):  pre-change: **822197 micros/op**; post-change: **823148 micros/op (regressed by 0.12%)**
```
Same as above script, except that -rate_limiter_bytes_per_sec=0
```
- flush with rate-limiting (see table 3, avg over 320-run, run on the [patch](ee5c6023a9) to augment current db_bench ): pre-change: **745752 micros/op**; post-change: **745331 micros/op (regressed by 0.06 %)**
```
 #!/bin/bash
TEST_TMPDIR=/dev/shm/testdb
START=1
NUM_DATA_ENTRY=8
N=10

rm -f flush_bmk_output.txt flush_bmk_output_2.txt

for i in $(eval echo "{$START..$NUM_DATA_ENTRY}")
do
    NUM_RUN=$(($N*(2**($i-1))))
    for j in $(eval echo "{$START..$NUM_RUN}")
    do
       ./db_bench -db=$TEST_TMPDIR -write_buffer_size=1048576000 -num=1000000 -rate_limiter_bytes_per_sec=100000000 -benchmarks=fillseq,flush | egrep 'flush'
    done > flush_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' flush_bmk_output.txt >> flush_bmk_output_2.txt
done

```
- flush w/o rate-limiting (see table 4, avg over 320-run, run on the [patch](ee5c6023a9) to augment current db_bench): pre-change: **487512 micros/op**, post-change: **485856 micors/ops (improved by 0.34%)**
```
Same as above script, except that -rate_limiter_bytes_per_sec=0
```

| table 1 - compact with rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change)  avg micros/op | std micros/op | change in avg micros/op  (%)
-- | -- | -- | -- | -- | --
10 | 896978 | 16046.9 | 901242 | 15670.9 | 0.475373978
20 | 893718 | 15813 | 886505 | 17544.7 | -0.8070778478
40 | 900426 | 23882.2 | 894958 | 15104.5 | -0.6072681153
80 | 906635 | 21761.5 | 903332 | 23948.3 | -0.3643141948
160 | 898632 | 21098.9 | 907583 | 21145 | 0.9960695813
3.20E+02 | 905252 | 22785.5 | 908106 | 25325.5 | 0.3152713278
6.40E+02 | 905213 | 23598.6 | 906741 | 21370.5 | 0.1688000504
**1.28E+03** | **908316** | **23533.1** | **907350** | **24626.8** | **-0.1063506533**
average over #-run | 901896.25 | 21064.9625 | 901977.125 | 20592.025 | 0.008967217682

| table 2 - compact w/o rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change)  avg micros/op | std micros/op | change in avg micros/op  (%)
-- | -- | -- | -- | -- | --
10 | 811211 | 26996.7 | 807586 | 28456.4 | -0.4468627768
20 | 815465 | 14803.7 | 814608 | 28719.7 | -0.105093413
40 | 809203 | 26187.1 | 797835 | 25492.1 | -1.404839082
80 | 822088 | 28765.3 | 822192 | 32840.4 | 0.01265071379
160 | 821719 | 36344.7 | 821664 | 29544.9 | -0.006693285661
3.20E+02 | 820921 | 27756.4 | 821403 | 28347.7 | 0.05871454135
**6.40E+02** | **822197** | **28960.6** | **823148** | **30055.1** | **0.1156657103**
average over #-run | 8.18E+05 | 2.71E+04 | 8.15E+05 | 2.91E+04 |  -0.25

| table 3 - flush with rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change)  avg micros/op | std micros/op | change in avg micros/op  (%)
-- | -- | -- | -- | -- | --
10 | 741721 | 11770.8 | 740345 | 5949.76 | -0.1855144994
20 | 735169 | 3561.83 | 743199 | 9755.77 | 1.09226586
40 | 743368 | 8891.03 | 742102 | 8683.22 | -0.1703059588
80 | 742129 | 8148.51 | 743417 | 9631.58| 0.1735547324
160 | 749045 | 9757.21 | 746256 | 9191.86 | -0.3723407806
**3.20E+02** | **745752** | **9819.65** | **745331** | **9840.62** | **-0.0564530836**
6.40E+02 | 749006 | 11080.5 | 748173 | 10578.7 | -0.1112140624
average over #-run | 743741.4286 | 9004.218571 | 744117.5714 | 9090.215714 | 0.05057441238

| table 4 - flush w/o rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change)  avg micros/op | std micros/op | change in avg micros/op (%)
-- | -- | -- | -- | -- | --
10 | 477283 | 24719.6 | 473864 | 12379 | -0.7163464863
20 | 486743 | 20175.2 | 502296 | 23931.3 | 3.195320734
40 | 482846 | 15309.2 | 489820 | 22259.5 | 1.444352858
80 | 491490 | 21883.1 | 490071 | 23085.7 | -0.2887139108
160 | 493347 | 28074.3 | 483609 | 21211.7 | -1.973864238
**3.20E+02** | **487512** | **21401.5** | **485856** | **22195.2** | **-0.3396839462**
6.40E+02 | 490307 | 25418.6 | 485435 | 22405.2 | -0.9936631539
average over #-run | 4.87E+05 | 2.24E+04 | 4.87E+05 | 2.11E+04 | 0.00E+00

Reviewed By: ajkr

Differential Revision: D34442441

Pulled By: hx235

fbshipit-source-id: 4790f13e1e5c0a95ae1d1cc93ffcf69dc6e78bdd
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent 27d6ef8e60
commit ca0ef54f16
  1. 1
      HISTORY.md
  2. 6
      db/db_impl/db_impl.h
  3. 3
      db/db_impl/db_impl_open.cc
  4. 23
      db/db_impl/db_impl_write.cc
  5. 206
      db/db_rate_limiter_test.cc
  6. 18
      db/log_writer.cc
  7. 8
      db/log_writer.h
  8. 5
      db/write_thread.cc
  9. 3
      db/write_thread.h
  10. 1
      db_stress_tool/db_stress_common.h
  11. 6
      db_stress_tool/db_stress_gflags.cc
  12. 6
      db_stress_tool/db_stress_test_base.cc
  13. 3
      db_stress_tool/no_batched_ops_stress.cc
  14. 90
      file/writable_file_writer.cc
  15. 24
      file/writable_file_writer.h
  16. 10
      include/rocksdb/env.h
  17. 11
      include/rocksdb/file_system.h
  18. 18
      include/rocksdb/options.h
  19. 8
      tools/db_bench_tool.cc

@ -13,6 +13,7 @@
### Public API changes ### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
* `options.compression_per_level` is dynamically changeable with `SetOptions()`. * `options.compression_per_level` is dynamically changeable with `SetOptions()`.
* Added `WriteOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for writes associated with the API to which the `WriteOptions` was provided. Currently the support covers automatic WAL flushes, which happen during live updates (`Put()`, `Write()`, `Delete()`, etc.) when `WriteOptions::disableWAL == false` and `DBOptions::manual_wal_flush == false`.
### Bug Fixes ### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread. * Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.

@ -1769,8 +1769,12 @@ class DBImpl : public DB {
WriteBatch* tmp_batch, size_t* write_with_wal, WriteBatch* tmp_batch, size_t* write_with_wal,
WriteBatch** to_be_cached_state); WriteBatch** to_be_cached_state);
// rate_limiter_priority is used to charge `DBOptions::rate_limiter`
// for automatic WAL flush (`Options::manual_wal_flush` == false)
// associated with this WriteToWAL
IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size); uint64_t* log_used, uint64_t* log_size,
Env::IOPriority rate_limiter_priority);
IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used, log::Writer* log_writer, uint64_t* log_used,

@ -1729,7 +1729,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
WriteOptions write_options; WriteOptions write_options;
uint64_t log_used, log_size; uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer; log::Writer* log_writer = impl->logs_.back().writer;
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
Env::IO_TOTAL);
if (s.ok()) { if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset. // Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false); s = impl->FlushWAL(false);

@ -132,6 +132,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) {
return Status::InvalidArgument("write batch must have timestamp(s) set"); return Status::InvalidArgument("write batch must have timestamp(s) set");
} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&
write_options.rate_limiter_priority != Env::IO_USER) {
return Status::InvalidArgument(
"WriteOptions::rate_limiter_priority only allows "
"Env::IO_TOTAL and Env::IO_USER due to implementation constraints");
} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&
(write_options.disableWAL || manual_wal_flush_)) {
return Status::InvalidArgument(
"WriteOptions::rate_limiter_priority currently only supports "
"rate-limiting automatic WAL flush, which requires "
"`WriteOptions::disableWAL` and "
"`DBOptions::manual_wal_flush` both set to false");
} }
// TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
// grabs but does not seem thread-safe. // grabs but does not seem thread-safe.
@ -1147,7 +1159,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
// write thread. Otherwise this must be called holding log_write_mutex_. // write thread. Otherwise this must be called holding log_write_mutex_.
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used, log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size) { uint64_t* log_size,
Env::IOPriority rate_limiter_priority) {
assert(log_size != nullptr); assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch); Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size(); *log_size = log_entry.size();
@ -1162,7 +1175,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (UNLIKELY(needs_locking)) { if (UNLIKELY(needs_locking)) {
log_write_mutex_.Lock(); log_write_mutex_.Lock();
} }
IOStatus io_s = log_writer->AddRecord(log_entry); IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority);
if (UNLIKELY(needs_locking)) { if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
@ -1200,7 +1213,8 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
WriteBatchInternal::SetSequence(merged_batch, sequence); WriteBatchInternal::SetSequence(merged_batch, sequence);
uint64_t log_size; uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
write_group.leader->rate_limiter_priority);
if (to_be_cached_state) { if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false; cached_recoverable_state_empty_ = false;
@ -1294,7 +1308,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
log::Writer* log_writer = logs_.back().writer; log::Writer* log_writer = logs_.back().writer;
uint64_t log_size; uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
write_group.leader->rate_limiter_priority);
if (to_be_cached_state) { if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false; cached_recoverable_state_empty_ = false;

@ -3,18 +3,26 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include <gtest/gtest.h>
#include <cstdint>
#include <string>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/testharness.h"
#include "util/file_checksum_helper.h" #include "util/file_checksum_helper.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class DBRateLimiterTest class DBRateLimiterOnReadTest
: public DBTestBase, : public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> { public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public: public:
DBRateLimiterTest() explicit DBRateLimiterOnReadTest()
: DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false), : DBTestBase("db_rate_limiter_on_read_test", /*env_do_fsync=*/false),
use_direct_io_(std::get<0>(GetParam())), use_direct_io_(std::get<0>(GetParam())),
use_block_cache_(std::get<1>(GetParam())), use_block_cache_(std::get<1>(GetParam())),
use_readahead_(std::get<2>(GetParam())) {} use_readahead_(std::get<2>(GetParam())) {}
@ -89,20 +97,20 @@ std::string GetTestNameSuffix(
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest,
::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Bool()), ::testing::Bool()),
GetTestNameSuffix); GetTestNameSuffix);
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
// Cannot use direct I/O in lite mode. // Cannot use direct I/O in lite mode.
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest,
::testing::Combine(::testing::Values(false), ::testing::Combine(::testing::Values(false),
::testing::Bool(), ::testing::Bool(),
::testing::Bool()), ::testing::Bool()),
GetTestNameSuffix); GetTestNameSuffix);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_P(DBRateLimiterTest, Get) { TEST_P(DBRateLimiterOnReadTest, Get) {
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
return; return;
} }
@ -130,7 +138,7 @@ TEST_P(DBRateLimiterTest, Get) {
} }
} }
TEST_P(DBRateLimiterTest, NewMultiGet) { TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
// The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not
// yet support rate limiting. // yet support rate limiting.
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
@ -161,7 +169,7 @@ TEST_P(DBRateLimiterTest, NewMultiGet) {
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
} }
TEST_P(DBRateLimiterTest, OldMultiGet) { TEST_P(DBRateLimiterOnReadTest, OldMultiGet) {
// The old `vector<Status>`-returning `MultiGet()` APIs use `Read()`, which // The old `vector<Status>`-returning `MultiGet()` APIs use `Read()`, which
// supports rate limiting. // supports rate limiting.
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
@ -193,7 +201,7 @@ TEST_P(DBRateLimiterTest, OldMultiGet) {
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
} }
TEST_P(DBRateLimiterTest, Iterator) { TEST_P(DBRateLimiterOnReadTest, Iterator) {
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
return; return;
} }
@ -223,7 +231,7 @@ TEST_P(DBRateLimiterTest, Iterator) {
#if !defined(ROCKSDB_LITE) #if !defined(ROCKSDB_LITE)
TEST_P(DBRateLimiterTest, VerifyChecksum) { TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) {
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
return; return;
} }
@ -237,7 +245,7 @@ TEST_P(DBRateLimiterTest, VerifyChecksum) {
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
} }
TEST_P(DBRateLimiterTest, VerifyFileChecksums) { TEST_P(DBRateLimiterOnReadTest, VerifyFileChecksums) {
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
return; return;
} }
@ -253,6 +261,182 @@ TEST_P(DBRateLimiterTest, VerifyFileChecksums) {
#endif // !defined(ROCKSDB_LITE) #endif // !defined(ROCKSDB_LITE)
class DBRateLimiterOnWriteTest : public DBTestBase {
public:
explicit DBRateLimiterOnWriteTest()
: DBTestBase("db_rate_limiter_on_write_test", /*env_do_fsync=*/false) {}
void Init() {
options_ = GetOptions();
ASSERT_OK(TryReopenWithColumnFamilies({"default"}, options_));
Random rnd(301);
for (int i = 0; i < kNumFiles; i++) {
ASSERT_OK(Put(0, kStartKey, rnd.RandomString(2)));
ASSERT_OK(Put(0, kEndKey, rnd.RandomString(2)));
ASSERT_OK(Flush(0));
}
}
Options GetOptions() {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.rate_limiter.reset(NewGenericRateLimiter(
1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kWritesOnly));
options.table_factory.reset(
NewBlockBasedTableFactory(BlockBasedTableOptions()));
return options;
}
protected:
inline const static int64_t kNumFiles = 3;
inline const static std::string kStartKey = "a";
inline const static std::string kEndKey = "b";
Options options_;
};
TEST_F(DBRateLimiterOnWriteTest, Flush) {
std::int64_t prev_total_request = 0;
Init();
std::int64_t actual_flush_request =
options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) -
prev_total_request;
std::int64_t exepcted_flush_request = kNumFiles;
EXPECT_EQ(actual_flush_request, exepcted_flush_request);
EXPECT_EQ(actual_flush_request,
options_.rate_limiter->GetTotalRequests(Env::IO_HIGH));
}
TEST_F(DBRateLimiterOnWriteTest, Compact) {
Init();
// Pre-comaction:
// level-0 : `kNumFiles` SST files overlapping on [kStartKey, kEndKey]
#ifndef ROCKSDB_LITE
std::string files_per_level_pre_compaction = std::to_string(kNumFiles);
ASSERT_EQ(files_per_level_pre_compaction, FilesPerLevel(0 /* cf */));
#endif // !ROCKSDB_LITE
std::int64_t prev_total_request =
options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL);
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_LOW));
Compact(kStartKey, kEndKey);
std::int64_t actual_compaction_request =
options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) -
prev_total_request;
// Post-comaction:
// level-0 : 0 SST file
// level-1 : 1 SST file
#ifndef ROCKSDB_LITE
std::string files_per_level_post_compaction = "0,1";
ASSERT_EQ(files_per_level_post_compaction, FilesPerLevel(0 /* cf */));
#endif // !ROCKSDB_LITE
std::int64_t exepcted_compaction_request = 1;
EXPECT_EQ(actual_compaction_request, exepcted_compaction_request);
EXPECT_EQ(actual_compaction_request,
options_.rate_limiter->GetTotalRequests(Env::IO_LOW));
}
class DBRateLimiterOnWriteWALTest
: public DBRateLimiterOnWriteTest,
public ::testing::WithParamInterface<std::tuple<
bool /* WriteOptions::disableWal */,
bool /* Options::manual_wal_flush */,
Env::IOPriority /* WriteOptions::rate_limiter_priority */>> {
public:
static std::string GetTestNameSuffix(
::testing::TestParamInfo<std::tuple<bool, bool, Env::IOPriority>> info) {
std::ostringstream oss;
if (std::get<0>(info.param)) {
oss << "DisableWAL";
} else {
oss << "EnableWAL";
}
if (std::get<1>(info.param)) {
oss << "_ManualWALFlush";
} else {
oss << "_AutoWALFlush";
}
if (std::get<2>(info.param) == Env::IO_USER) {
oss << "_RateLimitAutoWALFlush";
} else if (std::get<2>(info.param) == Env::IO_TOTAL) {
oss << "_NoRateLimitAutoWALFlush";
} else {
oss << "_RateLimitAutoWALFlushWithIncorrectPriority";
}
return oss.str();
}
explicit DBRateLimiterOnWriteWALTest()
: disable_wal_(std::get<0>(GetParam())),
manual_wal_flush_(std::get<1>(GetParam())),
rate_limiter_priority_(std::get<2>(GetParam())) {}
void Init() {
options_ = GetOptions();
options_.manual_wal_flush = manual_wal_flush_;
Reopen(options_);
}
WriteOptions GetWriteOptions() {
WriteOptions write_options;
write_options.disableWAL = disable_wal_;
write_options.rate_limiter_priority = rate_limiter_priority_;
return write_options;
}
protected:
bool disable_wal_;
bool manual_wal_flush_;
Env::IOPriority rate_limiter_priority_;
};
INSTANTIATE_TEST_CASE_P(
DBRateLimiterOnWriteWALTest, DBRateLimiterOnWriteWALTest,
::testing::Values(std::make_tuple(false, false, Env::IO_TOTAL),
std::make_tuple(false, false, Env::IO_USER),
std::make_tuple(false, false, Env::IO_HIGH),
std::make_tuple(false, true, Env::IO_USER),
std::make_tuple(true, false, Env::IO_USER)),
DBRateLimiterOnWriteWALTest::GetTestNameSuffix);
TEST_P(DBRateLimiterOnWriteWALTest, AutoWalFlush) {
Init();
const bool no_rate_limit_auto_wal_flush =
(rate_limiter_priority_ == Env::IO_TOTAL);
const bool valid_arg = (rate_limiter_priority_ == Env::IO_USER &&
!disable_wal_ && !manual_wal_flush_);
std::int64_t prev_total_request =
options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL);
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
Status s = Put("foo", "v1", GetWriteOptions());
if (no_rate_limit_auto_wal_flush || valid_arg) {
EXPECT_TRUE(s.ok());
} else {
EXPECT_TRUE(s.IsInvalidArgument());
EXPECT_TRUE(s.ToString().find("WriteOptions::rate_limiter_priority") !=
std::string::npos);
}
std::int64_t actual_auto_wal_flush_request =
options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) -
prev_total_request;
std::int64_t expected_auto_wal_flush_request = valid_arg ? 1 : 0;
EXPECT_EQ(actual_auto_wal_flush_request, expected_auto_wal_flush_request);
EXPECT_EQ(actual_auto_wal_flush_request,
options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -50,7 +50,8 @@ IOStatus Writer::Close() {
return s; return s;
} }
IOStatus Writer::AddRecord(const Slice& slice) { IOStatus Writer::AddRecord(const Slice& slice,
Env::IOPriority rate_limiter_priority) {
const char* ptr = slice.data(); const char* ptr = slice.data();
size_t left = slice.size(); size_t left = slice.size();
@ -73,7 +74,8 @@ IOStatus Writer::AddRecord(const Slice& slice) {
// kRecyclableHeaderSize being <= 11) // kRecyclableHeaderSize being <= 11)
assert(header_size <= 11); assert(header_size <= 11);
s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
static_cast<size_t>(leftover))); static_cast<size_t>(leftover)),
0 /* crc32c_checksum */, rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -99,7 +101,7 @@ IOStatus Writer::AddRecord(const Slice& slice) {
type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
} }
s = EmitPhysicalRecord(type, ptr, fragment_length); s = EmitPhysicalRecord(type, ptr, fragment_length, rate_limiter_priority);
ptr += fragment_length; ptr += fragment_length;
left -= fragment_length; left -= fragment_length;
begin = false; begin = false;
@ -107,7 +109,7 @@ IOStatus Writer::AddRecord(const Slice& slice) {
if (s.ok()) { if (s.ok()) {
if (!manual_flush_) { if (!manual_flush_) {
s = dest_->Flush(); s = dest_->Flush(rate_limiter_priority);
} }
} }
@ -141,7 +143,8 @@ IOStatus Writer::AddCompressionTypeRecord() {
bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n,
Env::IOPriority rate_limiter_priority) {
assert(n <= 0xffff); // Must fit in two bytes assert(n <= 0xffff); // Must fit in two bytes
size_t header_size; size_t header_size;
@ -180,9 +183,10 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
EncodeFixed32(buf, crc); EncodeFixed32(buf, crc);
// Write the header and the payload // Write the header and the payload
IOStatus s = dest_->Append(Slice(buf, header_size)); IOStatus s = dest_->Append(Slice(buf, header_size), 0 /* crc32c_checksum */,
rate_limiter_priority);
if (s.ok()) { if (s.ok()) {
s = dest_->Append(Slice(ptr, n), payload_crc); s = dest_->Append(Slice(ptr, n), payload_crc, rate_limiter_priority);
} }
block_offset_ += header_size + n; block_offset_ += header_size + n;
return s; return s;

@ -13,6 +13,7 @@
#include "db/log_format.h" #include "db/log_format.h"
#include "rocksdb/compression_type.h" #include "rocksdb/compression_type.h"
#include "rocksdb/env.h"
#include "rocksdb/io_status.h" #include "rocksdb/io_status.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -81,7 +82,8 @@ class Writer {
~Writer(); ~Writer();
IOStatus AddRecord(const Slice& slice); IOStatus AddRecord(const Slice& slice,
Env::IOPriority rate_limiter_priority = Env::IO_TOTAL);
IOStatus AddCompressionTypeRecord(); IOStatus AddCompressionTypeRecord();
WritableFileWriter* file() { return dest_.get(); } WritableFileWriter* file() { return dest_.get(); }
@ -106,7 +108,9 @@ class Writer {
// record type stored in the header. // record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1]; uint32_t type_crc_[kMaxRecordType + 1];
IOStatus EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); IOStatus EmitPhysicalRecord(
RecordType type, const char* ptr, size_t length,
Env::IOPriority rate_limiter_priority = Env::IO_TOTAL);
// If true, it does not flush after each write. Instead it relies on the upper // If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::WriteBuffer() // layer to manually does the flush by calling ::WriteBuffer()

@ -471,6 +471,11 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
break; break;
} }
if (w->rate_limiter_priority != leader->rate_limiter_priority) {
// Do not mix writes with different rate limiter priorities.
break;
}
if (w->batch == nullptr) { if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes, // Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone // those are something else. They want to be alone

@ -117,6 +117,7 @@ class WriteThread {
bool sync; bool sync;
bool no_slowdown; bool no_slowdown;
bool disable_wal; bool disable_wal;
Env::IOPriority rate_limiter_priority;
bool disable_memtable; bool disable_memtable;
size_t batch_cnt; // if non-zero, number of sub-batches in the write batch size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
size_t protection_bytes_per_key; size_t protection_bytes_per_key;
@ -141,6 +142,7 @@ class WriteThread {
sync(false), sync(false),
no_slowdown(false), no_slowdown(false),
disable_wal(false), disable_wal(false),
rate_limiter_priority(Env::IOPriority::IO_TOTAL),
disable_memtable(false), disable_memtable(false),
batch_cnt(0), batch_cnt(0),
protection_bytes_per_key(0), protection_bytes_per_key(0),
@ -163,6 +165,7 @@ class WriteThread {
sync(write_options.sync), sync(write_options.sync),
no_slowdown(write_options.no_slowdown), no_slowdown(write_options.no_slowdown),
disable_wal(write_options.disableWAL), disable_wal(write_options.disableWAL),
rate_limiter_priority(write_options.rate_limiter_priority),
disable_memtable(_disable_memtable), disable_memtable(_disable_memtable),
batch_cnt(_batch_cnt), batch_cnt(_batch_cnt),
protection_bytes_per_key(_batch->GetProtectionBytesPerKey()), protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),

@ -176,6 +176,7 @@ DECLARE_int32(range_deletion_width);
DECLARE_uint64(rate_limiter_bytes_per_sec); DECLARE_uint64(rate_limiter_bytes_per_sec);
DECLARE_bool(rate_limit_bg_reads); DECLARE_bool(rate_limit_bg_reads);
DECLARE_bool(rate_limit_user_ops); DECLARE_bool(rate_limit_user_ops);
DECLARE_bool(rate_limit_auto_wal_flush);
DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_sec);
DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_uint64(sst_file_manager_bytes_per_truncate);
DECLARE_bool(use_txn); DECLARE_bool(use_txn);

@ -550,6 +550,12 @@ DEFINE_bool(rate_limit_user_ops, false,
"When true use Env::IO_USER priority level to charge internal rate " "When true use Env::IO_USER priority level to charge internal rate "
"limiter for reads associated with user operations."); "limiter for reads associated with user operations.");
DEFINE_bool(rate_limit_auto_wal_flush, false,
"When true use Env::IO_USER priority level to charge internal rate "
"limiter for automatic WAL flush (`Options::manual_wal_flush` == "
"false) after the user "
"write operation.");
DEFINE_uint64(sst_file_manager_bytes_per_sec, 0, DEFINE_uint64(sst_file_manager_bytes_per_sec, 0,
"Set `Options::sst_file_manager` to delete at this rate. By " "Set `Options::sst_file_manager` to delete at this rate. By "
"default the deletion rate is unbounded."); "default the deletion rate is unbounded.");

@ -485,6 +485,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
if (FLAGS_sync) { if (FLAGS_sync) {
write_opts.sync = true; write_opts.sync = true;
} }
if (FLAGS_rate_limit_auto_wal_flush) {
write_opts.rate_limiter_priority = Env::IO_USER;
}
char value[100]; char value[100];
int cf_idx = 0; int cf_idx = 0;
Status s; Status s;
@ -640,6 +643,9 @@ void StressTest::OperateDb(ThreadState* thread) {
read_opts.rate_limiter_priority = read_opts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
WriteOptions write_opts; WriteOptions write_opts;
if (FLAGS_rate_limit_auto_wal_flush) {
write_opts.rate_limiter_priority = Env::IO_USER;
}
auto shared = thread->shared; auto shared = thread->shared;
char value[100]; char value[100];
std::string from_db; std::string from_db;

@ -271,6 +271,9 @@ class NonBatchedOpsStressTest : public StressTest {
Transaction* txn = nullptr; Transaction* txn = nullptr;
if (use_txn) { if (use_txn) {
WriteOptions wo; WriteOptions wo;
if (FLAGS_rate_limit_auto_wal_flush) {
wo.rate_limiter_priority = Env::IO_USER;
}
Status s = NewTxn(wo, &txn); Status s = NewTxn(wo, &txn);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str()); fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());

@ -41,8 +41,8 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
return io_s; return io_s;
} }
IOStatus WritableFileWriter::Append(const Slice& data, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
uint32_t crc32c_checksum) { Env::IOPriority op_rate_limiter_priority) {
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();
IOStatus s; IOStatus s;
@ -79,7 +79,7 @@ IOStatus WritableFileWriter::Append(const Slice& data,
// Flush only when buffered I/O // Flush only when buffered I/O
if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) { if (buf_.CurrentSize() > 0) {
s = Flush(); s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -109,7 +109,7 @@ IOStatus WritableFileWriter::Append(const Slice& data,
src += appended; src += appended;
if (left > 0) { if (left > 0) {
s = Flush(); s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -119,7 +119,7 @@ IOStatus WritableFileWriter::Append(const Slice& data,
} else { } else {
assert(buf_.CurrentSize() == 0); assert(buf_.CurrentSize() == 0);
buffered_data_crc32c_checksum_ = crc32c_checksum; buffered_data_crc32c_checksum_ = crc32c_checksum;
s = WriteBufferedWithChecksum(src, left); s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority);
} }
} else { } else {
// In this case, either we do not need to do the data verification or // In this case, either we do not need to do the data verification or
@ -139,7 +139,7 @@ IOStatus WritableFileWriter::Append(const Slice& data,
src += appended; src += appended;
if (left > 0) { if (left > 0) {
s = Flush(); s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -150,9 +150,9 @@ IOStatus WritableFileWriter::Append(const Slice& data,
assert(buf_.CurrentSize() == 0); assert(buf_.CurrentSize() == 0);
if (perform_data_verification_ && buffered_data_with_checksum_) { if (perform_data_verification_ && buffered_data_with_checksum_) {
buffered_data_crc32c_checksum_ = crc32c::Value(src, left); buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
s = WriteBufferedWithChecksum(src, left); s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority);
} else { } else {
s = WriteBuffered(src, left); s = WriteBuffered(src, left, op_rate_limiter_priority);
} }
} }
} }
@ -164,7 +164,8 @@ IOStatus WritableFileWriter::Append(const Slice& data,
return s; return s;
} }
IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
Env::IOPriority op_rate_limiter_priority) {
assert(pad_bytes < kDefaultPageSize); assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes; size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize(); size_t cap = buf_.Capacity() - buf_.CurrentSize();
@ -178,7 +179,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
buf_.PadWith(append_bytes, 0); buf_.PadWith(append_bytes, 0);
left -= append_bytes; left -= append_bytes;
if (left > 0) { if (left > 0) {
IOStatus s = Flush(); IOStatus s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -294,7 +295,7 @@ IOStatus WritableFileWriter::Close() {
// write out the cached data to the OS cache or storage if direct I/O // write out the cached data to the OS cache or storage if direct I/O
// enabled // enabled
IOStatus WritableFileWriter::Flush() { IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
IOStatus s; IOStatus s;
TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
@ -303,17 +304,19 @@ IOStatus WritableFileWriter::Flush() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (pending_sync_) { if (pending_sync_) {
if (perform_data_verification_ && buffered_data_with_checksum_) { if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteDirectWithChecksum(); s = WriteDirectWithChecksum(op_rate_limiter_priority);
} else { } else {
s = WriteDirect(); s = WriteDirect(op_rate_limiter_priority);
} }
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} else { } else {
if (perform_data_verification_ && buffered_data_with_checksum_) { if (perform_data_verification_ && buffered_data_with_checksum_) {
s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize()); s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize(),
op_rate_limiter_priority);
} else { } else {
s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize(),
op_rate_limiter_priority);
} }
} }
if (!s.ok()) { if (!s.ok()) {
@ -479,7 +482,8 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
// This method writes to disk the specified data and makes use of the rate // This method writes to disk the specified data and makes use of the rate
// limiter if available // limiter if available
IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { IOStatus WritableFileWriter::WriteBuffered(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
IOStatus s; IOStatus s;
assert(!use_direct_io()); assert(!use_direct_io());
const char* src = data; const char* src = data;
@ -489,9 +493,13 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
while (left > 0) { while (left > 0) {
size_t allowed; size_t allowed;
if (rate_limiter_ != nullptr) { Env::IOPriority rate_limiter_priority_used =
allowed = rate_limiter_->RequestToken( WritableFileWriter::DecideRateLimiterPriority(
left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
} else { } else {
allowed = left; allowed = left;
@ -562,8 +570,8 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
return s; return s;
} }
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, IOStatus WritableFileWriter::WriteBufferedWithChecksum(
size_t size) { const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
IOStatus s; IOStatus s;
assert(!use_direct_io()); assert(!use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_); assert(perform_data_verification_ && buffered_data_with_checksum_);
@ -577,11 +585,14 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data,
// TODO: need to be improved since it sort of defeats the purpose of the rate // TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter // limiter
size_t data_size = left; size_t data_size = left;
if (rate_limiter_ != nullptr) { Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { while (data_size > 0) {
size_t tmp_size; size_t tmp_size;
tmp_size = rate_limiter_->RequestToken( tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
data_size -= tmp_size; data_size -= tmp_size;
} }
@ -674,7 +685,8 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
// only write on aligned // only write on aligned
// offsets. // offsets.
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
IOStatus WritableFileWriter::WriteDirect() { IOStatus WritableFileWriter::WriteDirect(
Env::IOPriority op_rate_limiter_priority) {
assert(use_direct_io()); assert(use_direct_io());
IOStatus s; IOStatus s;
const size_t alignment = buf_.Alignment(); const size_t alignment = buf_.Alignment();
@ -701,7 +713,11 @@ IOStatus WritableFileWriter::WriteDirect() {
while (left > 0) { while (left > 0) {
// Check how much is allowed // Check how much is allowed
size_t size; size_t size;
if (rate_limiter_ != nullptr) { Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(), size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(), writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite); stats_, RateLimiter::OpType::kWrite);
@ -762,7 +778,8 @@ IOStatus WritableFileWriter::WriteDirect() {
return s; return s;
} }
IOStatus WritableFileWriter::WriteDirectWithChecksum() { IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority op_rate_limiter_priority) {
assert(use_direct_io()); assert(use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_); assert(perform_data_verification_ && buffered_data_with_checksum_);
IOStatus s; IOStatus s;
@ -798,7 +815,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() {
// TODO: need to be improved since it sort of defeats the purpose of the rate // TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter // limiter
size_t data_size = left; size_t data_size = left;
if (rate_limiter_ != nullptr) { Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { while (data_size > 0) {
size_t size; size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
@ -860,4 +880,18 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() {
return s; return s;
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority) {
if (writable_file_io_priority == Env::IO_TOTAL &&
op_rate_limiter_priority == Env::IO_TOTAL) {
return Env::IO_TOTAL;
} else if (writable_file_io_priority == Env::IO_TOTAL) {
return op_rate_limiter_priority;
} else if (op_rate_limiter_priority == Env::IO_TOTAL) {
return writable_file_io_priority;
} else {
return op_rate_limiter_priority;
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -234,11 +234,13 @@ class WritableFileWriter {
// When this Append API is called, if the crc32c_checksum is not provided, we // When this Append API is called, if the crc32c_checksum is not provided, we
// will calculate the checksum internally. // will calculate the checksum internally.
IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0); IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0,
Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL);
IOStatus Pad(const size_t pad_bytes); IOStatus Pad(const size_t pad_bytes,
Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL);
IOStatus Flush(); IOStatus Flush(Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL);
IOStatus Close(); IOStatus Close();
@ -271,15 +273,21 @@ class WritableFileWriter {
const char* GetFileChecksumFuncName() const; const char* GetFileChecksumFuncName() const;
private: private:
static Env::IOPriority DecideRateLimiterPriority(
Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority);
// Used when os buffering is OFF and we are writing // Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode // DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
IOStatus WriteDirect(); IOStatus WriteDirect(Env::IOPriority op_rate_limiter_priority);
IOStatus WriteDirectWithChecksum(); IOStatus WriteDirectWithChecksum(Env::IOPriority op_rate_limiter_priority);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
// Normal write // Normal write.
IOStatus WriteBuffered(const char* data, size_t size); IOStatus WriteBuffered(const char* data, size_t size,
IOStatus WriteBufferedWithChecksum(const char* data, size_t size); Env::IOPriority op_rate_limiter_priority);
IOStatus WriteBufferedWithChecksum(const char* data, size_t size,
Env::IOPriority op_rate_limiter_priority);
IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
IOStatus SyncInternal(bool use_fsync); IOStatus SyncInternal(bool use_fsync);
}; };

@ -959,8 +959,16 @@ class WritableFile {
// Use the returned alignment value to allocate // Use the returned alignment value to allocate
// aligned buffer for Direct I/O // aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
/* /*
* Change the priority in rate limiter if rate limiting is enabled. * If rate limiting is enabled, change the file-granularity priority used in
* rate-limiting writes.
*
* In the presence of finer-granularity priority such as
* `WriteOptions::rate_limiter_priority`, this file-granularity priority may
* be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as
* a fallback for Env::IO_TOTAL finer-granularity priority.
*
* If rate limiting is not enabled, this call has no effect. * If rate limiting is not enabled, this call has no effect.
*/ */
virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; }

@ -1031,6 +1031,17 @@ class FSWritableFile {
write_hint_ = hint; write_hint_ = hint;
} }
/*
* If rate limiting is enabled, change the file-granularity priority used in
* rate-limiting writes.
*
* In the presence of finer-granularity priority such as
* `WriteOptions::rate_limiter_priority`, this file-granularity priority may
* be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as
* a fallback for Env::IO_TOTAL finer-granularity priority.
*
* If rate limiting is not enabled, this call has no effect.
*/
virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; }
virtual Env::IOPriority GetIOPriority() { return io_priority_; } virtual Env::IOPriority GetIOPriority() { return io_priority_; }

@ -1659,13 +1659,29 @@ struct WriteOptions {
// Default: false // Default: false
bool memtable_insert_hint_per_batch; bool memtable_insert_hint_per_batch;
// For writes associated with this option, charge the internal rate
// limiter (see `DBOptions::rate_limiter`) at the specified priority. The
// special value `Env::IO_TOTAL` disables charging the rate limiter.
//
// Currently the support covers automatic WAL flushes, which happen during
// live updates (`Put()`, `Write()`, `Delete()`, etc.)
// when `WriteOptions::disableWAL == false`
// and `DBOptions::manual_wal_flush == false`.
//
// Only `Env::IO_USER` and `Env::IO_TOTAL` are allowed
// due to implementation constraints.
//
// Default: `Env::IO_TOTAL`
Env::IOPriority rate_limiter_priority;
WriteOptions() WriteOptions()
: sync(false), : sync(false),
disableWAL(false), disableWAL(false),
ignore_missing_column_families(false), ignore_missing_column_families(false),
no_slowdown(false), no_slowdown(false),
low_pri(false), low_pri(false),
memtable_insert_hint_per_batch(false) {} memtable_insert_hint_per_batch(false),
rate_limiter_priority(Env::IO_TOTAL) {}
}; };
// Options that control flush operations // Options that control flush operations

@ -1118,6 +1118,12 @@ DEFINE_bool(file_checksum, false,
"When true use FileChecksumGenCrc32cFactory for " "When true use FileChecksumGenCrc32cFactory for "
"file_checksum_gen_factory."); "file_checksum_gen_factory.");
DEFINE_bool(rate_limit_auto_wal_flush, false,
"When true use Env::IO_USER priority level to charge internal rate "
"limiter for automatic WAL flush (`Options::manual_wal_flush` == "
"false) after the user "
"write operation");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
assert(ctype); assert(ctype);
@ -3100,6 +3106,8 @@ class Benchmark {
write_options_.sync = true; write_options_.sync = true;
} }
write_options_.disableWAL = FLAGS_disable_wal; write_options_.disableWAL = FLAGS_disable_wal;
write_options_.rate_limiter_priority =
FLAGS_rate_limit_auto_wal_flush ? Env::IO_USER : Env::IO_TOTAL;
read_options_ = ReadOptions(FLAGS_verify_checksum, true); read_options_ = ReadOptions(FLAGS_verify_checksum, true);
read_options_.total_order_seek = FLAGS_total_order_seek; read_options_.total_order_seek = FLAGS_total_order_seek;
read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start; read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start;

Loading…
Cancel
Save