Add options.hard_pending_compaction_bytes_limit to stop writes if compaction lagging behind

Summary: Add an option to stop writes if compaction lefts behind. If estimated pending compaction bytes is more than threshold specified by options.hard_pending_compaction_bytes_liimt, writes will stop until compactions are cleared to under the threshold.

Test Plan: Add unit test DBTest.HardLimit

Reviewers: rven, kradhakrishnan, anthony, IslamAbdelRahman, yhchiang, igor

Reviewed By: igor

Subscribers: MarkCallaghan, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D45999
main
sdong 9 years ago
parent 7143242d12
commit 5de807ac16
  1. 10
      db/column_family.cc
  2. 9
      db/db_bench.cc
  3. 47
      db/db_test.cc
  4. 30
      db/internal_stats.cc
  5. 2
      db/internal_stats.h
  6. 8
      include/rocksdb/options.h
  7. 7
      util/mutable_cf_options.cc
  8. 7
      util/mutable_cf_options.h
  9. 9
      util/options.cc
  10. 5
      util/options_helper.cc
  11. 3
      util/options_helper.h
  12. 8
      util/options_test.cc

@ -454,6 +454,16 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files", "[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count()); name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.hard_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because estimated pending compaction "
"bytes exceed %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {

@ -611,9 +611,10 @@ static bool ValidateRateLimit(const char* flagname, double value) {
} }
DEFINE_double(soft_rate_limit, 0.0, ""); DEFINE_double(soft_rate_limit, 0.0, "");
DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads " DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");
"sleep at each stats reporting interval until the compaction"
" score for all levels is less than or equal to this value."); DEFINE_uint64(hard_pending_compaction_bytes_limit, 128u * 1024 * 1024 * 1024,
"Stop writes if pending compaction bytes exceed this number");
DEFINE_uint64(delayed_write_rate, 2097152u, DEFINE_uint64(delayed_write_rate, 2097152u,
"Limited bytes allowed to DB when soft_rate_limit or " "Limited bytes allowed to DB when soft_rate_limit or "
@ -2431,6 +2432,8 @@ class Benchmark {
} }
options.soft_rate_limit = FLAGS_soft_rate_limit; options.soft_rate_limit = FLAGS_soft_rate_limit;
options.hard_rate_limit = FLAGS_hard_rate_limit; options.hard_rate_limit = FLAGS_hard_rate_limit;
options.hard_pending_compaction_bytes_limit =
FLAGS_hard_pending_compaction_bytes_limit;
options.delayed_write_rate = FLAGS_delayed_write_rate; options.delayed_write_rate = FLAGS_delayed_write_rate;
options.rate_limit_delay_max_milliseconds = options.rate_limit_delay_max_milliseconds =
FLAGS_rate_limit_delay_max_milliseconds; FLAGS_rate_limit_delay_max_milliseconds;

@ -8513,6 +8513,53 @@ TEST_F(DBTest, DelayedWriteRate) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBTest, HardLimit) {
Options options;
options.env = env_;
env_->SetBackgroundThreads(1, Env::LOW);
options = CurrentOptions(options);
options.max_write_buffer_number = 256;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 * 1024;
options.level0_file_num_compaction_trigger = 4;
options.level0_slowdown_writes_trigger = 999999;
options.level0_stop_writes_trigger = 999999;
options.hard_pending_compaction_bytes_limit = 800 << 10;
options.max_bytes_for_level_base = 10000000000u;
options.max_background_compactions = 1;
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
CreateAndReopenWithCF({"pikachu"}, options);
std::atomic<int> callback_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait",
[&](void* arg) {
callback_count.fetch_add(1);
sleeping_task_low.WakeUp();
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < 5; num++) {
GenerateNewFile(&rnd, &key_idx, true);
}
ASSERT_EQ(0, callback_count.load());
for (int num = 0; num < 5; num++) {
GenerateNewFile(&rnd, &key_idx, true);
dbfull()->TEST_WaitForFlushMemTable();
}
ASSERT_GE(callback_count.load(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest, SoftLimit) { TEST_F(DBTest, SoftLimit) {
Options options; Options options;
options.env = env_; options.env = env_;

@ -669,8 +669,10 @@ void InternalStats::DumpCFStats(std::string* value) {
total_files += files; total_files += files;
total_files_being_compacted += files_being_compacted[level]; total_files_being_compacted += files_being_compacted[level];
if (comp_stats_[level].micros > 0 || files > 0) { if (comp_stats_[level].micros > 0 || files > 0) {
uint64_t stalls = level == 0 ? (cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL] + uint64_t stalls =
level == 0 ? (cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL] +
cf_stats_count_[LEVEL0_NUM_FILES_TOTAL] + cf_stats_count_[LEVEL0_NUM_FILES_TOTAL] +
cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT] +
cf_stats_count_[MEMTABLE_COMPACTION]) cf_stats_count_[MEMTABLE_COMPACTION])
: (stall_leveln_slowdown_count_soft_[level] + : (stall_leveln_slowdown_count_soft_[level] +
stall_leveln_slowdown_count_hard_[level]); stall_leveln_slowdown_count_hard_[level]);
@ -715,20 +717,28 @@ void InternalStats::DumpCFStats(std::string* value) {
curr_ingest / kGB, interval_ingest / kGB); curr_ingest / kGB, interval_ingest / kGB);
value->append(buf); value->append(buf);
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf), "Stalls(count): %" PRIu64
"Stalls(count): %" PRIu64 " level0_slowdown, " " level0_slowdown, "
"%" PRIu64 " level0_slowdown_with_compaction, " "%" PRIu64
"%" PRIu64 " level0_numfiles, " " level0_slowdown_with_compaction, "
"%" PRIu64 " level0_numfiles_with_compaction, " "%" PRIu64
"%" PRIu64 " memtable_compaction, " " level0_numfiles, "
"%" PRIu64 " leveln_slowdown_soft, " "%" PRIu64
" level0_numfiles_with_compaction, "
"%" PRIu64
" pending_compaction_bytes, "
"%" PRIu64
" memtable_compaction, "
"%" PRIu64
" leveln_slowdown_soft, "
"%" PRIu64 " leveln_slowdown_hard\n", "%" PRIu64 " leveln_slowdown_hard\n",
cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL], cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL],
cf_stats_count_[LEVEL0_SLOWDOWN_WITH_COMPACTION], cf_stats_count_[LEVEL0_SLOWDOWN_WITH_COMPACTION],
cf_stats_count_[LEVEL0_NUM_FILES_TOTAL], cf_stats_count_[LEVEL0_NUM_FILES_TOTAL],
cf_stats_count_[LEVEL0_NUM_FILES_WITH_COMPACTION], cf_stats_count_[LEVEL0_NUM_FILES_WITH_COMPACTION],
cf_stats_count_[MEMTABLE_COMPACTION], cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT],
total_slowdown_count_soft, total_slowdown_count_hard); cf_stats_count_[MEMTABLE_COMPACTION], total_slowdown_count_soft,
total_slowdown_count_hard);
value->append(buf); value->append(buf);
cf_stats_snapshot_.ingest_bytes = curr_ingest; cf_stats_snapshot_.ingest_bytes = curr_ingest;

