Call RateLimiter for compaction reads

Summary:
Allow users to rate limit background work based on read bytes, written bytes, or sum of read and written bytes. Support these by changing the RateLimiter API, so no additional options were needed.
Closes https://github.com/facebook/rocksdb/pull/2433

Differential Revision: D5216946

Pulled By: ajkr

fbshipit-source-id: aec57a8357dbb4bfde2003261094d786d94f724e
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 91e2aa3ce2
commit c217e0b9c7
  1. 1
      HISTORY.md
  2. 5
      db/db_impl_write.cc
  3. 65
      db/db_test2.cc
  4. 9
      db/table_cache.cc
  5. 3
      db/table_cache.h
  6. 70
      include/rocksdb/rate_limiter.h
  7. 1
      options/cf_options.cc
  8. 2
      options/cf_options.h
  9. 53
      tools/db_bench_tool.cc
  10. 17
      tools/db_stress.cc
  11. 100
      util/file_reader_writer.cc
  12. 24
      util/file_reader_writer.h
  13. 30
      util/rate_limiter.cc
  14. 5
      util/rate_limiter.h
  15. 32
      util/rate_limiter_test.cc
  16. 3
      utilities/backupable/backupable_db.cc

@ -5,6 +5,7 @@
### New Features
* Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property.
* RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions.
## 5.6.0 (06/06/2017)
### Public API Change

@ -769,8 +769,9 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
// is that in case the write is heavy, low pri writes may never have
// a chance to run. Now we guarantee we are still slowly making
// progress.
write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(),
Env::IO_HIGH, nullptr);
write_controller_.low_pri_rate_limiter()->Request(
my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kWrite);
}
}
return Status::OK();

@ -2236,6 +2236,71 @@ TEST_F(DBTest2, LowPriWrite) {
Put("", "", wo);
ASSERT_EQ(1, rate_limit_count.load());
}
TEST_F(DBTest2, RateLimitedCompactionReads) {
// compaction input has 512KB data
const int kNumKeysPerFile = 128;
const int kBytesPerKey = 1024;
const int kNumL0Files = 4;
for (auto use_direct_io : {false, true}) {
if (use_direct_io && !IsDirectIOSupported()) {
continue;
}
Options options = CurrentOptions();
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
options.new_table_reader_for_compaction_inputs = true;
// takes roughly one second, split into 100 x 10ms intervals. Each interval
// permits 5.12KB, which is smaller than the block size, so this test
// exercises the code for chunking reads.
options.rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
kBytesPerKey) /* rate_bytes_per_sec */,
10 * 1000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kReadsOnly));
options.use_direct_io_for_flush_and_compaction = use_direct_io;
BlockBasedTableOptions bbto;
bbto.block_size = 16384;
bbto.no_block_cache = true;
options.table_factory.reset(new BlockBasedTableFactory(bbto));
DestroyAndReopen(options);
for (int i = 0; i < kNumL0Files; ++i) {
for (int j = 0; j <= kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
// should be slightly above 512KB due to non-data blocks read. Arbitrarily
// chose 1MB as the upper bound on the total bytes read.
size_t rate_limited_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files));
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
}
delete iter;
// bytes read for user iterator shouldn't count against the rate limit.
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -91,7 +91,8 @@ Status TableCache::GetTableReader(
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache) {
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
bool for_compaction) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
@ -109,7 +110,8 @@ Status TableCache::GetTableReader(
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), ioptions_.env,
ioptions_.statistics, record_read_stats,
file_read_hist));
file_read_hist, ioptions_.rate_limiter,
for_compaction));
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, env_options, internal_comparator,
skip_filters, level),
@ -205,7 +207,8 @@ InternalIterator* TableCache::NewIterator(
s = GetTableReader(
env_options, icomparator, fd, true /* sequential_mode */, readahead,
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
false /* skip_filters */, level,
true /* prefetch_index_and_filter_in_cache */, for_compaction);
if (s.ok()) {
table_reader = table_reader_unique_ptr.release();
}

@ -136,7 +136,8 @@ class TableCache {
HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true);
bool prefetch_index_and_filter_in_cache = true,
bool for_compaction = false);
const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_;

