integrate rate limiter into rocksdb

Summary:
Add option and plugin rate limiter for PosixWritableFile. The rate
limiter only applies to flush and compaction. WAL and MANIFEST are
excluded from this enforcement.

Test Plan: db_test

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D19425
main
Lei Jin 11 years ago
parent 5ef1ba7ff5
commit 534357ca3a
  1. 1
      HISTORY.md
  2. 4
      db/builder.cc
  3. 4
      db/builder.h
  4. 5
      db/db_impl.cc
  5. 70
      db/db_test.cc
  6. 20
      include/rocksdb/env.h
  7. 18
      include/rocksdb/options.h
  8. 1
      util/env.cc
  9. 22
      util/env_posix.cc
  10. 2
      util/options.cc

@ -6,6 +6,7 @@
* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory().
* RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily. * RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily.
* Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition. * Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition.
* Add a rate_limiter option, which controls total throughput of flush and compaction. The throughput is specified in bytes/sec. Flush always has precedence over compaction when available bandwidth is constrained.
## 3.2.0 (06/20/2014) ## 3.2.0 (06/20/2014)

@ -40,7 +40,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable, const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression) { const CompressionType compression,
const Env::IOPriority io_priority) {
Status s; Status s;
meta->fd.file_size = 0; meta->fd.file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0; meta->smallest_seqno = meta->largest_seqno = 0;
@ -62,6 +63,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
file->SetIOPriority(io_priority);
TableBuilder* builder = TableBuilder* builder =
NewTableBuilder(options, internal_comparator, file.get(), compression); NewTableBuilder(options, internal_comparator, file.get(), compression);

@ -7,6 +7,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -40,6 +41,7 @@ extern Status BuildTable(const std::string& dbname, Env* env,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable, const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression); const CompressionType compression,
const Env::IOPriority io_priority = Env::IO_HIGH);
} // namespace rocksdb } // namespace rocksdb

@ -1406,7 +1406,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(), cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable, newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->options())); GetCompressionFlush(*cfd->options()), Env::IO_HIGH);
LogFlush(options_.info_log); LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
} }
@ -1473,7 +1473,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(), cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable, newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->options())); GetCompressionFlush(*cfd->options()), Env::IO_HIGH);
LogFlush(options_.info_log); LogFlush(options_.info_log);
delete iter; delete iter;
Log(options_.info_log, Log(options_.info_log,
@ -2385,6 +2385,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
if (s.ok()) { if (s.ok()) {
compact->outfile->SetIOPriority(Env::IO_LOW);
compact->outfile->SetPreallocationBlockSize( compact->outfile->SetPreallocationBlockSize(
compact->compaction->OutputFilePreallocationSize()); compact->compaction->OutputFilePreallocationSize());

@ -27,6 +27,7 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h" #include "table/plain_table_factory.h"
@ -35,6 +36,7 @@
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/statistics.h" #include "util/statistics.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -135,6 +137,8 @@ class SpecialEnv : public EnvWrapper {
anon::AtomicCounter sleep_counter_; anon::AtomicCounter sleep_counter_;
std::atomic<int64_t> bytes_written_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) { explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(nullptr); delay_sstable_sync_.Release_Store(nullptr);
no_space_.Release_Store(nullptr); no_space_.Release_Store(nullptr);
@ -144,6 +148,7 @@ class SpecialEnv : public EnvWrapper {
manifest_sync_error_.Release_Store(nullptr); manifest_sync_error_.Release_Store(nullptr);
manifest_write_error_.Release_Store(nullptr); manifest_write_error_.Release_Store(nullptr);
log_write_error_.Release_Store(nullptr); log_write_error_.Release_Store(nullptr);
bytes_written_ = 0;
} }
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r, Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
@ -163,6 +168,7 @@ class SpecialEnv : public EnvWrapper {
// Drop writes on the floor // Drop writes on the floor
return Status::OK(); return Status::OK();
} else { } else {
env_->bytes_written_ += data.size();
return base_->Append(data); return base_->Append(data);
} }
} }
@ -174,6 +180,9 @@ class SpecialEnv : public EnvWrapper {
} }
return base_->Sync(); return base_->Sync();
} }
void SetIOPriority(Env::IOPriority pri) {
base_->SetIOPriority(pri);
}
}; };
class ManifestFile : public WritableFile { class ManifestFile : public WritableFile {
private: private:
@ -7124,6 +7133,67 @@ TEST(DBTest, MTRandomTimeoutTest) {
} // anonymous namespace } // anonymous namespace
TEST(DBTest, RateLimitingTest) {
Options options = CurrentOptions();
options.write_buffer_size = 1 << 20; // 1MB
options.level0_file_num_compaction_trigger = 10;
options.target_file_size_base = 1 << 20; // 1MB
options.max_bytes_for_level_base = 10 << 20; // 10MB
options.compression = kNoCompression;
options.create_if_missing = true;
options.env = env_;
DestroyAndReopen(&options);
// # no rate limiting
Random rnd(301);
uint64_t start = env_->NowMicros();
// Write ~32M data
for (int64_t i = 0; i < (32 << 10); ++i) {
ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1)));
}
uint64_t elapsed = env_->NowMicros() - start;
double raw_rate = env_->bytes_written_ * 1000000 / elapsed;
Close();
// # rate limiting with 0.7 x threshold
options.rate_limiter.reset(
NewRateLimiter(static_cast<int64_t>(0.7 * raw_rate)));
env_->bytes_written_ = 0;
DestroyAndReopen(&options);
start = env_->NowMicros();
// Write ~32M data
for (int64_t i = 0; i < (32 << 10); ++i) {
ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1)));
}
elapsed = env_->NowMicros() - start;
Close();
ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() ==
env_->bytes_written_);
double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
ASSERT_TRUE(ratio > 0.6 && ratio < 0.8);
// # rate limiting with half of the raw_rate
options.rate_limiter.reset(
NewRateLimiter(static_cast<int64_t>(raw_rate / 2)));
env_->bytes_written_ = 0;
DestroyAndReopen(&options);
start = env_->NowMicros();
// Write ~32M data
for (int64_t i = 0; i < (32 << 10); ++i) {
ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1)));
}
elapsed = env_->NowMicros() - start;
Close();
ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() ==
env_->bytes_written_);
ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
ASSERT_TRUE(ratio > 0.4 && ratio < 0.6);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -35,6 +35,7 @@ class WritableFile;
class RandomRWFile; class RandomRWFile;
class Directory; class Directory;
struct DBOptions; struct DBOptions;
class RateLimiter;
using std::unique_ptr; using std::unique_ptr;
using std::shared_ptr; using std::shared_ptr;
@ -74,6 +75,9 @@ struct EnvOptions {
// write. By default, we set it to true for MANIFEST writes and false for // write. By default, we set it to true for MANIFEST writes and false for
// WAL writes // WAL writes
bool fallocate_with_keep_size = true; bool fallocate_with_keep_size = true;
// If not nullptr, write rate limiting is enabled for flush and compaction
RateLimiter* rate_limiter = nullptr;
}; };
class Env { class Env {
@ -379,7 +383,10 @@ class RandomAccessFile {
// at a time to the file. // at a time to the file.
class WritableFile { class WritableFile {
public: public:
WritableFile() : last_preallocated_block_(0), preallocation_block_size_ (0) { WritableFile()
: last_preallocated_block_(0),
preallocation_block_size_(0),
io_priority_(Env::IO_TOTAL) {
} }
virtual ~WritableFile(); virtual ~WritableFile();
@ -398,6 +405,14 @@ class WritableFile {
return Sync(); return Sync();
} }
/*
* Change the priority in rate limiter if rate limiting is enabled.
* If rate limiting is not enabled, this call has no effect.
*/
virtual void SetIOPriority(Env::IOPriority pri) {
io_priority_ = pri;
}
/* /*
* Get the size of valid data in the file. * Get the size of valid data in the file.
*/ */
@ -482,6 +497,9 @@ class WritableFile {
// No copying allowed // No copying allowed
WritableFile(const WritableFile&); WritableFile(const WritableFile&);
void operator=(const WritableFile&); void operator=(const WritableFile&);
protected:
Env::IOPriority io_priority_;
}; };
// A file abstraction for random reading and writing. // A file abstraction for random reading and writing.

@ -39,8 +39,7 @@ class Slice;
class SliceTransform; class SliceTransform;
class Statistics; class Statistics;
class InternalKeyComparator; class InternalKeyComparator;
class RateLimiter;
using std::shared_ptr;
// DB contents are stored in a set of blocks, each of which holds a // DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before // sequence of key,value pairs. Each block may be compressed before
@ -133,7 +132,7 @@ struct ColumnFamilyOptions {
// for the first time. It's necessary to specify a merge operator when // for the first time. It's necessary to specify a merge operator when
// openning the DB in this case. // openning the DB in this case.
// Default: nullptr // Default: nullptr
shared_ptr<MergeOperator> merge_operator; std::shared_ptr<MergeOperator> merge_operator;
// A single CompactionFilter instance to call into during compaction. // A single CompactionFilter instance to call into during compaction.
// Allows an application to modify/delete a key-value during background // Allows an application to modify/delete a key-value during background
@ -206,12 +205,12 @@ struct ColumnFamilyOptions {
// If non-NULL use the specified cache for blocks. // If non-NULL use the specified cache for blocks.
// If NULL, rocksdb will automatically create and use an 8MB internal cache. // If NULL, rocksdb will automatically create and use an 8MB internal cache.
// Default: nullptr // Default: nullptr
shared_ptr<Cache> block_cache; std::shared_ptr<Cache> block_cache;
// If non-NULL use the specified cache for compressed blocks. // If non-NULL use the specified cache for compressed blocks.
// If NULL, rocksdb will not use a compressed block cache. // If NULL, rocksdb will not use a compressed block cache.
// Default: nullptr // Default: nullptr
shared_ptr<Cache> block_cache_compressed; std::shared_ptr<Cache> block_cache_compressed;
// Approximate size of user data packed per block. Note that the // Approximate size of user data packed per block. Note that the
// block size specified here corresponds to uncompressed data. The // block size specified here corresponds to uncompressed data. The
@ -626,11 +625,16 @@ struct DBOptions {
// Default: Env::Default() // Default: Env::Default()
Env* env; Env* env;
// Use to control write rate of flush and compaction. Flush has higher
// priority than compaction. Rate limiting is disabled if nullptr.
// Default: nullptr
std::shared_ptr<RateLimiter> rate_limiter;
// Any internal progress/error information generated by the db will // Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored // be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr. // in the same directory as the DB contents if info_log is nullptr.
// Default: nullptr // Default: nullptr
shared_ptr<Logger> info_log; std::shared_ptr<Logger> info_log;
InfoLogLevel info_log_level; InfoLogLevel info_log_level;
@ -653,7 +657,7 @@ struct DBOptions {
// If non-null, then we should collect metrics about database operations // If non-null, then we should collect metrics about database operations
// Statistics objects should not be shared between DB instances as // Statistics objects should not be shared between DB instances as
// it does not use any locks to prevent concurrent updates. // it does not use any locks to prevent concurrent updates.
shared_ptr<Statistics> statistics; std::shared_ptr<Statistics> statistics;
// If true, then the contents of data files are not synced // If true, then the contents of data files are not synced
// to stable storage. Their contents remain in the OS buffers till the // to stable storage. Their contents remain in the OS buffers till the

@ -226,6 +226,7 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_writes = options.allow_mmap_writes; env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync; env_options->bytes_per_sync = options.bytes_per_sync;
env_options->rate_limiter = options.rate_limiter.get();
} }
} }

@ -33,6 +33,8 @@
#if defined(LEVELDB_PLATFORM_ANDROID) #if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h> #include <sys/stat.h>
#endif #endif
#include <signal.h>
#include <algorithm>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "port/port.h" #include "port/port.h"
@ -41,7 +43,7 @@
#include "util/posix_logger.h" #include "util/posix_logger.h"
#include "util/random.h" #include "util/random.h"
#include "util/iostats_context_imp.h" #include "util/iostats_context_imp.h"
#include <signal.h> #include "util/rate_limiter.h"
// Get nano time for mach systems // Get nano time for mach systems
#ifdef __MACH__ #ifdef __MACH__
@ -634,6 +636,7 @@ class PosixWritableFile : public WritableFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
bool fallocate_with_keep_size_; bool fallocate_with_keep_size_;
#endif #endif
RateLimiter* rate_limiter_;
public: public:
PosixWritableFile(const std::string& fname, int fd, size_t capacity, PosixWritableFile(const std::string& fname, int fd, size_t capacity,
@ -647,7 +650,8 @@ class PosixWritableFile : public WritableFile {
pending_sync_(false), pending_sync_(false),
pending_fsync_(false), pending_fsync_(false),
last_sync_size_(0), last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync) { bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate_with_keep_size_ = options.fallocate_with_keep_size; fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif #endif
@ -691,7 +695,7 @@ class PosixWritableFile : public WritableFile {
cursize_ += left; cursize_ += left;
} else { } else {
while (left != 0) { while (left != 0) {
ssize_t done = write(fd_, src, left); ssize_t done = write(fd_, src, RequestToken(left));
if (done < 0) { if (done < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@ -742,7 +746,7 @@ class PosixWritableFile : public WritableFile {
size_t left = cursize_; size_t left = cursize_;
char* src = buf_.get(); char* src = buf_.get();
while (left != 0) { while (left != 0) {
ssize_t done = write(fd_, src, left); ssize_t done = write(fd_, src, RequestToken(left));
if (done < 0) { if (done < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@ -838,6 +842,16 @@ class PosixWritableFile : public WritableFile {
return GetUniqueIdFromFile(fd_, id, max_size); return GetUniqueIdFromFile(fd_, id, max_size);
} }
#endif #endif
private:
inline size_t RequestToken(size_t bytes) {
if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
bytes = std::min(bytes,
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
rate_limiter_->Request(bytes, io_priority_);
}
return bytes;
}
}; };
class PosixRandomRWFile : public RandomRWFile { class PosixRandomRWFile : public RandomRWFile {

@ -166,6 +166,7 @@ DBOptions::DBOptions()
error_if_exists(false), error_if_exists(false),
paranoid_checks(true), paranoid_checks(true),
env(Env::Default()), env(Env::Default()),
rate_limiter(nullptr),
info_log(nullptr), info_log(nullptr),
info_log_level(INFO_LEVEL), info_log_level(INFO_LEVEL),
max_open_files(5000), max_open_files(5000),
@ -206,6 +207,7 @@ DBOptions::DBOptions(const Options& options)
error_if_exists(options.error_if_exists), error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks), paranoid_checks(options.paranoid_checks),
env(options.env), env(options.env),
rate_limiter(options.rate_limiter),
info_log(options.info_log), info_log(options.info_log),
info_log_level(options.info_log_level), info_log_level(options.info_log_level),
max_open_files(options.max_open_files), max_open_files(options.max_open_files),

Loading…
Cancel
Save