Set Write rate limiter priority dynamically and pass it to FS (#9988)

Summary:
### Context:
Background compactions and flush generate large reads and writes, and can be long running, especially for universal compaction. In some cases, this can impact foreground reads and writes by users.

From the RocksDB perspective, there can be two kinds of rate limiters, the internal (native) one and the external one.
- The internal (native) rate limiter is introduced in [the wiki](https://github.com/facebook/rocksdb/wiki/Rate-Limiter). Currently, only IO_LOW and IO_HIGH are used and they are set statically.
- For the external rate limiter, in FSWritableFile functions,  IOOptions is open for end users to set and get rate_limiter_priority for their own rate limiter. Currently, RocksDB doesn’t pass the rate_limiter_priority through IOOptions to the file system.

### Solution
During the User Read, Flush write, Compaction read/write, the WriteController is used to determine whether DB writes are stalled or slowed down. The rate limiter priority (Env::IOPriority) can be determined accordingly. We decided to always pass the priority in IOOptions. What the file system does with it should be a contract between the user and the file system. We would like to set the rate limiter priority at file level, since the Flush/Compaction job level may be too coarse with multiple files and block IO level is too granular.

**This PR is for the Write path.** The **Write:** dynamic priority for different state are listed as follows:

| State | Normal | Delayed | Stalled |
| ----- | ------ | ------- | ------- |
|  Flush | IO_HIGH | IO_USER | IO_USER |
|  Compaction | IO_LOW | IO_USER | IO_USER |

Flush and Compaction writes share the same call path through BlockBaseTableWriter, WritableFileWriter, and FSWritableFile. When a new FSWritableFile object is created, its io_priority_ can be set dynamically based on the state of the WriteController. In WritableFileWriter, before the call sites of FSWritableFile functions, WritableFileWriter::DecideRateLimiterPriority() determines the rate_limiter_priority. The options (IOOptions) argument of FSWritableFile functions will be updated with the rate_limiter_priority.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9988

Test Plan: Add unit tests.

Reviewed By: anand1976

Differential Revision: D36395159

Pulled By: gitbw95

fbshipit-source-id: a7c82fc29759139a1a07ec46c37dbf7e753474cf
main
gitbw95 3 years ago committed by Facebook GitHub Bot
parent b84e3363f5
commit 05c678e135
  1. 2
      HISTORY.md
  2. 15
      db/compaction/compaction_job.cc
  3. 6
      db/compaction/compaction_job.h
  4. 40
      db/compaction/compaction_job_test.cc
  5. 17
      db/flush_job.cc
  6. 4
      db/flush_job.h
  7. 66
      db/flush_job_test.cc
  8. 2
      db/write_controller.h
  9. 114
      file/writable_file_writer.cc
  10. 4
      file/writable_file_writer.h
  11. 148
      util/file_reader_writer_test.cc

@ -5,6 +5,7 @@
* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen.
* Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent. * Fixed a bug in async_io path where incorrect length of data is read by FilePrefetchBuffer if data is consumed from two populated buffers and request for more data is sent.
* Fixed a CompactionFilter bug. Compaction filter used to use `Delete` to remove keys, even if the keys should be removed with `SingleDelete`. Mixing `Delete` and `SingleDelete` may cause undefined behavior. * Fixed a CompactionFilter bug. Compaction filter used to use `Delete` to remove keys, even if the keys should be removed with `SingleDelete`. Mixing `Delete` and `SingleDelete` may cause undefined behavior.
* Fixed a bug in `WritableFileWriter::WriteDirect` and `WritableFileWriter::WriteDirectWithChecksum`. The rate_limiter_priority specified in ReadOptions was not passed to the RateLimiter when requesting a token.
* Fixed a bug which might cause process crash when I/O error happens when reading an index block in MultiGet(). * Fixed a bug which might cause process crash when I/O error happens when reading an index block in MultiGet().
### New Features ### New Features
@ -29,6 +30,7 @@
### Behavior changes ### Behavior changes
* Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass. * Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass.
* ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`). * ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`).
* During Flush write or Compaction write, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system.
## 7.2.0 (04/15/2022) ## 7.2.0 (04/15/2022)
### Bug Fixes ### Bug Fixes

@ -2285,7 +2285,7 @@ Status CompactionJob::OpenCompactionOutputFile(
/*enable_hash=*/paranoid_file_checks_); /*enable_hash=*/paranoid_file_checks_);
} }
writable_file->SetIOPriority(Env::IOPriority::IO_LOW); writable_file->SetIOPriority(GetRateLimiterPriority());
writable_file->SetWriteLifeTimeHint(write_hint_); writable_file->SetWriteLifeTimeHint(write_hint_);
FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
writable_file->SetPreallocationBlockSize(static_cast<size_t>( writable_file->SetPreallocationBlockSize(static_cast<size_t>(
@ -2476,6 +2476,19 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) {
file_number, compact_->compaction->output_path_id()); file_number, compact_->compaction->output_path_id());
} }
Env::IOPriority CompactionJob::GetRateLimiterPriority() {
if (versions_ && versions_->GetColumnFamilySet() &&
versions_->GetColumnFamilySet()->write_controller()) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
return Env::IO_USER;
}
}
return Env::IO_LOW;
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::string CompactionServiceCompactionJob::GetTableFileName( std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) { uint64_t file_number) {

@ -137,6 +137,8 @@ class CompactionJob {
IOStatus io_status_; IOStatus io_status_;
private: private:
friend class CompactionJobTestBase;
// Generates a histogram representing potential divisions of key ranges from // Generates a histogram representing potential divisions of key ranges from
// the input. It adds the starting and/or ending keys of certain input files // the input. It adds the starting and/or ending keys of certain input files
// to the working set and then finds the approximate size of data in between // to the working set and then finds the approximate size of data in between
@ -234,6 +236,10 @@ class CompactionJob {
// Get table file name in where it's outputting to, which should also be in // Get table file name in where it's outputting to, which should also be in
// `output_directory_`. // `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number); virtual std::string GetTableFileName(uint64_t file_number);
// The rate limiter priority (io_priority) is determined dynamically here.
// The Compaction Read and Write priorities are the same for different
// scenarios, such as write stalled.
Env::IOPriority GetRateLimiterPriority();
}; };
// CompactionServiceInput is used the pass compaction information between two // CompactionServiceInput is used the pass compaction information between two

@ -321,7 +321,8 @@ class CompactionJobTestBase : public testing::Test {
const std::vector<SequenceNumber>& snapshots = {}, const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true, int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) { uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
bool check_get_priority = false) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
size_t num_input_files = 0; size_t num_input_files = 0;
@ -390,6 +391,32 @@ class CompactionJobTestBase : public testing::Test {
expected_oldest_blob_file_number); expected_oldest_blob_file_number);
} }
} }
if (check_get_priority) {
CheckGetRateLimiterPriority(compaction_job);
}
}
void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
// When the state from WriteController is normal.
ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW);
WriteController* write_controller =
compaction_job.versions_->GetColumnFamilySet()->write_controller();
{
// When the state from WriteController is Delayed.
std::unique_ptr<WriteControllerToken> delay_token =
write_controller->GetDelayToken(1000000);
ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
}
{
// When the state from WriteController is Stopped.
std::unique_ptr<WriteControllerToken> stop_token =
write_controller->GetStopToken();
ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
}
} }
std::shared_ptr<Env> env_guard_; std::shared_ptr<Env> env_guard_;
@ -1303,6 +1330,17 @@ TEST_F(CompactionJobTest, ResultSerialization) {
} }
} }
TEST_F(CompactionJobTest, GetRateLimiterPriority) {
NewDB();
auto expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true,
kInvalidBlobFileNumber, true);
}
class CompactionJobTimestampTest : public CompactionJobTestBase { class CompactionJobTimestampTest : public CompactionJobTestBase {
public: public:
CompactionJobTimestampTest() CompactionJobTimestampTest()

@ -810,6 +810,7 @@ Status FlushJob::WriteLevel0Table() {
{ {
auto write_hint = cfd_->CalculateSSTWriteHint(0); auto write_hint = cfd_->CalculateSSTWriteHint(0);
Env::IOPriority io_priority = GetRateLimiterPriorityForWrite();
db_mutex_->Unlock(); db_mutex_->Unlock();
if (log_buffer_) { if (log_buffer_) {
log_buffer_->FlushBufferToLog(); log_buffer_->FlushBufferToLog();
@ -925,7 +926,7 @@ Status FlushJob::WriteLevel0Table() {
snapshot_checker_, mutable_cf_options_.paranoid_file_checks, snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_, cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, io_priority, &table_properties_, write_hint, full_history_ts_low,
blob_callback_, &num_input_entries, &memtable_payload_bytes, blob_callback_, &num_input_entries, &memtable_payload_bytes,
&memtable_garbage_bytes); &memtable_garbage_bytes);
// TODO: Cleanup io_status in BuildTable and table builders // TODO: Cleanup io_status in BuildTable and table builders
@ -1032,6 +1033,19 @@ Status FlushJob::WriteLevel0Table() {
return s; return s;
} }
Env::IOPriority FlushJob::GetRateLimiterPriorityForWrite() {
if (versions_ && versions_->GetColumnFamilySet() &&
versions_->GetColumnFamilySet()->write_controller()) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
return Env::IO_USER;
}
}
return Env::IO_HIGH;
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const { std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
@ -1064,7 +1078,6 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
} }
return info; return info;
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -94,6 +94,8 @@ class FlushJob {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
private: private:
friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;
void ReportStartedFlush(); void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems); void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats(); void RecordFlushIOStats();
@ -121,6 +123,8 @@ class FlushJob {
// process has not matured yet. // process has not matured yet.
Status MemPurge(); Status MemPurge();
bool MemPurgeDecider(); bool MemPurgeDecider();
// The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriorityForWrite();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const; std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -528,6 +528,72 @@ TEST_F(FlushJobTest, Snapshots) {
job_context.Clean(); job_context.Clean();
} }
TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
// Prepare a FlushJob that flush MemTables of Single Column Family.
const size_t num_mems = 2;
const size_t num_mems_to_flush = 1;
const size_t num_keys_per_table = 100;
JobContext job_context(0);
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
std::vector<uint64_t> memtable_ids;
std::vector<MemTable*> new_mems;
for (size_t i = 0; i != num_mems; ++i) {
MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
mem->SetID(i);
mem->Ref();
new_mems.emplace_back(mem);
memtable_ids.push_back(mem->GetID());
for (size_t j = 0; j < num_keys_per_table; ++j) {
std::string key(std::to_string(j + i * num_keys_per_table));
std::string value("value" + key);
ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue,
key, value, nullptr /* kv_prot_info */));
}
}
autovector<MemTable*> to_delete;
for (auto mem : new_mems) {
cfd->imm()->Add(mem, &to_delete);
}
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
assert(memtable_ids.size() == num_mems);
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */, true /* write_manifest */,
Env::Priority::USER, nullptr /*IOTracer*/);
// When the state from WriteController is normal.
ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH);
WriteController* write_controller =
flush_job.versions_->GetColumnFamilySet()->write_controller();
{
// When the state from WriteController is Delayed.
std::unique_ptr<WriteControllerToken> delay_token =
write_controller->GetDelayToken(1000000);
ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER);
}
{
// When the state from WriteController is Stopped.
std::unique_ptr<WriteControllerToken> stop_token =
write_controller->GetStopToken();
ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER);
}
}
class FlushJobTimestampTest : public FlushJobTestBase { class FlushJobTimestampTest : public FlushJobTestBase {
public: public:
FlushJobTimestampTest() FlushJobTimestampTest()

@ -52,7 +52,7 @@ class WriteController {
bool IsStopped() const; bool IsStopped() const;
bool NeedsDelay() const { return total_delayed_.load() > 0; } bool NeedsDelay() const { return total_delayed_.load() > 0; }
bool NeedSpeedupCompaction() const { bool NeedSpeedupCompaction() const {
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0;
} }
// return how many microseconds the caller needs to sleep after the call // return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB. // num_bytes: how many number of bytes to put into the DB.

@ -54,10 +54,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
UpdateFileChecksum(data); UpdateFileChecksum(data);
{ {
IOOptions io_options;
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
IOSTATS_TIMER_GUARD(prepare_write_nanos); IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left, writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
IOOptions(), nullptr); io_options, nullptr);
} }
// See whether we need to enlarge the buffer to avoid the flush // See whether we need to enlarge the buffer to avoid the flush
@ -211,6 +215,8 @@ IOStatus WritableFileWriter::Close() {
s = Flush(); // flush cache to OS s = Flush(); // flush cache to OS
IOStatus interim; IOStatus interim;
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
// In direct I/O mode we write whole pages so // In direct I/O mode we write whole pages so
// we need to let the file know where data ends. // we need to let the file know where data ends.
if (use_direct_io()) { if (use_direct_io()) {
@ -221,7 +227,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); interim = writable_file_->Truncate(filesize_, io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -241,7 +247,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Fsync(IOOptions(), nullptr); interim = writable_file_->Fsync(io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -267,7 +273,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Close(IOOptions(), nullptr); interim = writable_file_->Close(io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -331,7 +337,11 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
s = writable_file_->Flush(IOOptions(), nullptr); IOOptions io_options;
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
s = writable_file_->Flush(io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -428,17 +438,22 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
IOSTATS_TIMER_GUARD(fsync_nanos); IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel(); auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts; FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
if (use_fsync) { if (use_fsync) {
s = writable_file_->Fsync(IOOptions(), nullptr); s = writable_file_->Fsync(io_options, nullptr);
} else { } else {
s = writable_file_->Sync(IOOptions(), nullptr); s = writable_file_->Sync(io_options, nullptr);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -466,7 +481,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -490,19 +507,19 @@ IOStatus WritableFileWriter::WriteBuffered(
size_t left = size; size_t left = size;
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
while (left > 0) { while (left > 0) {
size_t allowed; size_t allowed = left;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) { rate_limiter_priority_used != Env::IO_TOTAL) {
allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
rate_limiter_priority_used, stats_, rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
} else {
allowed = left;
} }
{ {
@ -511,7 +528,7 @@ IOStatus WritableFileWriter::WriteBuffered(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts; FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_; old_size = next_write_offset_;
@ -524,10 +541,10 @@ IOStatus WritableFileWriter::WriteBuffered(
if (perform_data_verification_) { if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, s = writable_file_->Append(Slice(src, allowed), io_options, v_info,
nullptr); nullptr);
} else { } else {
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); s = writable_file_->Append(Slice(src, allowed), io_options, nullptr);
} }
if (!s.ok()) { if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not // If writable_file_->Append() failed, then the data may or may not
@ -579,15 +596,16 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
size_t left = size; size_t left = size;
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
// Check how much is allowed. Here, we loop until the rate limiter allows to // Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer. // write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate // TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter // limiter
size_t data_size = left; size_t data_size = left;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { while (data_size > 0) {
size_t tmp_size; size_t tmp_size;
@ -604,7 +622,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts; FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_; old_size = next_write_offset_;
@ -617,8 +635,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr);
nullptr);
SetPerfLevel(prev_perf_level); SetPerfLevel(prev_perf_level);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -709,20 +726,20 @@ IOStatus WritableFileWriter::WriteDirect(
size_t left = buf_.CurrentSize(); size_t left = buf_.CurrentSize();
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
while (left > 0) { while (left > 0) {
// Check how much is allowed // Check how much is allowed
size_t size; size_t size = left;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) { rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(), size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(), rate_limiter_priority_used, stats_,
stats_, RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
} else {
size = left;
} }
{ {
@ -737,10 +754,10 @@ IOStatus WritableFileWriter::WriteDirect(
Crc32cHandoffChecksumCalculation(src, size, checksum_buf); Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, size), write_offset, s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), v_info, nullptr); io_options, v_info, nullptr);
} else { } else {
s = writable_file_->PositionedAppend(Slice(src, size), write_offset, s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), nullptr); io_options, nullptr);
} }
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -810,20 +827,22 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority,
io_options.rate_limiter_priority);
// Check how much is allowed. Here, we loop until the rate limiter allows to // Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer. // write the entire buffer.
// TODO: need to be improved since it sort of defeats the purpose of the rate // TODO: need to be improved since it sort of defeats the purpose of the rate
// limiter // limiter
size_t data_size = left; size_t data_size = left;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { while (data_size > 0) {
size_t size; size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
writable_file_->GetIOPriority(), rate_limiter_priority_used, stats_,
stats_, RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
data_size -= size; data_size -= size;
} }
} }
@ -839,7 +858,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, left), write_offset, s = writable_file_->PositionedAppend(Slice(src, left), write_offset,
IOOptions(), v_info, nullptr); io_options, v_info, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -882,16 +901,21 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
Env::IOPriority writable_file_io_priority, Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority) { Env::IOPriority op_rate_limiter_priority,
Env::IOPriority& iooptions_io_priority) {
Env::IOPriority rate_limiter_priority{Env::IO_TOTAL};
if (writable_file_io_priority == Env::IO_TOTAL && if (writable_file_io_priority == Env::IO_TOTAL &&
op_rate_limiter_priority == Env::IO_TOTAL) { op_rate_limiter_priority == Env::IO_TOTAL) {
return Env::IO_TOTAL; rate_limiter_priority = Env::IO_TOTAL;
} else if (writable_file_io_priority == Env::IO_TOTAL) { } else if (writable_file_io_priority == Env::IO_TOTAL) {
return op_rate_limiter_priority; rate_limiter_priority = op_rate_limiter_priority;
} else if (op_rate_limiter_priority == Env::IO_TOTAL) { } else if (op_rate_limiter_priority == Env::IO_TOTAL) {
return writable_file_io_priority; rate_limiter_priority = writable_file_io_priority;
} else { } else {
return op_rate_limiter_priority; rate_limiter_priority = op_rate_limiter_priority;
} }
iooptions_io_priority = rate_limiter_priority;
return rate_limiter_priority;
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -277,9 +277,11 @@ class WritableFileWriter {
const char* GetFileChecksumFuncName() const; const char* GetFileChecksumFuncName() const;
private: private:
// Decide the Rate Limiter priority and update io_options.io_priority.
static Env::IOPriority DecideRateLimiterPriority( static Env::IOPriority DecideRateLimiterPriority(
Env::IOPriority writable_file_io_priority, Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority); Env::IOPriority op_rate_limiter_priority,
Env::IOPriority& iooptions_io_priority);
// Used when os buffering is OFF and we are writing // Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode // DMA such as in Direct I/O mode

@ -901,6 +901,154 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
fwf->CheckCounters(1, 1); fwf->CheckCounters(1, 1);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
class WritableFileWriterIOPriorityTest : public testing::Test {
protected:
// This test is to check whether the rate limiter priority can be passed
// correctly from WritableFileWriter functions to FSWritableFile functions.
void SetUp() override {
// When op_rate_limiter_priority parameter in WritableFileWriter functions
// is the default (Env::IO_TOTAL).
std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)};
FileOptions file_options;
writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */,
file_options));
}
class FakeWF : public FSWritableFile {
public:
explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); }
~FakeWF() override {}
IOStatus Append(const Slice& /*data*/, const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Append(const Slice& data, const IOOptions& options,
const DataVerificationInfo& /* verification_info */,
IODebugContext* dbg) override {
return Append(data, options, dbg);
}
IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/,
const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus PositionedAppend(
const Slice& /* data */, uint64_t /* offset */,
const IOOptions& options,
const DataVerificationInfo& /* verification_info */,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Truncate(uint64_t /*size*/, const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
uint64_t GetFileSize(const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return 0;
}
void GetPreallocationStatus(size_t* /*block_size*/,
size_t* /*last_allocated_block*/) override {}
size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
return 0;
}
IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return IOStatus::OK();
}
IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
return IOStatus::OK();
}
void PrepareWrite(size_t /*offset*/, size_t /*len*/,
const IOOptions& options,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(options.rate_limiter_priority, io_priority_);
}
bool IsSyncThreadSafe() const override { return true; }
};
std::unique_ptr<WritableFileWriter> writer_;
};
TEST_F(WritableFileWriterIOPriorityTest, Append) {
ASSERT_OK(writer_->Append(Slice("abc")));
}
TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); }
TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); }
TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); }
TEST_F(WritableFileWriterIOPriorityTest, Sync) {
ASSERT_OK(writer_->Sync(false));
ASSERT_OK(writer_->Sync(true));
}
TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) {
ASSERT_OK(writer_->SyncWithoutFlush(false));
ASSERT_OK(writer_->SyncWithoutFlush(true));
}
TEST_F(WritableFileWriterIOPriorityTest, BasicOp) {
EnvOptions env_options;
env_options.bytes_per_sync = kMb;
std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH));
std::unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
Random r(301);
Status s;
std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
for (int i = 0; i < 1000; i++) {
int skew_limit = (i < 700) ? 10 : 15;
uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
s = writer->Append(Slice(large_buf.get(), num));
ASSERT_OK(s);
// Flush in a chance of 1/10.
if (r.Uniform(10) == 0) {
s = writer->Flush();
ASSERT_OK(s);
}
}
s = writer->Close();
ASSERT_OK(s);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save