Add soft and hard rate limit support

Summary:
This diff adds support for both soft and hard rate limiting. The following changes are included:

1) Options.rate_limit is renamed to Options.hard_rate_limit.
2) Options.rate_limit_delay_milliseconds is renamed to Options.rate_limit_delay_max_milliseconds.
3) Options.soft_rate_limit is added.
4) If the maximum compaction score is > hard_rate_limit and rate_limit_delay_max_milliseconds == 0, then writes are delayed by 1 ms at a time until the max compaction score falls below hard_rate_limit.
5) If the max compaction score is > soft_rate_limit but <= hard_rate_limit, then writes are delayed by 0-1 ms depending on how close we are to hard_rate_limit.
6) Users can disable 4 by setting hard_rate_limit = 0. They can add a limit to the maximum amount of time waited by setting rate_limit_delay_max_milliseconds > 0. Thus, the old behavior can be preserved by setting soft_rate_limit = 0, which is the default.

Test Plan:
make -j32 check
./db_stress

Reviewers: dhruba, haobo, MarkCallaghan

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12003
main
Jim Paton 11 years ago
parent cacd812fb2
commit 1036537c94
  1. 31
      db/db_bench.cc
  2. 61
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 4
      db/db_test.cc
  5. 20
      include/leveldb/options.h
  6. 13
      util/options.cc