@ -18,22 +18,40 @@ namespace rocksdb {
class RateLimiter {
public:
enum class OpType {
// Limitation: we currently only invoke Request() with OpType::kRead for
// compactions when DBOptions::new_table_reader_for_compaction_inputs is set
kRead,
kWrite,
};
enum class Mode {
kReadsOnly,
kWritesOnly,
kAllIo,
};
// For API compatibility, default to rate-limiting writes only.
explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {}
virtual ~RateLimiter() {}
// This API allows user to dynamically change rate limiter's bytes per second.
// REQUIRED: bytes_per_second > 0
virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0;
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// Deprecated. New RateLimiter derived classes should override
// Request(const int64_t, const Env::IOPriority, Statistics*) or
// Request(const int64_t, const Env::IOPriority, Statistics*, OpType)
// instead.
//
// Request for token for bytes. If this request can not be satisfied, the call
// is blocked. Caller is responsible to make sure
// bytes <= GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) {
// Deprecated. New RateLimiter derived classes should override
// Request(const int64_t, const Env::IOPriority, Statistics*) instead.
assert(false);
}
// Request for token to write bytes and potentially update statistics. If this
// Request for token for bytes and potentially update statistics. If this
// request can not be satisfied, the call is blocked. Caller is responsible to
// make sure bytes <= GetSingleBurstBytes().
virtual void Request(const int64_t bytes, const Env::IOPriority pri,
@ -43,6 +61,25 @@ class RateLimiter {
Request(bytes, pri);
}
// Requests token to read or write bytes and potentially updates statistics.
//
// If this request can not be satisfied, the call is blocked. Caller is
// responsible to make sure bytes <= GetSingleBurstBytes().
virtual void Request(const int64_t bytes, const Env::IOPriority pri,
Statistics* stats, OpType op_type) {
if (IsRateLimited(op_type)) {
Request(bytes, pri, stats);
}
}
// Requests token to read or write bytes and potentially updates statistics.
// Takes into account GetSingleBurstBytes() and alignment (e.g., in case of
// direct I/O) to allocate an appropriate number of bytes, which may be less
// than the number of bytes requested.
virtual size_t RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type);
// Max bytes can be granted in a single burst
virtual int64_t GetSingleBurstBytes() const = 0;
@ -55,6 +92,22 @@ class RateLimiter {
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
virtual int64_t GetBytesPerSecond() const = 0;
virtual bool IsRateLimited(OpType op_type) {
if ((mode_ == RateLimiter::Mode::kWritesOnly &&
op_type == RateLimiter::OpType::kRead) ||
(mode_ == RateLimiter::Mode::kReadsOnly &&
op_type == RateLimiter::OpType::kWrite)) {
return false;
}
return true;
}
protected:
Mode GetMode() { return mode_; }
private:
const Mode mode_;
};
// Create a RateLimiter object, which can be shared among RocksDB instances to
@ -75,9 +128,10 @@ class RateLimiter {
// continuously. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
// @mode: Mode indicates which types of operations count against the limit.
extern RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec,
int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10);
int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly);
} // namespace rocksdb

@ -45,6 +45,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
inplace_callback(cf_options.inplace_callback),
info_log(db_options.info_log.get()),
statistics(db_options.statistics.get()),
rate_limiter(db_options.rate_limiter.get()),
env(db_options.env),
allow_mmap_reads(db_options.allow_mmap_reads),
allow_mmap_writes(db_options.allow_mmap_writes),

@ -59,6 +59,8 @@ struct ImmutableCFOptions {
Statistics* statistics;
RateLimiter* rate_limiter;
InfoLogLevel info_log_level;
Env* env;

@ -785,6 +785,9 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_bool(rate_limit_bg_reads, false,
"Use options.rate_limiter on compaction reads");
DEFINE_uint64(
benchmark_write_rate_limit, 0,
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
@ -2579,8 +2582,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
}
if (FLAGS_benchmark_read_rate_limit > 0) {
shared.read_rate_limiter.reset(
NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit));
shared.read_rate_limiter.reset(NewGenericRateLimiter(
FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kReadsOnly));
}
std::unique_ptr<ReporterAgent> reporter_agent;
@ -3132,8 +3136,18 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.enable_thread_tracking = true;
}
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
options.rate_limiter.reset(
NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
if (FLAGS_rate_limit_bg_reads &&
!FLAGS_new_table_reader_for_compaction_inputs) {
fprintf(stderr,
"rate limit compaction reads must have "
"new_table_reader_for_compaction_inputs set\n");
exit(1);
}
options.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
}
#ifndef ROCKSDB_LITE
@ -3423,7 +3437,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->write_rate_limiter.get() != nullptr) {
thread->shared->write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
nullptr /* stats */);
nullptr /* stats */, RateLimiter::OpType::kWrite);
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
@ -3833,7 +3847,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->read_rate_limiter.get() != nullptr &&
i % 1024 == 1023) {
thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
nullptr /* stats */);
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
}
@ -3865,7 +3880,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->read_rate_limiter.get() != nullptr &&
i % 1024 == 1023) {
thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
nullptr /* stats */);
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
}
delete iter;
@ -3906,8 +3922,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
}
}
if (thread->shared->read_rate_limiter.get() != nullptr) {
thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH,
nullptr /* stats */);
thread->shared->read_rate_limiter->Request(
100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
}
thread->stats.FinishedOps(nullptr, db, 100, kRead);
@ -3991,8 +4007,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH,
nullptr /* stats */);
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
@ -4048,7 +4064,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->read_rate_limiter.get() != nullptr &&
num_multireads % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */);
256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
}
@ -4145,8 +4162,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH,
nullptr /* stats */);
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
}
thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
@ -4294,7 +4311,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
nullptr /* stats */);
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
thread->stats.AddBytes(bytes);
@ -4965,8 +4982,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
found += key_found;
if (thread->shared->read_rate_limiter.get() != nullptr) {
thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH,
nullptr /* stats */);
thread->shared->read_rate_limiter->Request(
1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
}
}
delete iter;
@ -5037,7 +5054,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
nullptr /* stats */);
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
}

@ -245,7 +245,7 @@ DEFINE_uint64(subcompactions, 1,
"Maximum number of subcompactions to divide L0-L1 compactions "
"into.");
DEFINE_bool(allow_concurrent_memtable_write, true,
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");
DEFINE_bool(enable_write_thread_adaptive_yield, true,
@ -326,6 +326,11 @@ DEFINE_double(max_bytes_for_level_multiplier, 2,
DEFINE_int32(range_deletion_width, 10,
"The width of the range deletion intervals.");
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_bool(rate_limit_bg_reads, false,
"Use options.rate_limiter on compaction reads");
// Temporarily disable this to allows it to detect new bugs
DEFINE_int32(compact_files_one_in, 0,
"If non-zero, then CompactFiles() will be called one for every N "
@ -2182,6 +2187,16 @@ class StressTest {
FLAGS_allow_concurrent_memtable_write;
options_.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
options_.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
if (FLAGS_rate_limit_bg_reads) {
options_.new_table_reader_for_compaction_inputs = true;
}
}
if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
fprintf(stderr,

@ -84,22 +84,63 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
size_t r = 0;
size_t read_size = Roundup(offset + n, alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
buf.AllocateNewBuffer(read_size);
while (buf.CurrentSize() < read_size) {
size_t allowed;
if (rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
} else {
assert(buf.CurrentSize() == 0);
allowed = read_size;
}
Slice tmp;
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
buf.Destination());
buf.Size(buf.CurrentSize() + tmp.size());
if (!s.ok() || tmp.size() < allowed) {
break;
}
}
*result = Slice(scratch, r);
size_t res_len = 0;
if (s.ok() && offset_advance < buf.CurrentSize()) {
res_len = buf.Read(scratch, offset_advance,
std::min(buf.CurrentSize() - offset_advance, n));
}
*result = Slice(scratch, res_len);
#endif // !ROCKSDB_LITE
} else {
s = file_->Read(offset, n, result, scratch);
size_t pos = 0;
const char* res_scratch = nullptr;
while (pos < n) {
size_t allowed;
if (for_compaction_ && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
Env::IOPriority::IO_LOW, stats_,
RateLimiter::OpType::kRead);
} else {
allowed = n;
}
Slice tmp_result;
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
if (res_scratch == nullptr) {
// we can't simply use `scratch` because reads of mmap'd files return
// data in a different buffer.
res_scratch = tmp_result.data();
} else {
// make sure chunks are inserted contiguously into `res_scratch`.
assert(tmp_result.data() == res_scratch + pos);
}
pos += tmp_result.size();
if (!s.ok() || tmp_result.size() < allowed) {
break;
}
}
*result = Slice(res_scratch, s.ok() ? pos : 0);
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
}
@ -319,25 +360,6 @@ Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
return writable_file_->RangeSync(offset, nbytes);
}
size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
Env::IOPriority io_priority;
if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) <
Env::IO_TOTAL) {
bytes = std::min(
bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
if (align) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on direct I/O
// thus we may want not to use ratelimiter
size_t alignment = buf_.Alignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
rate_limiter_->Request(bytes, io_priority, stats_);
}
return bytes;
}
// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
@ -347,7 +369,14 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
size_t left = size;
while (left > 0) {
size_t allowed = RequestToken(left, false);
size_t allowed;
if (rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
RateLimiter::OpType::kWrite);
} else {
allowed = left;
}
{
IOSTATS_TIMER_GUARD(write_nanos);
@ -403,7 +432,14 @@ Status WritableFileWriter::WriteDirect() {
while (left > 0) {
// Check how much is allowed
size_t size = RequestToken(left, true);
size_t size;
if (rate_limiter_ != nullptr) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
} else {
size = left;
}
{
IOSTATS_TIMER_GUARD(write_nanos);

@ -13,6 +13,7 @@
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
@ -53,9 +54,6 @@ class SequentialFileReader {
SequentialFile* file() { return file_.get(); }
bool use_direct_io() const { return file_->use_direct_io(); }
protected:
Status DirectRead(size_t n, Slice* result, char* scratch);
};
class RandomAccessFileReader {
@ -65,29 +63,38 @@ class RandomAccessFileReader {
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
RateLimiter* rate_limiter_;
bool for_compaction_;
public:
explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf,
Env* env = nullptr,
Statistics* stats = nullptr,
uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr)
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
bool for_compaction = false)
: file_(std::move(raf)),
env_(env),
stats_(stats),
hist_type_(hist_type),
file_read_hist_(file_read_hist) {}
file_read_hist_(file_read_hist),
rate_limiter_(rate_limiter),
for_compaction_(for_compaction) {}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{
RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
rate_limiter_ = std::move(o.rate_limiter_);
for_compaction_ = std::move(o.for_compaction_);
return *this;
}
@ -103,10 +110,6 @@ class RandomAccessFileReader {
RandomAccessFile* file() { return file_.get(); }
bool use_direct_io() const { return file_->use_direct_io(); }
protected:
Status DirectRead(uint64_t offset, size_t n, Slice* result,
char* scratch) const;
};
// Use posix write to write data to a file.
@ -187,7 +190,6 @@ class WritableFileWriter {
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
size_t RequestToken(size_t bytes, bool align);
Status SyncInternal(bool use_fsync);
};

@ -13,10 +13,27 @@
#include "monitoring/statistics.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/aligned_buffer.h"
#include "util/sync_point.h"
namespace rocksdb {
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type) {
if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
if (alignment > 0) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on direct I/O
// thus we may want not to use ratelimiter
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
Request(bytes, io_priority, stats, op_type);
}
return bytes;
}
// Pending request
struct GenericRateLimiter::Req {
@ -30,8 +47,9 @@ struct GenericRateLimiter::Req {
GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness)
: refill_period_us_(refill_period_us),
int32_t fairness, RateLimiter::Mode mode)
: RateLimiter(mode),
refill_period_us_(refill_period_us),
rate_bytes_per_sec_(rate_bytes_per_sec),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec)),
@ -241,12 +259,14 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
}
RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) {
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) {
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
return new GenericRateLimiter(
rate_bytes_per_sec, refill_period_us, fairness);
return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
mode);
}
} // namespace rocksdb

