Push- instead of pull-model for managing Write stalls

Summary:
Introducing WriteController, which is a source of truth about per-DB write delays. Let's define an DB epoch as a period where there are no flushes and compactions (i.e. new epoch is started when flush or compaction finishes). Each epoch can either:
* proceed with all writes without delay
* delay all writes by fixed time
* stop all writes

The three modes are recomputed at each epoch change (flush, compaction), rather than on every write (which is currently the case).

When we have a lot of column families, our current pull behavior adds a big overhead, since we need to loop over every column family for every write. With new push model, overhead on Write code-path is minimal.

This is just the start. Next step is to also take care of stalls introduced by slow memtable flushes. The final goal is to eliminate function MakeRoomForWrite(), which currently needs to be called for every column family by every write.

Test Plan: make check for now. I'll add some unit tests later. Also, perf test.

Reviewers: dhruba, yhchiang, MarkCallaghan, sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22791
main
Igor Canadi 11 years ago
parent 0af157f9bf
commit a2bb7c3c33
  1. 5
      HISTORY.md
  2. 6
      Makefile
  3. 130
      db/column_family.cc
  4. 54
      db/column_family.h
  5. 180
      db/db_impl.cc
  6. 17
      db/db_impl.h
  7. 4
      db/log_and_apply_bench.cc
  8. 8
      db/version_set.cc
  9. 8
      db/version_set.h
  10. 37
      db/write_controller.cc
  11. 78
      db/write_controller.h
  12. 40
      db/write_controller_test.cc
  13. 4
      include/rocksdb/options.h
  14. 2
      include/rocksdb/statistics.h
  15. 6
      util/ldb_cmd.cc

@ -1,6 +1,9 @@
# Rocksdb Change Log
## Unreleased
## Unreleased (will be released with 3.6)
### Behavior changes
* We have refactored our system of stalling writes. Any stall-related statistics' meanings are changed. Instead of per-write stall counts, we now count stalls per-epoch, where epochs are periods between flushes and compactions. You'll find more information in our Tuning Perf Guide once we release RocksDB 3.6.
----- Past Releases -----

