diff --git a/HISTORY.md b/HISTORY.md index 5b144ff3a..ca117b273 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,9 @@ # Rocksdb Change Log -## Unreleased +## Unreleased (will be released with 3.6) + +### Behavior changes +* We have refactored our system of stalling writes. Any stall-related statistics' meanings are changed. Instead of per-write stall counts, we now count stalls per-epoch, where epochs are periods between flushes and compactions. You'll find more information in our Tuning Perf Guide once we release RocksDB 3.6. ----- Past Releases ----- diff --git a/Makefile b/Makefile index c05d82af7..da85ae2fc 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,8 @@ TESTS = \ version_edit_test \ version_set_test \ file_indexer_test \ - write_batch_test\ + write_batch_test \ + write_controller_test\ deletefile_test \ table_test \ thread_local_test \ @@ -427,6 +428,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index 7e06c9bd7..eb2f21e9f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -9,6 +9,11 @@ #include "db/column_family.h" +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include #include @@ -19,11 +24,42 @@ #include "db/internal_stats.h" #include "db/compaction_picker.h" #include "db/table_properties_collector.h" +#include "db/write_controller.h" #include "util/autovector.h" #include "util/hash_skiplist_rep.h" namespace rocksdb { +namespace { +// This function computes the amount of time in microseconds by which a write +// should be delayed based on the number of level-0 files according to the +// following formula: +// if n < bottom, return 0; +// if n >= top, return 1000; +// otherwise, let r = (n - bottom) / +// (top - bottom) +// and return r^2 * 1000. +// The goal of this formula is to gradually increase the rate at which writes +// are slowed. We also tried linear delay (r * 1000), but it seemed to do +// slightly worse. There is no other particular reason for choosing quadratic. +uint64_t SlowdownAmount(int n, double bottom, double top) { + uint64_t delay; + if (n >= top) { + delay = 1000; + } else if (n < bottom) { + delay = 0; + } else { + // If we are here, we know that: + // level0_start_slowdown <= n < level0_slowdown + // since the previous two conditions are false. + double how_much = static_cast(n - bottom) / (top - bottom); + delay = std::max(how_much * how_much * 1000, 100.0); + } + assert(delay <= 1000); + return delay; +} +} // namespace + ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex) : cfd_(cfd), db_(db), mutex_(mutex) { @@ -197,7 +233,6 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, next_(nullptr), prev_(nullptr), log_number_(0), - need_slowdown_for_num_level0_files_(false), column_family_set_(column_family_set) { Ref(); @@ -278,31 +313,62 @@ ColumnFamilyData::~ColumnFamilyData() { } void ColumnFamilyData::RecalculateWriteStallConditions() { - need_wait_for_num_memtables_ = - (imm()->size() == options()->max_write_buffer_number - 1); - if (current_ != nullptr) { - need_wait_for_num_level0_files_ = - (current_->NumLevelFiles(0) >= options()->level0_stop_writes_trigger); - } else { - need_wait_for_num_level0_files_ = false; - } - - RecalculateWriteStallRateLimitsConditions(); -} - -void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() { - if (current_ != nullptr) { - exceeds_hard_rate_limit_ = - (options()->hard_rate_limit > 1.0 && - current_->MaxCompactionScore() > options()->hard_rate_limit); - - exceeds_soft_rate_limit_ = - (options()->soft_rate_limit > 0.0 && - current_->MaxCompactionScore() > options()->soft_rate_limit); - } else { - exceeds_hard_rate_limit_ = false; - exceeds_soft_rate_limit_ = false; + const double score = current_->MaxCompactionScore(); + const int max_level = current_->MaxCompactionScoreLevel(); + + auto write_controller = column_family_set_->write_controller_; + + if (imm()->size() == options_.max_write_buffer_number) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); + Log(options_.info_log, + "[%s] Stopping writes because we have %d immutable memtables " + "(waiting for flush)", + name_.c_str(), imm()->size()); + } else if (options_.level0_slowdown_writes_trigger >= 0 && + current_->NumLevelFiles(0) >= + options_.level0_slowdown_writes_trigger) { + uint64_t slowdown = SlowdownAmount( + current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, + options_.level0_stop_writes_trigger); + write_controller_token_ = write_controller->GetDelayToken(slowdown); + internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown); + Log(options_.info_log, + "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 + "us)", + name_.c_str(), current_->NumLevelFiles(0), slowdown); + } else if (current_->NumLevelFiles(0) >= + options_.level0_stop_writes_trigger) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1); + Log(options_.info_log, + "[%s] Stopping writes because we have %d level-0 files", + name_.c_str(), current_->NumLevelFiles(0)); + } else if (options_.hard_rate_limit > 1.0 && + score > options_.hard_rate_limit) { + uint64_t kHardLimitSlowdown = 1000; + write_controller_token_ = + write_controller->GetDelayToken(kHardLimitSlowdown); + internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown, + false); + Log(options_.info_log, + "[%s] Stalling writes because we hit hard limit on level %d. " + "(%" PRIu64 "us)", + name_.c_str(), max_level, kHardLimitSlowdown); + } else if (options_.soft_rate_limit > 0.0 && + score > options_.soft_rate_limit) { + uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit, + options_.hard_rate_limit); + write_controller_token_ = write_controller->GetDelayToken(slowdown); + internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true); + Log(options_.info_log, + "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64 + "us)", + name_.c_str(), max_level, slowdown); + } else { + write_controller_token_.reset(); + } } } @@ -310,12 +376,7 @@ const EnvOptions* ColumnFamilyData::soptions() const { return &(column_family_set_->env_options_); } -void ColumnFamilyData::SetCurrent(Version* current) { - current_ = current; - need_slowdown_for_num_level0_files_ = - (options_.level0_slowdown_writes_trigger >= 0 && - current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger); -} +void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; } void ColumnFamilyData::CreateNewMemtable() { assert(current_ != nullptr); @@ -328,7 +389,6 @@ void ColumnFamilyData::CreateNewMemtable() { Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { auto result = compaction_picker_->PickCompaction(current_, log_buffer); - RecalculateWriteStallRateLimitsConditions(); return result; } @@ -464,16 +524,18 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() { ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, - Cache* table_cache) + Cache* table_cache, + WriteController* write_controller) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, ColumnFamilyOptions(), db_options, - env_options_, nullptr)), + env_options, nullptr)), default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), env_options_(env_options), table_cache_(table_cache), + write_controller_(write_controller), spin_lock_(ATOMIC_FLAG_INIT) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; diff --git a/db/column_family.h b/db/column_family.h index a68189d51..b5363fe30 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -19,6 +19,7 @@ #include "rocksdb/env.h" #include "db/memtable_list.h" #include "db/write_batch_internal.h" +#include "db/write_controller.h" #include "db/table_cache.h" #include "util/thread_local.h" @@ -156,6 +157,7 @@ class ColumnFamilyData { // can't drop default CF assert(id_ != 0); dropped_ = true; + write_controller_token_.reset(); } bool IsDropped() const { return dropped_; } @@ -225,35 +227,12 @@ class ColumnFamilyData { void ResetThreadLocalSuperVersions(); - // A Flag indicating whether write needs to slowdown because of there are - // too many number of level0 files. - bool NeedSlowdownForNumLevel0Files() const { - return need_slowdown_for_num_level0_files_; - } - - bool NeedWaitForNumLevel0Files() const { - return need_wait_for_num_level0_files_; - } - - bool NeedWaitForNumMemtables() const { - return need_wait_for_num_memtables_; - } - - bool ExceedsSoftRateLimit() const { - return exceeds_soft_rate_limit_; - } - - bool ExceedsHardRateLimit() const { - return exceeds_hard_rate_limit_; - } - private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, Cache* table_cache, const ColumnFamilyOptions& options, - const DBOptions* db_options, - const EnvOptions& env_options, + const DBOptions* db_options, const EnvOptions& env_options, ColumnFamilySet* column_family_set); // Recalculate some small conditions, which are changed only during @@ -262,7 +241,6 @@ class ColumnFamilyData { // DBImpl::MakeRoomForWrite function to decide, if it need to make // a write stall void RecalculateWriteStallConditions(); - void RecalculateWriteStallRateLimitsConditions(); uint32_t id_; const std::string name_; @@ -304,31 +282,13 @@ class ColumnFamilyData { // recovered from uint64_t log_number_; - // A flag indicating whether we should delay writes because - // we have too many level 0 files - bool need_slowdown_for_num_level0_files_; - - // These 4 variables are updated only after compaction, - // adding new memtable, flushing memtables to files - // and/or add recalculation of compaction score. - // That's why theirs values are cached in ColumnFamilyData. - // Recalculation is made by RecalculateWriteStallConditions and - // RecalculateWriteStallRateLimitsConditions function. They are used - // in DBImpl::MakeRoomForWrite function to decide, if it need - // to sleep during write operation - bool need_wait_for_num_memtables_; - - bool need_wait_for_num_level0_files_; - - bool exceeds_hard_rate_limit_; - - bool exceeds_soft_rate_limit_; - // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_; ColumnFamilySet* column_family_set_; + + std::unique_ptr write_controller_token_; }; // ColumnFamilySet has interesting thread-safety requirements @@ -370,7 +330,8 @@ class ColumnFamilySet { }; ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, - const EnvOptions& env_options, Cache* table_cache); + const EnvOptions& env_options, Cache* table_cache, + WriteController* write_controller); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -425,6 +386,7 @@ class ColumnFamilySet { const DBOptions* const db_options_; const EnvOptions env_options_; Cache* table_cache_; + WriteController* write_controller_; std::atomic_flag spin_lock_; }; diff --git a/db/db_impl.cc b/db/db_impl.cc index b83d60f5e..cff2d5a20 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -344,7 +344,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) last_stats_dump_time_microsec_(0), default_interval_to_delete_obsolete_WAL_(600), flush_on_destroy_(false), - delayed_writes_(0), env_options_(options), bg_work_gate_closed_(false), refitting_level_(false), @@ -360,8 +359,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits, db_options_.table_cache_remove_scan_count_limit); - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get())); + versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, + table_cache_.get(), &write_controller_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -3988,6 +3987,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { flush_column_family_if_log_file, total_log_size_, max_total_wal_size); } + if (write_controller_.IsStopped() || write_controller_.GetDelay() > 0) { + DelayWrite(expiration_time); + } + if (LIKELY(single_column_family_mode_)) { // fast path status = MakeRoomForWrite(default_cf_handle_->cfd(), @@ -4189,36 +4192,28 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, } } -// This function computes the amount of time in microseconds by which a write -// should be delayed based on the number of level-0 files according to the -// following formula: -// if n < bottom, return 0; -// if n >= top, return 1000; -// otherwise, let r = (n - bottom) / -// (top - bottom) -// and return r^2 * 1000. -// The goal of this formula is to gradually increase the rate at which writes -// are slowed. We also tried linear delay (r * 1000), but it seemed to do -// slightly worse. There is no other particular reason for choosing quadratic. -uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { - uint64_t delay; - if (n >= top) { - delay = 1000; - } - else if (n < bottom) { - delay = 0; - } - else { - // If we are here, we know that: - // level0_start_slowdown <= n < level0_slowdown - // since the previous two conditions are false. - double how_much = - (double) (n - bottom) / - (top - bottom); - delay = std::max(how_much * how_much * 1000, 100.0); - } - assert(delay <= 1000); - return delay; +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +void DBImpl::DelayWrite(uint64_t expiration_time) { + StopWatch sw(env_, stats_, WRITE_STALL); + bool has_timeout = (expiration_time > 0); + auto delay = write_controller_.GetDelay(); + if (write_controller_.IsStopped() == false && delay > 0) { + mutex_.Unlock(); + env_->SleepForMicroseconds(delay); + mutex_.Lock(); + } + + while (write_controller_.IsStopped()) { + if (has_timeout) { + bg_cv_.TimedWait(expiration_time); + if (env_->NowMicros() > expiration_time) { + break; + } + } else { + bg_cv_.Wait(); + } + } } // REQUIRES: mutex_ is held @@ -4228,16 +4223,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, uint64_t expiration_time) { mutex_.AssertHeld(); assert(!writers_.empty()); - bool allow_delay = true; - bool allow_hard_rate_limit_delay = true; - bool allow_soft_rate_limit_delay = true; - uint64_t rate_limit_delay_millis = 0; Status s; - double score; - // Once we schedule background work, we shouldn't schedule it again, since it - // might generate a tight feedback loop, constantly scheduling more background - // work, even if additional background work is not needed - bool schedule_background_work = true; bool has_timeout = (expiration_time > 0); while (true) { @@ -4248,111 +4234,9 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, } else if (has_timeout && env_->NowMicros() > expiration_time) { s = Status::TimedOut(); break; - } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { - // We are getting close to hitting a hard limit on the number of - // L0 files. Rather than delaying a single write by several - // seconds when we hit the hard limit, start delaying each - // individual write by 0-1ms to reduce latency variance. Also, - // this delay hands over some CPU to the compaction thread in - // case it is sharing the same core as the writer. - uint64_t slowdown = - SlowdownAmount(cfd->current()->NumLevelFiles(0), - cfd->options()->level0_slowdown_writes_trigger, - cfd->options()->level0_stop_writes_trigger); - mutex_.Unlock(); - uint64_t delayed; - { - StopWatch sw(env_, stats_, STALL_L0_SLOWDOWN_COUNT, &delayed); - env_->SleepForMicroseconds(slowdown); - } - RecordTick(stats_, STALL_L0_SLOWDOWN_MICROS, delayed); - allow_delay = false; // Do not delay a single write more than once - mutex_.Lock(); - cfd->internal_stats()->AddCFStats( - InternalStats::LEVEL0_SLOWDOWN, delayed); - delayed_writes_++; } else if (!cfd->mem()->ShouldFlush()) { // There is room in current memtable - if (allow_delay) { - DelayLoggingAndReset(); - } break; - } else if (cfd->NeedWaitForNumMemtables()) { - // We have filled up the current memtable, but the previous - // ones are still being flushed, so we wait. - DelayLoggingAndReset(); - Log(db_options_.info_log, "[%s] wait for memtable flush...\n", - cfd->GetName().c_str()); - if (schedule_background_work) { - MaybeScheduleFlushOrCompaction(); - schedule_background_work = false; - } - uint64_t stall; - { - StopWatch sw(env_, stats_, STALL_MEMTABLE_COMPACTION_COUNT, &stall); - if (!has_timeout) { - bg_cv_.Wait(); - } else { - bg_cv_.TimedWait(expiration_time); - } - } - RecordTick(stats_, STALL_MEMTABLE_COMPACTION_MICROS, stall); - cfd->internal_stats()->AddCFStats( - InternalStats::MEMTABLE_COMPACTION, stall); - } else if (cfd->NeedWaitForNumLevel0Files()) { - DelayLoggingAndReset(); - Log(db_options_.info_log, "[%s] wait for fewer level0 files...\n", - cfd->GetName().c_str()); - uint64_t stall; - { - StopWatch sw(env_, stats_, STALL_L0_NUM_FILES_COUNT, &stall); - if (!has_timeout) { - bg_cv_.Wait(); - } else { - bg_cv_.TimedWait(expiration_time); - } - } - RecordTick(stats_, STALL_L0_NUM_FILES_MICROS, stall); - cfd->internal_stats()->AddCFStats( - InternalStats::LEVEL0_NUM_FILES, stall); - } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { - // Delay a write when the compaction score for any level is too large. - const int max_level = cfd->current()->MaxCompactionScoreLevel(); - score = cfd->current()->MaxCompactionScore(); - mutex_.Unlock(); - uint64_t delayed; - { - StopWatch sw(env_, stats_, HARD_RATE_LIMIT_DELAY_COUNT, &delayed); - env_->SleepForMicroseconds(1000); - } - // Make sure the following value doesn't round to zero. - uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); - rate_limit_delay_millis += rate_limit; - RecordTick(stats_, RATE_LIMIT_DELAY_MILLIS, rate_limit); - if (cfd->options()->rate_limit_delay_max_milliseconds > 0 && - rate_limit_delay_millis >= - (unsigned)cfd->options()->rate_limit_delay_max_milliseconds) { - allow_hard_rate_limit_delay = false; - } - mutex_.Lock(); - cfd->internal_stats()->RecordLevelNSlowdown(max_level, delayed, false); - } else if (allow_soft_rate_limit_delay && cfd->ExceedsSoftRateLimit()) { - const int max_level = cfd->current()->MaxCompactionScoreLevel(); - score = cfd->current()->MaxCompactionScore(); - // Delay a write when the compaction score for any level is too large. - // TODO: add statistics - uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit, - cfd->options()->hard_rate_limit); - uint64_t elapsed = 0; - mutex_.Unlock(); - { - StopWatch sw(env_, stats_, SOFT_RATE_LIMIT_DELAY_COUNT, &elapsed); - env_->SleepForMicroseconds(slowdown); - rate_limit_delay_millis += slowdown; - } - allow_soft_rate_limit_delay = false; - mutex_.Lock(); - cfd->internal_stats()->RecordLevelNSlowdown(max_level, elapsed, true); } else { s = SetNewMemtableAndNewLogFile(cfd, context); if (!s.ok()) { @@ -4383,7 +4267,6 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, mutex_.Unlock(); Status s; { - DelayLoggingAndReset(); if (creating_new_log) { s = env_->NewWritableFile( LogFileName(db_options_.wal_dir, new_log_number), @@ -4595,13 +4478,6 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, } } -inline void DBImpl::DelayLoggingAndReset() { - if (delayed_writes_ > 0) { - Log(db_options_.info_log, "delayed %d write...\n", delayed_writes_); - delayed_writes_ = 0; - } -} - #ifndef ROCKSDB_LITE Status DBImpl::GetUpdatesSince( SequenceNumber seq, unique_ptr* iter, diff --git a/db/db_impl.h b/db/db_impl.h index 69fe2eaac..c2bb48597 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -32,6 +32,7 @@ #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" #include "db/internal_stats.h" +#include "db/write_controller.h" namespace rocksdb { @@ -357,9 +358,6 @@ class DBImpl : public DB { Status WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); - - uint64_t SlowdownAmount(int n, double bottom, double top); - // Information kept for every waiting writer struct Writer { Status status; @@ -399,8 +397,9 @@ class DBImpl : public DB { // See also: BeginWrite void EndWrite(Writer* w, Writer* last_writer, Status status); - Status MakeRoomForWrite(ColumnFamilyData* cfd, - WriteContext* context, + void DelayWrite(uint64_t expiration_time); + + Status MakeRoomForWrite(ColumnFamilyData* cfd, WriteContext* context, uint64_t expiration_time); Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, @@ -557,6 +556,8 @@ class DBImpl : public DB { std::deque writers_; WriteBatch tmp_batch_; + WriteController write_controller_; + SnapshotList snapshots_; // cache for ReadFirstRecord() calls @@ -628,9 +629,6 @@ class DBImpl : public DB { static const uint64_t kNoTimeOut = std::numeric_limits::max(); std::string db_absolute_path_; - // count of the number of contiguous delaying writes - int delayed_writes_; - // The options to access storage files const EnvOptions env_options_; @@ -647,9 +645,6 @@ class DBImpl : public DB { DBImpl(const DBImpl&); void operator=(const DBImpl&); - // dump the delayed_writes_ to the log file and reset counter. - void DelayLoggingAndReset(); - // Return the earliest snapshot where seqno is visible. // Store the snapshot right before that, if any, in prev_snapshot inline SequenceNumber findEarliestVisibleSnapshot( diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index a5aa95017..60baeb5ec 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -21,6 +21,7 @@ std::string MakeKey(unsigned int num) { void BM_LogAndApply(int iters, int num_base_files) { VersionSet* vset; + WriteController wc; ColumnFamilyData* default_cfd; uint64_t fnum = 1; port::Mutex mu; @@ -47,7 +48,7 @@ void BM_LogAndApply(int iters, int num_base_files) { options.db_paths.emplace_back(dbname, 0); // The parameter of table cache is passed in as null, so any file I/O // operation is likely to fail. - vset = new VersionSet(dbname, &options, sopt, nullptr); + vset = new VersionSet(dbname, &options, sopt, nullptr, &wc); std::vector dummy; dummy.push_back(ColumnFamilyDescriptor()); ASSERT_OK(vset->Recover(dummy)); @@ -69,6 +70,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); vset->LogAndApply(default_cfd, &vedit, &mu); } + delete vset; } BENCHMARK_NAMED_PARAM(BM_LogAndApply, 1000_iters_1_file, 1000, 1) diff --git a/db/version_set.cc b/db/version_set.cc index 82183a982..7e9393e3c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1706,9 +1706,10 @@ class VersionSet::Builder { }; VersionSet::VersionSet(const std::string& dbname, const DBOptions* options, - const EnvOptions& storage_options, Cache* table_cache) + const EnvOptions& storage_options, Cache* table_cache, + WriteController* write_controller) : column_family_set_(new ColumnFamilySet(dbname, options, storage_options, - table_cache)), + table_cache, write_controller)), env_(options->env), dbname_(dbname), options_(options), @@ -2411,7 +2412,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, std::shared_ptr tc(NewLRUCache( options->max_open_files - 10, options->table_cache_numshardbits, options->table_cache_remove_scan_count_limit)); - VersionSet versions(dbname, options, storage_options, tc.get()); + WriteController wc; + VersionSet versions(dbname, options, storage_options, tc.get(), &wc); Status status; std::vector dummy; diff --git a/db/version_set.h b/db/version_set.h index e9747f839..bfb567036 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -34,6 +34,7 @@ #include "db/column_family.h" #include "db/log_reader.h" #include "db/file_indexer.h" +#include "db/write_controller.h" namespace rocksdb { @@ -321,8 +322,8 @@ class Version { // These are used to pick the best compaction level std::vector compaction_score_; std::vector compaction_level_; - double max_compaction_score_; // max score in l1 to ln-1 - int max_compaction_score_level_; // level on which max score occurs + double max_compaction_score_ = 0.0; // max score in l1 to ln-1 + int max_compaction_score_level_ = 0; // level on which max score occurs // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. @@ -357,7 +358,8 @@ class Version { class VersionSet { public: VersionSet(const std::string& dbname, const DBOptions* options, - const EnvOptions& storage_options, Cache* table_cache); + const EnvOptions& storage_options, Cache* table_cache, + WriteController* write_controller); ~VersionSet(); // Apply *edit to the current version to form a new descriptor that diff --git a/db/write_controller.cc b/db/write_controller.cc new file mode 100644 index 000000000..bb6f8ecf7 --- /dev/null +++ b/db/write_controller.cc @@ -0,0 +1,37 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/write_controller.h" + +#include + +namespace rocksdb { + +std::unique_ptr WriteController::GetStopToken() { + ++total_stopped_; + return std::unique_ptr(new StopWriteToken(this)); +} + +std::unique_ptr WriteController::GetDelayToken( + uint64_t delay_us) { + total_delay_us_ += delay_us; + return std::unique_ptr( + new DelayWriteToken(this, delay_us)); +} + +bool WriteController::IsStopped() const { return total_stopped_ > 0; } +uint64_t WriteController::GetDelay() const { return total_delay_us_; } + +StopWriteToken::~StopWriteToken() { + assert(controller_->total_stopped_ >= 1); + --controller_->total_stopped_; +} + +DelayWriteToken::~DelayWriteToken() { + assert(controller_->total_delay_us_ >= delay_us_); + controller_->total_delay_us_ -= delay_us_; +} + +} // namespace rocksdb diff --git a/db/write_controller.h b/db/write_controller.h new file mode 100644 index 000000000..4ed221df1 --- /dev/null +++ b/db/write_controller.h @@ -0,0 +1,78 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include + +#include + +namespace rocksdb { + +class WriteControllerToken; + +// WriteController is controlling write stalls in our write code-path. Write +// stalls happen when compaction can't keep up with write rate. +// All of the methods here (including WriteControllerToken's destructors) need +// to be called while holding DB mutex +class WriteController { + public: + WriteController() : total_stopped_(0), total_delay_us_(0) {} + ~WriteController() = default; + + // When an actor (column family) requests a stop token, all writes will be + // stopped until the stop token is released (deleted) + std::unique_ptr GetStopToken(); + // When an actor (column family) requests a delay token, total delay for all + // writes will be increased by delay_us. The delay will last until delay token + // is released + std::unique_ptr GetDelayToken(uint64_t delay_us); + + // these two metods are querying the state of the WriteController + bool IsStopped() const; + uint64_t GetDelay() const; + + private: + friend class WriteControllerToken; + friend class StopWriteToken; + friend class DelayWriteToken; + + int total_stopped_; + uint64_t total_delay_us_; +}; + +class WriteControllerToken { + public: + explicit WriteControllerToken(WriteController* controller) + : controller_(controller) {} + virtual ~WriteControllerToken() = default; + + protected: + WriteController* controller_; + + private: + // no copying allowed + WriteControllerToken(const WriteControllerToken&) = delete; + void operator=(const WriteControllerToken&) = delete; +}; + +class StopWriteToken : public WriteControllerToken { + public: + explicit StopWriteToken(WriteController* controller) + : WriteControllerToken(controller) {} + ~StopWriteToken(); +}; + +class DelayWriteToken : public WriteControllerToken { + public: + DelayWriteToken(WriteController* controller, uint64_t delay_us) + : WriteControllerToken(controller), delay_us_(delay_us) {} + ~DelayWriteToken(); + + private: + uint64_t delay_us_; +}; + +} // namespace rocksdb diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc new file mode 100644 index 000000000..1cec9658d --- /dev/null +++ b/db/write_controller_test.cc @@ -0,0 +1,40 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include "db/write_controller.h" + +#include "util/testharness.h" + +namespace rocksdb { + +class WriteControllerTest {}; + +TEST(WriteControllerTest, SanityTest) { + WriteController controller; + auto stop_token_1 = controller.GetStopToken(); + auto stop_token_2 = controller.GetStopToken(); + + ASSERT_EQ(true, controller.IsStopped()); + stop_token_1.reset(); + ASSERT_EQ(true, controller.IsStopped()); + stop_token_2.reset(); + ASSERT_EQ(false, controller.IsStopped()); + + auto delay_token_1 = controller.GetDelayToken(5); + ASSERT_EQ(static_cast(5), controller.GetDelay()); + auto delay_token_2 = controller.GetDelayToken(8); + ASSERT_EQ(static_cast(13), controller.GetDelay()); + + delay_token_2.reset(); + ASSERT_EQ(static_cast(5), controller.GetDelay()); + delay_token_1.reset(); + ASSERT_EQ(static_cast(0), controller.GetDelay()); + delay_token_1.reset(); + ASSERT_EQ(false, controller.IsStopped()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fbb3b6ddb..fc5e039a7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -347,9 +347,7 @@ struct ColumnFamilyOptions { // Default: 0 (disabled) double hard_rate_limit; - // Max time a put will be stalled when hard_rate_limit is enforced. If 0, then - // there is no limit. - // Default: 1000 + // DEPRECATED -- this options is no longer used unsigned int rate_limit_delay_max_milliseconds; // size of one block in arena memory allocation. diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index a7f2c1408..87ac321c9 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -212,7 +212,6 @@ enum Histograms : uint32_t { READ_BLOCK_COMPACTION_MICROS, READ_BLOCK_GET_MICROS, WRITE_RAW_BLOCK_MICROS, - STALL_L0_SLOWDOWN_COUNT, STALL_MEMTABLE_COMPACTION_COUNT, STALL_L0_NUM_FILES_COUNT, @@ -220,6 +219,7 @@ enum Histograms : uint32_t { SOFT_RATE_LIMIT_DELAY_COUNT, NUM_FILES_IN_SINGLE_COMPACTION, DB_SEEK, + WRITE_STALL, HISTOGRAM_ENUM_MAX, }; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 53e15e0ba..9f00757b8 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -564,7 +564,8 @@ void ManifestDumpCommand::DoCommand() { // if VersionSet::DumpManifest() depends on any option done by // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back("dummy", 0); - VersionSet versions(dbname, &options, sopt, tc.get()); + WriteController wc; + VersionSet versions(dbname, &options, sopt, tc.get(), &wc); Status s = versions.DumpManifest(options, file, verbose_, is_key_hex_); if (!s.ok()) { printf("Error in processing file %s %s\n", manifestfile.c_str(), @@ -1089,7 +1090,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits, opt.table_cache_remove_scan_count_limit)); const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, soptions, tc.get()); + WriteController wc; + VersionSet versions(db_path_, &opt, soptions, tc.get(), &wc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt));