From 7c80a6d7d189d4414da7c2126e111ea71cf1504e Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 2 Mar 2017 17:40:24 -0800 Subject: [PATCH] Statistic for how often rate limiter is drained Summary: This is the metric I plan to use for adaptive rate limiting. The statistics are updated only if the rate limiter is drained by flush or compaction. I believe (but am not certain) that this is the normal case. The Statistics object is passed in RateLimiter::Request() to avoid requiring changes to client code, which would've been necessary if we passed it in the RateLimiter constructor. Closes https://github.com/facebook/rocksdb/pull/1946 Differential Revision: D4646489 Pulled By: ajkr fbshipit-source-id: d8e0161 --- db/builder.cc | 3 ++- db/compaction_job.cc | 4 ++-- db/db_test.cc | 18 ++++++++++++++++++ include/rocksdb/rate_limiter.h | 17 ++++++++++++++++- include/rocksdb/statistics.h | 4 ++++ util/file_reader_writer.cc | 2 +- util/file_reader_writer.h | 7 ++++--- util/rate_limiter.cc | 5 ++++- util/rate_limiter.h | 4 +++- util/rate_limiter_test.cc | 7 ++++--- utilities/backupable/backupable_db.cc | 2 +- 11 files changed, 59 insertions(+), 14 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2fbc93862..5c7557720 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -112,7 +112,8 @@ Status BuildTable( } file->SetIOPriority(io_priority); - file_writer.reset(new WritableFileWriter(std::move(file), env_options)); + file_writer.reset(new WritableFileWriter(std::move(file), env_options, + ioptions.statistics)); builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 8fc7e542c..986653ded 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1222,8 +1222,8 @@ Status CompactionJob::OpenCompactionOutputFile( writable_file->SetIOPriority(Env::IO_LOW); writable_file->SetPreallocationBlockSize(static_cast( sub_compact->compaction->OutputFilePreallocationSize())); - sub_compact->outfile.reset( - new WritableFileWriter(std::move(writable_file), env_options_)); + sub_compact->outfile.reset(new WritableFileWriter( + std::move(writable_file), env_options_, db_options_.statistics.get())); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where diff --git a/db/db_test.cc b/db/db_test.cc index 3b3870152..ee280223c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2781,6 +2781,7 @@ TEST_F(DBTest, RateLimitingTest) { options.compression = kNoCompression; options.create_if_missing = true; options.env = env_; + options.statistics = rocksdb::CreateDBStatistics(); options.IncreaseParallelism(4); DestroyAndReopen(options); @@ -2797,6 +2798,9 @@ TEST_F(DBTest, RateLimitingTest) { } uint64_t elapsed = env_->NowMicros() - start; double raw_rate = env_->bytes_written_ * 1000000.0 / elapsed; + uint64_t rate_limiter_drains = + TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS); + ASSERT_EQ(0, rate_limiter_drains); Close(); // # rate limiting with 0.7 x threshold @@ -2812,8 +2816,15 @@ TEST_F(DBTest, RateLimitingTest) { Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo)); } elapsed = env_->NowMicros() - start; + rate_limiter_drains = + TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) - + rate_limiter_drains; Close(); ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_); + // Most intervals should've been drained (interval time is 100ms, elapsed is + // micros) + ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2); + ASSERT_LE(rate_limiter_drains, elapsed / 100000); double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio); ASSERT_TRUE(ratio < 0.8); @@ -2831,8 +2842,15 @@ TEST_F(DBTest, RateLimitingTest) { Put(RandomString(&rnd, 32), RandomString(&rnd, (1 << 10) + 1), wo)); } elapsed = env_->NowMicros() - start; + rate_limiter_drains = + TestGetTickerCount(options, NUMBER_RATE_LIMITER_DRAINS) - + rate_limiter_drains; Close(); ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_); + // Most intervals should've been drained (interval time is 100ms, elapsed is + // micros) + ASSERT_GT(rate_limiter_drains, elapsed / 100000 / 2); + ASSERT_LE(rate_limiter_drains, elapsed / 100000); ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio); ASSERT_LT(ratio, 0.6); diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index b1bf3f427..432353210 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -10,6 +10,7 @@ #pragma once #include "rocksdb/env.h" +#include "rocksdb/statistics.h" namespace rocksdb { @@ -24,7 +25,21 @@ class RateLimiter { // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure // bytes <= GetSingleBurstBytes() - virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0; + virtual void Request(const int64_t bytes, const Env::IOPriority pri) { + // Deprecated. New RateLimiter derived classes should override + // Request(const int64_t, const Env::IOPriority, Statistics*) instead. + assert(false); + } + + // Request for token to write bytes and potentially update statistics. If this + // request can not be satisfied, the call is blocked. Caller is responsible to + // make sure bytes <= GetSingleBurstBytes(). + virtual void Request(const int64_t bytes, const Env::IOPriority pri, + Statistics* /* stats */) { + // For API compatibility, default implementation calls the older API in + // which statistics are unsupported. + Request(bytes, pri); + } // Max bytes can be granted in a single burst virtual int64_t GetSingleBurstBytes() const = 0; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 0271dd1a4..a77b9817d 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -216,6 +216,9 @@ enum Tickers : uint32_t { READ_AMP_ESTIMATE_USEFUL_BYTES, // Estimate of total bytes actually used. READ_AMP_TOTAL_READ_BYTES, // Total size of loaded data blocks. + // Number of refill intervals where rate limiter's bytes are fully consumed. + NUMBER_RATE_LIMITER_DRAINS, + TICKER_ENUM_MAX }; @@ -318,6 +321,7 @@ const std::vector> TickersNameMap = { {ROW_CACHE_MISS, "rocksdb.row.cache.miss"}, {READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"}, {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"}, + {NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"}, }; /** diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index edee46937..bcfe5cf22 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -319,7 +319,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { size_t alignment = buf_.Alignment(); bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); } - rate_limiter_->Request(bytes, io_priority); + rate_limiter_->Request(bytes, io_priority, stats_); } return bytes; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 4d2b65dac..37f9813dd 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -119,10 +119,11 @@ class WritableFileWriter { uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; + Statistics* stats_; public: WritableFileWriter(std::unique_ptr&& file, - const EnvOptions& options) + const EnvOptions& options, Statistics* stats = nullptr) : writable_file_(std::move(file)), buf_(), max_buffer_size_(options.writable_file_max_buffer_size), @@ -132,8 +133,8 @@ class WritableFileWriter { direct_io_(writable_file_->use_direct_io()), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), - rate_limiter_(options.rate_limiter) { - + rate_limiter_(options.rate_limiter), + stats_(stats) { buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); } diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index f06c550cd..d3b512e42 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -10,6 +10,7 @@ #include "util/rate_limiter.h" #include "port/port.h" #include "rocksdb/env.h" +#include "util/statistics.h" #include "util/sync_point.h" namespace rocksdb { @@ -70,7 +71,8 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { std::memory_order_relaxed); } -void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { +void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, + Statistics* stats) { assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); TEST_SYNC_POINT("GenericRateLimiter::Request"); MutexLock g(&request_mutex_); @@ -113,6 +115,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { timedout = true; } else { int64_t wait_until = env_->NowMicros() + delta; + RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); timedout = r.cv.TimedWait(wait_until); } } else { diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 896dc595d..0ebeb9eac 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -33,7 +33,9 @@ class GenericRateLimiter : public RateLimiter { // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure // bytes <= GetSingleBurstBytes() - virtual void Request(const int64_t bytes, const Env::IOPriority pri) override; + using RateLimiter::Request; + virtual void Request(const int64_t bytes, const Env::IOPriority pri, + Statistics* stats) override; virtual int64_t GetSingleBurstBytes() const override { return refill_bytes_per_period_.load(std::memory_order_relaxed); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 0829f67e2..61d7c423b 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -55,9 +55,10 @@ TEST_F(RateLimiterTest, Rate) { while (thread_env->NowMicros() < until) { for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, - Env::IO_HIGH); + Env::IO_HIGH, nullptr /* stats */); } - arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW); + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, + nullptr /* stats */); } }; @@ -110,7 +111,7 @@ TEST_F(RateLimiterTest, LimitChangeTest) { auto writer = [](void* p) { auto* arg = static_cast(p); - arg->limiter->Request(arg->request_size, arg->pri); + arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */); }; for (uint32_t i = 1; i <= 16; i <<= 1) { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index ed5601d55..da55d04bf 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1219,7 +1219,7 @@ Status BackupEngineImpl::CopyOrCreateFile( } s = dest_writer->Append(data); if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW); + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */); } if (processed_buffer_size > options_.callback_trigger_interval_size) { processed_buffer_size -= options_.callback_trigger_interval_size;