From 05c678e1352916093e985412f4e755f4a28fce2c Mon Sep 17 00:00:00 2001 From: gitbw95 <95719937+gitbw95@users.noreply.github.com> Date: Wed, 18 May 2022 00:41:41 -0700 Subject: [PATCH] Set Write rate limiter priority dynamically and pass it to FS (#9988) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: ### Context: Background compactions and flush generate large reads and writes, and can be long running, especially for universal compaction. In some cases, this can impact foreground reads and writes by users. From the RocksDB perspective, there can be two kinds of rate limiters, the internal (native) one and the external one. - The internal (native) rate limiter is introduced in [the wiki](https://github.com/facebook/rocksdb/wiki/Rate-Limiter). Currently, only IO_LOW and IO_HIGH are used and they are set statically. - For the external rate limiter, in FSWritableFile functions, IOOptions is open for end users to set and get rate_limiter_priority for their own rate limiter. Currently, RocksDB doesn’t pass the rate_limiter_priority through IOOptions to the file system. ### Solution During the User Read, Flush write, Compaction read/write, the WriteController is used to determine whether DB writes are stalled or slowed down. The rate limiter priority (Env::IOPriority) can be determined accordingly. We decided to always pass the priority in IOOptions. What the file system does with it should be a contract between the user and the file system. We would like to set the rate limiter priority at file level, since the Flush/Compaction job level may be too coarse with multiple files and block IO level is too granular. **This PR is for the Write path.** The **Write:** dynamic priority for different state are listed as follows: | State | Normal | Delayed | Stalled | | ----- | ------ | ------- | ------- | | Flush | IO_HIGH | IO_USER | IO_USER | | Compaction | IO_LOW | IO_USER | IO_USER | Flush and Compaction writes share the same call path through BlockBaseTableWriter, WritableFileWriter, and FSWritableFile. When a new FSWritableFile object is created, its io_priority_ can be set dynamically based on the state of the WriteController. In WritableFileWriter, before the call sites of FSWritableFile functions, WritableFileWriter::DecideRateLimiterPriority() determines the rate_limiter_priority. The options (IOOptions) argument of FSWritableFile functions will be updated with the rate_limiter_priority. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9988 Test Plan: Add unit tests. Reviewed By: anand1976 Differential Revision: D36395159 Pulled By: gitbw95 fbshipit-source-id: a7c82fc29759139a1a07ec46c37dbf7e753474cf --- HISTORY.md | 2 + db/compaction/compaction_job.cc | 15 ++- db/compaction/compaction_job.h | 6 ++ db/compaction/compaction_job_test.cc | 40 +++++++- db/flush_job.cc | 17 ++- db/flush_job.h | 4 + db/flush_job_test.cc | 66 ++++++++++++ db/write_controller.h | 2 +- file/writable_file_writer.cc | 114 +++++++++++++-------- file/writable_file_writer.h | 4 +- util/file_reader_writer_test.cc | 148 +++++++++++++++++++++++++++ 11 files changed, 367 insertions(+), 51 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 61cb7cd7a..7c4738c15 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent. * Fixed a CompactionFilter bug. Compaction filter used to use `Delete` to remove keys, even if the keys should be removed with `SingleDelete`. Mixing `Delete` and `SingleDelete` may cause undefined behavior. +* Fixed a bug in `WritableFileWriter::WriteDirect` and `WritableFileWriter::WriteDirectWithChecksum`. The rate_limiter_priority specified in ReadOptions was not passed to the RateLimiter when requesting a token. * Fixed a bug which might cause process crash when I/O error happens when reading an index block in MultiGet(). ### New Features @@ -29,6 +30,7 @@ ### Behavior changes * Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass. * ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`). +* During Flush write or Compaction write, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system. ## 7.2.0 (04/15/2022) ### Bug Fixes diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ed4687906..d6451828a 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2285,7 +2285,7 @@ Status CompactionJob::OpenCompactionOutputFile( /*enable_hash=*/paranoid_file_checks_); } - writable_file->SetIOPriority(Env::IOPriority::IO_LOW); + writable_file->SetIOPriority(GetRateLimiterPriority()); writable_file->SetWriteLifeTimeHint(write_hint_); FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; writable_file->SetPreallocationBlockSize(static_cast( @@ -2476,6 +2476,19 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) { file_number, compact_->compaction->output_path_id()); } +Env::IOPriority CompactionJob::GetRateLimiterPriority() { + if (versions_ && versions_->GetColumnFamilySet() && + versions_->GetColumnFamilySet()->write_controller()) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + if (write_controller->NeedsDelay() || write_controller->IsStopped()) { + return Env::IO_USER; + } + } + + return Env::IO_LOW; +} + #ifndef ROCKSDB_LITE std::string CompactionServiceCompactionJob::GetTableFileName( uint64_t file_number) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 24a77c679..7a804c74a 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -137,6 +137,8 @@ class CompactionJob { IOStatus io_status_; private: + friend class CompactionJobTestBase; + // Generates a histogram representing potential divisions of key ranges from // the input. It adds the starting and/or ending keys of certain input files // to the working set and then finds the approximate size of data in between @@ -234,6 +236,10 @@ class CompactionJob { // Get table file name in where it's outputting to, which should also be in // `output_directory_`. virtual std::string GetTableFileName(uint64_t file_number); + // The rate limiter priority (io_priority) is determined dynamically here. + // The Compaction Read and Write priorities are the same for different + // scenarios, such as write stalled. + Env::IOPriority GetRateLimiterPriority(); }; // CompactionServiceInput is used the pass compaction information between two diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f9a2a2855..09079a74c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -321,7 +321,8 @@ class CompactionJobTestBase : public testing::Test { const std::vector& snapshots = {}, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, - uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) { + uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, + bool check_get_priority = false) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); size_t num_input_files = 0; @@ -390,6 +391,32 @@ class CompactionJobTestBase : public testing::Test { expected_oldest_blob_file_number); } } + + if (check_get_priority) { + CheckGetRateLimiterPriority(compaction_job); + } + } + + void CheckGetRateLimiterPriority(CompactionJob& compaction_job) { + // When the state from WriteController is normal. + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW); + + WriteController* write_controller = + compaction_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER); + } } std::shared_ptr env_guard_; @@ -1303,6 +1330,17 @@ TEST_F(CompactionJobTest, ResultSerialization) { } } +TEST_F(CompactionJobTest, GetRateLimiterPriority) { + NewDB(); + + auto expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true, + kInvalidBlobFileNumber, true); +} + class CompactionJobTimestampTest : public CompactionJobTestBase { public: CompactionJobTimestampTest() diff --git a/db/flush_job.cc b/db/flush_job.cc index adb98fed5..65f9e2b0e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -810,6 +810,7 @@ Status FlushJob::WriteLevel0Table() { { auto write_hint = cfd_->CalculateSSTWriteHint(0); + Env::IOPriority io_priority = GetRateLimiterPriorityForWrite(); db_mutex_->Unlock(); if (log_buffer_) { log_buffer_->FlushBufferToLog(); @@ -925,7 +926,7 @@ Status FlushJob::WriteLevel0Table() { snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, + io_priority, &table_properties_, write_hint, full_history_ts_low, blob_callback_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); // TODO: Cleanup io_status in BuildTable and table builders @@ -1032,6 +1033,19 @@ Status FlushJob::WriteLevel0Table() { return s; } +Env::IOPriority FlushJob::GetRateLimiterPriorityForWrite() { + if (versions_ && versions_->GetColumnFamilySet() && + versions_->GetColumnFamilySet()->write_controller()) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + if (write_controller->IsStopped() || write_controller->NeedsDelay()) { + return Env::IO_USER; + } + } + + return Env::IO_HIGH; +} + #ifndef ROCKSDB_LITE std::unique_ptr FlushJob::GetFlushJobInfo() const { db_mutex_->AssertHeld(); @@ -1064,7 +1078,6 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { } return info; } - #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job.h b/db/flush_job.h index 76d5e34b6..7cfad1398 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -94,6 +94,8 @@ class FlushJob { #endif // !ROCKSDB_LITE private: + friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test; + void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); @@ -121,6 +123,8 @@ class FlushJob { // process has not matured yet. Status MemPurge(); bool MemPurgeDecider(); + // The rate limiter priority (io_priority) is determined dynamically here. + Env::IOPriority GetRateLimiterPriorityForWrite(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index e326db9e0..0f9c2a928 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -528,6 +528,72 @@ TEST_F(FlushJobTest, Snapshots) { job_context.Clean(); } +TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { + // Prepare a FlushJob that flush MemTables of Single Column Family. + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector memtable_ids; + std::vector new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(std::to_string(j + i * num_keys_per_table)); + std::string value("value" + key); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value, nullptr /* kv_prot_info */)); + } + } + + autovector to_delete; + for (auto mem : new_mems) { + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); + + // When the state from WriteController is normal. + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); + + WriteController* write_controller = + flush_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } +} + class FlushJobTimestampTest : public FlushJobTestBase { public: FlushJobTimestampTest() diff --git a/db/write_controller.h b/db/write_controller.h index 88bd1417f..c32b70b94 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -52,7 +52,7 @@ class WriteController { bool IsStopped() const; bool NeedsDelay() const { return total_delayed_.load() > 0; } bool NeedSpeedupCompaction() const { - return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0; } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 84be9b689..87c08fed4 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -54,10 +54,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, UpdateFileChecksum(data); { + IOOptions io_options; + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); IOSTATS_TIMER_GUARD(prepare_write_nanos); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left, - IOOptions(), nullptr); + io_options, nullptr); } // See whether we need to enlarge the buffer to avoid the flush @@ -211,6 +215,8 @@ IOStatus WritableFileWriter::Close() { s = Flush(); // flush cache to OS IOStatus interim; + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { @@ -221,7 +227,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); + interim = writable_file_->Truncate(filesize_, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -241,7 +247,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Fsync(IOOptions(), nullptr); + interim = writable_file_->Fsync(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -267,7 +273,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Close(IOOptions(), nullptr); + interim = writable_file_->Close(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -331,7 +337,11 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { start_ts = FileOperationInfo::StartNow(); } #endif - s = writable_file_->Flush(IOOptions(), nullptr); + IOOptions io_options; + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); + s = writable_file_->Flush(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -428,17 +438,22 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); + #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); } #endif + + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); if (use_fsync) { - s = writable_file_->Fsync(IOOptions(), nullptr); + s = writable_file_->Fsync(io_options, nullptr); } else { - s = writable_file_->Sync(IOOptions(), nullptr); + s = writable_file_->Sync(io_options, nullptr); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -466,7 +481,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { start_ts = FileOperationInfo::StartNow(); } #endif - IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); + IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -490,19 +507,19 @@ IOStatus WritableFileWriter::WriteBuffered( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); while (left > 0) { - size_t allowed; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); + size_t allowed = left; if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, rate_limiter_priority_used, stats_, RateLimiter::OpType::kWrite); - } else { - allowed = left; } { @@ -511,7 +528,7 @@ IOStatus WritableFileWriter::WriteBuffered( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -524,10 +541,10 @@ IOStatus WritableFileWriter::WriteBuffered( if (perform_data_verification_) { Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, + s = writable_file_->Append(Slice(src, allowed), io_options, v_info, nullptr); } else { - s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); + s = writable_file_->Append(Slice(src, allowed), io_options, nullptr); } if (!s.ok()) { // If writable_file_->Append() failed, then the data may or may not @@ -579,15 +596,16 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; - + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t tmp_size; @@ -604,7 +622,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -617,8 +635,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, - nullptr); + s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr); SetPerfLevel(prev_perf_level); } #ifndef ROCKSDB_LITE @@ -709,20 +726,20 @@ IOStatus WritableFileWriter::WriteDirect( size_t left = buf_.CurrentSize(); DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); while (left > 0) { // Check how much is allowed - size_t size; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); + size_t size = left; if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { size = rate_limiter_->RequestToken(left, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); - } else { - size = left; + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); } { @@ -737,10 +754,10 @@ IOStatus WritableFileWriter::WriteDirect( Crc32cHandoffChecksumCalculation(src, size, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); } else { s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), nullptr); + io_options, nullptr); } if (ShouldNotifyListeners()) { @@ -810,20 +827,22 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t size; size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); data_size -= size; } } @@ -839,7 +858,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, left), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -882,16 +901,21 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( #endif // !ROCKSDB_LITE Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( Env::IOPriority writable_file_io_priority, - Env::IOPriority op_rate_limiter_priority) { + Env::IOPriority op_rate_limiter_priority, + Env::IOPriority& iooptions_io_priority) { + Env::IOPriority rate_limiter_priority{Env::IO_TOTAL}; if (writable_file_io_priority == Env::IO_TOTAL && op_rate_limiter_priority == Env::IO_TOTAL) { - return Env::IO_TOTAL; + rate_limiter_priority = Env::IO_TOTAL; } else if (writable_file_io_priority == Env::IO_TOTAL) { - return op_rate_limiter_priority; + rate_limiter_priority = op_rate_limiter_priority; } else if (op_rate_limiter_priority == Env::IO_TOTAL) { - return writable_file_io_priority; + rate_limiter_priority = writable_file_io_priority; } else { - return op_rate_limiter_priority; + rate_limiter_priority = op_rate_limiter_priority; } + iooptions_io_priority = rate_limiter_priority; + return rate_limiter_priority; } + } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index f9f6e5bd0..a1391f976 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -277,9 +277,11 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; private: + // Decide the Rate Limiter priority and update io_options.io_priority. static Env::IOPriority DecideRateLimiterPriority( Env::IOPriority writable_file_io_priority, - Env::IOPriority op_rate_limiter_priority); + Env::IOPriority op_rate_limiter_priority, + Env::IOPriority& iooptions_io_priority); // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index a0da9f8c4..5e222c5ed 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -901,6 +901,154 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) { fwf->CheckCounters(1, 1); } #endif // ROCKSDB_LITE + +class WritableFileWriterIOPriorityTest : public testing::Test { + protected: + // This test is to check whether the rate limiter priority can be passed + // correctly from WritableFileWriter functions to FSWritableFile functions. + + void SetUp() override { + // When op_rate_limiter_priority parameter in WritableFileWriter functions + // is the default (Env::IO_TOTAL). + std::unique_ptr wf{new FakeWF(Env::IO_HIGH)}; + FileOptions file_options; + writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */, + file_options)); + } + + class FakeWF : public FSWritableFile { + public: + explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); } + ~FakeWF() override {} + + IOStatus Append(const Slice& /*data*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* dbg) override { + return Append(data, options, dbg); + } + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus PositionedAppend( + const Slice& /* data */, uint64_t /* offset */, + const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Truncate(uint64_t /*size*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + uint64_t GetFileSize(const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return 0; + } + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override {} + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + return 0; + } + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); + } + + IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + + void PrepareWrite(size_t /*offset*/, size_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + } + + bool IsSyncThreadSafe() const override { return true; } + }; + + std::unique_ptr writer_; +}; + +TEST_F(WritableFileWriterIOPriorityTest, Append) { + ASSERT_OK(writer_->Append(Slice("abc"))); +} + +TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); } + +TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); } + +TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); } + +TEST_F(WritableFileWriterIOPriorityTest, Sync) { + ASSERT_OK(writer_->Sync(false)); + ASSERT_OK(writer_->Sync(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) { + ASSERT_OK(writer_->SyncWithoutFlush(false)); + ASSERT_OK(writer_->SyncWithoutFlush(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, BasicOp) { + EnvOptions env_options; + env_options.bytes_per_sync = kMb; + std::unique_ptr wf(new FakeWF(Env::IO_HIGH)); + std::unique_ptr writer( + new WritableFileWriter(std::move(wf), "" /* don't care */, env_options)); + Random r(301); + Status s; + std::unique_ptr large_buf(new char[10 * kMb]); + for (int i = 0; i < 1000; i++) { + int skew_limit = (i < 700) ? 10 : 15; + uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100); + s = writer->Append(Slice(large_buf.get(), num)); + ASSERT_OK(s); + + // Flush in a chance of 1/10. + if (r.Uniform(10) == 0) { + s = writer->Flush(); + ASSERT_OK(s); + } + } + s = writer->Close(); + ASSERT_OK(s); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {