diff --git a/db/db_bench.cc b/db/db_bench.cc index 4f3450ba5..ce1ff7ab7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -232,6 +232,9 @@ static int FLAGS_stats_per_interval = 0; // less than or equal to this value. static double FLAGS_rate_limit = 0; +// When FLAGS_rate_limit is set then this is the max time a put will be stalled. +static int FLAGS_rate_limit_delay_milliseconds = 1000; + // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we // stop building a single file in a level->level+1 compaction. static int FLAGS_max_grandparent_overlap_factor = 10; @@ -1029,6 +1032,7 @@ unique_ptr GenerateKeyFromInt(int v) options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; options.rate_limit = FLAGS_rate_limit; + options.rate_limit_delay_milliseconds = FLAGS_rate_limit_delay_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; options.max_grandparent_overlap_factor = FLAGS_max_grandparent_overlap_factor; @@ -1652,6 +1656,10 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && d > 1.0) { FLAGS_rate_limit = d; + } else if (sscanf(argv[i], + "--rate_limit_delay_milliseconds=%d%c", &n, &junk) == 1 + && n > 0) { + FLAGS_rate_limit_delay_milliseconds = n; } else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { FLAGS_read_only = n; diff --git a/db/db_impl.cc b/db/db_impl.cc index 5cbe08236..540f30f54 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -159,7 +159,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_level0_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), - stall_leveln_slowdown_(0), started_at_(options.env->NowMicros()), flush_on_destroy_(false), delayed_writes_(0) { @@ -167,6 +166,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) env_->GetAbsolutePath(dbname, &db_absolute_path_); stats_ = new CompactionStats[options.num_levels]; + + stall_leveln_slowdown_.resize(options.num_levels); + for (int i = 0; i < options.num_levels; ++i) + stall_leveln_slowdown_[i] = 0; + // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; table_cache_.reset(new TableCache(dbname_, &options_, table_cache_size)); @@ -2042,6 +2046,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; + bool allow_rate_limit_delay = !force; + uint64_t rate_limit_delay_millis = 0; Status s; double score; @@ -2095,19 +2101,24 @@ Status DBImpl::MakeRoomForWrite(bool force) { bg_cv_.Wait(); stall_level0_num_files_ += env_->NowMicros() - t1; } else if ( - allow_delay && + allow_rate_limit_delay && options_.rate_limit > 1.0 && (score = versions_->MaxCompactionScore()) > options_.rate_limit) { // Delay a write when the compaction score for any level is too large. + int max_level = versions_->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t t1 = env_->NowMicros(); env_->SleepForMicroseconds(1000); uint64_t delayed = env_->NowMicros() - t1; - stall_leveln_slowdown_ += delayed; - allow_delay = false; // Do not delay a single write more than once - Log(options_.info_log, - "delaying write %llu usecs for rate limits with max score %.2f\n", - (long long unsigned int)delayed, score); + stall_leveln_slowdown_[max_level] += delayed; + // Make sure the following value doesn't round to zero. + rate_limit_delay_millis += std::max((delayed / 1000), (uint64_t) 1); + if (rate_limit_delay_millis >= options_.rate_limit_delay_milliseconds) { + allow_rate_limit_delay = false; + } + // Log(options_.info_log, + // "delaying write %llu usecs for rate limits with max score %.2f\n", + // (long long unsigned int)delayed, score); mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old @@ -2163,12 +2174,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { uint64_t total_bytes = 0; uint64_t micros_up = env_->NowMicros() - started_at_; double seconds_up = micros_up / 1000000.0; + uint64_t total_slowdown = 0; // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count\n" - "------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall\n" + "----------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { @@ -2186,7 +2198,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { total_bytes += bytes_read + stats_[level].bytes_written; snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d\n", + "%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f\n", level, files, versions_->NumLevelBytes(level) / 1048576.0, @@ -2204,7 +2216,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].files_in_levelnp1, stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, - stats_[level].count); + stats_[level].count, + stall_leveln_slowdown_[level] / 1000000.0); + total_slowdown += stall_leveln_slowdown_[level]; value->append(buf); } } @@ -2227,7 +2241,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stall_level0_slowdown_ / 1000000.0, stall_level0_num_files_ / 1000000.0, stall_memtable_compaction_ / 1000000.0, - stall_leveln_slowdown_ / 1000000.0); + total_slowdown / 1000000.0); value->append(buf); return true; diff --git a/db/db_impl.h b/db/db_impl.h index 0e16c4e4a..1e9c3ae99 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -265,7 +265,7 @@ class DBImpl : public DB { uint64_t stall_level0_slowdown_; uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; - uint64_t stall_leveln_slowdown_; + std::vector stall_leveln_slowdown_; // Time at which this instance was started. const uint64_t started_at_; diff --git a/db/db_test.cc b/db/db_test.cc index 180666909..97243d42c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -212,6 +212,7 @@ class DBTest { kDBLogDir, kManifestFileSize, kCompactOnFlush, + kPerfOptions, kEnd }; int option_config_; @@ -271,6 +272,12 @@ class DBTest { options.max_manifest_file_size = 50; // 50 bytes case kCompactOnFlush: options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush; + break; + case kPerfOptions: + options.rate_limit = 2.0; + options.rate_limit_delay_milliseconds = 2; + // TODO -- test more options + break; default: break; } diff --git a/db/version_set.cc b/db/version_set.cc index 6f7e2a2a6..dd25b8d4a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1376,6 +1376,8 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::Finalize(Version* v) { double max_score = 0; + int max_score_level = 0; + for (int level = 0; level < NumberLevels()-1; level++) { double score; if (level == 0) { @@ -1421,6 +1423,7 @@ void VersionSet::Finalize(Version* v) { } if (max_score < score) { max_score = score; + max_score_level = level; } } v->compaction_level_[level] = level; @@ -1429,6 +1432,7 @@ void VersionSet::Finalize(Version* v) { // update the max compaction score in levels 1 to n-1 v->max_compaction_score_ = max_score; + v->max_compaction_score_level_ = max_score_level; // sort all the levels based on their score. Higher scores get listed // first. Use bubble sort because the number of entries are small. diff --git a/db/version_set.h b/db/version_set.h index 55cc1ad3d..f8ec71620 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -176,6 +176,7 @@ class Version { std::vector compaction_score_; std::vector compaction_level_; double max_compaction_score_; // max score in l1 to ln-1 + int max_compaction_score_level_; // level on which max score occurs // The offset in the manifest file where this version is stored. uint64_t offset_manifest_file_; @@ -315,6 +316,11 @@ class VersionSet { return current_->max_compaction_score_; } + // See field declaration + int MaxCompactionScoreLevel() const { + return current_->max_compaction_score_level_; + } + // Add all files listed in any live version to *live. // May also mutate some internal state. void AddLiveFiles(std::set* live); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 2a44f7e5d..5b37f4e50 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -324,6 +324,9 @@ struct Options { // exceeds rate_limit. This is ignored when <= 1.0. double rate_limit; + // Max time a put will be stalled when rate_limit is enforced + int rate_limit_delay_milliseconds; + // manifest file is rolled over on reaching this limit. // The older manifest file be deleted. // The default value is MAX_INT so that roll-over does not take place. diff --git a/util/options.cc b/util/options.cc index b68b2758c..8adcde9c9 100644 --- a/util/options.cc +++ b/util/options.cc @@ -51,6 +51,7 @@ Options::Options() log_file_time_to_roll(0), keep_log_file_num(1000), rate_limit(0.0), + rate_limit_delay_milliseconds(1000), max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), @@ -146,6 +147,8 @@ Options::Dump(Logger* log) const max_background_compactions); Log(log," Options.rate_limit: %.2f", rate_limit); + Log(log," Options.rate_limit_delay_milliseconds: %d", + rate_limit_delay_milliseconds); Log(log," Options.compaction_filter_args: %p", compaction_filter_args); Log(log," Options.CompactionFilter: %p",