Statistic for how often rate limiter is drained

Summary:
This is the metric I plan to use for adaptive rate limiting. The statistics are updated only if the rate limiter is drained by flush or compaction. I believe (but am not certain) that this is the normal case.

The Statistics object is passed in RateLimiter::Request() to avoid requiring changes to client code, which would've been necessary if we passed it in the RateLimiter constructor.
Closes https://github.com/facebook/rocksdb/pull/1946

Differential Revision: D4646489

Pulled By: ajkr

fbshipit-source-id: d8e0161
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 0ad5af42d0
commit 7c80a6d7d1
  1. 3
      db/builder.cc
  2. 4
      db/compaction_job.cc
  3. 18
      db/db_test.cc
  4. 17
      include/rocksdb/rate_limiter.h
  5. 4
      include/rocksdb/statistics.h
  6. 2
      util/file_reader_writer.cc
  7. 7
      util/file_reader_writer.h
  8. 5
      util/rate_limiter.cc
  9. 4
      util/rate_limiter.h
  10. 7
      util/rate_limiter_test.cc
  11. 2
      utilities/backupable/backupable_db.cc

@ -112,7 +112,8 @@ Status BuildTable(
}
file->SetIOPriority(io_priority);
file_writer.reset(new WritableFileWriter(std::move(file), env_options));
file_writer.reset(new WritableFileWriter(std::move(file), env_options,
ioptions.statistics));
builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,

@ -1222,8 +1222,8 @@ Status CompactionJob::OpenCompactionOutputFile(
writable_file->SetIOPriority(Env::IO_LOW);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), env_options_));
sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), env_options_, db_options_.statistics.get()));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where

@ -2781,6 +2781,7 @@ TEST_F(DBTest, RateLimitingTest) {
options.compression = kNoCompression;
options.create_if_missing = true;
options.env = env_;
options.statistics = rocksdb::CreateDBStatistics();
options.IncreaseParallelism(4);
DestroyAndReopen(options);
@ -2797,6 +2798,9 @@ TEST_F(DBTest, RateLimitingTest) {
}
uint64_t elapsed = env_->NowMicros() - start;
double raw_rate = env_->bytes_written_ * 1000000.0 / elapsed;
uint64_t rate_limiter_drains =
TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS);
ASSERT_EQ(0, rate_limiter_drains);
Close();
// # rate limiting with 0.7 x threshold
@ -2812,8 +2816,15 @@ TEST_F(DBTest, RateLimitingTest) {
Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
}
elapsed = env_->NowMicros() - start;
rate_limiter_drains =
TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
rate_limiter_drains;
Close();
ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
// Most intervals should've been drained (interval time is 100ms, elapsed is
// micros)
ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2);
ASSERT_LE(rate_limiter_drains, elapsed / 100000);
double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
ASSERT_TRUE(ratio < 0.8);
@ -2831,8 +2842,15 @@ TEST_F(DBTest, RateLimitingTest) {
Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo));
}
elapsed = env_->NowMicros() - start;
rate_limiter_drains =
TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) -
rate_limiter_drains;
Close();
ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
// Most intervals should've been drained (interval time is 100ms, elapsed is
// micros)
ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2);
ASSERT_LE(rate_limiter_drains, elapsed / 100000);
ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
ASSERT_LT(ratio, 0.6);

@ -10,6 +10,7 @@
#pragma once
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
namespace rocksdb {
@ -24,7 +25,21 @@ class RateLimiter {
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes <= GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0;
virtual void Request(const int64_t bytes, const Env::IOPriority pri) {
// Deprecated. New RateLimiter derived classes should override
// Request(const int64_t, const Env::IOPriority, Statistics*) instead.
assert(false);
}
// Request for token to write bytes and potentially update statistics. If this
// request can not be satisfied, the call is blocked. Caller is responsible to
// make sure bytes <= GetSingleBurstBytes().
virtual void Request(const int64_t bytes, const Env::IOPriority pri,
Statistics* /* stats */) {
// For API compatibility, default implementation calls the older API in
// which statistics are unsupported.
Request(bytes, pri);
}
// Max bytes can be granted in a single burst
virtual int64_t GetSingleBurstBytes() const = 0;

@ -216,6 +216,9 @@ enum Tickers : uint32_t {
READ_AMP_ESTIMATE_USEFUL_BYTES, // Estimate of total bytes actually used.
READ_AMP_TOTAL_READ_BYTES, // Total size of loaded data blocks.
// Number of refill intervals where rate limiter's bytes are fully consumed.
NUMBER_RATE_LIMITER_DRAINS,
TICKER_ENUM_MAX
};
@ -318,6 +321,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ROW_CACHE_MISS, "rocksdb.row.cache.miss"},
{READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"},
{READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"},
{NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"},
};
/**

@ -319,7 +319,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
size_t alignment = buf_.Alignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
rate_limiter_->Request(bytes, io_priority);
rate_limiter_->Request(bytes, io_priority, stats_);
}
return bytes;
}

@ -119,10 +119,11 @@ class WritableFileWriter {
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Statistics* stats_;
public:
WritableFileWriter(std::unique_ptr<WritableFile>&& file,
const EnvOptions& options)
const EnvOptions& options, Statistics* stats = nullptr)
: writable_file_(std::move(file)),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
@ -132,8 +133,8 @@ class WritableFileWriter {
direct_io_(writable_file_->use_direct_io()),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) {
rate_limiter_(options.rate_limiter),
stats_(stats) {
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
}

@ -10,6 +10,7 @@
#include "util/rate_limiter.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/statistics.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -70,7 +71,8 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
std::memory_order_relaxed);
}
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Statistics* stats) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
TEST_SYNC_POINT("GenericRateLimiter::Request");
MutexLock g(&request_mutex_);
@ -113,6 +115,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
timedout = true;
} else {
int64_t wait_until = env_->NowMicros() + delta;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
timedout = r.cv.TimedWait(wait_until);
}
} else {

@ -33,7 +33,9 @@ class GenericRateLimiter : public RateLimiter {
// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes <= GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) override;
using RateLimiter::Request;
virtual void Request(const int64_t bytes, const Env::IOPriority pri,
Statistics* stats) override;
virtual int64_t GetSingleBurstBytes() const override {
return refill_bytes_per_period_.load(std::memory_order_relaxed);

@ -55,9 +55,10 @@ TEST_F(RateLimiterTest, Rate) {
while (thread_env->NowMicros() < until) {
for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
Env::IO_HIGH);
Env::IO_HIGH, nullptr /* stats */);
}
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW);
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
nullptr /* stats */);
}
};
@ -110,7 +111,7 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
auto writer = [](void* p) {
auto* arg = static_cast<Arg*>(p);
arg->limiter->Request(arg->request_size, arg->pri);
arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */);
};
for (uint32_t i = 1; i <= 16; i <<= 1) {

@ -1219,7 +1219,7 @@ Status BackupEngineImpl::CopyOrCreateFile(
}
s = dest_writer->Append(data);
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW);
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */);
}
if (processed_buffer_size > options_.callback_trigger_interval_size) {
processed_buffer_size -= options_.callback_trigger_interval_size;

Loading…
Cancel
Save