@ -112,7 +112,8 @@ TESTS = \
version_edit_test \
version_set_test \
file_indexer_test \
write_batch_test\
write_batch_test \
write_controller_test\
deletefile_test \
table_test \
thread_local_test \
@ -427,6 +428,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -9,6 +9,11 @@
#include "db/column_family.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <vector>
#include <string>
#include <algorithm>
@ -19,11 +24,42 @@
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
#include "db/table_properties_collector.h"
#include "db/write_controller.h"
#include "util/autovector.h"
#include "util/hash_skiplist_rep.h"
namespace rocksdb {
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
// (top - bottom)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
uint64_t delay;
if (n >= top) {
delay = 1000;
} else if (n < bottom) {
delay = 0;
} else {
// If we are here, we know that:
// level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false.
double how_much = static_cast<double>(n - bottom) / (top - bottom);
delay = std::max(how_much * how_much * 1000, 100.0);
}
assert(delay <= 1000);
return delay;
}
} // namespace
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
DBImpl* db, port::Mutex* mutex)
: cfd_(cfd), db_(db), mutex_(mutex) {
@ -197,7 +233,6 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false),
column_family_set_(column_family_set) {
Ref();
@ -278,31 +313,62 @@ ColumnFamilyData::~ColumnFamilyData() {
}
void ColumnFamilyData::RecalculateWriteStallConditions() {
need_wait_for_num_memtables_ =
(imm()->size() == options()->max_write_buffer_number - 1);
if (current_ != nullptr) {
need_wait_for_num_level0_files_ =
(current_->NumLevelFiles(0) >= options()->level0_stop_writes_trigger);
} else {
need_wait_for_num_level0_files_ = false;
}
RecalculateWriteStallRateLimitsConditions();
}
void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() {
if (current_ != nullptr) {
exceeds_hard_rate_limit_ =
(options()->hard_rate_limit > 1.0 &&
current_->MaxCompactionScore() > options()->hard_rate_limit);
exceeds_soft_rate_limit_ =
(options()->soft_rate_limit > 0.0 &&
current_->MaxCompactionScore() > options()->soft_rate_limit);
} else {
exceeds_hard_rate_limit_ = false;
exceeds_soft_rate_limit_ = false;
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();
auto write_controller = column_family_set_->write_controller_;
if (imm()->size() == options_.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(options_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
} else if (options_.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(options_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
} else if (current_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(options_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
} else if (options_.hard_rate_limit > 1.0 &&
score > options_.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
write_controller_token_ =
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(options_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
} else if (options_.soft_rate_limit > 0.0 &&
score > options_.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit,
options_.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(options_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
} else {
write_controller_token_.reset();
}
}
}
@ -310,12 +376,7 @@ const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->env_options_);
}
void ColumnFamilyData::SetCurrent(Version* current) {
current_ = current;
need_slowdown_for_num_level0_files_ =
(options_.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}
void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; }
void ColumnFamilyData::CreateNewMemtable() {
assert(current_ != nullptr);
@ -328,7 +389,6 @@ void ColumnFamilyData::CreateNewMemtable() {
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
RecalculateWriteStallRateLimitsConditions();
return result;
}
@ -464,16 +524,18 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Cache* table_cache)
Cache* table_cache,
WriteController* write_controller)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
env_options_, nullptr)),
env_options, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_controller_(write_controller),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;

@ -19,6 +19,7 @@
#include "rocksdb/env.h"
#include "db/memtable_list.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
@ -156,6 +157,7 @@ class ColumnFamilyData {
// can't drop default CF
assert(id_ != 0);
dropped_ = true;
write_controller_token_.reset();
}
bool IsDropped() const { return dropped_; }
@ -225,35 +227,12 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions();
// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
bool NeedSlowdownForNumLevel0Files() const {
return need_slowdown_for_num_level0_files_;
}
bool NeedWaitForNumLevel0Files() const {
return need_wait_for_num_level0_files_;
}
bool NeedWaitForNumMemtables() const {
return need_wait_for_num_memtables_;
}
bool ExceedsSoftRateLimit() const {
return exceeds_soft_rate_limit_;
}
bool ExceedsHardRateLimit() const {
return exceeds_hard_rate_limit_;
}
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& env_options,
const DBOptions* db_options, const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
// Recalculate some small conditions, which are changed only during
@ -262,7 +241,6 @@ class ColumnFamilyData {
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallRateLimitsConditions();
uint32_t id_;
const std::string name_;
@ -304,31 +282,13 @@ class ColumnFamilyData {
// recovered from
uint64_t log_number_;
// A flag indicating whether we should delay writes because
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;
// These 4 variables are updated only after compaction,
// adding new memtable, flushing memtables to files
// and/or add recalculation of compaction score.
// That's why theirs values are cached in ColumnFamilyData.
// Recalculation is made by RecalculateWriteStallConditions and
// RecalculateWriteStallRateLimitsConditions function. They are used
// in DBImpl::MakeRoomForWrite function to decide, if it need
// to sleep during write operation
bool need_wait_for_num_memtables_;
bool need_wait_for_num_level0_files_;
bool exceeds_hard_rate_limit_;
bool exceeds_soft_rate_limit_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
ColumnFamilySet* column_family_set_;
std::unique_ptr<WriteControllerToken> write_controller_token_;
};
// ColumnFamilySet has interesting thread-safety requirements
@ -370,7 +330,8 @@ class ColumnFamilySet {
};
ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache);
const EnvOptions& env_options, Cache* table_cache,
WriteController* write_controller);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
@ -425,6 +386,7 @@ class ColumnFamilySet {
const DBOptions* const db_options_;
const EnvOptions env_options_;
Cache* table_cache_;
WriteController* write_controller_;
std::atomic_flag spin_lock_;
};

