diff --git a/db/db_bench.cc b/db/db_bench.cc index a5c95732e..5f5eb506b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -264,13 +264,16 @@ static int FLAGS_stats_interval = 0; // than 0. static int FLAGS_stats_per_interval = 0; +static double FLAGS_soft_rate_limit = 0; + // When not equal to 0 this make threads sleep at each stats // reporting interval until the compaction score for all levels is // less than or equal to this value. -static double FLAGS_rate_limit = 0; +static double FLAGS_hard_rate_limit = 0; -// When FLAGS_rate_limit is set then this is the max time a put will be stalled. -static int FLAGS_rate_limit_delay_milliseconds = 1000; +// When FLAGS_hard_rate_limit is set then this is the max time a put will be +// stalled. +static int FLAGS_rate_limit_delay_max_milliseconds = 1000; // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we // stop building a single file in a level->level+1 compaction. @@ -1146,8 +1149,10 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; - options.rate_limit = FLAGS_rate_limit; - options.rate_limit_delay_milliseconds = FLAGS_rate_limit_delay_milliseconds; + options.soft_rate_limit = FLAGS_soft_rate_limit; + options.hard_rate_limit = FLAGS_hard_rate_limit; + options.rate_limit_delay_max_milliseconds = + FLAGS_rate_limit_delay_max_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; options.max_grandparent_overlap_factor = FLAGS_max_grandparent_overlap_factor; @@ -2039,7 +2044,8 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", &n, &junk) == 1) { FLAGS_min_write_buffer_number_to_merge = n; - } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { + } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) + == 1) { FLAGS_max_background_compactions = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; @@ -2173,13 +2179,16 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_stats_per_interval = n; - } else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && + } else if (sscanf(argv[i], "--soft_rate_limit=%lf%c", &d, &junk) == 1 && + d > 0.0) { + FLAGS_soft_rate_limit = d; + } else if (sscanf(argv[i], "--hard_rate_limit=%lf%c", &d, &junk) == 1 && d > 1.0) { - FLAGS_rate_limit = d; + FLAGS_hard_rate_limit = d; } else if (sscanf(argv[i], - "--rate_limit_delay_milliseconds=%d%c", &n, &junk) == 1 - && n > 0) { - FLAGS_rate_limit_delay_milliseconds = n; + "--rate_limit_delay_max_milliseconds=%d%c", &n, &junk) == 1 + && n >= 0) { + FLAGS_rate_limit_delay_max_milliseconds = n; } else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { FLAGS_read_only = n; diff --git a/db/db_impl.cc b/db/db_impl.cc index 8be77d57e..af92d624f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -154,6 +154,9 @@ Options SanitizeOptions(const std::string& dbname, if (result.max_mem_compaction_level >= result.num_levels) { result.max_mem_compaction_level = result.num_levels - 1; } + if (result.soft_rate_limit > result.hard_rate_limit) { + result.soft_rate_limit = result.hard_rate_limit; + } return result; } @@ -2417,31 +2420,29 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // This function computes the amount of time in microseconds by which a write // should be delayed based on the number of level-0 files according to the // following formula: -// if num_level_files < level0_slowdown_writes_trigger, return 0; -// if num_level_files >= level0_stop_writes_trigger, return 1000; -// otherwise, let r = (num_level_files - level0_slowdown) / -// (level0_stop - level0_slowdown) +// if n < bottom, return 0; +// if n >= top, return 1000; +// otherwise, let r = (n - bottom) / +// (top - bottom) // and return r^2 * 1000. // The goal of this formula is to gradually increase the rate at which writes // are slowed. We also tried linear delay (r * 1000), but it seemed to do // slightly worse. There is no other particular reason for choosing quadratic. -uint64_t DBImpl::SlowdownAmount(int num_level0_files) { +uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) { uint64_t delay; - int stop_trigger = options_.level0_stop_writes_trigger; - int slowdown_trigger = options_.level0_slowdown_writes_trigger; - if (num_level0_files >= stop_trigger) { + if (n >= top) { delay = 1000; } - else if (num_level0_files < slowdown_trigger) { + else if (n < bottom) { delay = 0; } else { // If we are here, we know that: - // slowdown_trigger <= num_level0_files < stop_trigger + // level0_start_slowdown <= n < level0_slowdown // since the previous two conditions are false. float how_much = - (float) (num_level0_files - slowdown_trigger) / - (stop_trigger - slowdown_trigger); + (float) (n - bottom) / + (top - bottom); delay = how_much * how_much * 1000; } assert(delay <= 1000); @@ -2454,7 +2455,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; - bool allow_rate_limit_delay = !force; + bool allow_hard_rate_limit_delay = !force; + bool allow_soft_rate_limit_delay = !force; uint64_t rate_limit_delay_millis = 0; Status s; double score; @@ -2478,7 +2480,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t delayed; { StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); - env_->SleepForMicroseconds(SlowdownAmount(versions_->NumLevelFiles(0))); + env_->SleepForMicroseconds( + SlowdownAmount(versions_->NumLevelFiles(0), + options_.level0_slowdown_writes_trigger, + options_.level0_stop_writes_trigger) + ); delayed = sw.ElapsedMicros(); } RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); @@ -2527,9 +2533,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { stall_level0_num_files_ += stall; stall_level0_num_files_count_++; } else if ( - allow_rate_limit_delay && - options_.rate_limit > 1.0 && - (score = versions_->MaxCompactionScore()) > options_.rate_limit) { + allow_hard_rate_limit_delay && + options_.hard_rate_limit > 1.0 && + (score = versions_->MaxCompactionScore()) > options_.hard_rate_limit) { // Delay a write when the compaction score for any level is too large. int max_level = versions_->MaxCompactionScoreLevel(); mutex_.Unlock(); @@ -2545,14 +2551,29 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); rate_limit_delay_millis += rate_limit; RecordTick(options_.statistics, RATE_LIMIT_DELAY_MILLIS, rate_limit); - if (rate_limit_delay_millis >= - (unsigned)options_.rate_limit_delay_milliseconds) { - allow_rate_limit_delay = false; + if (options_.rate_limit_delay_max_milliseconds > 0 && + rate_limit_delay_millis >= + (unsigned)options_.rate_limit_delay_max_milliseconds) { + allow_hard_rate_limit_delay = false; } // Log(options_.info_log, // "delaying write %llu usecs for rate limits with max score %.2f\n", // (long long unsigned int)delayed, score); mutex_.Lock(); + } else if ( + allow_soft_rate_limit_delay && + options_.soft_rate_limit > 0.0 && + (score = versions_->MaxCompactionScore()) > options_.soft_rate_limit) { + // Delay a write when the compaction score for any level is too large. + // TODO: add statistics + mutex_.Unlock(); + env_->SleepForMicroseconds(SlowdownAmount( + score, + options_.soft_rate_limit, + options_.hard_rate_limit) + ); + allow_soft_rate_limit_delay = false; + mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old DelayLoggingAndReset(); diff --git a/db/db_impl.h b/db/db_impl.h index 333a86867..c7f90b3cc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -166,7 +166,7 @@ class DBImpl : public DB { Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber); - uint64_t SlowdownAmount(int num_level0_files); + uint64_t SlowdownAmount(int n, int top, int bottom); Status MakeRoomForWrite(bool force /* compact even if there is room? */); WriteBatch* BuildBatchGroup(Writer** last_writer); diff --git a/db/db_test.cc b/db/db_test.cc index 9a7bb2eb0..61fdae82a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -286,8 +286,8 @@ class DBTest { options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush; break; case kPerfOptions: - options.rate_limit = 2.0; - options.rate_limit_delay_milliseconds = 2; + options.hard_rate_limit = 2.0; + options.rate_limit_delay_max_milliseconds = 2; // TODO -- test more options break; case kDeletesFilterFirst: diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 1bf0f6dda..1bf63c33f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -355,12 +355,22 @@ struct Options { // Default: 1000 size_t keep_log_file_num; - // Puts are delayed when any level has a compaction score that - // exceeds rate_limit. This is ignored when <= 1.0. - double rate_limit; + // Puts are delayed 0-1 ms when any level has a compaction score that exceeds + // soft_rate_limit. This is ignored when == 0.0. + // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not + // hold, RocksDB will set soft_rate_limit = hard_rate_limit + // Default: 0 (disabled) + double soft_rate_limit; + + // Puts are delayed 1ms at a time when any level has a compaction score that + // exceeds hard_rate_limit. This is ignored when <= 1.0. + // Default: 0 (disabled) + double hard_rate_limit; - // Max time a put will be stalled when rate_limit is enforced - unsigned int rate_limit_delay_milliseconds; + // Max time a put will be stalled when hard_rate_limit is enforced. If 0, then + // there is no limit. + // Default: 1000 + unsigned int rate_limit_delay_max_milliseconds; // manifest file is rolled over on reaching this limit. // The older manifest file be deleted. diff --git a/util/options.cc b/util/options.cc index 99cd1b094..5f41a1c15 100644 --- a/util/options.cc +++ b/util/options.cc @@ -56,8 +56,9 @@ Options::Options() max_log_file_size(0), log_file_time_to_roll(0), keep_log_file_num(1000), - rate_limit(0.0), - rate_limit_delay_milliseconds(1000), + soft_rate_limit(0.0), + hard_rate_limit(0.0), + rate_limit_delay_max_milliseconds(1000), max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), @@ -181,10 +182,10 @@ Options::Dump(Logger* log) const delete_obsolete_files_period_micros); Log(log," Options.max_background_compactions: %d", max_background_compactions); - Log(log," Options.rate_limit: %.2f", - rate_limit); - Log(log," Options.rate_limit_delay_milliseconds: %d", - rate_limit_delay_milliseconds); + Log(log," Options.hard_rate_limit: %.2f", + hard_rate_limit); + Log(log," Options.rate_limit_delay_max_milliseconds: %d", + rate_limit_delay_max_milliseconds); Log(log," Options.disable_auto_compactions: %d", disable_auto_compactions); Log(log," Options.WAL_ttl_seconds: %ld",