diff --git a/HISTORY.md b/HISTORY.md index c541fad7d..cabf93b73 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property. +* RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions. ## 5.6.0 (06/06/2017) ### Public API Change diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 477fc7469..324fcd1da 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -769,8 +769,9 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, // is that in case the write is heavy, low pri writes may never have // a chance to run. Now we guarantee we are still slowly making // progress. - write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(), - Env::IO_HIGH, nullptr); + write_controller_.low_pri_rate_limiter()->Request( + my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); } } return Status::OK(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 42403f2aa..b2b814961 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2236,6 +2236,71 @@ TEST_F(DBTest2, LowPriWrite) { Put("", "", wo); ASSERT_EQ(1, rate_limit_count.load()); } + +TEST_F(DBTest2, RateLimitedCompactionReads) { + // compaction input has 512KB data + const int kNumKeysPerFile = 128; + const int kBytesPerKey = 1024; + const int kNumL0Files = 4; + + for (auto use_direct_io : {false, true}) { + if (use_direct_io && !IsDirectIOSupported()) { + continue; + } + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options.new_table_reader_for_compaction_inputs = true; + // takes roughly one second, split into 100 x 10ms intervals. Each interval + // permits 5.12KB, which is smaller than the block size, so this test + // exercises the code for chunking reads. + options.rate_limiter.reset(NewGenericRateLimiter( + static_cast(kNumL0Files * kNumKeysPerFile * + kBytesPerKey) /* rate_bytes_per_sec */, + 10 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kReadsOnly)); + options.use_direct_io_for_flush_and_compaction = use_direct_io; + BlockBasedTableOptions bbto; + bbto.block_size = 16384; + bbto.no_block_cache = true; + options.table_factory.reset(new BlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + + for (int i = 0; i < kNumL0Files; ++i) { + for (int j = 0; j <= kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey))); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH)); + // should be slightly above 512KB due to non-data blocks read. Arbitrarily + // chose 1MB as the upper bound on the total bytes read. + size_t rate_limited_bytes = + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW); + ASSERT_GE( + rate_limited_bytes, + static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + ASSERT_LT( + rate_limited_bytes, + static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey)); + } + delete iter; + // bytes read for user iterator shouldn't count against the rate limit. + ASSERT_EQ(rate_limited_bytes, + static_cast( + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/table_cache.cc b/db/table_cache.cc index be5b9f038..287f3dea9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -91,7 +91,8 @@ Status TableCache::GetTableReader( const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, size_t readahead, bool record_read_stats, HistogramImpl* file_read_hist, unique_ptr* table_reader, - bool skip_filters, int level, bool prefetch_index_and_filter_in_cache) { + bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, + bool for_compaction) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; @@ -109,7 +110,8 @@ Status TableCache::GetTableReader( std::unique_ptr file_reader( new RandomAccessFileReader(std::move(file), ioptions_.env, ioptions_.statistics, record_read_stats, - file_read_hist)); + file_read_hist, ioptions_.rate_limiter, + for_compaction)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, env_options, internal_comparator, skip_filters, level), @@ -205,7 +207,8 @@ InternalIterator* TableCache::NewIterator( s = GetTableReader( env_options, icomparator, fd, true /* sequential_mode */, readahead, !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, - false /* skip_filters */, level); + false /* skip_filters */, level, + true /* prefetch_index_and_filter_in_cache */, for_compaction); if (s.ok()) { table_reader = table_reader_unique_ptr.release(); } diff --git a/db/table_cache.h b/db/table_cache.h index 68dbfa74e..85adba510 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -136,7 +136,8 @@ class TableCache { HistogramImpl* file_read_hist, unique_ptr* table_reader, bool skip_filters = false, int level = -1, - bool prefetch_index_and_filter_in_cache = true); + bool prefetch_index_and_filter_in_cache = true, + bool for_compaction = false); const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 1fec5e081..03976a9f6 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -18,22 +18,40 @@ namespace rocksdb { class RateLimiter { public: + enum class OpType { + // Limitation: we currently only invoke Request() with OpType::kRead for + // compactions when DBOptions::new_table_reader_for_compaction_inputs is set + kRead, + kWrite, + }; + enum class Mode { + kReadsOnly, + kWritesOnly, + kAllIo, + }; + + // For API compatibility, default to rate-limiting writes only. + explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} + virtual ~RateLimiter() {} // This API allows user to dynamically change rate limiter's bytes per second. // REQUIRED: bytes_per_second > 0 virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; - // Request for token to write bytes. If this request can not be satisfied, - // the call is blocked. Caller is responsible to make sure + // Deprecated. New RateLimiter derived classes should override + // Request(const int64_t, const Env::IOPriority, Statistics*) or + // Request(const int64_t, const Env::IOPriority, Statistics*, OpType) + // instead. + // + // Request for token for bytes. If this request can not be satisfied, the call + // is blocked. Caller is responsible to make sure // bytes <= GetSingleBurstBytes() virtual void Request(const int64_t bytes, const Env::IOPriority pri) { - // Deprecated. New RateLimiter derived classes should override - // Request(const int64_t, const Env::IOPriority, Statistics*) instead. assert(false); } - // Request for token to write bytes and potentially update statistics. If this + // Request for token for bytes and potentially update statistics. If this // request can not be satisfied, the call is blocked. Caller is responsible to // make sure bytes <= GetSingleBurstBytes(). virtual void Request(const int64_t bytes, const Env::IOPriority pri, @@ -43,6 +61,25 @@ class RateLimiter { Request(bytes, pri); } + // Requests token to read or write bytes and potentially updates statistics. + // + // If this request can not be satisfied, the call is blocked. Caller is + // responsible to make sure bytes <= GetSingleBurstBytes(). + virtual void Request(const int64_t bytes, const Env::IOPriority pri, + Statistics* stats, OpType op_type) { + if (IsRateLimited(op_type)) { + Request(bytes, pri, stats); + } + } + + // Requests token to read or write bytes and potentially updates statistics. + // Takes into account GetSingleBurstBytes() and alignment (e.g., in case of + // direct I/O) to allocate an appropriate number of bytes, which may be less + // than the number of bytes requested. + virtual size_t RequestToken(size_t bytes, size_t alignment, + Env::IOPriority io_priority, Statistics* stats, + RateLimiter::OpType op_type); + // Max bytes can be granted in a single burst virtual int64_t GetSingleBurstBytes() const = 0; @@ -55,6 +92,22 @@ class RateLimiter { const Env::IOPriority pri = Env::IO_TOTAL) const = 0; virtual int64_t GetBytesPerSecond() const = 0; + + virtual bool IsRateLimited(OpType op_type) { + if ((mode_ == RateLimiter::Mode::kWritesOnly && + op_type == RateLimiter::OpType::kRead) || + (mode_ == RateLimiter::Mode::kReadsOnly && + op_type == RateLimiter::OpType::kWrite)) { + return false; + } + return true; + } + + protected: + Mode GetMode() { return mode_; } + + private: + const Mode mode_; }; // Create a RateLimiter object, which can be shared among RocksDB instances to @@ -75,9 +128,10 @@ class RateLimiter { // continuously. This fairness parameter grants low-pri requests permission by // 1/fairness chance even though high-pri requests exist to avoid starvation. // You should be good by leaving it at default 10. +// @mode: Mode indicates which types of operations count against the limit. extern RateLimiter* NewGenericRateLimiter( - int64_t rate_bytes_per_sec, - int64_t refill_period_us = 100 * 1000, - int32_t fairness = 10); + int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); } // namespace rocksdb diff --git a/options/cf_options.cc b/options/cf_options.cc index 79e60abb5..d0c0a6f5d 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -45,6 +45,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, inplace_callback(cf_options.inplace_callback), info_log(db_options.info_log.get()), statistics(db_options.statistics.get()), + rate_limiter(db_options.rate_limiter.get()), env(db_options.env), allow_mmap_reads(db_options.allow_mmap_reads), allow_mmap_writes(db_options.allow_mmap_writes), diff --git a/options/cf_options.h b/options/cf_options.h index 397ee5d6f..b1e11d7bb 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -59,6 +59,8 @@ struct ImmutableCFOptions { Statistics* statistics; + RateLimiter* rate_limiter; + InfoLogLevel info_log_level; Env* env; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 27adaf76b..6ea3fc47f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -785,6 +785,9 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_bool(rate_limit_bg_reads, false, + "Use options.rate_limiter on compaction reads"); + DEFINE_uint64( benchmark_write_rate_limit, 0, "If non-zero, db_bench will rate-limit the writes going into RocksDB. This " @@ -2579,8 +2582,9 @@ void VerifyDBFromDB(std::string& truth_db_name) { NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit)); } if (FLAGS_benchmark_read_rate_limit > 0) { - shared.read_rate_limiter.reset( - NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit)); + shared.read_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kReadsOnly)); } std::unique_ptr reporter_agent; @@ -3132,8 +3136,18 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.enable_thread_tracking = true; } if (FLAGS_rate_limiter_bytes_per_sec > 0) { - options.rate_limiter.reset( - NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec)); + if (FLAGS_rate_limit_bg_reads && + !FLAGS_new_table_reader_for_compaction_inputs) { + fprintf(stderr, + "rate limit compaction reads must have " + "new_table_reader_for_compaction_inputs set\n"); + exit(1); + } + options.rate_limiter.reset(NewGenericRateLimiter( + FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, + FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly + : RateLimiter::Mode::kWritesOnly)); } #ifndef ROCKSDB_LITE @@ -3423,7 +3437,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->write_rate_limiter.get() != nullptr) { thread->shared->write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); // Set time at which last op finished to Now() to hide latency and // sleep from rate limiter. Also, do the check once per batch, not // once per write. @@ -3833,7 +3847,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && i % 1024 == 1023) { thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, + RateLimiter::OpType::kRead); } } @@ -3865,7 +3880,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && i % 1024 == 1023) { thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, + RateLimiter::OpType::kRead); } } delete iter; @@ -3906,8 +3922,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { } } if (thread->shared->read_rate_limiter.get() != nullptr) { - thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(nullptr, db, 100, kRead); @@ -3991,8 +4007,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && read % 256 == 255) { - thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead); @@ -4048,7 +4064,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && num_multireads % 256 == 255) { thread->shared->read_rate_limiter->Request( - 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */); + 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); } thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead); } @@ -4145,8 +4162,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (thread->shared->read_rate_limiter.get() != nullptr && read % 256 == 255) { - thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } thread->stats.FinishedOps(&db_, db_.db, 1, kSeek); @@ -4294,7 +4311,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (FLAGS_benchmark_write_rate_limit > 0) { write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } } thread->stats.AddBytes(bytes); @@ -4965,8 +4982,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { found += key_found; if (thread->shared->read_rate_limiter.get() != nullptr) { - thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH, - nullptr /* stats */); + thread->shared->read_rate_limiter->Request( + 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); } } delete iter; @@ -5037,7 +5054,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (FLAGS_benchmark_write_rate_limit > 0) { write_rate_limiter->Request( entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } } } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 44ff1bbda..ef7273ee8 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -245,7 +245,7 @@ DEFINE_uint64(subcompactions, 1, "Maximum number of subcompactions to divide L0-L1 compactions " "into."); -DEFINE_bool(allow_concurrent_memtable_write, true, +DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); DEFINE_bool(enable_write_thread_adaptive_yield, true, @@ -326,6 +326,11 @@ DEFINE_double(max_bytes_for_level_multiplier, 2, DEFINE_int32(range_deletion_width, 10, "The width of the range deletion intervals."); +DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); + +DEFINE_bool(rate_limit_bg_reads, false, + "Use options.rate_limiter on compaction reads"); + // Temporarily disable this to allows it to detect new bugs DEFINE_int32(compact_files_one_in, 0, "If non-zero, then CompactFiles() will be called one for every N " @@ -2182,6 +2187,16 @@ class StressTest { FLAGS_allow_concurrent_memtable_write; options_.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; + if (FLAGS_rate_limiter_bytes_per_sec > 0) { + options_.rate_limiter.reset(NewGenericRateLimiter( + FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, + 10 /* fairness */, + FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly + : RateLimiter::Mode::kWritesOnly)); + if (FLAGS_rate_limit_bg_reads) { + options_.new_table_reader_for_compaction_inputs = true; + } + } if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) { fprintf(stderr, diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index a29fe9715..b578b642a 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -84,22 +84,63 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t offset_advance = offset - aligned_offset; - size_t size = Roundup(offset + n, alignment) - aligned_offset; - size_t r = 0; + size_t read_size = Roundup(offset + n, alignment) - aligned_offset; AlignedBuffer buf; buf.Alignment(alignment); - buf.AllocateNewBuffer(size); - Slice tmp; - s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart()); - if (s.ok() && offset_advance < tmp.size()) { - buf.Size(tmp.size()); - r = buf.Read(scratch, offset_advance, - std::min(tmp.size() - offset_advance, n)); + buf.AllocateNewBuffer(read_size); + while (buf.CurrentSize() < read_size) { + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = read_size; + } + Slice tmp; + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, + buf.Destination()); + buf.Size(buf.CurrentSize() + tmp.size()); + if (!s.ok() || tmp.size() < allowed) { + break; + } } - *result = Slice(scratch, r); + size_t res_len = 0; + if (s.ok() && offset_advance < buf.CurrentSize()) { + res_len = buf.Read(scratch, offset_advance, + std::min(buf.CurrentSize() - offset_advance, n)); + } + *result = Slice(scratch, res_len); #endif // !ROCKSDB_LITE } else { - s = file_->Read(offset, n, result, scratch); + size_t pos = 0; + const char* res_scratch = nullptr; + while (pos < n) { + size_t allowed; + if (for_compaction_ && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, + Env::IOPriority::IO_LOW, stats_, + RateLimiter::OpType::kRead); + } else { + allowed = n; + } + Slice tmp_result; + s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + if (res_scratch == nullptr) { + // we can't simply use `scratch` because reads of mmap'd files return + // data in a different buffer. + res_scratch = tmp_result.data(); + } else { + // make sure chunks are inserted contiguously into `res_scratch`. + assert(tmp_result.data() == res_scratch + pos); + } + pos += tmp_result.size(); + if (!s.ok() || tmp_result.size() < allowed) { + break; + } + } + *result = Slice(res_scratch, s.ok() ? pos : 0); } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); } @@ -319,25 +360,6 @@ Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { return writable_file_->RangeSync(offset, nbytes); } -size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { - Env::IOPriority io_priority; - if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < - Env::IO_TOTAL) { - bytes = std::min( - bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); - - if (align) { - // Here we may actually require more than burst and block - // but we can not write less than one page at a time on direct I/O - // thus we may want not to use ratelimiter - size_t alignment = buf_.Alignment(); - bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); - } - rate_limiter_->Request(bytes, io_priority, stats_); - } - return bytes; -} - // This method writes to disk the specified data and makes use of the rate // limiter if available Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { @@ -347,7 +369,14 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { size_t left = size; while (left > 0) { - size_t allowed = RequestToken(left, false); + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + } else { + allowed = left; + } { IOSTATS_TIMER_GUARD(write_nanos); @@ -403,7 +432,14 @@ Status WritableFileWriter::WriteDirect() { while (left > 0) { // Check how much is allowed - size_t size = RequestToken(left, true); + size_t size; + if (rate_limiter_ != nullptr) { + size = rate_limiter_->RequestToken(left, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + } else { + size = left; + } { IOSTATS_TIMER_GUARD(write_nanos); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 4a82e1ddc..b3c65c95c 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -13,6 +13,7 @@ #include #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" #include "util/aligned_buffer.h" namespace rocksdb { @@ -53,9 +54,6 @@ class SequentialFileReader { SequentialFile* file() { return file_.get(); } bool use_direct_io() const { return file_->use_direct_io(); } - - protected: - Status DirectRead(size_t n, Slice* result, char* scratch); }; class RandomAccessFileReader { @@ -65,29 +63,38 @@ class RandomAccessFileReader { Statistics* stats_; uint32_t hist_type_; HistogramImpl* file_read_hist_; + RateLimiter* rate_limiter_; + bool for_compaction_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, - HistogramImpl* file_read_hist = nullptr) + HistogramImpl* file_read_hist = nullptr, + RateLimiter* rate_limiter = nullptr, + bool for_compaction = false) : file_(std::move(raf)), env_(env), stats_(stats), hist_type_(hist_type), - file_read_hist_(file_read_hist) {} + file_read_hist_(file_read_hist), + rate_limiter_(rate_limiter), + for_compaction_(for_compaction) {} RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); } - RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{ + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) + ROCKSDB_NOEXCEPT { file_ = std::move(o.file_); env_ = std::move(o.env_); stats_ = std::move(o.stats_); hist_type_ = std::move(o.hist_type_); file_read_hist_ = std::move(o.file_read_hist_); + rate_limiter_ = std::move(o.rate_limiter_); + for_compaction_ = std::move(o.for_compaction_); return *this; } @@ -103,10 +110,6 @@ class RandomAccessFileReader { RandomAccessFile* file() { return file_.get(); } bool use_direct_io() const { return file_->use_direct_io(); } - - protected: - Status DirectRead(uint64_t offset, size_t n, Slice* result, - char* scratch) const; }; // Use posix write to write data to a file. @@ -187,7 +190,6 @@ class WritableFileWriter { // Normal write Status WriteBuffered(const char* data, size_t size); Status RangeSync(uint64_t offset, uint64_t nbytes); - size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 064764cb6..e8d4cdf87 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -13,10 +13,27 @@ #include "monitoring/statistics.h" #include "port/port.h" #include "rocksdb/env.h" +#include "util/aligned_buffer.h" #include "util/sync_point.h" namespace rocksdb { +size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, + Env::IOPriority io_priority, Statistics* stats, + RateLimiter::OpType op_type) { + if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) { + bytes = std::min(bytes, static_cast(GetSingleBurstBytes())); + + if (alignment > 0) { + // Here we may actually require more than burst and block + // but we can not write less than one page at a time on direct I/O + // thus we may want not to use ratelimiter + bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); + } + Request(bytes, io_priority, stats, op_type); + } + return bytes; +} // Pending request struct GenericRateLimiter::Req { @@ -30,8 +47,9 @@ struct GenericRateLimiter::Req { GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, - int32_t fairness) - : refill_period_us_(refill_period_us), + int32_t fairness, RateLimiter::Mode mode) + : RateLimiter(mode), + refill_period_us_(refill_period_us), rate_bytes_per_sec_(rate_bytes_per_sec), refill_bytes_per_period_( CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), @@ -241,12 +259,14 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( } RateLimiter* NewGenericRateLimiter( - int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { + int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, + int32_t fairness /* = 10 */, + RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) { assert(rate_bytes_per_sec > 0); assert(refill_period_us > 0); assert(fairness > 0); - return new GenericRateLimiter( - rate_bytes_per_sec, refill_period_us, fairness); + return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, + mode); } } // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 238be5c3d..106e25c59 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -24,8 +24,9 @@ namespace rocksdb { class GenericRateLimiter : public RateLimiter { public: - GenericRateLimiter(int64_t refill_bytes, - int64_t refill_period_us, int32_t fairness); + GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, + int32_t fairness, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); virtual ~GenericRateLimiter(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index c20806ddf..4d4d0f03f 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -35,6 +35,30 @@ TEST_F(RateLimiterTest, StartStop) { std::unique_ptr limiter(NewGenericRateLimiter(100, 100, 10)); } +TEST_F(RateLimiterTest, Modes) { + for (auto mode : {RateLimiter::Mode::kWritesOnly, + RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { + GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */, + 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, mode); + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); + if (mode == RateLimiter::Mode::kWritesOnly) { + ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + if (mode == RateLimiter::Mode::kAllIo) { + ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + } +} + TEST_F(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { @@ -57,10 +81,11 @@ TEST_F(RateLimiterTest, Rate) { while (thread_env->NowMicros() < until) { for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, - Env::IO_HIGH, nullptr /* stats */); + Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); } arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, - nullptr /* stats */); + nullptr /* stats */, RateLimiter::OpType::kWrite); } }; @@ -113,7 +138,8 @@ TEST_F(RateLimiterTest, LimitChangeTest) { auto writer = [](void* p) { auto* arg = static_cast(p); - arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */); + arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */, + RateLimiter::OpType::kWrite); }; for (uint32_t i = 1; i <= 16; i <<= 1) { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 233b34388..1377e94a6 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1214,7 +1214,8 @@ Status BackupEngineImpl::CopyOrCreateFile( } s = dest_writer->Append(data); if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */); + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); } if (processed_buffer_size > options_.callback_trigger_interval_size) { processed_buffer_size -= options_.callback_trigger_interval_size;