Allow SstFileWriter to use the rate limiter

Summary:
The default IO priority of WritableFiles is IO_TOTAL, meaning that
they will bypass the rate limiter if it's passed in the options.

This change allows to pass an io priority in construction, so that by
setting IO_LOW or IO_HIGH the rate limit will be honored.

It also fixes a minor bug: SstFileWriter's copy and move constructor
are not disabled and incorrect, as any copy/move will result in a
double free. Switching to unique_ptr makes the object correctly
movable and non-copyable as expected.

Also fix minor style inconsistencies.
Closes https://github.com/facebook/rocksdb/pull/2335

Differential Revision: D5113260

Pulled By: sagar0

fbshipit-source-id: e084236e7ff0b50a56cbeceaa9fedd5e210bf9f8
main
Giuseppe Ottaviano 8 years ago committed by Facebook Github Bot
parent 6cc9aef162
commit 69ec8356b2
  1. 8
      include/rocksdb/env.h
  2. 22
      include/rocksdb/sst_file_writer.h
  3. 31
      table/sst_file_writer.cc

@ -57,10 +57,10 @@ const size_t kDefaultPageSize = 4 * 1024;
// Options while opening a file to read/write // Options while opening a file to read/write
struct EnvOptions { struct EnvOptions {
// construct with default Options // Construct with default Options
EnvOptions(); EnvOptions();
// construct from Options // Construct from Options
explicit EnvOptions(const DBOptions& options); explicit EnvOptions(const DBOptions& options);
// If true, then use mmap to read data // If true, then use mmap to read data
@ -95,10 +95,10 @@ struct EnvOptions {
// WAL writes // WAL writes
bool fallocate_with_keep_size = true; bool fallocate_with_keep_size = true;
// See DBOPtions doc // See DBOptions doc
size_t compaction_readahead_size; size_t compaction_readahead_size;
// See DBOPtions doc // See DBOptions doc
size_t random_access_max_buffer_size; size_t random_access_max_buffer_size;
// See DBOptions doc // See DBOptions doc

@ -8,7 +8,10 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#pragma once #pragma once
#include <memory>
#include <string> #include <string>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
@ -19,7 +22,7 @@ namespace rocksdb {
class Comparator; class Comparator;
// ExternalSstFileInfo include information about sst files created // ExternalSstFileInfo include information about sst files created
// using SstFileWriter // using SstFileWriter.
struct ExternalSstFileInfo { struct ExternalSstFileInfo {
ExternalSstFileInfo() {} ExternalSstFileInfo() {}
ExternalSstFileInfo(const std::string& _file_path, ExternalSstFileInfo(const std::string& _file_path,
@ -45,25 +48,28 @@ struct ExternalSstFileInfo {
}; };
// SstFileWriter is used to create sst files that can be added to database later // SstFileWriter is used to create sst files that can be added to database later
// All keys in files generated by SstFileWriter will have sequence number = 0 // All keys in files generated by SstFileWriter will have sequence number = 0.
class SstFileWriter { class SstFileWriter {
public: public:
// User can pass `column_family` to specify that the the generated file will // User can pass `column_family` to specify that the the generated file will
// be ingested into this column_family, note that passing nullptr means that // be ingested into this column_family, note that passing nullptr means that
// the column_family is unknown. // the column_family is unknown.
// If invalidate_page_cache is set to true, SstFileWriter will give the OS a // If invalidate_page_cache is set to true, SstFileWriter will give the OS a
// hint that this file pages is not needed everytime we write 1MB to the file // hint that this file pages is not needed everytime we write 1MB to the file.
// To use the rate limiter an io_priority smaller than IO_TOTAL can be passed.
SstFileWriter(const EnvOptions& env_options, const Options& options, SstFileWriter(const EnvOptions& env_options, const Options& options,
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool invalidate_page_cache = true) bool invalidate_page_cache = true,
Env::IOPriority io_priority = Env::IOPriority::IO_TOTAL)
: SstFileWriter(env_options, options, options.comparator, column_family, : SstFileWriter(env_options, options, options.comparator, column_family,
invalidate_page_cache) {} invalidate_page_cache, io_priority) {}
// Deprecated API // Deprecated API
SstFileWriter(const EnvOptions& env_options, const Options& options, SstFileWriter(const EnvOptions& env_options, const Options& options,
const Comparator* user_comparator, const Comparator* user_comparator,
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool invalidate_page_cache = true); bool invalidate_page_cache = true,
Env::IOPriority io_priority = Env::IOPriority::IO_TOTAL);
~SstFileWriter(); ~SstFileWriter();
@ -77,7 +83,7 @@ class SstFileWriter {
// Finalize writing to sst file and close file. // Finalize writing to sst file and close file.
// //
// An optional ExternalSstFileInfo pointer can be passed to the function // An optional ExternalSstFileInfo pointer can be passed to the function
// which will be populated with information about the created sst file // which will be populated with information about the created sst file.
Status Finish(ExternalSstFileInfo* file_info = nullptr); Status Finish(ExternalSstFileInfo* file_info = nullptr);
// Return the current file size. // Return the current file size.
@ -87,7 +93,7 @@ class SstFileWriter {
void InvalidatePageCache(bool closing); void InvalidatePageCache(bool closing);
struct Rep; struct Rep;
Rep* rep_; std::unique_ptr<Rep> rep_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -28,11 +28,12 @@ const size_t kFadviseTrigger = 1024 * 1024; // 1MB
struct SstFileWriter::Rep { struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options, Rep(const EnvOptions& _env_options, const Options& options,
const Comparator* _user_comparator, ColumnFamilyHandle* _cfh, Env::IOPriority _io_priority, const Comparator* _user_comparator,
bool _invalidate_page_cache) ColumnFamilyHandle* _cfh, bool _invalidate_page_cache)
: env_options(_env_options), : env_options(_env_options),
ioptions(options), ioptions(options),
mutable_cf_options(options), mutable_cf_options(options),
io_priority(_io_priority),
internal_comparator(_user_comparator), internal_comparator(_user_comparator),
cfh(_cfh), cfh(_cfh),
invalidate_page_cache(_invalidate_page_cache), invalidate_page_cache(_invalidate_page_cache),
@ -43,15 +44,16 @@ struct SstFileWriter::Rep {
EnvOptions env_options; EnvOptions env_options;
ImmutableCFOptions ioptions; ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options; MutableCFOptions mutable_cf_options;
Env::IOPriority io_priority;
InternalKeyComparator internal_comparator; InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info; ExternalSstFileInfo file_info;
InternalKey ikey; InternalKey ikey;
std::string column_family_name; std::string column_family_name;
ColumnFamilyHandle* cfh; ColumnFamilyHandle* cfh;
// If true, We will give the OS a hint that this file pages is not needed // If true, We will give the OS a hint that this file pages is not needed
// everytime we write 1MB to the file // everytime we write 1MB to the file.
bool invalidate_page_cache; bool invalidate_page_cache;
// the size of the file during the last time we called Fadvise to remove // The size of the file during the last time we called Fadvise to remove
// cached pages from page cache. // cached pages from page cache.
uint64_t last_fadvise_size; uint64_t last_fadvise_size;
}; };
@ -60,9 +62,10 @@ SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const Options& options, const Options& options,
const Comparator* user_comparator, const Comparator* user_comparator,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
bool invalidate_page_cache) bool invalidate_page_cache,
: rep_(new Rep(env_options, options, user_comparator, column_family, Env::IOPriority io_priority)
invalidate_page_cache)) { : rep_(new Rep(env_options, options, io_priority, user_comparator,
column_family, invalidate_page_cache)) {
rep_->file_info.file_size = 0; rep_->file_info.file_size = 0;
} }
@ -72,12 +75,10 @@ SstFileWriter::~SstFileWriter() {
// abandon the builder. // abandon the builder.
rep_->builder->Abandon(); rep_->builder->Abandon();
} }
delete rep_;
} }
Status SstFileWriter::Open(const std::string& file_path) { Status SstFileWriter::Open(const std::string& file_path) {
Rep* r = rep_; Rep* r = rep_.get();
Status s; Status s;
std::unique_ptr<WritableFile> sst_file; std::unique_ptr<WritableFile> sst_file;
s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
@ -85,6 +86,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
return s; return s;
} }
sst_file->SetIOPriority(r->io_priority);
CompressionType compression_type; CompressionType compression_type;
if (r->ioptions.bottommost_compression != kDisableCompressionOption) { if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
compression_type = r->ioptions.bottommost_compression; compression_type = r->ioptions.bottommost_compression;
@ -146,7 +149,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
} }
Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
Rep* r = rep_; Rep* r = rep_.get();
if (!r->builder) { if (!r->builder) {
return Status::InvalidArgument("File is not opened"); return Status::InvalidArgument("File is not opened");
} }
@ -166,7 +169,7 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
ValueType::kTypeValue /* Put */); ValueType::kTypeValue /* Put */);
r->builder->Add(r->ikey.Encode(), value); r->builder->Add(r->ikey.Encode(), value);
// update file info // Update file info
r->file_info.num_entries++; r->file_info.num_entries++;
r->file_info.largest_key.assign(user_key.data(), user_key.size()); r->file_info.largest_key.assign(user_key.data(), user_key.size());
r->file_info.file_size = r->builder->FileSize(); r->file_info.file_size = r->builder->FileSize();
@ -177,7 +180,7 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
} }
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
Rep* r = rep_; Rep* r = rep_.get();
if (!r->builder) { if (!r->builder) {
return Status::InvalidArgument("File is not opened"); return Status::InvalidArgument("File is not opened");
} }
@ -208,7 +211,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
} }
void SstFileWriter::InvalidatePageCache(bool closing) { void SstFileWriter::InvalidatePageCache(bool closing) {
Rep* r = rep_; Rep* r = rep_.get();
if (r->invalidate_page_cache == false) { if (r->invalidate_page_cache == false) {
// Fadvise disabled // Fadvise disabled
return; return;

Loading…
Cancel
Save