@ -264,13 +264,16 @@ static int FLAGS_stats_interval = 0;
// than 0. // than 0.
static int FLAGS_stats_per_interval = 0; static int FLAGS_stats_per_interval = 0;
static double FLAGS_soft_rate_limit = 0;
// When not equal to 0 this make threads sleep at each stats // When not equal to 0 this make threads sleep at each stats
// reporting interval until the compaction score for all levels is // reporting interval until the compaction score for all levels is
// less than or equal to this value. // less than or equal to this value.
static double FLAGS_rate_limit = 0; static double FLAGS_hard_rate_limit = 0;
// When FLAGS_rate_limit is set then this is the max time a put will be stalled. // When FLAGS_hard_rate_limit is set then this is the max time a put will be
static int FLAGS_rate_limit_delay_milliseconds = 1000; // stalled.
static int FLAGS_rate_limit_delay_max_milliseconds = 1000;
// Control maximum bytes of overlaps in grandparent (i.e., level+2) before we // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction. // stop building a single file in a level->level+1 compaction.
@ -1146,8 +1149,10 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.disable_seek_compaction = FLAGS_disable_seek_compaction;
options.delete_obsolete_files_period_micros = options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros; FLAGS_delete_obsolete_files_period_micros;
options.rate_limit = FLAGS_rate_limit; options.soft_rate_limit = FLAGS_soft_rate_limit;
options.rate_limit_delay_milliseconds = FLAGS_rate_limit_delay_milliseconds; options.hard_rate_limit = FLAGS_hard_rate_limit;
options.rate_limit_delay_max_milliseconds =
FLAGS_rate_limit_delay_max_milliseconds;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
options.max_grandparent_overlap_factor = options.max_grandparent_overlap_factor =
FLAGS_max_grandparent_overlap_factor; FLAGS_max_grandparent_overlap_factor;
@ -2039,7 +2044,8 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c",
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_min_write_buffer_number_to_merge = n; FLAGS_min_write_buffer_number_to_merge = n;
} else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk)
== 1) {
FLAGS_max_background_compactions = n; FLAGS_max_background_compactions = n;
} else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) {
FLAGS_cache_size = l; FLAGS_cache_size = l;
@ -2173,13 +2179,16 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) { && (n == 0 || n == 1)) {
FLAGS_stats_per_interval = n; FLAGS_stats_per_interval = n;
} else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && } else if (sscanf(argv[i], "--soft_rate_limit=%lf%c", &d, &junk) == 1 &&
d > 0.0) {
FLAGS_soft_rate_limit = d;
} else if (sscanf(argv[i], "--hard_rate_limit=%lf%c", &d, &junk) == 1 &&
d > 1.0) { d > 1.0) {
FLAGS_rate_limit = d; FLAGS_hard_rate_limit = d;
} else if (sscanf(argv[i], } else if (sscanf(argv[i],
"--rate_limit_delay_milliseconds=%d%c", &n, &junk) == 1 "--rate_limit_delay_max_milliseconds=%d%c", &n, &junk) == 1
&& n > 0) { && n >= 0) {
FLAGS_rate_limit_delay_milliseconds = n; FLAGS_rate_limit_delay_max_milliseconds = n;
} else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 &&
(n == 0 || n ==1 )) { (n == 0 || n ==1 )) {
FLAGS_read_only = n; FLAGS_read_only = n;

@ -154,6 +154,9 @@ Options SanitizeOptions(const std::string& dbname,
if (result.max_mem_compaction_level >= result.num_levels) { if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1; result.max_mem_compaction_level = result.num_levels - 1;
} }
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
return result; return result;
} }
@ -2417,31 +2420,29 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
// This function computes the amount of time in microseconds by which a write // This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the // should be delayed based on the number of level-0 files according to the
// following formula: // following formula:
// if num_level_files < level0_slowdown_writes_trigger, return 0; // if n < bottom, return 0;
// if num_level_files >= level0_stop_writes_trigger, return 1000; // if n >= top, return 1000;
// otherwise, let r = (num_level_files - level0_slowdown) / // otherwise, let r = (n - bottom) /
// (level0_stop - level0_slowdown) // (top - bottom)
// and return r^2 * 1000. // and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes // The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do // are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic. // slightly worse. There is no other particular reason for choosing quadratic.
uint64_t DBImpl::SlowdownAmount(int num_level0_files) { uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) {
uint64_t delay; uint64_t delay;
int stop_trigger = options_.level0_stop_writes_trigger; if (n >= top) {
int slowdown_trigger = options_.level0_slowdown_writes_trigger;
if (num_level0_files >= stop_trigger) {
delay = 1000; delay = 1000;
} }
else if (num_level0_files < slowdown_trigger) { else if (n < bottom) {
delay = 0; delay = 0;
} }
else { else {
// If we are here, we know that: // If we are here, we know that:
// slowdown_trigger <= num_level0_files < stop_trigger // level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false. // since the previous two conditions are false.
float how_much = float how_much =
(float) (num_level0_files - slowdown_trigger) / (float) (n - bottom) /
(stop_trigger - slowdown_trigger); (top - bottom);
delay = how_much * how_much * 1000; delay = how_much * how_much * 1000;
} }
assert(delay <= 1000); assert(delay <= 1000);
@ -2454,7 +2455,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(!writers_.empty()); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
bool allow_rate_limit_delay = !force; bool allow_hard_rate_limit_delay = !force;
bool allow_soft_rate_limit_delay = !force;
uint64_t rate_limit_delay_millis = 0; uint64_t rate_limit_delay_millis = 0;
Status s; Status s;
double score; double score;
@ -2478,7 +2480,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
uint64_t delayed; uint64_t delayed;
{ {
StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT);
env_->SleepForMicroseconds(SlowdownAmount(versions_->NumLevelFiles(0))); env_->SleepForMicroseconds(
SlowdownAmount(versions_->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger)
);
delayed = sw.ElapsedMicros(); delayed = sw.ElapsedMicros();
} }
RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed);
@ -2527,9 +2533,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
stall_level0_num_files_ += stall; stall_level0_num_files_ += stall;
stall_level0_num_files_count_++; stall_level0_num_files_count_++;
} else if ( } else if (
allow_rate_limit_delay && allow_hard_rate_limit_delay &&
options_.rate_limit > 1.0 && options_.hard_rate_limit > 1.0 &&
(score = versions_->MaxCompactionScore()) > options_.rate_limit) { (score = versions_->MaxCompactionScore()) > options_.hard_rate_limit) {
// Delay a write when the compaction score for any level is too large. // Delay a write when the compaction score for any level is too large.
int max_level = versions_->MaxCompactionScoreLevel(); int max_level = versions_->MaxCompactionScoreLevel();
mutex_.Unlock(); mutex_.Unlock();
@ -2545,14 +2551,29 @@ Status DBImpl::MakeRoomForWrite(bool force) {
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
rate_limit_delay_millis += rate_limit; rate_limit_delay_millis += rate_limit;
RecordTick(options_.statistics, RATE_LIMIT_DELAY_MILLIS, rate_limit); RecordTick(options_.statistics, RATE_LIMIT_DELAY_MILLIS, rate_limit);
if (rate_limit_delay_millis >= if (options_.rate_limit_delay_max_milliseconds > 0 &&
(unsigned)options_.rate_limit_delay_milliseconds) { rate_limit_delay_millis >=
allow_rate_limit_delay = false; (unsigned)options_.rate_limit_delay_max_milliseconds) {
allow_hard_rate_limit_delay = false;
} }
// Log(options_.info_log, // Log(options_.info_log,
// "delaying write %llu usecs for rate limits with max score %.2f\n", // "delaying write %llu usecs for rate limits with max score %.2f\n",
// (long long unsigned int)delayed, score); // (long long unsigned int)delayed, score);
mutex_.Lock(); mutex_.Lock();
} else if (
allow_soft_rate_limit_delay &&
options_.soft_rate_limit > 0.0 &&
(score = versions_->MaxCompactionScore()) > options_.soft_rate_limit) {
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
mutex_.Unlock();
env_->SleepForMicroseconds(SlowdownAmount(
score,
options_.soft_rate_limit,
options_.hard_rate_limit)
);
allow_soft_rate_limit_delay = false;
mutex_.Lock();
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
DelayLoggingAndReset(); DelayLoggingAndReset();

@ -166,7 +166,7 @@ class DBImpl : public DB {
Status WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit, Status WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
uint64_t* filenumber); uint64_t* filenumber);
uint64_t SlowdownAmount(int num_level0_files); uint64_t SlowdownAmount(int n, int top, int bottom);
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer); WriteBatch* BuildBatchGroup(Writer** last_writer);

