diff --git a/HISTORY.md b/HISTORY.md index 52cba2724..03720b69f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. * `options.compression_per_level` is dynamically changeable with `SetOptions()`. +* Added `WriteOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for writes associated with the API to which the `WriteOptions` was provided. Currently the support covers automatic WAL flushes, which happen during live updates (`Put()`, `Write()`, `Delete()`, etc.) when `WriteOptions::disableWAL == false` and `DBOptions::manual_wal_flush == false`. ### Bug Fixes * Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d1e95c75a..c28248708 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1769,8 +1769,12 @@ class DBImpl : public DB { WriteBatch* tmp_batch, size_t* write_with_wal, WriteBatch** to_be_cached_state); + // rate_limiter_priority is used to charge `DBOptions::rate_limiter` + // for automatic WAL flush (`Options::manual_wal_flush` == false) + // associated with this WriteToWAL IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, - uint64_t* log_used, uint64_t* log_size); + uint64_t* log_used, uint64_t* log_size, + Env::IOPriority rate_limiter_priority); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ee79eb9f0..5dd8be655 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1729,7 +1729,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, WriteOptions write_options; uint64_t log_used, log_size; log::Writer* log_writer = impl->logs_.back().writer; - s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); + s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, + Env::IO_TOTAL); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a5bd89a7e..61910ede4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -132,6 +132,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::Corruption("Batch is nullptr!"); } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { return Status::InvalidArgument("write batch must have timestamp(s) set"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + write_options.rate_limiter_priority != Env::IO_USER) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority only allows " + "Env::IO_TOTAL and Env::IO_USER due to implementation constraints"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + (write_options.disableWAL || manual_wal_flush_)) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority currently only supports " + "rate-limiting automatic WAL flush, which requires " + "`WriteOptions::disableWAL` and " + "`DBOptions::manual_wal_flush` both set to false"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -1147,7 +1159,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // write thread. Otherwise this must be called holding log_write_mutex_. IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, - uint64_t* log_size) { + uint64_t* log_size, + Env::IOPriority rate_limiter_priority) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); @@ -1162,7 +1175,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } - IOStatus io_s = log_writer->AddRecord(log_entry); + IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); @@ -1200,7 +1213,8 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatchInternal::SetSequence(merged_batch, sequence); uint64_t log_size; - io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1294,7 +1308,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL( log::Writer* log_writer = logs_.back().writer; uint64_t log_size; - io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; diff --git a/db/db_rate_limiter_test.cc b/db/db_rate_limiter_test.cc index 7b200add1..f30af1974 100644 --- a/db/db_rate_limiter_test.cc +++ b/db/db_rate_limiter_test.cc @@ -3,18 +3,26 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include + +#include +#include + #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "test_util/testharness.h" #include "util/file_checksum_helper.h" namespace ROCKSDB_NAMESPACE { -class DBRateLimiterTest +class DBRateLimiterOnReadTest : public DBTestBase, public ::testing::WithParamInterface> { public: - DBRateLimiterTest() - : DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false), + explicit DBRateLimiterOnReadTest() + : DBTestBase("db_rate_limiter_on_read_test", /*env_do_fsync=*/false), use_direct_io_(std::get<0>(GetParam())), use_block_cache_(std::get<1>(GetParam())), use_readahead_(std::get<2>(GetParam())) {} @@ -89,20 +97,20 @@ std::string GetTestNameSuffix( } #ifndef ROCKSDB_LITE -INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, +INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest, ::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Bool()), GetTestNameSuffix); #else // ROCKSDB_LITE // Cannot use direct I/O in lite mode. -INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, +INSTANTIATE_TEST_CASE_P(DBRateLimiterOnReadTest, DBRateLimiterOnReadTest, ::testing::Combine(::testing::Values(false), ::testing::Bool(), ::testing::Bool()), GetTestNameSuffix); #endif // ROCKSDB_LITE -TEST_P(DBRateLimiterTest, Get) { +TEST_P(DBRateLimiterOnReadTest, Get) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -130,7 +138,7 @@ TEST_P(DBRateLimiterTest, Get) { } } -TEST_P(DBRateLimiterTest, NewMultiGet) { +TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not // yet support rate limiting. if (use_direct_io_ && !IsDirectIOSupported()) { @@ -161,7 +169,7 @@ TEST_P(DBRateLimiterTest, NewMultiGet) { ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, OldMultiGet) { +TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { // The old `vector`-returning `MultiGet()` APIs use `Read()`, which // supports rate limiting. if (use_direct_io_ && !IsDirectIOSupported()) { @@ -193,7 +201,7 @@ TEST_P(DBRateLimiterTest, OldMultiGet) { ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, Iterator) { +TEST_P(DBRateLimiterOnReadTest, Iterator) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -223,7 +231,7 @@ TEST_P(DBRateLimiterTest, Iterator) { #if !defined(ROCKSDB_LITE) -TEST_P(DBRateLimiterTest, VerifyChecksum) { +TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -237,7 +245,7 @@ TEST_P(DBRateLimiterTest, VerifyChecksum) { ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } -TEST_P(DBRateLimiterTest, VerifyFileChecksums) { +TEST_P(DBRateLimiterOnReadTest, VerifyFileChecksums) { if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -253,6 +261,182 @@ TEST_P(DBRateLimiterTest, VerifyFileChecksums) { #endif // !defined(ROCKSDB_LITE) +class DBRateLimiterOnWriteTest : public DBTestBase { + public: + explicit DBRateLimiterOnWriteTest() + : DBTestBase("db_rate_limiter_on_write_test", /*env_do_fsync=*/false) {} + + void Init() { + options_ = GetOptions(); + ASSERT_OK(TryReopenWithColumnFamilies({"default"}, options_)); + Random rnd(301); + for (int i = 0; i < kNumFiles; i++) { + ASSERT_OK(Put(0, kStartKey, rnd.RandomString(2))); + ASSERT_OK(Put(0, kEndKey, rnd.RandomString(2))); + ASSERT_OK(Flush(0)); + } + } + + Options GetOptions() { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.rate_limiter.reset(NewGenericRateLimiter( + 1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kWritesOnly)); + options.table_factory.reset( + NewBlockBasedTableFactory(BlockBasedTableOptions())); + return options; + } + + protected: + inline const static int64_t kNumFiles = 3; + inline const static std::string kStartKey = "a"; + inline const static std::string kEndKey = "b"; + Options options_; +}; + +TEST_F(DBRateLimiterOnWriteTest, Flush) { + std::int64_t prev_total_request = 0; + + Init(); + + std::int64_t actual_flush_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + std::int64_t exepcted_flush_request = kNumFiles; + EXPECT_EQ(actual_flush_request, exepcted_flush_request); + EXPECT_EQ(actual_flush_request, + options_.rate_limiter->GetTotalRequests(Env::IO_HIGH)); +} + +TEST_F(DBRateLimiterOnWriteTest, Compact) { + Init(); + + // Pre-comaction: + // level-0 : `kNumFiles` SST files overlapping on [kStartKey, kEndKey] +#ifndef ROCKSDB_LITE + std::string files_per_level_pre_compaction = std::to_string(kNumFiles); + ASSERT_EQ(files_per_level_pre_compaction, FilesPerLevel(0 /* cf */)); +#endif // !ROCKSDB_LITE + + std::int64_t prev_total_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL); + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_LOW)); + + Compact(kStartKey, kEndKey); + + std::int64_t actual_compaction_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + + // Post-comaction: + // level-0 : 0 SST file + // level-1 : 1 SST file +#ifndef ROCKSDB_LITE + std::string files_per_level_post_compaction = "0,1"; + ASSERT_EQ(files_per_level_post_compaction, FilesPerLevel(0 /* cf */)); +#endif // !ROCKSDB_LITE + + std::int64_t exepcted_compaction_request = 1; + EXPECT_EQ(actual_compaction_request, exepcted_compaction_request); + EXPECT_EQ(actual_compaction_request, + options_.rate_limiter->GetTotalRequests(Env::IO_LOW)); +} + +class DBRateLimiterOnWriteWALTest + : public DBRateLimiterOnWriteTest, + public ::testing::WithParamInterface> { + public: + static std::string GetTestNameSuffix( + ::testing::TestParamInfo> info) { + std::ostringstream oss; + if (std::get<0>(info.param)) { + oss << "DisableWAL"; + } else { + oss << "EnableWAL"; + } + if (std::get<1>(info.param)) { + oss << "_ManualWALFlush"; + } else { + oss << "_AutoWALFlush"; + } + if (std::get<2>(info.param) == Env::IO_USER) { + oss << "_RateLimitAutoWALFlush"; + } else if (std::get<2>(info.param) == Env::IO_TOTAL) { + oss << "_NoRateLimitAutoWALFlush"; + } else { + oss << "_RateLimitAutoWALFlushWithIncorrectPriority"; + } + return oss.str(); + } + + explicit DBRateLimiterOnWriteWALTest() + : disable_wal_(std::get<0>(GetParam())), + manual_wal_flush_(std::get<1>(GetParam())), + rate_limiter_priority_(std::get<2>(GetParam())) {} + + void Init() { + options_ = GetOptions(); + options_.manual_wal_flush = manual_wal_flush_; + Reopen(options_); + } + + WriteOptions GetWriteOptions() { + WriteOptions write_options; + write_options.disableWAL = disable_wal_; + write_options.rate_limiter_priority = rate_limiter_priority_; + return write_options; + } + + protected: + bool disable_wal_; + bool manual_wal_flush_; + Env::IOPriority rate_limiter_priority_; +}; + +INSTANTIATE_TEST_CASE_P( + DBRateLimiterOnWriteWALTest, DBRateLimiterOnWriteWALTest, + ::testing::Values(std::make_tuple(false, false, Env::IO_TOTAL), + std::make_tuple(false, false, Env::IO_USER), + std::make_tuple(false, false, Env::IO_HIGH), + std::make_tuple(false, true, Env::IO_USER), + std::make_tuple(true, false, Env::IO_USER)), + DBRateLimiterOnWriteWALTest::GetTestNameSuffix); + +TEST_P(DBRateLimiterOnWriteWALTest, AutoWalFlush) { + Init(); + + const bool no_rate_limit_auto_wal_flush = + (rate_limiter_priority_ == Env::IO_TOTAL); + const bool valid_arg = (rate_limiter_priority_ == Env::IO_USER && + !disable_wal_ && !manual_wal_flush_); + + std::int64_t prev_total_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL); + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + Status s = Put("foo", "v1", GetWriteOptions()); + + if (no_rate_limit_auto_wal_flush || valid_arg) { + EXPECT_TRUE(s.ok()); + } else { + EXPECT_TRUE(s.IsInvalidArgument()); + EXPECT_TRUE(s.ToString().find("WriteOptions::rate_limiter_priority") != + std::string::npos); + } + + std::int64_t actual_auto_wal_flush_request = + options_.rate_limiter->GetTotalRequests(Env::IO_TOTAL) - + prev_total_request; + std::int64_t expected_auto_wal_flush_request = valid_arg ? 1 : 0; + + EXPECT_EQ(actual_auto_wal_flush_request, expected_auto_wal_flush_request); + EXPECT_EQ(actual_auto_wal_flush_request, + options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/log_writer.cc b/db/log_writer.cc index 410c634cc..77b82950a 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -50,7 +50,8 @@ IOStatus Writer::Close() { return s; } -IOStatus Writer::AddRecord(const Slice& slice) { +IOStatus Writer::AddRecord(const Slice& slice, + Env::IOPriority rate_limiter_priority) { const char* ptr = slice.data(); size_t left = slice.size(); @@ -73,7 +74,8 @@ IOStatus Writer::AddRecord(const Slice& slice) { // kRecyclableHeaderSize being <= 11) assert(header_size <= 11); s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", - static_cast(leftover))); + static_cast(leftover)), + 0 /* crc32c_checksum */, rate_limiter_priority); if (!s.ok()) { break; } @@ -99,7 +101,7 @@ IOStatus Writer::AddRecord(const Slice& slice) { type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; } - s = EmitPhysicalRecord(type, ptr, fragment_length); + s = EmitPhysicalRecord(type, ptr, fragment_length, rate_limiter_priority); ptr += fragment_length; left -= fragment_length; begin = false; @@ -107,7 +109,7 @@ IOStatus Writer::AddRecord(const Slice& slice) { if (s.ok()) { if (!manual_flush_) { - s = dest_->Flush(); + s = dest_->Flush(rate_limiter_priority); } } @@ -141,7 +143,8 @@ IOStatus Writer::AddCompressionTypeRecord() { bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } -IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { +IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, + Env::IOPriority rate_limiter_priority) { assert(n <= 0xffff); // Must fit in two bytes size_t header_size; @@ -180,9 +183,10 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { EncodeFixed32(buf, crc); // Write the header and the payload - IOStatus s = dest_->Append(Slice(buf, header_size)); + IOStatus s = dest_->Append(Slice(buf, header_size), 0 /* crc32c_checksum */, + rate_limiter_priority); if (s.ok()) { - s = dest_->Append(Slice(ptr, n), payload_crc); + s = dest_->Append(Slice(ptr, n), payload_crc, rate_limiter_priority); } block_offset_ += header_size + n; return s; diff --git a/db/log_writer.h b/db/log_writer.h index 8f60049db..ec0e4788e 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include "db/log_format.h" #include "rocksdb/compression_type.h" +#include "rocksdb/env.h" #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -81,7 +82,8 @@ class Writer { ~Writer(); - IOStatus AddRecord(const Slice& slice); + IOStatus AddRecord(const Slice& slice, + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); IOStatus AddCompressionTypeRecord(); WritableFileWriter* file() { return dest_.get(); } @@ -106,7 +108,9 @@ class Writer { // record type stored in the header. uint32_t type_crc_[kMaxRecordType + 1]; - IOStatus EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + IOStatus EmitPhysicalRecord( + RecordType type, const char* ptr, size_t length, + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() diff --git a/db/write_thread.cc b/db/write_thread.cc index ac3a2f869..d59eba263 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -471,6 +471,11 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, break; } + if (w->rate_limiter_priority != leader->rate_limiter_priority) { + // Do not mix writes with different rate limiter priorities. + break; + } + if (w->batch == nullptr) { // Do not include those writes with nullptr batch. Those are not writes, // those are something else. They want to be alone diff --git a/db/write_thread.h b/db/write_thread.h index ac2c8696d..af4d0967e 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -117,6 +117,7 @@ class WriteThread { bool sync; bool no_slowdown; bool disable_wal; + Env::IOPriority rate_limiter_priority; bool disable_memtable; size_t batch_cnt; // if non-zero, number of sub-batches in the write batch size_t protection_bytes_per_key; @@ -141,6 +142,7 @@ class WriteThread { sync(false), no_slowdown(false), disable_wal(false), + rate_limiter_priority(Env::IOPriority::IO_TOTAL), disable_memtable(false), batch_cnt(0), protection_bytes_per_key(0), @@ -163,6 +165,7 @@ class WriteThread { sync(write_options.sync), no_slowdown(write_options.no_slowdown), disable_wal(write_options.disableWAL), + rate_limiter_priority(write_options.rate_limiter_priority), disable_memtable(_disable_memtable), batch_cnt(_batch_cnt), protection_bytes_per_key(_batch->GetProtectionBytesPerKey()), diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 5f67b0b8e..137339f6c 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -176,6 +176,7 @@ DECLARE_int32(range_deletion_width); DECLARE_uint64(rate_limiter_bytes_per_sec); DECLARE_bool(rate_limit_bg_reads); DECLARE_bool(rate_limit_user_ops); +DECLARE_bool(rate_limit_auto_wal_flush); DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_bool(use_txn); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 4933b35a4..c973eeb55 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -550,6 +550,12 @@ DEFINE_bool(rate_limit_user_ops, false, "When true use Env::IO_USER priority level to charge internal rate " "limiter for reads associated with user operations."); +DEFINE_bool(rate_limit_auto_wal_flush, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for automatic WAL flush (`Options::manual_wal_flush` == " + "false) after the user " + "write operation."); + DEFINE_uint64(sst_file_manager_bytes_per_sec, 0, "Set `Options::sst_file_manager` to delete at this rate. By " "default the deletion rate is unbounded."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index c30a069ec..c2c793b9f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -485,6 +485,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, if (FLAGS_sync) { write_opts.sync = true; } + if (FLAGS_rate_limit_auto_wal_flush) { + write_opts.rate_limiter_priority = Env::IO_USER; + } char value[100]; int cf_idx = 0; Status s; @@ -640,6 +643,9 @@ void StressTest::OperateDb(ThreadState* thread) { read_opts.rate_limiter_priority = FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; WriteOptions write_opts; + if (FLAGS_rate_limit_auto_wal_flush) { + write_opts.rate_limiter_priority = Env::IO_USER; + } auto shared = thread->shared; char value[100]; std::string from_db; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 02a71e30a..b8b51d81e 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -271,6 +271,9 @@ class NonBatchedOpsStressTest : public StressTest { Transaction* txn = nullptr; if (use_txn) { WriteOptions wo; + if (FLAGS_rate_limit_auto_wal_flush) { + wo.rate_limiter_priority = Env::IO_USER; + } Status s = NewTxn(wo, &txn); if (!s.ok()) { fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str()); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index f345c6572..84be9b689 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -41,8 +41,8 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, return io_s; } -IOStatus WritableFileWriter::Append(const Slice& data, - uint32_t crc32c_checksum) { +IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, + Env::IOPriority op_rate_limiter_priority) { const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -79,7 +79,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, // Flush only when buffered I/O if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { return s; } @@ -109,7 +109,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, src += appended; if (left > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { break; } @@ -119,7 +119,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, } else { assert(buf_.CurrentSize() == 0); buffered_data_crc32c_checksum_ = crc32c_checksum; - s = WriteBufferedWithChecksum(src, left); + s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority); } } else { // In this case, either we do not need to do the data verification or @@ -139,7 +139,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, src += appended; if (left > 0) { - s = Flush(); + s = Flush(op_rate_limiter_priority); if (!s.ok()) { break; } @@ -150,9 +150,9 @@ IOStatus WritableFileWriter::Append(const Slice& data, assert(buf_.CurrentSize() == 0); if (perform_data_verification_ && buffered_data_with_checksum_) { buffered_data_crc32c_checksum_ = crc32c::Value(src, left); - s = WriteBufferedWithChecksum(src, left); + s = WriteBufferedWithChecksum(src, left, op_rate_limiter_priority); } else { - s = WriteBuffered(src, left); + s = WriteBuffered(src, left, op_rate_limiter_priority); } } } @@ -164,7 +164,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, return s; } -IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { +IOStatus WritableFileWriter::Pad(const size_t pad_bytes, + Env::IOPriority op_rate_limiter_priority) { assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); @@ -178,7 +179,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { buf_.PadWith(append_bytes, 0); left -= append_bytes; if (left > 0) { - IOStatus s = Flush(); + IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { return s; } @@ -294,7 +295,7 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled -IOStatus WritableFileWriter::Flush() { +IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { IOStatus s; TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); @@ -303,17 +304,19 @@ IOStatus WritableFileWriter::Flush() { #ifndef ROCKSDB_LITE if (pending_sync_) { if (perform_data_verification_ && buffered_data_with_checksum_) { - s = WriteDirectWithChecksum(); + s = WriteDirectWithChecksum(op_rate_limiter_priority); } else { - s = WriteDirect(); + s = WriteDirect(op_rate_limiter_priority); } } #endif // !ROCKSDB_LITE } else { if (perform_data_verification_ && buffered_data_with_checksum_) { - s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize()); + s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize(), + op_rate_limiter_priority); } else { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize(), + op_rate_limiter_priority); } } if (!s.ok()) { @@ -479,7 +482,8 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // This method writes to disk the specified data and makes use of the rate // limiter if available -IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { +IOStatus WritableFileWriter::WriteBuffered( + const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { IOStatus s; assert(!use_direct_io()); const char* src = data; @@ -489,10 +493,14 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { while (left > 0) { size_t allowed; - if (rate_limiter_ != nullptr) { - allowed = rate_limiter_->RequestToken( - left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, - RateLimiter::OpType::kWrite); + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && + rate_limiter_priority_used != Env::IO_TOTAL) { + allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); } else { allowed = left; } @@ -562,8 +570,8 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { return s; } -IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, - size_t size) { +IOStatus WritableFileWriter::WriteBufferedWithChecksum( + const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { IOStatus s; assert(!use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); @@ -577,12 +585,15 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t tmp_size; - tmp_size = rate_limiter_->RequestToken( - data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, - RateLimiter::OpType::kWrite); + tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); data_size -= tmp_size; } } @@ -674,7 +685,8 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, // only write on aligned // offsets. #ifndef ROCKSDB_LITE -IOStatus WritableFileWriter::WriteDirect() { +IOStatus WritableFileWriter::WriteDirect( + Env::IOPriority op_rate_limiter_priority) { assert(use_direct_io()); IOStatus s; const size_t alignment = buf_.Alignment(); @@ -701,7 +713,11 @@ IOStatus WritableFileWriter::WriteDirect() { while (left > 0) { // Check how much is allowed size_t size; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && + rate_limiter_priority_used != Env::IO_TOTAL) { size = rate_limiter_->RequestToken(left, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, RateLimiter::OpType::kWrite); @@ -762,7 +778,8 @@ IOStatus WritableFileWriter::WriteDirect() { return s; } -IOStatus WritableFileWriter::WriteDirectWithChecksum() { +IOStatus WritableFileWriter::WriteDirectWithChecksum( + Env::IOPriority op_rate_limiter_priority) { assert(use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); IOStatus s; @@ -798,7 +815,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - if (rate_limiter_ != nullptr) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t size; size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), @@ -860,4 +880,18 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { return s; } #endif // !ROCKSDB_LITE +Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( + Env::IOPriority writable_file_io_priority, + Env::IOPriority op_rate_limiter_priority) { + if (writable_file_io_priority == Env::IO_TOTAL && + op_rate_limiter_priority == Env::IO_TOTAL) { + return Env::IO_TOTAL; + } else if (writable_file_io_priority == Env::IO_TOTAL) { + return op_rate_limiter_priority; + } else if (op_rate_limiter_priority == Env::IO_TOTAL) { + return writable_file_io_priority; + } else { + return op_rate_limiter_priority; + } +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index ede71d218..bfc756388 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -234,11 +234,13 @@ class WritableFileWriter { // When this Append API is called, if the crc32c_checksum is not provided, we // will calculate the checksum internally. - IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0); + IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0, + Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); - IOStatus Pad(const size_t pad_bytes); + IOStatus Pad(const size_t pad_bytes, + Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); - IOStatus Flush(); + IOStatus Flush(Env::IOPriority op_rate_limiter_priority = Env::IO_TOTAL); IOStatus Close(); @@ -271,15 +273,21 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; private: + static Env::IOPriority DecideRateLimiterPriority( + Env::IOPriority writable_file_io_priority, + Env::IOPriority op_rate_limiter_priority); + // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE - IOStatus WriteDirect(); - IOStatus WriteDirectWithChecksum(); + IOStatus WriteDirect(Env::IOPriority op_rate_limiter_priority); + IOStatus WriteDirectWithChecksum(Env::IOPriority op_rate_limiter_priority); #endif // !ROCKSDB_LITE - // Normal write - IOStatus WriteBuffered(const char* data, size_t size); - IOStatus WriteBufferedWithChecksum(const char* data, size_t size); + // Normal write. + IOStatus WriteBuffered(const char* data, size_t size, + Env::IOPriority op_rate_limiter_priority); + IOStatus WriteBufferedWithChecksum(const char* data, size_t size, + Env::IOPriority op_rate_limiter_priority); IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus SyncInternal(bool use_fsync); }; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 21a272a13..6966022aa 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -959,8 +959,16 @@ class WritableFile { // Use the returned alignment value to allocate // aligned buffer for Direct I/O virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } + /* - * Change the priority in rate limiter if rate limiting is enabled. + * If rate limiting is enabled, change the file-granularity priority used in + * rate-limiting writes. + * + * In the presence of finer-granularity priority such as + * `WriteOptions::rate_limiter_priority`, this file-granularity priority may + * be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as + * a fallback for Env::IO_TOTAL finer-granularity priority. + * * If rate limiting is not enabled, this call has no effect. */ virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 13c459ee8..201cee95a 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -1031,6 +1031,17 @@ class FSWritableFile { write_hint_ = hint; } + /* + * If rate limiting is enabled, change the file-granularity priority used in + * rate-limiting writes. + * + * In the presence of finer-granularity priority such as + * `WriteOptions::rate_limiter_priority`, this file-granularity priority may + * be overridden by a non-Env::IO_TOTAL finer-granularity priority and used as + * a fallback for Env::IO_TOTAL finer-granularity priority. + * + * If rate limiting is not enabled, this call has no effect. + */ virtual void SetIOPriority(Env::IOPriority pri) { io_priority_ = pri; } virtual Env::IOPriority GetIOPriority() { return io_priority_; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index edbd40b9d..44c30447f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1659,13 +1659,29 @@ struct WriteOptions { // Default: false bool memtable_insert_hint_per_batch; + // For writes associated with this option, charge the internal rate + // limiter (see `DBOptions::rate_limiter`) at the specified priority. The + // special value `Env::IO_TOTAL` disables charging the rate limiter. + // + // Currently the support covers automatic WAL flushes, which happen during + // live updates (`Put()`, `Write()`, `Delete()`, etc.) + // when `WriteOptions::disableWAL == false` + // and `DBOptions::manual_wal_flush == false`. + // + // Only `Env::IO_USER` and `Env::IO_TOTAL` are allowed + // due to implementation constraints. + // + // Default: `Env::IO_TOTAL` + Env::IOPriority rate_limiter_priority; + WriteOptions() : sync(false), disableWAL(false), ignore_missing_column_families(false), no_slowdown(false), low_pri(false), - memtable_insert_hint_per_batch(false) {} + memtable_insert_hint_per_batch(false), + rate_limiter_priority(Env::IO_TOTAL) {} }; // Options that control flush operations diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index dec6e7520..a405685a2 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1118,6 +1118,12 @@ DEFINE_bool(file_checksum, false, "When true use FileChecksumGenCrc32cFactory for " "file_checksum_gen_factory."); +DEFINE_bool(rate_limit_auto_wal_flush, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for automatic WAL flush (`Options::manual_wal_flush` == " + "false) after the user " + "write operation"); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3100,6 +3106,8 @@ class Benchmark { write_options_.sync = true; } write_options_.disableWAL = FLAGS_disable_wal; + write_options_.rate_limiter_priority = + FLAGS_rate_limit_auto_wal_flush ? Env::IO_USER : Env::IO_TOTAL; read_options_ = ReadOptions(FLAGS_verify_checksum, true); read_options_.total_order_seek = FLAGS_total_order_seek; read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start;