@ -24,8 +24,9 @@ namespace rocksdb {
class GenericRateLimiter : public RateLimiter {
public:
GenericRateLimiter(int64_t refill_bytes,
int64_t refill_period_us, int32_t fairness);
GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
int32_t fairness,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly);
virtual ~GenericRateLimiter();

@ -35,6 +35,30 @@ TEST_F(RateLimiterTest, StartStop) {
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
}
TEST_F(RateLimiterTest, Modes) {
for (auto mode : {RateLimiter::Mode::kWritesOnly,
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */,
1000 * 1000 /* refill_period_us */,
10 /* fairness */, mode);
limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (mode == RateLimiter::Mode::kWritesOnly) {
ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
} else {
ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
}
limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kWrite);
if (mode == RateLimiter::Mode::kAllIo) {
ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
} else {
ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
}
}
}
TEST_F(RateLimiterTest, Rate) {
auto* env = Env::Default();
struct Arg {
@ -57,10 +81,11 @@ TEST_F(RateLimiterTest, Rate) {
while (thread_env->NowMicros() < until) {
for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
Env::IO_HIGH, nullptr /* stats */);
Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kWrite);
}
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
nullptr /* stats */);
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
};
@ -113,7 +138,8 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
auto writer = [](void* p) {
auto* arg = static_cast<Arg*>(p);
arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */);
arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
RateLimiter::OpType::kWrite);
};
for (uint32_t i = 1; i <= 16; i <<= 1) {

@ -1214,7 +1214,8 @@ Status BackupEngineImpl::CopyOrCreateFile(
}
s = dest_writer->Append(data);
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */);
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kWrite);
}
if (processed_buffer_size > options_.callback_trigger_interval_size) {
processed_buffer_size -= options_.callback_trigger_interval_size;

Loading…
Cancel
Save