diff --git a/HISTORY.md b/HISTORY.md index a4ef5d659..a334de234 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). * RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily. * Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition. +* Add a rate_limiter option, which controls total throughput of flush and compaction. The throughput is specified in bytes/sec. Flush always has precedence over compaction when available bandwidth is constrained. ## 3.2.0 (06/20/2014) diff --git a/db/builder.cc b/db/builder.cc index 3be61bd10..c84e92ec0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -40,7 +40,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression) { + const CompressionType compression, + const Env::IOPriority io_priority) { Status s; meta->fd.file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -62,6 +63,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, if (!s.ok()) { return s; } + file->SetIOPriority(io_priority); TableBuilder* builder = NewTableBuilder(options, internal_comparator, file.get(), compression); diff --git a/db/builder.h b/db/builder.h index 68eb3fc6f..f57501abd 100644 --- a/db/builder.h +++ b/db/builder.h @@ -7,6 +7,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "rocksdb/comparator.h" +#include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "rocksdb/options.h" @@ -40,6 +41,7 @@ extern Status BuildTable(const std::string& dbname, Env* env, const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression); + const CompressionType compression, + const Env::IOPriority io_priority = Env::IO_HIGH); } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index 6862f9a77..2b96cdee5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1406,7 +1406,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options())); + GetCompressionFlush(*cfd->options()), Env::IO_HIGH); LogFlush(options_.info_log); mutex_.Lock(); } @@ -1473,7 +1473,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options())); + GetCompressionFlush(*cfd->options()), Env::IO_HIGH); LogFlush(options_.info_log); delete iter; Log(options_.info_log, @@ -2385,6 +2385,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); if (s.ok()) { + compact->outfile->SetIOPriority(Env::IO_LOW); compact->outfile->SetPreallocationBlockSize( compact->compaction->OutputFilePreallocationSize()); diff --git a/db/db_test.cc b/db/db_test.cc index 025e04f24..23f6bc1e2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -27,6 +27,7 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" +#include "rocksdb/options.h" #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" #include "table/plain_table_factory.h" @@ -35,6 +36,7 @@ #include "utilities/merge_operators.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/rate_limiter.h" #include "util/statistics.h" #include "util/testharness.h" #include "util/sync_point.h" @@ -135,6 +137,8 @@ class SpecialEnv : public EnvWrapper { anon::AtomicCounter sleep_counter_; + std::atomic bytes_written_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(nullptr); no_space_.Release_Store(nullptr); @@ -144,7 +148,8 @@ class SpecialEnv : public EnvWrapper { manifest_sync_error_.Release_Store(nullptr); manifest_write_error_.Release_Store(nullptr); log_write_error_.Release_Store(nullptr); - } + bytes_written_ = 0; + } Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& soptions) { @@ -163,6 +168,7 @@ class SpecialEnv : public EnvWrapper { // Drop writes on the floor return Status::OK(); } else { + env_->bytes_written_ += data.size(); return base_->Append(data); } } @@ -174,6 +180,9 @@ class SpecialEnv : public EnvWrapper { } return base_->Sync(); } + void SetIOPriority(Env::IOPriority pri) { + base_->SetIOPriority(pri); + } }; class ManifestFile : public WritableFile { private: @@ -7124,6 +7133,67 @@ TEST(DBTest, MTRandomTimeoutTest) { } // anonymous namespace +TEST(DBTest, RateLimitingTest) { + Options options = CurrentOptions(); + options.write_buffer_size = 1 << 20; // 1MB + options.level0_file_num_compaction_trigger = 10; + options.target_file_size_base = 1 << 20; // 1MB + options.max_bytes_for_level_base = 10 << 20; // 10MB + options.compression = kNoCompression; + options.create_if_missing = true; + options.env = env_; + DestroyAndReopen(&options); + + // # no rate limiting + Random rnd(301); + uint64_t start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + uint64_t elapsed = env_->NowMicros() - start; + double raw_rate = env_->bytes_written_ * 1000000 / elapsed; + Close(); + + // # rate limiting with 0.7 x threshold + options.rate_limiter.reset( + NewRateLimiter(static_cast(0.7 * raw_rate))); + env_->bytes_written_ = 0; + DestroyAndReopen(&options); + + start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + elapsed = env_->NowMicros() - start; + Close(); + ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() == + env_->bytes_written_); + double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; + fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio); + ASSERT_TRUE(ratio > 0.6 && ratio < 0.8); + + // # rate limiting with half of the raw_rate + options.rate_limiter.reset( + NewRateLimiter(static_cast(raw_rate / 2))); + env_->bytes_written_ = 0; + DestroyAndReopen(&options); + + start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + elapsed = env_->NowMicros() - start; + Close(); + ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() == + env_->bytes_written_); + ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; + fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio); + ASSERT_TRUE(ratio > 0.4 && ratio < 0.6); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 9698c66ae..fc4665d90 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -35,6 +35,7 @@ class WritableFile; class RandomRWFile; class Directory; struct DBOptions; +class RateLimiter; using std::unique_ptr; using std::shared_ptr; @@ -74,6 +75,9 @@ struct EnvOptions { // write. By default, we set it to true for MANIFEST writes and false for // WAL writes bool fallocate_with_keep_size = true; + + // If not nullptr, write rate limiting is enabled for flush and compaction + RateLimiter* rate_limiter = nullptr; }; class Env { @@ -379,7 +383,10 @@ class RandomAccessFile { // at a time to the file. class WritableFile { public: - WritableFile() : last_preallocated_block_(0), preallocation_block_size_ (0) { + WritableFile() + : last_preallocated_block_(0), + preallocation_block_size_(0), + io_priority_(Env::IO_TOTAL) { } virtual ~WritableFile(); @@ -398,6 +405,14 @@ class WritableFile { return Sync(); } + /* + * Change the priority in rate limiter if rate limiting is enabled. + * If rate limiting is not enabled, this call has no effect. + */ + virtual void SetIOPriority(Env::IOPriority pri) { + io_priority_ = pri; + } + /* * Get the size of valid data in the file. */ @@ -482,6 +497,9 @@ class WritableFile { // No copying allowed WritableFile(const WritableFile&); void operator=(const WritableFile&); + + protected: + Env::IOPriority io_priority_; }; // A file abstraction for random reading and writing. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f1f807c15..dea14f2a7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -39,8 +39,7 @@ class Slice; class SliceTransform; class Statistics; class InternalKeyComparator; - -using std::shared_ptr; +class RateLimiter; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -133,7 +132,7 @@ struct ColumnFamilyOptions { // for the first time. It's necessary to specify a merge operator when // openning the DB in this case. // Default: nullptr - shared_ptr merge_operator; + std::shared_ptr merge_operator; // A single CompactionFilter instance to call into during compaction. // Allows an application to modify/delete a key-value during background @@ -206,12 +205,12 @@ struct ColumnFamilyOptions { // If non-NULL use the specified cache for blocks. // If NULL, rocksdb will automatically create and use an 8MB internal cache. // Default: nullptr - shared_ptr block_cache; + std::shared_ptr block_cache; // If non-NULL use the specified cache for compressed blocks. // If NULL, rocksdb will not use a compressed block cache. // Default: nullptr - shared_ptr block_cache_compressed; + std::shared_ptr block_cache_compressed; // Approximate size of user data packed per block. Note that the // block size specified here corresponds to uncompressed data. The @@ -626,11 +625,16 @@ struct DBOptions { // Default: Env::Default() Env* env; + // Use to control write rate of flush and compaction. Flush has higher + // priority than compaction. Rate limiting is disabled if nullptr. + // Default: nullptr + std::shared_ptr rate_limiter; + // Any internal progress/error information generated by the db will // be written to info_log if it is non-nullptr, or to a file stored // in the same directory as the DB contents if info_log is nullptr. // Default: nullptr - shared_ptr info_log; + std::shared_ptr info_log; InfoLogLevel info_log_level; @@ -653,7 +657,7 @@ struct DBOptions { // If non-null, then we should collect metrics about database operations // Statistics objects should not be shared between DB instances as // it does not use any locks to prevent concurrent updates. - shared_ptr statistics; + std::shared_ptr statistics; // If true, then the contents of data files are not synced // to stable storage. Their contents remain in the OS buffers till the diff --git a/util/env.cc b/util/env.cc index 1c0cae4c3..91ae0784b 100644 --- a/util/env.cc +++ b/util/env.cc @@ -226,6 +226,7 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->use_mmap_writes = options.allow_mmap_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; + env_options->rate_limiter = options.rate_limiter.get(); } } diff --git a/util/env_posix.cc b/util/env_posix.cc index a73ec6b0e..dc8696e55 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -33,6 +33,8 @@ #if defined(LEVELDB_PLATFORM_ANDROID) #include #endif +#include +#include #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "port/port.h" @@ -41,7 +43,7 @@ #include "util/posix_logger.h" #include "util/random.h" #include "util/iostats_context_imp.h" -#include +#include "util/rate_limiter.h" // Get nano time for mach systems #ifdef __MACH__ @@ -634,6 +636,7 @@ class PosixWritableFile : public WritableFile { #ifdef ROCKSDB_FALLOCATE_PRESENT bool fallocate_with_keep_size_; #endif + RateLimiter* rate_limiter_; public: PosixWritableFile(const std::string& fname, int fd, size_t capacity, @@ -647,7 +650,8 @@ class PosixWritableFile : public WritableFile { pending_sync_(false), pending_fsync_(false), last_sync_size_(0), - bytes_per_sync_(options.bytes_per_sync) { + bytes_per_sync_(options.bytes_per_sync), + rate_limiter_(options.rate_limiter) { #ifdef ROCKSDB_FALLOCATE_PRESENT fallocate_with_keep_size_ = options.fallocate_with_keep_size; #endif @@ -691,7 +695,7 @@ class PosixWritableFile : public WritableFile { cursize_ += left; } else { while (left != 0) { - ssize_t done = write(fd_, src, left); + ssize_t done = write(fd_, src, RequestToken(left)); if (done < 0) { if (errno == EINTR) { continue; @@ -742,7 +746,7 @@ class PosixWritableFile : public WritableFile { size_t left = cursize_; char* src = buf_.get(); while (left != 0) { - ssize_t done = write(fd_, src, left); + ssize_t done = write(fd_, src, RequestToken(left)); if (done < 0) { if (errno == EINTR) { continue; @@ -838,6 +842,16 @@ class PosixWritableFile : public WritableFile { return GetUniqueIdFromFile(fd_, id, max_size); } #endif + + private: + inline size_t RequestToken(size_t bytes) { + if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { + bytes = std::min(bytes, + static_cast(rate_limiter_->GetSingleBurstBytes())); + rate_limiter_->Request(bytes, io_priority_); + } + return bytes; + } }; class PosixRandomRWFile : public RandomRWFile { diff --git a/util/options.cc b/util/options.cc index 88d26fa01..e37fe9e74 100644 --- a/util/options.cc +++ b/util/options.cc @@ -166,6 +166,7 @@ DBOptions::DBOptions() error_if_exists(false), paranoid_checks(true), env(Env::Default()), + rate_limiter(nullptr), info_log(nullptr), info_log_level(INFO_LEVEL), max_open_files(5000), @@ -206,6 +207,7 @@ DBOptions::DBOptions(const Options& options) error_if_exists(options.error_if_exists), paranoid_checks(options.paranoid_checks), env(options.env), + rate_limiter(options.rate_limiter), info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files),