Adds DB::GetNextCompaction and then uses that for rate limiting db_bench

Summary:
Adds a method that returns the score for the next level that most
needs compaction. That method is then used by db_bench to rate limit threads.
Threads are put to sleep at the end of each stats interval until the score
is less than the limit. The limit is set via the --rate_limit=$double option.
The specified value must be > 1.0. Also adds the option --stats_per_interval
to enable additional metrics reported every stats interval.

Task ID: #

Blame Rev:

Test Plan:
run db_bench

Revert Plan:

Database Impact:

Memcache Impact:

Other Notes:

EImportant:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D6243
main
Mark Callaghan 12 years ago
parent 8965c8d0b9
commit 70c42bf05f
  1. 24
      db/db_bench.cc
  2. 19
      db/db_impl.cc
  3. 1
      db/db_impl.h
  4. 5
      db/version_set.h
  5. 4
      include/leveldb/options.h
  6. 5
      util/options.cc

@ -186,6 +186,15 @@ static leveldb::Env* FLAGS_env = leveldb::Env::Default();
// than zero. When 0 the interval grows over time.
static int FLAGS_stats_interval = 0;
// Reports additional stats per interval when this is greater
// than 0.
static int FLAGS_stats_per_interval = 0;
// When not equal to 0 this make threads sleep at each stats
// reporting interval until the compaction score for all levels is
// less than or equal to this value.
static double FLAGS_rate_limit = 0;
extern bool useOsBuffer;
extern bool useFsReadAhead;
extern bool useMmapRead;
@ -336,9 +345,11 @@ class Stats {
(done_ - last_report_done_) /
((now - last_report_finish_) / 1000000.0));
std::string stats;
if (db && db->GetProperty("leveldb.stats", &stats))
fprintf(stderr, stats.c_str());
if (FLAGS_stats_per_interval) {
std::string stats;
if (db && db->GetProperty("leveldb.stats", &stats))
fprintf(stderr, stats.c_str());
}
fflush(stderr);
next_report_ += FLAGS_stats_interval;
@ -903,6 +914,7 @@ class Benchmark {
options.disable_seek_compaction = FLAGS_disable_seek_compaction;
options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros;
options.rate_limit = FLAGS_rate_limit;
Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1316,6 +1328,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 &&
n >= 0 && n < 2000000000) {
FLAGS_stats_interval = n;
} else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) {
FLAGS_stats_per_interval = n;
} else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 &&
d > 1.0) {
FLAGS_rate_limit = d;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

@ -184,6 +184,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
disable_delete_obsolete_files_(false),
delete_obsolete_files_last_run_(0),
stall_level0_slowdown_(0),
stall_leveln_slowdown_(0),
stall_memtable_compaction_(0),
stall_level0_num_files_(0),
started_at_(options.env->NowMicros()) {
@ -1488,6 +1489,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
double score;
while (true) {
if (!bg_error_.ok()) {
@ -1528,6 +1530,18 @@ Status DBImpl::MakeRoomForWrite(bool force) {
uint64_t t1 = env_->NowMicros();
bg_cv_.Wait();
stall_level0_num_files_ += env_->NowMicros() - t1;
} else if (
allow_delay &&
options_.rate_limit > 1.0 &&
(score = versions_->MaxCompactionScore()) > options_.rate_limit) {
// Delay a write when the compaction score for any level is too large.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
stall_leveln_slowdown_ += 1000;
allow_delay = false; // Do not delay a single write more than once
Log(options_.info_log,
"delaying write for rate limits with max score %.2f\n", score);
mutex_.Lock();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
@ -1640,10 +1654,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
snprintf(buf, sizeof(buf),
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
"%.3f memtable_compaction\n",
"%.3f memtable_compaction, %.3f leveln_slowdown\n",
stall_level0_slowdown_ / 1000000.0,
stall_level0_num_files_ / 1000000.0,
stall_memtable_compaction_ / 1000000.0);
stall_memtable_compaction_ / 1000000.0,
stall_leveln_slowdown_ / 1000000.0);
value->append(buf);
return true;

@ -217,6 +217,7 @@ class DBImpl : public DB {
uint64_t stall_level0_slowdown_;
uint64_t stall_memtable_compaction_;
uint64_t stall_level0_num_files_;
uint64_t stall_leveln_slowdown_;
// Time at which this instance was started.
const uint64_t started_at_;

@ -235,6 +235,11 @@ class VersionSet {
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}
// Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const {
return current_->compaction_score_;
}
// Add all files listed in any live version to *live.
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);

@ -253,6 +253,10 @@ struct Options {
// log file.
size_t max_log_file_size;
// Puts are delayed when any level has a compaction score that
// exceeds rate_limit. This is ignored when <= 1.0.
double rate_limit;
// Create an Options object with default values for all fields.
Options();

@ -43,7 +43,8 @@ Options::Options()
db_log_dir(""),
disable_seek_compaction(false),
max_log_file_size(0),
delete_obsolete_files_period_micros(0) {
delete_obsolete_files_period_micros(0),
rate_limit(0.0) {
}
void
@ -97,6 +98,8 @@ Options::Dump(
disable_seek_compaction);
Log(log," Options.delete_obsolete_files_period_micros: %ld",
delete_obsolete_files_period_micros);
Log(log," Options.rate_limit: %.2f",
rate_limit);
} // Options::Dump

Loading…
Cancel
Save