@ -286,8 +286,8 @@ class DBTest {
options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush; options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush;
break; break;
case kPerfOptions: case kPerfOptions:
options.rate_limit = 2.0; options.hard_rate_limit = 2.0;
options.rate_limit_delay_milliseconds = 2; options.rate_limit_delay_max_milliseconds = 2;
// TODO -- test more options // TODO -- test more options
break; break;
case kDeletesFilterFirst: case kDeletesFilterFirst:

@ -355,12 +355,22 @@ struct Options {
// Default: 1000 // Default: 1000
size_t keep_log_file_num; size_t keep_log_file_num;
// Puts are delayed when any level has a compaction score that // Puts are delayed 0-1 ms when any level has a compaction score that exceeds
// exceeds rate_limit. This is ignored when <= 1.0. // soft_rate_limit. This is ignored when == 0.0.
double rate_limit; // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not
// hold, RocksDB will set soft_rate_limit = hard_rate_limit
// Default: 0 (disabled)
double soft_rate_limit;
// Puts are delayed 1ms at a time when any level has a compaction score that
// exceeds hard_rate_limit. This is ignored when <= 1.0.
// Default: 0 (disabled)
double hard_rate_limit;
// Max time a put will be stalled when rate_limit is enforced // Max time a put will be stalled when hard_rate_limit is enforced. If 0, then
unsigned int rate_limit_delay_milliseconds; // there is no limit.
// Default: 1000
unsigned int rate_limit_delay_max_milliseconds;
// manifest file is rolled over on reaching this limit. // manifest file is rolled over on reaching this limit.
// The older manifest file be deleted. // The older manifest file be deleted.

@ -56,8 +56,9 @@ Options::Options()
max_log_file_size(0), max_log_file_size(0),
log_file_time_to_roll(0), log_file_time_to_roll(0),
keep_log_file_num(1000), keep_log_file_num(1000),
rate_limit(0.0), soft_rate_limit(0.0),
rate_limit_delay_milliseconds(1000), hard_rate_limit(0.0),
rate_limit_delay_max_milliseconds(1000),
max_manifest_file_size(std::numeric_limits<uint64_t>::max()), max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
no_block_cache(false), no_block_cache(false),
table_cache_numshardbits(4), table_cache_numshardbits(4),
@ -181,10 +182,10 @@ Options::Dump(Logger* log) const
delete_obsolete_files_period_micros); delete_obsolete_files_period_micros);
Log(log," Options.max_background_compactions: %d", Log(log," Options.max_background_compactions: %d",
max_background_compactions); max_background_compactions);
Log(log," Options.rate_limit: %.2f", Log(log," Options.hard_rate_limit: %.2f",
rate_limit); hard_rate_limit);
Log(log," Options.rate_limit_delay_milliseconds: %d", Log(log," Options.rate_limit_delay_max_milliseconds: %d",
rate_limit_delay_milliseconds); rate_limit_delay_max_milliseconds);
Log(log," Options.disable_auto_compactions: %d", Log(log," Options.disable_auto_compactions: %d",
disable_auto_compactions); disable_auto_compactions);
Log(log," Options.WAL_ttl_seconds: %ld", Log(log," Options.WAL_ttl_seconds: %ld",

Loading…
Cancel
Save