@ -85,6 +85,7 @@ class InternalStats {
MEMTABLE_COMPACTION, MEMTABLE_COMPACTION,
LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_TOTAL,
LEVEL0_NUM_FILES_WITH_COMPACTION, LEVEL0_NUM_FILES_WITH_COMPACTION,
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX, WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED, BYTES_FLUSHED,
INTERNAL_CF_STATS_ENUM_MAX, INTERNAL_CF_STATS_ENUM_MAX,
@ -357,6 +358,7 @@ class InternalStats {
MEMTABLE_COMPACTION, MEMTABLE_COMPACTION,
LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_TOTAL,
LEVEL0_NUM_FILES_WITH_COMPACTION, LEVEL0_NUM_FILES_WITH_COMPACTION,
HARD_PENDING_COMPACTION_BYTES_LIMIT,
WRITE_STALLS_ENUM_MAX, WRITE_STALLS_ENUM_MAX,
BYTES_FLUSHED, BYTES_FLUSHED,
INTERNAL_CF_STATS_ENUM_MAX, INTERNAL_CF_STATS_ENUM_MAX,

@ -499,8 +499,6 @@ struct ColumnFamilyOptions {
// Puts are delayed to options.delayed_write_rate when any level has a // Puts are delayed to options.delayed_write_rate when any level has a
// compaction score that exceeds soft_rate_limit. This is ignored when == 0.0. // compaction score that exceeds soft_rate_limit. This is ignored when == 0.0.
// CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not
// hold, RocksDB will set soft_rate_limit = hard_rate_limit
// //
// Default: 0 (disabled) // Default: 0 (disabled)
// //
@ -510,6 +508,12 @@ struct ColumnFamilyOptions {
// DEPRECATED -- this options is no longer usde // DEPRECATED -- this options is no longer usde
double hard_rate_limit; double hard_rate_limit;
// All writes are stopped if estimated bytes needed to be compaction exceed
// this threshold.
//
// Default: 0 (disabled)
uint64_t hard_pending_compaction_bytes_limit;
// DEPRECATED -- this options is no longer used // DEPRECATED -- this options is no longer used
unsigned int rate_limit_delay_max_milliseconds; unsigned int rate_limit_delay_max_milliseconds;

@ -3,6 +3,8 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#include "util/mutable_cf_options.h"
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif
@ -15,7 +17,6 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/immutable_options.h" #include "rocksdb/immutable_options.h"
#include "util/mutable_cf_options.h"
namespace rocksdb { namespace rocksdb {
@ -83,8 +84,8 @@ void MutableCFOptions::Dump(Logger* log) const {
disable_auto_compactions); disable_auto_compactions);
Log(log, " soft_rate_limit: %lf", Log(log, " soft_rate_limit: %lf",
soft_rate_limit); soft_rate_limit);
Log(log, " hard_rate_limit: %lf", Log(log, " hard_pending_compaction_bytes_limit: %" PRIu64,
hard_rate_limit); hard_pending_compaction_bytes_limit);
Log(log, " level0_file_num_compaction_trigger: %d", Log(log, " level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger); level0_file_num_compaction_trigger);
Log(log, " level0_slowdown_writes_trigger: %d", Log(log, " level0_slowdown_writes_trigger: %d",

@ -25,7 +25,8 @@ struct MutableCFOptions {
inplace_update_num_locks(options.inplace_update_num_locks), inplace_update_num_locks(options.inplace_update_num_locks),
disable_auto_compactions(options.disable_auto_compactions), disable_auto_compactions(options.disable_auto_compactions),
soft_rate_limit(options.soft_rate_limit), soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit), hard_pending_compaction_bytes_limit(
options.hard_pending_compaction_bytes_limit),
level0_file_num_compaction_trigger( level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger), options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
@ -61,7 +62,7 @@ struct MutableCFOptions {
inplace_update_num_locks(0), inplace_update_num_locks(0),
disable_auto_compactions(false), disable_auto_compactions(false),
soft_rate_limit(0), soft_rate_limit(0),
hard_rate_limit(0), hard_pending_compaction_bytes_limit(0),
level0_file_num_compaction_trigger(0), level0_file_num_compaction_trigger(0),
level0_slowdown_writes_trigger(0), level0_slowdown_writes_trigger(0),
level0_stop_writes_trigger(0), level0_stop_writes_trigger(0),
@ -112,7 +113,7 @@ struct MutableCFOptions {
// Compaction related options // Compaction related options
bool disable_auto_compactions; bool disable_auto_compactions;
double soft_rate_limit; double soft_rate_limit;
double hard_rate_limit; uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger; int level0_file_num_compaction_trigger;
int level0_slowdown_writes_trigger; int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger; int level0_stop_writes_trigger;

@ -102,7 +102,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
source_compaction_factor(1), source_compaction_factor(1),
max_grandparent_overlap_factor(10), max_grandparent_overlap_factor(10),
soft_rate_limit(0.0), soft_rate_limit(0.0),
hard_rate_limit(0.0), hard_pending_compaction_bytes_limit(0),
rate_limit_delay_max_milliseconds(1000), rate_limit_delay_max_milliseconds(1000),
arena_block_size(0), arena_block_size(0),
disable_auto_compactions(false), disable_auto_compactions(false),
@ -161,7 +161,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
source_compaction_factor(options.source_compaction_factor), source_compaction_factor(options.source_compaction_factor),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
soft_rate_limit(options.soft_rate_limit), soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit), hard_pending_compaction_bytes_limit(
options.hard_pending_compaction_bytes_limit),
rate_limit_delay_max_milliseconds( rate_limit_delay_max_milliseconds(
options.rate_limit_delay_max_milliseconds), options.rate_limit_delay_max_milliseconds),
arena_block_size(options.arena_block_size), arena_block_size(options.arena_block_size),
@ -473,8 +474,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
arena_block_size); arena_block_size);
Warn(log, " Options.soft_rate_limit: %.2f", Warn(log, " Options.soft_rate_limit: %.2f",
soft_rate_limit); soft_rate_limit);
Warn(log, " Options.hard_rate_limit: %.2f", Warn(log, " Options.hard_pending_compaction_bytes_limit: %" PRIu64,
hard_rate_limit); hard_pending_compaction_bytes_limit);
Warn(log, " Options.rate_limit_delay_max_milliseconds: %u", Warn(log, " Options.rate_limit_delay_max_milliseconds: %u",
rate_limit_delay_max_milliseconds); rate_limit_delay_max_milliseconds);
Warn(log, " Options.disable_auto_compactions: %d", Warn(log, " Options.disable_auto_compactions: %d",

@ -281,8 +281,11 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value,
new_options->disable_auto_compactions = ParseBoolean(name, value); new_options->disable_auto_compactions = ParseBoolean(name, value);
} else if (name == "soft_rate_limit") { } else if (name == "soft_rate_limit") {
new_options->soft_rate_limit = ParseDouble(value); new_options->soft_rate_limit = ParseDouble(value);
} else if (name == "hard_pending_compaction_bytes_limit") {
new_options->hard_pending_compaction_bytes_limit = ParseUint64(value);
} else if (name == "hard_rate_limit") { } else if (name == "hard_rate_limit") {
new_options->hard_rate_limit = ParseDouble(value); // Deprecated options but still leave it here to avoid older options
// strings can be consumed.
} else if (name == "level0_file_num_compaction_trigger") { } else if (name == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(value); new_options->level0_file_num_compaction_trigger = ParseInt(value);
} else if (name == "level0_slowdown_writes_trigger") { } else if (name == "level0_slowdown_writes_trigger") {

@ -194,6 +194,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
{"verify_checksums_in_compaction", {"verify_checksums_in_compaction",
{offsetof(struct ColumnFamilyOptions, verify_checksums_in_compaction), {offsetof(struct ColumnFamilyOptions, verify_checksums_in_compaction),
OptionType::kBoolean}}, OptionType::kBoolean}},
{"hard_pending_compaction_bytes_limit",
{offsetof(struct ColumnFamilyOptions, hard_pending_compaction_bytes_limit),
OptionType::kUInt64T}},
{"hard_rate_limit", {"hard_rate_limit",
{offsetof(struct ColumnFamilyOptions, hard_rate_limit), {offsetof(struct ColumnFamilyOptions, hard_rate_limit),
OptionType::kDouble}}, OptionType::kDouble}},

@ -122,6 +122,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"max_grandparent_overlap_factor", "21"}, {"max_grandparent_overlap_factor", "21"},
{"soft_rate_limit", "1.1"}, {"soft_rate_limit", "1.1"},
{"hard_rate_limit", "2.1"}, {"hard_rate_limit", "2.1"},
{"hard_pending_compaction_bytes_limit", "211"},
{"arena_block_size", "22"}, {"arena_block_size", "22"},
{"disable_auto_compactions", "true"}, {"disable_auto_compactions", "true"},
{"compaction_style", "kCompactionStyleLevel"}, {"compaction_style", "kCompactionStyleLevel"},
@ -214,7 +215,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.source_compaction_factor, 20); ASSERT_EQ(new_cf_opt.source_compaction_factor, 20);
ASSERT_EQ(new_cf_opt.max_grandparent_overlap_factor, 21); ASSERT_EQ(new_cf_opt.max_grandparent_overlap_factor, 21);
ASSERT_EQ(new_cf_opt.soft_rate_limit, 1.1); ASSERT_EQ(new_cf_opt.soft_rate_limit, 1.1);
ASSERT_EQ(new_cf_opt.hard_rate_limit, 2.1); ASSERT_EQ(new_cf_opt.hard_pending_compaction_bytes_limit, 211);
ASSERT_EQ(new_cf_opt.arena_block_size, 22U); ASSERT_EQ(new_cf_opt.arena_block_size, 22U);
ASSERT_EQ(new_cf_opt.disable_auto_compactions, true); ASSERT_EQ(new_cf_opt.disable_auto_compactions, true);
ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel); ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel);
@ -667,7 +668,8 @@ void VerifyColumnFamilyOptions(const ColumnFamilyOptions& base_opt,
new_opt.verify_checksums_in_compaction); new_opt.verify_checksums_in_compaction);
// double options // double options
VerifyDouble(base_opt.hard_rate_limit, new_opt.hard_rate_limit); ASSERT_EQ(base_opt.hard_pending_compaction_bytes_limit,
new_opt.hard_pending_compaction_bytes_limit);
VerifyDouble(base_opt.soft_rate_limit, new_opt.soft_rate_limit); VerifyDouble(base_opt.soft_rate_limit, new_opt.soft_rate_limit);
// int options // int options
@ -746,7 +748,6 @@ TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) {
base_opt.verify_checksums_in_compaction = rnd.Uniform(2); base_opt.verify_checksums_in_compaction = rnd.Uniform(2);
// double options // double options
base_opt.hard_rate_limit = static_cast<double>(rnd.Uniform(10000)) / 13;
base_opt.soft_rate_limit = static_cast<double>(rnd.Uniform(10000)) / 13; base_opt.soft_rate_limit = static_cast<double>(rnd.Uniform(10000)) / 13;
// int options // int options
@ -782,6 +783,7 @@ TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) {
static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
base_opt.max_sequential_skip_in_iterations = uint_max + rnd.Uniform(10000); base_opt.max_sequential_skip_in_iterations = uint_max + rnd.Uniform(10000);
base_opt.target_file_size_base = uint_max + rnd.Uniform(10000); base_opt.target_file_size_base = uint_max + rnd.Uniform(10000);
base_opt.hard_pending_compaction_bytes_limit = uint_max + rnd.Uniform(10000);
// unsigned int options // unsigned int options
base_opt.rate_limit_delay_max_milliseconds = rnd.Uniform(10000); base_opt.rate_limit_delay_max_milliseconds = rnd.Uniform(10000);

Loading…
Cancel
Save