@ -344,7 +344,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_stats_dump_time_microsec_(0),
default_interval_to_delete_obsolete_WAL_(600),
flush_on_destroy_(false),
delayed_writes_(0),
env_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false),
@ -360,8 +359,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits,
db_options_.table_cache_remove_scan_count_limit);
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get()));
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -3988,6 +3987,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
}
if (write_controller_.IsStopped() || write_controller_.GetDelay() > 0) {
DelayWrite(expiration_time);
}
if (LIKELY(single_column_family_mode_)) {
// fast path
status = MakeRoomForWrite(default_cf_handle_->cfd(),
@ -4189,36 +4192,28 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
}
}
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
// (top - bottom)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
uint64_t delay;
if (n >= top) {
delay = 1000;
}
else if (n < bottom) {
delay = 0;
}
else {
// If we are here, we know that:
// level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false.
double how_much =
(double) (n - bottom) /
(top - bottom);
delay = std::max(how_much * how_much * 1000, 100.0);
}
assert(delay <= 1000);
return delay;
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::DelayWrite(uint64_t expiration_time) {
StopWatch sw(env_, stats_, WRITE_STALL);
bool has_timeout = (expiration_time > 0);
auto delay = write_controller_.GetDelay();
if (write_controller_.IsStopped() == false && delay > 0) {
mutex_.Unlock();
env_->SleepForMicroseconds(delay);
mutex_.Lock();
}
while (write_controller_.IsStopped()) {
if (has_timeout) {
bg_cv_.TimedWait(expiration_time);
if (env_->NowMicros() > expiration_time) {
break;
}
} else {
bg_cv_.Wait();
}
}
}
// REQUIRES: mutex_ is held
@ -4228,16 +4223,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd,
uint64_t expiration_time) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = true;
bool allow_hard_rate_limit_delay = true;
bool allow_soft_rate_limit_delay = true;
uint64_t rate_limit_delay_millis = 0;
Status s;
double score;
// Once we schedule background work, we shouldn't schedule it again, since it
// might generate a tight feedback loop, constantly scheduling more background
// work, even if additional background work is not needed
bool schedule_background_work = true;
bool has_timeout = (expiration_time > 0);
while (true) {
@ -4248,111 +4234,9 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd,
} else if (has_timeout && env_->NowMicros() > expiration_time) {
s = Status::TimedOut();
break;
} else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 0-1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
uint64_t slowdown =
SlowdownAmount(cfd->current()->NumLevelFiles(0),
cfd->options()->level0_slowdown_writes_trigger,
cfd->options()->level0_stop_writes_trigger);
mutex_.Unlock();
uint64_t delayed;
{
StopWatch sw(env_, stats_, STALL_L0_SLOWDOWN_COUNT, &delayed);
env_->SleepForMicroseconds(slowdown);
}
RecordTick(stats_, STALL_L0_SLOWDOWN_MICROS, delayed);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
cfd->internal_stats()->AddCFStats(
InternalStats::LEVEL0_SLOWDOWN, delayed);
delayed_writes_++;
} else if (!cfd->mem()->ShouldFlush()) {
// There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();
}
break;
} else if (cfd->NeedWaitForNumMemtables()) {
// We have filled up the current memtable, but the previous
// ones are still being flushed, so we wait.
DelayLoggingAndReset();
Log(db_options_.info_log, "[%s] wait for memtable flush...\n",
cfd->GetName().c_str());
if (schedule_background_work) {
MaybeScheduleFlushOrCompaction();
schedule_background_work = false;
}
uint64_t stall;
{
StopWatch sw(env_, stats_, STALL_MEMTABLE_COMPACTION_COUNT, &stall);
if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
}
RecordTick(stats_, STALL_MEMTABLE_COMPACTION_MICROS, stall);
cfd->internal_stats()->AddCFStats(
InternalStats::MEMTABLE_COMPACTION, stall);
} else if (cfd->NeedWaitForNumLevel0Files()) {
DelayLoggingAndReset();
Log(db_options_.info_log, "[%s] wait for fewer level0 files...\n",
cfd->GetName().c_str());
uint64_t stall;
{
StopWatch sw(env_, stats_, STALL_L0_NUM_FILES_COUNT, &stall);
if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
}
RecordTick(stats_, STALL_L0_NUM_FILES_MICROS, stall);
cfd->internal_stats()->AddCFStats(
InternalStats::LEVEL0_NUM_FILES, stall);
} else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) {
// Delay a write when the compaction score for any level is too large.
const int max_level = cfd->current()->MaxCompactionScoreLevel();
score = cfd->current()->MaxCompactionScore();
mutex_.Unlock();
uint64_t delayed;
{
StopWatch sw(env_, stats_, HARD_RATE_LIMIT_DELAY_COUNT, &delayed);
env_->SleepForMicroseconds(1000);
}
// Make sure the following value doesn't round to zero.
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
rate_limit_delay_millis += rate_limit;
RecordTick(stats_, RATE_LIMIT_DELAY_MILLIS, rate_limit);
if (cfd->options()->rate_limit_delay_max_milliseconds > 0 &&
rate_limit_delay_millis >=
(unsigned)cfd->options()->rate_limit_delay_max_milliseconds) {
allow_hard_rate_limit_delay = false;
}
mutex_.Lock();
cfd->internal_stats()->RecordLevelNSlowdown(max_level, delayed, false);
} else if (allow_soft_rate_limit_delay && cfd->ExceedsSoftRateLimit()) {
const int max_level = cfd->current()->MaxCompactionScoreLevel();
score = cfd->current()->MaxCompactionScore();
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit,
cfd->options()->hard_rate_limit);
uint64_t elapsed = 0;
mutex_.Unlock();
{
StopWatch sw(env_, stats_, SOFT_RATE_LIMIT_DELAY_COUNT, &elapsed);
env_->SleepForMicroseconds(slowdown);
rate_limit_delay_millis += slowdown;
}
allow_soft_rate_limit_delay = false;
mutex_.Lock();
cfd->internal_stats()->RecordLevelNSlowdown(max_level, elapsed, true);
} else {
s = SetNewMemtableAndNewLogFile(cfd, context);
if (!s.ok()) {
@ -4383,7 +4267,6 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
mutex_.Unlock();
Status s;
{
DelayLoggingAndReset();
if (creating_new_log) {
s = env_->NewWritableFile(
LogFileName(db_options_.wal_dir, new_log_number),
@ -4595,13 +4478,6 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
}
}
inline void DBImpl::DelayLoggingAndReset() {
if (delayed_writes_ > 0) {
Log(db_options_.info_log, "delayed %d write...\n", delayed_writes_);
delayed_writes_ = 0;
}
}
#ifndef ROCKSDB_LITE
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,

@ -32,6 +32,7 @@
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
namespace rocksdb {
@ -357,9 +358,6 @@ class DBImpl : public DB {
Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber,
LogBuffer* log_buffer);
uint64_t SlowdownAmount(int n, double bottom, double top);
// Information kept for every waiting writer
struct Writer {
Status status;
@ -399,8 +397,9 @@ class DBImpl : public DB {
// See also: BeginWrite
void EndWrite(Writer* w, Writer* last_writer, Status status);
Status MakeRoomForWrite(ColumnFamilyData* cfd,
WriteContext* context,
void DelayWrite(uint64_t expiration_time);
Status MakeRoomForWrite(ColumnFamilyData* cfd, WriteContext* context,
uint64_t expiration_time);
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
@ -557,6 +556,8 @@ class DBImpl : public DB {
std::deque<Writer*> writers_;
WriteBatch tmp_batch_;
WriteController write_controller_;
SnapshotList snapshots_;
// cache for ReadFirstRecord() calls
@ -628,9 +629,6 @@ class DBImpl : public DB {
static const uint64_t kNoTimeOut = std::numeric_limits<uint64_t>::max();
std::string db_absolute_path_;
// count of the number of contiguous delaying writes
int delayed_writes_;
// The options to access storage files
const EnvOptions env_options_;
@ -647,9 +645,6 @@ class DBImpl : public DB {
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
// dump the delayed_writes_ to the log file and reset counter.
void DelayLoggingAndReset();
// Return the earliest snapshot where seqno is visible.
// Store the snapshot right before that, if any, in prev_snapshot
inline SequenceNumber findEarliestVisibleSnapshot(

@ -21,6 +21,7 @@ std::string MakeKey(unsigned int num) {
void BM_LogAndApply(int iters, int num_base_files) {
VersionSet* vset;
WriteController wc;
ColumnFamilyData* default_cfd;
uint64_t fnum = 1;
port::Mutex mu;
@ -47,7 +48,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
options.db_paths.emplace_back(dbname, 0);
// The parameter of table cache is passed in as null, so any file I/O
// operation is likely to fail.
vset = new VersionSet(dbname, &options, sopt, nullptr);
vset = new VersionSet(dbname, &options, sopt, nullptr, &wc);
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
ASSERT_OK(vset->Recover(dummy));
@ -69,6 +70,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
vset->LogAndApply(default_cfd, &vedit, &mu);
}
delete vset;
}
BENCHMARK_NAMED_PARAM(BM_LogAndApply, 1000_iters_1_file, 1000, 1)

@ -1706,9 +1706,10 @@ class VersionSet::Builder {
};
VersionSet::VersionSet(const std::string& dbname, const DBOptions* options,
const EnvOptions& storage_options, Cache* table_cache)
const EnvOptions& storage_options, Cache* table_cache,
WriteController* write_controller)
: column_family_set_(new ColumnFamilySet(dbname, options, storage_options,
table_cache)),
table_cache, write_controller)),
env_(options->env),
dbname_(dbname),
options_(options),
@ -2411,7 +2412,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
std::shared_ptr<Cache> tc(NewLRUCache(
options->max_open_files - 10, options->table_cache_numshardbits,
options->table_cache_remove_scan_count_limit));
VersionSet versions(dbname, options, storage_options, tc.get());
WriteController wc;
VersionSet versions(dbname, options, storage_options, tc.get(), &wc);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;

@ -34,6 +34,7 @@
#include "db/column_family.h"
#include "db/log_reader.h"
#include "db/file_indexer.h"
#include "db/write_controller.h"
namespace rocksdb {
@ -321,8 +322,8 @@ class Version {
// These are used to pick the best compaction level
std::vector<double> compaction_score_;
std::vector<int> compaction_level_;
double max_compaction_score_; // max score in l1 to ln-1
int max_compaction_score_level_; // level on which max score occurs
double max_compaction_score_ = 0.0; // max score in l1 to ln-1
int max_compaction_score_level_ = 0; // level on which max score occurs
// A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
@ -357,7 +358,8 @@ class Version {
class VersionSet {
public:
VersionSet(const std::string& dbname, const DBOptions* options,
const EnvOptions& storage_options, Cache* table_cache);
const EnvOptions& storage_options, Cache* table_cache,
WriteController* write_controller);
~VersionSet();
// Apply *edit to the current version to form a new descriptor that

@ -0,0 +1,37 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/write_controller.h"
#include <cassert>
namespace rocksdb {
std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
++total_stopped_;
return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
}
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t delay_us) {
total_delay_us_ += delay_us;
return std::unique_ptr<WriteControllerToken>(
new DelayWriteToken(this, delay_us));
}
bool WriteController::IsStopped() const { return total_stopped_ > 0; }
uint64_t WriteController::GetDelay() const { return total_delay_us_; }
StopWriteToken::~StopWriteToken() {
assert(controller_->total_stopped_ >= 1);
--controller_->total_stopped_;
}
DelayWriteToken::~DelayWriteToken() {
assert(controller_->total_delay_us_ >= delay_us_);
controller_->total_delay_us_ -= delay_us_;
}
} // namespace rocksdb

@ -0,0 +1,78 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <stdint.h>
#include <memory>
namespace rocksdb {
class WriteControllerToken;
// WriteController is controlling write stalls in our write code-path. Write
// stalls happen when compaction can't keep up with write rate.
// All of the methods here (including WriteControllerToken's destructors) need
// to be called while holding DB mutex
class WriteController {
public:
WriteController() : total_stopped_(0), total_delay_us_(0) {}
~WriteController() = default;
// When an actor (column family) requests a stop token, all writes will be
// stopped until the stop token is released (deleted)
std::unique_ptr<WriteControllerToken> GetStopToken();
// When an actor (column family) requests a delay token, total delay for all
// writes will be increased by delay_us. The delay will last until delay token
// is released
std::unique_ptr<WriteControllerToken> GetDelayToken(uint64_t delay_us);
// these two metods are querying the state of the WriteController
bool IsStopped() const;
uint64_t GetDelay() const;
private:
friend class WriteControllerToken;
friend class StopWriteToken;
friend class DelayWriteToken;
int total_stopped_;
uint64_t total_delay_us_;
};
class WriteControllerToken {
public:
explicit WriteControllerToken(WriteController* controller)
: controller_(controller) {}
virtual ~WriteControllerToken() = default;
protected:
WriteController* controller_;
private:
// no copying allowed
WriteControllerToken(const WriteControllerToken&) = delete;
void operator=(const WriteControllerToken&) = delete;
};
class StopWriteToken : public WriteControllerToken {
public:
explicit StopWriteToken(WriteController* controller)
: WriteControllerToken(controller) {}
~StopWriteToken();
};
class DelayWriteToken : public WriteControllerToken {
public:
DelayWriteToken(WriteController* controller, uint64_t delay_us)
: WriteControllerToken(controller), delay_us_(delay_us) {}
~DelayWriteToken();
private:
uint64_t delay_us_;
};
} // namespace rocksdb

@ -0,0 +1,40 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "db/write_controller.h"
#include "util/testharness.h"
namespace rocksdb {
class WriteControllerTest {};
TEST(WriteControllerTest, SanityTest) {
WriteController controller;
auto stop_token_1 = controller.GetStopToken();
auto stop_token_2 = controller.GetStopToken();
ASSERT_EQ(true, controller.IsStopped());
stop_token_1.reset();
ASSERT_EQ(true, controller.IsStopped());
stop_token_2.reset();
ASSERT_EQ(false, controller.IsStopped());
auto delay_token_1 = controller.GetDelayToken(5);
ASSERT_EQ(static_cast<uint64_t>(5), controller.GetDelay());
auto delay_token_2 = controller.GetDelayToken(8);
ASSERT_EQ(static_cast<uint64_t>(13), controller.GetDelay());
delay_token_2.reset();
ASSERT_EQ(static_cast<uint64_t>(5), controller.GetDelay());
delay_token_1.reset();
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay());
delay_token_1.reset();
ASSERT_EQ(false, controller.IsStopped());
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

@ -347,9 +347,7 @@ struct ColumnFamilyOptions {
// Default: 0 (disabled)
double hard_rate_limit;
// Max time a put will be stalled when hard_rate_limit is enforced. If 0, then
// there is no limit.
// Default: 1000
// DEPRECATED -- this options is no longer used
unsigned int rate_limit_delay_max_milliseconds;
// size of one block in arena memory allocation.

@ -212,7 +212,6 @@ enum Histograms : uint32_t {
READ_BLOCK_COMPACTION_MICROS,
READ_BLOCK_GET_MICROS,
WRITE_RAW_BLOCK_MICROS,
STALL_L0_SLOWDOWN_COUNT,
STALL_MEMTABLE_COMPACTION_COUNT,
STALL_L0_NUM_FILES_COUNT,
@ -220,6 +219,7 @@ enum Histograms : uint32_t {
SOFT_RATE_LIMIT_DELAY_COUNT,
NUM_FILES_IN_SINGLE_COMPACTION,
DB_SEEK,
WRITE_STALL,
HISTOGRAM_ENUM_MAX,
};

@ -564,7 +564,8 @@ void ManifestDumpCommand::DoCommand() {
// if VersionSet::DumpManifest() depends on any option done by
// SanitizeOptions(), we need to initialize it manually.
options.db_paths.emplace_back("dummy", 0);
VersionSet versions(dbname, &options, sopt, tc.get());
WriteController wc;
VersionSet versions(dbname, &options, sopt, tc.get(), &wc);
Status s = versions.DumpManifest(options, file, verbose_, is_key_hex_);
if (!s.ok()) {
printf("Error in processing file %s %s\n", manifestfile.c_str(),
@ -1089,7 +1090,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits,
opt.table_cache_remove_scan_count_limit));
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, tc.get());
WriteController wc;
VersionSet versions(db_path_, &opt, soptions, tc.get(), &wc);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt));

Loading…
Cancel
Save