diff --git a/HISTORY.md b/HISTORY.md index 61cb7cd7a..7c4738c15 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent. * Fixed a CompactionFilter bug. Compaction filter used to use `Delete` to remove keys, even if the keys should be removed with `SingleDelete`. Mixing `Delete` and `SingleDelete` may cause undefined behavior. +* Fixed a bug in `WritableFileWriter::WriteDirect` and `WritableFileWriter::WriteDirectWithChecksum`. The rate_limiter_priority specified in ReadOptions was not passed to the RateLimiter when requesting a token. * Fixed a bug which might cause process crash when I/O error happens when reading an index block in MultiGet(). ### New Features @@ -29,6 +30,7 @@ ### Behavior changes * Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass. * ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`). +* During Flush write or Compaction write, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system. ## 7.2.0 (04/15/2022) ### Bug Fixes diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ed4687906..d6451828a 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2285,7 +2285,7 @@ Status CompactionJob::OpenCompactionOutputFile( /*enable_hash=*/paranoid_file_checks_); } - writable_file->SetIOPriority(Env::IOPriority::IO_LOW); + writable_file->SetIOPriority(GetRateLimiterPriority()); writable_file->SetWriteLifeTimeHint(write_hint_); FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; writable_file->SetPreallocationBlockSize(static_cast( @@ -2476,6 +2476,19 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) { file_number, compact_->compaction->output_path_id()); } +Env::IOPriority CompactionJob::GetRateLimiterPriority() { + if (versions_ && versions_->GetColumnFamilySet() && + versions_->GetColumnFamilySet()->write_controller()) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + if (write_controller->NeedsDelay() || write_controller->IsStopped()) { + return Env::IO_USER; + } + } + + return Env::IO_LOW; +} + #ifndef ROCKSDB_LITE std::string CompactionServiceCompactionJob::GetTableFileName( uint64_t file_number) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 24a77c679..7a804c74a 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -137,6 +137,8 @@ class CompactionJob { IOStatus io_status_; private: + friend class CompactionJobTestBase; + // Generates a histogram representing potential divisions of key ranges from // the input. It adds the starting and/or ending keys of certain input files // to the working set and then finds the approximate size of data in between @@ -234,6 +236,10 @@ class CompactionJob { // Get table file name in where it's outputting to, which should also be in // `output_directory_`. virtual std::string GetTableFileName(uint64_t file_number); + // The rate limiter priority (io_priority) is determined dynamically here. + // The Compaction Read and Write priorities are the same for different + // scenarios, such as write stalled. + Env::IOPriority GetRateLimiterPriority(); }; // CompactionServiceInput is used the pass compaction information between two diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f9a2a2855..09079a74c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -321,7 +321,8 @@ class CompactionJobTestBase : public testing::Test { const std::vector& snapshots = {}, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, - uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) { + uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, + bool check_get_priority = false) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); size_t num_input_files = 0; @@ -390,6 +391,32 @@ class CompactionJobTestBase : public testing::Test { expected_oldest_blob_file_number); } } + + if (check_get_priority) { + CheckGetRateLimiterPriority(compaction_job); + } + } + + void CheckGetRateLimiterPriority(CompactionJob& compaction_job) { + // When the state from WriteController is normal. + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW); + + WriteController* write_controller = + compaction_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER); + } } std::shared_ptr env_guard_; @@ -1303,6 +1330,17 @@ TEST_F(CompactionJobTest, ResultSerialization) { } } +TEST_F(CompactionJobTest, GetRateLimiterPriority) { + NewDB(); + + auto expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true, + kInvalidBlobFileNumber, true); +} + class CompactionJobTimestampTest : public CompactionJobTestBase { public: CompactionJobTimestampTest() diff --git a/db/flush_job.cc b/db/flush_job.cc index adb98fed5..65f9e2b0e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -810,6 +810,7 @@ Status FlushJob::WriteLevel0Table() { { auto write_hint = cfd_->CalculateSSTWriteHint(0); + Env::IOPriority io_priority = GetRateLimiterPriorityForWrite(); db_mutex_->Unlock(); if (log_buffer_) { log_buffer_->FlushBufferToLog(); @@ -925,7 +926,7 @@ Status FlushJob::WriteLevel0Table() { snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, + io_priority, &table_properties_, write_hint, full_history_ts_low, blob_callback_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); // TODO: Cleanup io_status in BuildTable and table builders @@ -1032,6 +1033,19 @@ Status FlushJob::WriteLevel0Table() { return s; } +Env::IOPriority FlushJob::GetRateLimiterPriorityForWrite() { + if (versions_ && versions_->GetColumnFamilySet() && + versions_->GetColumnFamilySet()->write_controller()) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + if (write_controller->IsStopped() || write_controller->NeedsDelay()) { + return Env::IO_USER; + } + } + + return Env::IO_HIGH; +} + #ifndef ROCKSDB_LITE std::unique_ptr FlushJob::GetFlushJobInfo() const { db_mutex_->AssertHeld(); @@ -1064,7 +1078,6 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { } return info; } - #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job.h b/db/flush_job.h index 76d5e34b6..7cfad1398 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -94,6 +94,8 @@ class FlushJob { #endif // !ROCKSDB_LITE private: + friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test; + void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); @@ -121,6 +123,8 @@ class FlushJob { // process has not matured yet. Status MemPurge(); bool MemPurgeDecider(); + // The rate limiter priority (io_priority) is determined dynamically here. + Env::IOPriority GetRateLimiterPriorityForWrite(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; #endif // !ROCKSDB_LITE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index e326db9e0..0f9c2a928 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -528,6 +528,72 @@ TEST_F(FlushJobTest, Snapshots) { job_context.Clean(); } +TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { + // Prepare a FlushJob that flush MemTables of Single Column Family. + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector memtable_ids; + std::vector new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(std::to_string(j + i * num_keys_per_table)); + std::string value("value" + key); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value, nullptr /* kv_prot_info */)); + } + } + + autovector to_delete; + for (auto mem : new_mems) { + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/); + + // When the state from WriteController is normal. + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); + + WriteController* write_controller = + flush_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } +} + class FlushJobTimestampTest : public FlushJobTestBase { public: FlushJobTimestampTest() diff --git a/db/write_controller.h b/db/write_controller.h index 88bd1417f..c32b70b94 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -52,7 +52,7 @@ class WriteController { bool IsStopped() const; bool NeedsDelay() const { return total_delayed_.load() > 0; } bool NeedSpeedupCompaction() const { - return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0; } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 84be9b689..87c08fed4 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -54,10 +54,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, UpdateFileChecksum(data); { + IOOptions io_options; + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); IOSTATS_TIMER_GUARD(prepare_write_nanos); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left, - IOOptions(), nullptr); + io_options, nullptr); } // See whether we need to enlarge the buffer to avoid the flush @@ -211,6 +215,8 @@ IOStatus WritableFileWriter::Close() { s = Flush(); // flush cache to OS IOStatus interim; + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { @@ -221,7 +227,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); + interim = writable_file_->Truncate(filesize_, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -241,7 +247,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Fsync(IOOptions(), nullptr); + interim = writable_file_->Fsync(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -267,7 +273,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Close(IOOptions(), nullptr); + interim = writable_file_->Close(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -331,7 +337,11 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { start_ts = FileOperationInfo::StartNow(); } #endif - s = writable_file_->Flush(IOOptions(), nullptr); + IOOptions io_options; + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); + s = writable_file_->Flush(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -428,17 +438,22 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); + #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); } #endif + + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); if (use_fsync) { - s = writable_file_->Fsync(IOOptions(), nullptr); + s = writable_file_->Fsync(io_options, nullptr); } else { - s = writable_file_->Sync(IOOptions(), nullptr); + s = writable_file_->Sync(io_options, nullptr); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -466,7 +481,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { start_ts = FileOperationInfo::StartNow(); } #endif - IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); + IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -490,19 +507,19 @@ IOStatus WritableFileWriter::WriteBuffered( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); while (left > 0) { - size_t allowed; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); + size_t allowed = left; if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, rate_limiter_priority_used, stats_, RateLimiter::OpType::kWrite); - } else { - allowed = left; } { @@ -511,7 +528,7 @@ IOStatus WritableFileWriter::WriteBuffered( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -524,10 +541,10 @@ IOStatus WritableFileWriter::WriteBuffered( if (perform_data_verification_) { Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, + s = writable_file_->Append(Slice(src, allowed), io_options, v_info, nullptr); } else { - s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); + s = writable_file_->Append(Slice(src, allowed), io_options, nullptr); } if (!s.ok()) { // If writable_file_->Append() failed, then the data may or may not @@ -579,15 +596,16 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; - + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t tmp_size; @@ -604,7 +622,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -617,8 +635,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, - nullptr); + s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr); SetPerfLevel(prev_perf_level); } #ifndef ROCKSDB_LITE @@ -709,20 +726,20 @@ IOStatus WritableFileWriter::WriteDirect( size_t left = buf_.CurrentSize(); DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); while (left > 0) { // Check how much is allowed - size_t size; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); + size_t size = left; if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { size = rate_limiter_->RequestToken(left, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); - } else { - size = left; + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); } { @@ -737,10 +754,10 @@ IOStatus WritableFileWriter::WriteDirect( Crc32cHandoffChecksumCalculation(src, size, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); } else { s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), nullptr); + io_options, nullptr); } if (ShouldNotifyListeners()) { @@ -810,20 +827,22 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority, + io_options.rate_limiter_priority); // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. // TODO: need to be improved since it sort of defeats the purpose of the rate // limiter size_t data_size = left; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { while (data_size > 0) { size_t size; size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); + rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); data_size -= size; } } @@ -839,7 +858,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, left), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -882,16 +901,21 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( #endif // !ROCKSDB_LITE Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( Env::IOPriority writable_file_io_priority, - Env::IOPriority op_rate_limiter_priority) { + Env::IOPriority op_rate_limiter_priority, + Env::IOPriority& iooptions_io_priority) { + Env::IOPriority rate_limiter_priority{Env::IO_TOTAL}; if (writable_file_io_priority == Env::IO_TOTAL && op_rate_limiter_priority == Env::IO_TOTAL) { - return Env::IO_TOTAL; + rate_limiter_priority = Env::IO_TOTAL; } else if (writable_file_io_priority == Env::IO_TOTAL) { - return op_rate_limiter_priority; + rate_limiter_priority = op_rate_limiter_priority; } else if (op_rate_limiter_priority == Env::IO_TOTAL) { - return writable_file_io_priority; + rate_limiter_priority = writable_file_io_priority; } else { - return op_rate_limiter_priority; + rate_limiter_priority = op_rate_limiter_priority; } + iooptions_io_priority = rate_limiter_priority; + return rate_limiter_priority; } + } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index f9f6e5bd0..a1391f976 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -277,9 +277,11 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; private: + // Decide the Rate Limiter priority and update io_options.io_priority. static Env::IOPriority DecideRateLimiterPriority( Env::IOPriority writable_file_io_priority, - Env::IOPriority op_rate_limiter_priority); + Env::IOPriority op_rate_limiter_priority, + Env::IOPriority& iooptions_io_priority); // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index a0da9f8c4..5e222c5ed 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -901,6 +901,154 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) { fwf->CheckCounters(1, 1); } #endif // ROCKSDB_LITE + +class WritableFileWriterIOPriorityTest : public testing::Test { + protected: + // This test is to check whether the rate limiter priority can be passed + // correctly from WritableFileWriter functions to FSWritableFile functions. + + void SetUp() override { + // When op_rate_limiter_priority parameter in WritableFileWriter functions + // is the default (Env::IO_TOTAL). + std::unique_ptr wf{new FakeWF(Env::IO_HIGH)}; + FileOptions file_options; + writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */, + file_options)); + } + + class FakeWF : public FSWritableFile { + public: + explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); } + ~FakeWF() override {} + + IOStatus Append(const Slice& /*data*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* dbg) override { + return Append(data, options, dbg); + } + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus PositionedAppend( + const Slice& /* data */, uint64_t /* offset */, + const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Truncate(uint64_t /*size*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + uint64_t GetFileSize(const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return 0; + } + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override {} + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + return 0; + } + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); + } + + IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + + void PrepareWrite(size_t /*offset*/, size_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + } + + bool IsSyncThreadSafe() const override { return true; } + }; + + std::unique_ptr writer_; +}; + +TEST_F(WritableFileWriterIOPriorityTest, Append) { + ASSERT_OK(writer_->Append(Slice("abc"))); +} + +TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); } + +TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); } + +TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); } + +TEST_F(WritableFileWriterIOPriorityTest, Sync) { + ASSERT_OK(writer_->Sync(false)); + ASSERT_OK(writer_->Sync(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) { + ASSERT_OK(writer_->SyncWithoutFlush(false)); + ASSERT_OK(writer_->SyncWithoutFlush(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, BasicOp) { + EnvOptions env_options; + env_options.bytes_per_sync = kMb; + std::unique_ptr wf(new FakeWF(Env::IO_HIGH)); + std::unique_ptr writer( + new WritableFileWriter(std::move(wf), "" /* don't care */, env_options)); + Random r(301); + Status s; + std::unique_ptr large_buf(new char[10 * kMb]); + for (int i = 0; i < 1000; i++) { + int skew_limit = (i < 700) ? 10 : 15; + uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100); + s = writer->Append(Slice(large_buf.get(), num)); + ASSERT_OK(s); + + // Flush in a chance of 1/10. + if (r.Uniform(10) == 0) { + s = writer->Flush(); + ASSERT_OK(s); + } + } + s = writer->Close(); + ASSERT_OK(s); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {