diff --git a/db/builder.cc b/db/builder.cc index bd695f1dd..065bbaf47 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -43,11 +43,12 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters) { + const CompressionOptions& compression_opts, + const bool skip_filters, const bool skip_flush) { return ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, - compression_opts, skip_filters), + compression_opts, skip_filters, skip_flush), column_family_id, file); } @@ -86,7 +87,8 @@ Status BuildTable( builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, - column_family_id, file_writer.get(), compression, compression_opts); + column_family_id, file_writer.get(), compression, compression_opts, + false, env_options.skip_table_builder_flush); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/builder.h b/db/builder.h index cdafa4ab3..cf32f4b62 100644 --- a/db/builder.h +++ b/db/builder.h @@ -41,7 +41,8 @@ TableBuilder* NewTableBuilder( uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, - const bool skip_filters = false); + const bool skip_filters = false, + const bool skip_flush = false); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of diff --git a/db/compaction_job.cc b/db/compaction_job.cc index ea052b84f..0028acacb 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -939,11 +939,12 @@ Status CompactionJob::OpenCompactionOutputFile( // data is going to be found bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + bool skip_flush = db_options_.skip_table_builder_flush; sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), - cfd->ioptions()->compression_opts, skip_filters)); + cfd->ioptions()->compression_opts, skip_filters, skip_flush)); LogFlush(db_options_.info_log); return s; } diff --git a/db/db_bench.cc b/db/db_bench.cc index e3edc8396..0074056e3 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -376,6 +376,12 @@ DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); +DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024, + "Maximum write buffer for Writeable File"); + +DEFINE_int32(skip_table_builder_flush, false, "Skip flushing block in " + "table builder "); + DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " @@ -2299,6 +2305,8 @@ class Benchmark { FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; + options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size; + options.skip_table_builder_flush = FLAGS_skip_table_builder_flush; options.statistics = dbstats; if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 9e20a15aa..5f8e8c79c 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -94,6 +94,12 @@ struct EnvOptions { // See DBOPtions doc size_t random_access_max_buffer_size; + // See DBOptions doc + size_t writable_file_max_buffer_size = 1024 * 1024; + + // See DBOptions doc + bool skip_table_builder_flush = false; + // If not nullptr, write rate limiting is enabled for flush and compaction RateLimiter* rate_limiter = nullptr; }; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index acd0016ac..036e08cb7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1089,6 +1089,27 @@ struct DBOptions { // Default: 1 Mb size_t random_access_max_buffer_size; + // This is the maximum buffer size that is used by WritableFileWriter. + // On Windows, we need to maintain an aligned buffer for writes. + // We allow the buffer to grow until it's size hits the limit. + // + // Default: 1024 * 1024 (1 MB) + size_t writable_file_max_buffer_size; + + // If true, block will not be explictly flushed to disk during building + // a SstTable. Instead, buffer in WritableFileWriter will take + // care of the flushing when it is full. + // + // On Windows, this option helps a lot when unbuffered I/O + // (allow_os_buffer = false) is used, since it avoids small + // unbuffered disk write. + // + // User may also adjust writable_file_max_buffer_size to optimize disk I/O + // size. + // + // Default: false + bool skip_table_builder_flush; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 402528a52..addbe1c94 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -465,6 +465,7 @@ struct BlockBasedTableBuilder::Rep { BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; + bool skip_flush; std::unique_ptr flush_block_policy; std::vector> table_properties_collectors; @@ -476,7 +477,8 @@ struct BlockBasedTableBuilder::Rep { int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* f, const CompressionType _compression_type, - const CompressionOptions& _compression_opts, const bool skip_filters) + const CompressionOptions& _compression_opts, + const bool skip_filters, const bool skip_flush) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -490,6 +492,7 @@ struct BlockBasedTableBuilder::Rep { compression_opts(_compression_opts), filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder( _ioptions, table_options)), + skip_flush(skip_flush), flush_block_policy( table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)) { @@ -512,7 +515,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters) { + const CompressionOptions& compression_opts, + const bool skip_filters, const bool skip_flush) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -526,7 +530,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, - compression_type, compression_opts, skip_filters); + compression_type, compression_opts, skip_filters, skip_flush); if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); @@ -552,11 +556,12 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); } - auto should_flush = r->flush_block_policy->Update(key, value); - if (should_flush) { + auto should_seal = r->flush_block_policy->Update(key, value); + if (should_seal) { assert(!r->data_block.empty()); + SealDataBlock(); Flush(); - + // Add item to index block. // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter @@ -587,19 +592,23 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { } void BlockBasedTableBuilder::Flush() { + Rep* r = rep_; + if (ok() && !r->skip_flush) { + r->status = r->file->Flush(); + } +} + +void BlockBasedTableBuilder::SealDataBlock() { Rep* r = rep_; assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; WriteBlock(&r->data_block, &r->pending_handle); - if (ok()) { - r->status = r->file->Flush(); - } if (r->filter_block != nullptr) { r->filter_block->StartBlock(r->offset); } r->props.data_size = r->offset; - ++r->props.num_data_blocks; + ++r->props.num_data_blocks; } void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, @@ -728,6 +737,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; bool empty_data_block = r->data_block.empty(); + SealDataBlock(); Flush(); assert(!r->closed); r->closed = true; diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 7dc93b754..2a40c4984 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -42,7 +42,8 @@ class BlockBasedTableBuilder : public TableBuilder { int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters); + const CompressionOptions& compression_opts, + const bool skip_filters, const bool skip_flush); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); @@ -101,6 +102,8 @@ class BlockBasedTableBuilder : public TableBuilder { // REQUIRES: Finish(), Abandon() have not been called void Flush(); + void SealDataBlock(); + // Some compression libraries fail when the raw size is bigger than int. If // uncompressed size is bigger than kCompressionSizeLimit, don't compress it const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 1631652dd..f95aa8f66 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -69,7 +69,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.int_tbl_prop_collector_factories, column_family_id, file, table_builder_options.compression_type, table_builder_options.compression_opts, - table_builder_options.skip_filters); + table_builder_options.skip_filters, + table_builder_options.skip_flush); return table_builder; } diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 1c21a25f7..3dca61474 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -114,7 +114,8 @@ Status SstFileWriter::Open(const std::string& file_path) { TableBuilderOptions table_builder_options( r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, - compression_type, r->ioptions.compression_opts, false); + compression_type, r->ioptions.compression_opts, false, + r->env_options.skip_table_builder_flush); r->file_writer.reset( new WritableFileWriter(std::move(sst_file), r->env_options)); r->builder.reset(r->ioptions.table_factory->NewTableBuilder( diff --git a/table/table_builder.h b/table/table_builder.h index 55a1077fa..32d0b0718 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -44,13 +44,15 @@ struct TableBuilderOptions { const std::vector>* _int_tbl_prop_collector_factories, CompressionType _compression_type, - const CompressionOptions& _compression_opts, bool _skip_filters) + const CompressionOptions& _compression_opts, + bool _skip_filters, bool _skip_flush) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), compression_type(_compression_type), compression_opts(_compression_opts), - skip_filters(_skip_filters) {} + skip_filters(_skip_filters), + skip_flush(_skip_flush) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -58,6 +60,7 @@ struct TableBuilderOptions { CompressionType compression_type; const CompressionOptions& compression_opts; bool skip_filters = false; + bool skip_flush = false; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index c4106e4b3..378dd410e 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -98,7 +98,8 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, tb = opts.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, CompressionType::kNoCompression, - CompressionOptions(), false), + CompressionOptions(), false, + env_options.skip_table_builder_flush), 0, file_writer.get()); } else { s = DB::Open(opts, dbname, &db); diff --git a/table/table_test.cc b/table/table_test.cc index 58607bbb2..d19554b65 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -273,7 +273,8 @@ class TableConstructor: public Constructor { builder.reset(ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, &int_tbl_prop_collector_factories, - options.compression, CompressionOptions(), false), + options.compression, CompressionOptions(), + false, options.skip_table_builder_flush), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer_.get())); @@ -1845,7 +1846,8 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { int_tbl_prop_collector_factories; std::unique_ptr builder(factory.NewTableBuilder( TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, - kNoCompression, CompressionOptions(), false), + kNoCompression, CompressionOptions(), + false, options.skip_table_builder_flush), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index b40a3346c..dce863ff0 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -61,7 +61,7 @@ void createSST(const std::string& file_name, tb.reset(opts.table_factory->NewTableBuilder( TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories, CompressionType::kNoCompression, CompressionOptions(), - false), + false, env_options.skip_table_builder_flush), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index ae22da36d..b7abbcd98 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -177,7 +177,7 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) { : CompressionType(i + 1)) { CompressionOptions compress_opt; TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i, - compress_opt, false); + compress_opt, false, false); uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size); fprintf(stdout, "Compression: %s", compress_type.find(i)->second); fprintf(stdout, " Size: %" PRIu64 "\n", file_size); diff --git a/util/env.cc b/util/env.cc index f6cc40893..1f4b9c2a3 100644 --- a/util/env.cc +++ b/util/env.cc @@ -296,6 +296,9 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->random_access_max_buffer_size = options.random_access_max_buffer_size; env_options->rate_limiter = options.rate_limiter.get(); + env_options->writable_file_max_buffer_size = + options.writable_file_max_buffer_size; + env_options->skip_table_builder_flush = options.skip_table_builder_flush; env_options->allow_fallocate = options.allow_fallocate; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 050473bd2..f5c178896 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -21,10 +21,6 @@ namespace rocksdb { -namespace { - const size_t c_OneMb = (1 << 20); -} - Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s = file_->Read(n, result, scratch); IOSTATS_ADD(bytes_read, result->size()); @@ -76,9 +72,9 @@ Status WritableFileWriter::Append(const Slice& data) { } } - if (buf_.Capacity() < c_OneMb) { + if (buf_.Capacity() < max_buffer_size_) { size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMb); + desiredCapacity = std::min(desiredCapacity, max_buffer_size_); buf_.AllocateNewBuffer(desiredCapacity); } assert(buf_.CurrentSize() == 0); @@ -102,9 +98,9 @@ Status WritableFileWriter::Append(const Slice& data) { // We double the buffer here because // Flush calls do not keep up with the incoming bytes // This is the only place when buffer is changed with unbuffered I/O - if (buf_.Capacity() < c_OneMb) { + if (buf_.Capacity() < max_buffer_size_) { size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMb); + desiredCapacity = std::min(desiredCapacity, max_buffer_size_); buf_.AllocateNewBuffer(desiredCapacity); } } @@ -156,7 +152,6 @@ Status WritableFileWriter::Close() { return s; } - // write out the cached data to the OS cache Status WritableFileWriter::Flush() { Status s; diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9a076af56..720979099 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -93,6 +93,7 @@ class WritableFileWriter { private: std::unique_ptr writable_file_; AlignedBuffer buf_; + size_t max_buffer_size_; // Actually written data size can be used for truncate // not counting padding data uint64_t filesize_; @@ -113,6 +114,7 @@ class WritableFileWriter { const EnvOptions& options) : writable_file_(std::move(file)), buf_(), + max_buffer_size_(options.writable_file_max_buffer_size), filesize_(0), next_write_offset_(0), pending_sync_(false), diff --git a/util/options.cc b/util/options.cc index 126aa2121..026f8d812 100644 --- a/util/options.cc +++ b/util/options.cc @@ -251,6 +251,8 @@ DBOptions::DBOptions() new_table_reader_for_compaction_inputs(false), compaction_readahead_size(0), random_access_max_buffer_size(1024 * 1024), + writable_file_max_buffer_size(1024 * 1024), + skip_table_builder_flush(false), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -313,6 +315,8 @@ DBOptions::DBOptions(const Options& options) options.new_table_reader_for_compaction_inputs), compaction_readahead_size(options.compaction_readahead_size), random_access_max_buffer_size(options.random_access_max_buffer_size), + writable_file_max_buffer_size(options.writable_file_max_buffer_size), + skip_table_builder_flush(options.skip_table_builder_flush), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -412,6 +416,13 @@ void DBOptions::Dump(Logger* log) const { " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt "d", random_access_max_buffer_size); + Header(log, + " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt + "d", + writable_file_max_buffer_size); + Header(log, + " Options.skip_table_builder_flush: %d", + skip_table_builder_flush); Header(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Header(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index 6df914241..169de4e74 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -183,6 +183,12 @@ static std::unordered_map db_options_type_info = { {"random_access_max_buffer_size", {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"writable_file_max_buffer_size", + {offsetof(struct DBOptions, writable_file_max_buffer_size), + OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"skip_table_builder_flush", + {offsetof(struct DBOptions, skip_table_builder_flush), + OptionType::kBoolean, OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index 0f2bbf8b3..ac4a63582 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -340,6 +340,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, {"random_access_max_buffer_size", "3145728"}, + {"writable_file_max_buffer_size", "314159"}, + {"skip_table_builder_flush", "true"}, {"bytes_per_sync", "47"}, {"wal_bytes_per_sync", "48"}, }; @@ -451,6 +453,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); + ASSERT_EQ(new_db_opt.writable_file_max_buffer_size, 314159); + ASSERT_EQ(new_db_opt.skip_table_builder_flush, true); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); }