WriteOptions.low_pri which can throttle low pri writes if needed

Summary:
If ReadOptions.low_pri=true and compaction is behind, the write will either return immediate or be slowed down based on ReadOptions.no_slowdown.
Closes https://github.com/facebook/rocksdb/pull/2369

Differential Revision: D5127619

Pulled By: siying

fbshipit-source-id: d30e1cff515890af0eff32dfb869d2e4c9545eb0
main
Siying Dong 8 years ago committed by Facebook Github Bot
parent 26a8a80711
commit 52a7f38b19
  1. 1
      HISTORY.md
  2. 6
      db/column_family.cc
  3. 7
      db/db_impl.cc
  4. 5
      db/db_impl.h
  5. 38
      db/db_impl_write.cc
  6. 1
      db/db_test.cc
  7. 50
      db/db_test2.cc
  8. 8
      db/write_controller.cc
  9. 16
      db/write_controller.h
  10. 12
      include/rocksdb/options.h
  11. 2
      util/rate_limiter.cc

@ -10,6 +10,7 @@
* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
* Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit.
* Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit.
* Introduce WriteOptions.low_pri. If it is true, low priority writes will be throttled if the compaction is behind.
## 5.5.0 (05/17/2017)
### New Features

@ -760,6 +760,12 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
uint64_t write_rate = write_controller->delayed_write_rate();
write_controller->set_delayed_write_rate(static_cast<uint64_t>(
static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
// Set the low pri limit to be 1/4 the delayed write rate.
// Note we don't reset this value even after delay condition is relased.
// Low-pri rate will continue to apply if there is a compaction
// pressure.
write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
4);
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;

@ -134,6 +134,8 @@ void DumpSupportInfo(Logger* logger) {
ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d",
crc32c::IsFastCrc32Supported());
}
int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
} // namespace
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
@ -159,6 +161,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate),
// Use delayed_write_rate as a base line to determine the initial
// low pri write rate limit. It may be adjusted later.
low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min(
static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8),
kDefaultLowPriThrottledRate))),
last_batch_group_size_(0),
unscheduled_flushes_(0),
unscheduled_compactions_(0),

@ -722,6 +722,9 @@ class DBImpl : public DB {
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch);
Status ScheduleFlushes(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
@ -941,6 +944,8 @@ class DBImpl : public DB {
WriteController write_controller_;
unique_ptr<RateLimiter> low_pri_write_rate_limiter_;
// Size of the last batch group. In slowdown mode, next write needs to
// sleep if it uses up the quota.
uint64_t last_batch_group_size_;

@ -66,13 +66,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::Corruption("Batch is nullptr!");
}
Status status;
if (write_options.low_pri) {
status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!status.ok()) {
return status;
}
}
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable);
}
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
@ -742,6 +748,34 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
return bg_error_;
}
Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch) {
assert(write_options.low_pri);
// This is called outside the DB mutex. Although it is safe to make the call,
// the consistency condition is not guaranteed to hold. It's OK to live with
// it in this case.
// If we need to speed compaction, it means the compaction is left behind
// and we start to limit low pri writes to a limit.
if (write_controller_.NeedSpeedupCompaction()) {
if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
// For 2PC, we only rate limit prepare, not commit.
return Status::OK();
}
if (write_options.no_slowdown) {
return Status::Incomplete();
} else {
assert(my_batch != nullptr);
// Rate limit those writes. The reason that we don't completely wait
// is that in case the write is heavy, low pri writes may never have
// a chance to run. Now we guarantee we are still slowly making
// progress.
write_controller_.low_pri_rate_limiter()->Request(my_batch->GetDataSize(),
Env::IO_HIGH, nullptr);
}
}
return Status::OK();
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {

@ -5155,7 +5155,6 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
// now it's done
ASSERT_TRUE(done.load());
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -2186,6 +2186,56 @@ TEST_F(DBTest2, MemtableOnlyIterator) {
ASSERT_EQ(1, count);
delete it;
}
TEST_F(DBTest2, LowPriWrite) {
Options options = CurrentOptions();
// Compaction pressure should trigger since 6 files
options.level0_file_num_compaction_trigger = 4;
options.level0_slowdown_writes_trigger = 12;
options.level0_stop_writes_trigger = 30;
options.delayed_write_rate = 8 * 1024 * 1024;
Reopen(options);
std::atomic<int> rate_limit_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::Request:1", [&](void* arg) {
rate_limit_count.fetch_add(1);
int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
});
// Block compaction
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
for (int i = 0; i < 6; i++) {
wo.low_pri = false;
Put("", "", wo);
wo.low_pri = true;
Put("", "", wo);
Flush();
}
ASSERT_EQ(0, rate_limit_count.load());
wo.low_pri = true;
Put("", "", wo);
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = false;
Put("", "", wo);
ASSERT_EQ(1, rate_limit_count.load());
TEST_SYNC_POINT("DBTest.LowPriWrite:0");
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
dbfull()->TEST_WaitForCompact();
wo.low_pri = true;
Put("", "", wo);
ASSERT_EQ(1, rate_limit_count.load());
wo.low_pri = false;
Put("", "", wo);
ASSERT_EQ(1, rate_limit_count.load());
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -36,17 +36,19 @@ WriteController::GetCompactionPressureToken() {
new CompactionPressureToken(this));
}
bool WriteController::IsStopped() const { return total_stopped_ > 0; }
bool WriteController::IsStopped() const {
return total_stopped_.load(std::memory_order_relaxed) > 0;
}
// This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
if (total_stopped_ > 0) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
if (total_delayed_.load() == 0) {
if (total_delayed_.load(std::memory_order_relaxed) == 0) {
return 0;
}

@ -11,6 +11,7 @@
#include <atomic>
#include <memory>
#include "rocksdb/rate_limiter.h"
namespace rocksdb {
@ -23,12 +24,15 @@ class WriteControllerToken;
// to be called while holding DB mutex
class WriteController {
public:
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
: total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
bytes_left_(0),
last_refill_time_(0) {
last_refill_time_(0),
low_pri_rate_limiter_(
NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
set_max_delayed_write_rate(_delayed_write_rate);
}
~WriteController() = default;
@ -80,6 +84,8 @@ class WriteController {
uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
private:
uint64_t NowMicrosMonotonic(Env* env);
@ -88,15 +94,17 @@ class WriteController {
friend class DelayWriteToken;
friend class CompactionPressureToken;
int total_stopped_;
std::atomic<int> total_stopped_;
std::atomic<int> total_delayed_;
int total_compaction_pressure_;
std::atomic<int> total_compaction_pressure_;
uint64_t bytes_left_;
uint64_t last_refill_time_;
// write rate set when initialization or by `DBImpl::SetDBOptions`
uint64_t max_delayed_write_rate_;
// current write rate
uint64_t delayed_write_rate_;
std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
};
class WriteControllerToken {

@ -1079,11 +1079,21 @@ struct WriteOptions {
// immediately with Status::Incomplete().
bool no_slowdown;
// If true, this write request is of lower priority if compaction is
// behind. In this case, no_slowdown = true, the request will be cancelled
// immediately with Status::Incomplete() returned. Otherwise, it will be
// slowed down. The slowdown value is determined by RocksDB to guarantee
// it introduces minimum impacts to high priority writes.
//
// Default: false
bool low_pri;
WriteOptions()
: sync(false),
disableWAL(false),
ignore_missing_column_families(false),
no_slowdown(false) {}
no_slowdown(false),
low_pri(false) {}
};
// Options that control flush operations

@ -79,6 +79,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Statistics* stats) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
TEST_SYNC_POINT("GenericRateLimiter::Request");
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
&rate_bytes_per_sec_);
MutexLock g(&request_mutex_);
if (stop_) {
return;

Loading…
Cancel
Save