Allowed delayed_write_rate option to be dynamically set.

Summary: Closes https://github.com/facebook/rocksdb/pull/1488

Differential Revision: D4157784

Pulled By: siying

fbshipit-source-id: f150081
main
Lijun Tang 8 years ago committed by Facebook Github Bot
parent 307a4e80c8
commit adb665e0bf
  1. 1
      HISTORY.md
  2. 15
      db/column_family.cc
  3. 4
      db/db_impl.cc
  4. 11
      db/db_options_test.cc
  5. 18
      db/write_controller.h
  6. 6
      tools/db_bench_tool.cc
  7. 1
      util/cf_options.cc
  8. 2
      util/cf_options.h
  9. 11
      util/db_options.cc
  10. 2
      util/db_options.h
  11. 2
      util/options_helper.cc
  12. 3
      util/options_helper.h

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### Public API Change ### Public API Change
* Options::max_bytes_for_level_multiplier is now a double along with all getters and setters. * Options::max_bytes_for_level_multiplier is now a double along with all getters and setters.
* Suppor dynamically change `delayed_write_rate` option via SetDBOptions().
### New Features ### New Features
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable. * Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable.

@ -489,11 +489,12 @@ const double kSlowdownRatio = 1.2;
namespace { namespace {
std::unique_ptr<WriteControllerToken> SetupDelay( std::unique_ptr<WriteControllerToken> SetupDelay(
uint64_t max_write_rate, WriteController* write_controller, WriteController* write_controller,
uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes, uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
bool auto_comapctions_disabled) { bool auto_comapctions_disabled) {
const uint64_t kMinWriteRate = 1024u; // Minimum write rate 1KB/s. const uint64_t kMinWriteRate = 1024u; // Minimum write rate 1KB/s.
uint64_t max_write_rate = write_controller->max_delayed_write_rate();
uint64_t write_rate = write_controller->delayed_write_rate(); uint64_t write_rate = write_controller->delayed_write_rate();
if (auto_comapctions_disabled) { if (auto_comapctions_disabled) {
@ -616,8 +617,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
imm()->NumNotFlushed() >= imm()->NumNotFlushed() >=
mutable_cf_options.max_write_buffer_number - 1) { mutable_cf_options.max_write_buffer_number - 1) {
write_controller_token_ = write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller, SetupDelay(write_controller, compaction_needed_bytes,
compaction_needed_bytes, prev_compaction_needed_bytes_, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
@ -632,8 +633,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
write_controller_token_ = write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller, SetupDelay(write_controller, compaction_needed_bytes,
compaction_needed_bytes, prev_compaction_needed_bytes_, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) { if (compaction_picker_->IsLevel0CompactionInProgress()) {
@ -650,8 +651,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
vstorage->estimated_compaction_needed_bytes() >= vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) { mutable_cf_options.soft_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller, SetupDelay(write_controller, compaction_needed_bytes,
compaction_needed_bytes, prev_compaction_needed_bytes_, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats( internal_stats_->AddCFStats(
InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1); InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);

@ -322,7 +322,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
? immutable_db_options_.write_thread_max_yield_usec ? immutable_db_options_.write_thread_max_yield_usec
: 0, : 0,
immutable_db_options_.write_thread_slow_yield_usec), immutable_db_options_.write_thread_slow_yield_usec),
write_controller_(immutable_db_options_.delayed_write_rate), write_controller_(mutable_db_options_.delayed_write_rate),
last_batch_group_size_(0), last_batch_group_size_(0),
unscheduled_flushes_(0), unscheduled_flushes_(0),
unscheduled_compactions_(0), unscheduled_compactions_(0),
@ -2489,6 +2489,8 @@ Status DBImpl::SetDBOptions(
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
} }
write_controller_.set_max_delayed_write_rate(new_options.delayed_write_rate);
mutable_db_options_ = new_options; mutable_db_options_ = new_options;
persist_options_status = PersistOptions(); persist_options_status = PersistOptions();

@ -278,6 +278,17 @@ TEST_F(DBOptionsTest, AvoidFlushDuringShutdown) {
ASSERT_EQ("", FilesPerLevel()); ASSERT_EQ("", FilesPerLevel());
} }
TEST_F(DBOptionsTest, SetDelayedWriteRateOption) {
Options options;
options.create_if_missing = true;
options.delayed_write_rate = 2 * 1024U * 1024U;
Reopen(options);
ASSERT_EQ(2 * 1024U * 1024U, dbfull()->TEST_write_controler().max_delayed_write_rate());
ASSERT_OK(dbfull()->SetDBOptions({{"delayed_write_rate", "20000"}}));
ASSERT_EQ(20000, dbfull()->TEST_write_controler().max_delayed_write_rate());
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -26,7 +26,7 @@ class WriteController {
total_compaction_pressure_(0), total_compaction_pressure_(0),
bytes_left_(0), bytes_left_(0),
last_refill_time_(0) { last_refill_time_(0) {
set_delayed_write_rate(_delayed_write_rate); set_max_delayed_write_rate(_delayed_write_rate);
} }
~WriteController() = default; ~WriteController() = default;
@ -60,8 +60,21 @@ class WriteController {
} }
delayed_write_rate_ = write_rate; delayed_write_rate_ = write_rate;
} }
void set_max_delayed_write_rate(uint64_t write_rate) {
// avoid divide 0
if (write_rate == 0) {
write_rate = 1u;
}
max_delayed_write_rate_ = write_rate;
// update delayed_write_rate_ as well
delayed_write_rate_ = write_rate;
}
uint64_t delayed_write_rate() const { return delayed_write_rate_; } uint64_t delayed_write_rate() const { return delayed_write_rate_; }
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
private: private:
friend class WriteControllerToken; friend class WriteControllerToken;
friend class StopWriteToken; friend class StopWriteToken;
@ -73,6 +86,9 @@ class WriteController {
int total_compaction_pressure_; int total_compaction_pressure_;
uint64_t bytes_left_; uint64_t bytes_left_;
uint64_t last_refill_time_; uint64_t last_refill_time_;
// write rate set when initialization or by `DBImpl::SetDBOptions`
uint64_t max_delayed_write_rate_;
// current write rate
uint64_t delayed_write_rate_; uint64_t delayed_write_rate_;
}; };

@ -3528,11 +3528,11 @@ class Benchmark {
} }
if (levelMeta.level == 0) { if (levelMeta.level == 0) {
for (auto& fileMeta : levelMeta.files) { for (auto& fileMeta : levelMeta.files) {
fprintf(stdout, "Level[%d]: %s(size: %lu bytes)\n", levelMeta.level, fprintf(stdout, "Level[%d]: %s(size: %" PRIu64 " bytes)\n",
fileMeta.name.c_str(), fileMeta.size); levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
} }
} else { } else {
fprintf(stdout, "Level[%d]: %s - %s(total size: %lu bytes)\n", fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
levelMeta.level, levelMeta.files.front().name.c_str(), levelMeta.level, levelMeta.files.front().name.c_str(),
levelMeta.files.back().name.c_str(), levelMeta.size); levelMeta.files.back().name.c_str(), levelMeta.size);
} }

@ -43,7 +43,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
info_log(db_options.info_log.get()), info_log(db_options.info_log.get()),
statistics(db_options.statistics.get()), statistics(db_options.statistics.get()),
env(db_options.env), env(db_options.env),
delayed_write_rate(db_options.delayed_write_rate),
allow_mmap_reads(db_options.allow_mmap_reads), allow_mmap_reads(db_options.allow_mmap_reads),
allow_mmap_writes(db_options.allow_mmap_writes), allow_mmap_writes(db_options.allow_mmap_writes),
db_paths(db_options.db_paths), db_paths(db_options.db_paths),

@ -61,8 +61,6 @@ struct ImmutableCFOptions {
Env* env; Env* env;
uint64_t delayed_write_rate;
// Allow the OS to mmap file for reading sst tables. Default: false // Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads; bool allow_mmap_reads;

@ -74,7 +74,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
wal_bytes_per_sync(options.wal_bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners), listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking), enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield( enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield), options.enable_write_thread_adaptive_yield),
@ -199,8 +198,6 @@ void ImmutableDBOptions::Dump(Logger* log) const {
wal_recovery_mode); wal_recovery_mode);
Header(log, " Options.enable_thread_tracking: %d", Header(log, " Options.enable_thread_tracking: %d",
enable_thread_tracking); enable_thread_tracking);
Header(log, " Options.delayed_write_rate : %" PRIu64,
delayed_write_rate);
Header(log, " Options.allow_concurrent_memtable_write: %d", Header(log, " Options.allow_concurrent_memtable_write: %d",
allow_concurrent_memtable_write); allow_concurrent_memtable_write);
Header(log, " Options.enable_write_thread_adaptive_yield: %d", Header(log, " Options.enable_write_thread_adaptive_yield: %d",
@ -226,12 +223,14 @@ void ImmutableDBOptions::Dump(Logger* log) const {
MutableDBOptions::MutableDBOptions() MutableDBOptions::MutableDBOptions()
: base_background_compactions(1), : base_background_compactions(1),
max_background_compactions(1), max_background_compactions(1),
avoid_flush_during_shutdown(false) {} avoid_flush_during_shutdown(false),
delayed_write_rate(2 * 1024U * 1024U) {}
MutableDBOptions::MutableDBOptions(const DBOptions& options) MutableDBOptions::MutableDBOptions(const DBOptions& options)
: base_background_compactions(options.base_background_compactions), : base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions), max_background_compactions(options.max_background_compactions),
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown) {} avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
delayed_write_rate(options.delayed_write_rate) {}
void MutableDBOptions::Dump(Logger* log) const { void MutableDBOptions::Dump(Logger* log) const {
Header(log, " Options.base_background_compactions: %d", Header(log, " Options.base_background_compactions: %d",
@ -240,6 +239,8 @@ void MutableDBOptions::Dump(Logger* log) const {
max_background_compactions); max_background_compactions);
Header(log, " Options.avoid_flush_during_shutdown: %d", Header(log, " Options.avoid_flush_during_shutdown: %d",
avoid_flush_during_shutdown); avoid_flush_during_shutdown);
Header(log, " Options.delayed_write_rate : %" PRIu64,
delayed_write_rate);
} }
} // namespace rocksdb } // namespace rocksdb

@ -68,7 +68,6 @@ struct ImmutableDBOptions {
uint64_t wal_bytes_per_sync; uint64_t wal_bytes_per_sync;
std::vector<std::shared_ptr<EventListener>> listeners; std::vector<std::shared_ptr<EventListener>> listeners;
bool enable_thread_tracking; bool enable_thread_tracking;
uint64_t delayed_write_rate;
bool allow_concurrent_memtable_write; bool allow_concurrent_memtable_write;
bool enable_write_thread_adaptive_yield; bool enable_write_thread_adaptive_yield;
uint64_t write_thread_max_yield_usec; uint64_t write_thread_max_yield_usec;
@ -95,6 +94,7 @@ struct MutableDBOptions {
int base_background_compactions; int base_background_compactions;
int max_background_compactions; int max_background_compactions;
bool avoid_flush_during_shutdown; bool avoid_flush_during_shutdown;
uint64_t delayed_write_rate;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -96,7 +96,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.wal_bytes_per_sync = immutable_db_options.wal_bytes_per_sync; options.wal_bytes_per_sync = immutable_db_options.wal_bytes_per_sync;
options.listeners = immutable_db_options.listeners; options.listeners = immutable_db_options.listeners;
options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking;
options.delayed_write_rate = immutable_db_options.delayed_write_rate; options.delayed_write_rate = mutable_db_options.delayed_write_rate;
options.allow_concurrent_memtable_write = options.allow_concurrent_memtable_write =
immutable_db_options.allow_concurrent_memtable_write; immutable_db_options.allow_concurrent_memtable_write;
options.enable_write_thread_adaptive_yield = options.enable_write_thread_adaptive_yield =

@ -298,7 +298,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
OptionVerificationType::kNormal, false, 0}}, OptionVerificationType::kNormal, false, 0}},
{"delayed_write_rate", {"delayed_write_rate",
{offsetof(struct DBOptions, delayed_write_rate), OptionType::kUInt64T, {offsetof(struct DBOptions, delayed_write_rate), OptionType::kUInt64T,
OptionVerificationType::kNormal, false, 0}}, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, delayed_write_rate)}},
{"delete_obsolete_files_period_micros", {"delete_obsolete_files_period_micros",
{offsetof(struct DBOptions, delete_obsolete_files_period_micros), {offsetof(struct DBOptions, delete_obsolete_files_period_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal, false, 0}}, OptionType::kUInt64T, OptionVerificationType::kNormal, false, 0}},

Loading…
Cancel
Save