diff --git a/db/db_bench.cc b/db/db_bench.cc index 626cc163b..c5e838176 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -186,6 +186,15 @@ static leveldb::Env* FLAGS_env = leveldb::Env::Default(); // than zero. When 0 the interval grows over time. static int FLAGS_stats_interval = 0; +// Reports additional stats per interval when this is greater +// than 0. +static int FLAGS_stats_per_interval = 0; + +// When not equal to 0 this make threads sleep at each stats +// reporting interval until the compaction score for all levels is +// less than or equal to this value. +static double FLAGS_rate_limit = 0; + extern bool useOsBuffer; extern bool useFsReadAhead; extern bool useMmapRead; @@ -336,9 +345,11 @@ class Stats { (done_ - last_report_done_) / ((now - last_report_finish_) / 1000000.0)); - std::string stats; - if (db && db->GetProperty("leveldb.stats", &stats)) - fprintf(stderr, stats.c_str()); + if (FLAGS_stats_per_interval) { + std::string stats; + if (db && db->GetProperty("leveldb.stats", &stats)) + fprintf(stderr, stats.c_str()); + } fflush(stderr); next_report_ += FLAGS_stats_interval; @@ -903,6 +914,7 @@ class Benchmark { options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; + options.rate_limit = FLAGS_rate_limit; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1316,6 +1328,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 && n >= 0 && n < 2000000000) { FLAGS_stats_interval = n; + } else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1 + && (n == 0 || n == 1)) { + FLAGS_stats_per_interval = n; + } else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && + d > 1.0) { + FLAGS_rate_limit = d; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 678acf057..cd4cb05f3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -184,6 +184,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) disable_delete_obsolete_files_(false), delete_obsolete_files_last_run_(0), stall_level0_slowdown_(0), + stall_leveln_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), started_at_(options.env->NowMicros()) { @@ -1488,6 +1489,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(!writers_.empty()); bool allow_delay = !force; Status s; + double score; while (true) { if (!bg_error_.ok()) { @@ -1528,6 +1530,18 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t t1 = env_->NowMicros(); bg_cv_.Wait(); stall_level0_num_files_ += env_->NowMicros() - t1; + } else if ( + allow_delay && + options_.rate_limit > 1.0 && + (score = versions_->MaxCompactionScore()) > options_.rate_limit) { + // Delay a write when the compaction score for any level is too large. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + stall_leveln_slowdown_ += 1000; + allow_delay = false; // Do not delay a single write more than once + Log(options_.info_log, + "delaying write for rate limits with max score %.2f\n", score); + mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); @@ -1640,10 +1654,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction\n", + "%.3f memtable_compaction, %.3f leveln_slowdown\n", stall_level0_slowdown_ / 1000000.0, stall_level0_num_files_ / 1000000.0, - stall_memtable_compaction_ / 1000000.0); + stall_memtable_compaction_ / 1000000.0, + stall_leveln_slowdown_ / 1000000.0); value->append(buf); return true; diff --git a/db/db_impl.h b/db/db_impl.h index fcedea010..7264a9627 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -217,6 +217,7 @@ class DBImpl : public DB { uint64_t stall_level0_slowdown_; uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; + uint64_t stall_leveln_slowdown_; // Time at which this instance was started. const uint64_t started_at_; diff --git a/db/version_set.h b/db/version_set.h index 5295bd058..66611fdb8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -235,6 +235,11 @@ class VersionSet { return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); } + // Returns the maxmimum compaction score for levels 1 to max + double MaxCompactionScore() const { + return current_->compaction_score_; + } + // Add all files listed in any live version to *live. // May also mutate some internal state. void AddLiveFiles(std::set* live); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c090532bf..3b94d829f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -253,6 +253,10 @@ struct Options { // log file. size_t max_log_file_size; + // Puts are delayed when any level has a compaction score that + // exceeds rate_limit. This is ignored when <= 1.0. + double rate_limit; + // Create an Options object with default values for all fields. Options(); diff --git a/util/options.cc b/util/options.cc index 0a2a70fc6..eb132877e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -43,7 +43,8 @@ Options::Options() db_log_dir(""), disable_seek_compaction(false), max_log_file_size(0), - delete_obsolete_files_period_micros(0) { + delete_obsolete_files_period_micros(0), + rate_limit(0.0) { } void @@ -97,6 +98,8 @@ Options::Dump( disable_seek_compaction); Log(log," Options.delete_obsolete_files_period_micros: %ld", delete_obsolete_files_period_micros); + Log(log," Options.rate_limit: %.2f", + rate_limit); } // Options::Dump