Move skip_table_builder_flush to BlockBasedTableOption

main
SherlockNoMad 9 years ago
parent 550af4ee68
commit ccc8c10c0c
  1. 8
      db/builder.cc
  2. 3
      db/builder.h
  3. 5
      db/c.cc
  4. 3
      db/compaction_job.cc
  5. 3
      db/db_bench.cc
  6. 3
      include/rocksdb/c.h
  7. 3
      include/rocksdb/env.h
  8. 13
      include/rocksdb/options.h
  9. 14
      include/rocksdb/table.h
  10. 26
      table/block_based_table_builder.cc
  11. 5
      table/block_based_table_builder.h
  12. 7
      table/block_based_table_factory.cc
  13. 3
      table/sst_file_writer.cc
  14. 7
      table/table_builder.h
  15. 3
      table/table_reader_bench.cc
  16. 6
      table/table_test.cc
  17. 2
      tools/sst_dump_test.cc
  18. 2
      tools/sst_dump_tool.cc
  19. 1
      util/env.cc
  20. 5
      util/options.cc
  21. 6
      util/options_helper.h
  22. 6
      util/options_test.cc

@ -43,12 +43,11 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush) {
const CompressionOptions& compression_opts, const bool skip_filters) {
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters, skip_flush),
compression_opts, skip_filters),
column_family_id, file);
}
@ -87,8 +86,7 @@ Status BuildTable(
builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file_writer.get(), compression, compression_opts,
false, env_options.skip_table_builder_flush);
column_family_id, file_writer.get(), compression, compression_opts);
}
MergeHelper merge(env, internal_comparator.user_comparator(),

@ -41,8 +41,7 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false,
const bool skip_flush = false);
const bool skip_filters = false);
// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of

@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks(
options->rep.cache_index_and_filter_blocks = v;
}
void rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.skip_table_builder_flush = v;
}
void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t *opt,
rocksdb_block_based_table_options_t* table_options) {

@ -939,12 +939,11 @@ Status CompactionJob::OpenCompactionOutputFile(
// data is going to be found
bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
bool skip_flush = db_options_.skip_table_builder_flush;
sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(),
sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
cfd->ioptions()->compression_opts, skip_filters, skip_flush));
cfd->ioptions()->compression_opts, skip_filters));
LogFlush(db_options_.info_log);
return s;
}

@ -2306,7 +2306,6 @@ class Benchmark {
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
options.skip_table_builder_flush = FLAGS_skip_table_builder_flush;
options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
@ -2449,6 +2448,8 @@ class Benchmark {
block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
block_based_options.filter_policy = filter_policy_;
block_based_options.skip_table_builder_flush =
FLAGS_skip_table_builder_flush;
block_based_options.format_version = 2;
options.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));

@ -450,6 +450,9 @@ rocksdb_block_based_options_set_hash_index_allow_collision(
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_cache_index_and_filter_blocks(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t* opt, rocksdb_block_based_table_options_t* table_options);

@ -97,9 +97,6 @@ struct EnvOptions {
// See DBOptions doc
size_t writable_file_max_buffer_size = 1024 * 1024;
// See DBOptions doc
bool skip_table_builder_flush = false;
// If not nullptr, write rate limiting is enabled for flush and compaction
RateLimiter* rate_limiter = nullptr;
};

@ -1096,19 +1096,6 @@ struct DBOptions {
// Default: 1024 * 1024 (1 MB)
size_t writable_file_max_buffer_size;
// If true, block will not be explictly flushed to disk during building
// a SstTable. Instead, buffer in WritableFileWriter will take
// care of the flushing when it is full.
//
// On Windows, this option helps a lot when unbuffered I/O
// (allow_os_buffer = false) is used, since it avoids small
// unbuffered disk write.
//
// User may also adjust writable_file_max_buffer_size to optimize disk I/O
// size.
//
// Default: false
bool skip_table_builder_flush;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not

@ -128,6 +128,20 @@ struct BlockBasedTableOptions {
// This must generally be true for gets to be efficient.
bool whole_key_filtering = true;
// If true, block will not be explictly flushed to disk during building
// a SstTable. Instead, buffer in WritableFileWriter will take
// care of the flushing when it is full.
//
// On Windows, this option helps a lot when unbuffered I/O
// (allow_os_buffer = false) is used, since it avoids small
// unbuffered disk write.
//
// User may also adjust writable_file_max_buffer_size to optimize disk I/O
// size.
//
// Default: false
bool skip_table_builder_flush = false;
// We currently have three versions:
// 0 -- This version is currently written out by all RocksDB's versions by
// default. Can be read by really old RocksDB's. Doesn't support changing

@ -465,7 +465,6 @@ struct BlockBasedTableBuilder::Rep {
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
bool skip_flush;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
@ -477,8 +476,7 @@ struct BlockBasedTableBuilder::Rep {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* f,
const CompressionType _compression_type,
const CompressionOptions& _compression_opts,
const bool skip_filters, const bool _skip_flush)
const CompressionOptions& _compression_opts, const bool skip_filters)
: ioptions(_ioptions),
table_options(table_opt),
internal_comparator(icomparator),
@ -492,7 +490,6 @@ struct BlockBasedTableBuilder::Rep {
compression_opts(_compression_opts),
filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder(
_ioptions, table_options)),
skip_flush(_skip_flush),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)) {
@ -515,8 +512,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush) {
const CompressionOptions& compression_opts, const bool skip_filters) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
@ -530,7 +526,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, compression_opts, skip_filters, skip_flush);
compression_type, compression_opts, skip_filters);
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
@ -556,10 +552,9 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}
auto should_seal = r->flush_block_policy->Update(key, value);
if (should_seal) {
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
SealDataBlock();
Flush();
// Add item to index block.
@ -592,18 +587,14 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
}
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
if (ok() && !r->skip_flush) {
r->status = r->file->Flush();
}
}
void BlockBasedTableBuilder::SealDataBlock() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle);
if (ok() && !r->table_options.skip_table_builder_flush) {
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
@ -737,7 +728,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
SealDataBlock();
Flush();
assert(!r->closed);
r->closed = true;

@ -42,8 +42,7 @@ class BlockBasedTableBuilder : public TableBuilder {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters, const bool skip_flush);
const CompressionOptions& compression_opts, const bool skip_filters);
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
@ -102,8 +101,6 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: Finish(), Abandon() have not been called
void Flush();
void SealDataBlock();
// Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();

@ -69,8 +69,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_builder_options.int_tbl_prop_collector_factories, column_family_id,
file, table_builder_options.compression_type,
table_builder_options.compression_opts,
table_builder_options.skip_filters,
table_builder_options.skip_flush);
table_builder_options.skip_filters);
return table_builder;
}
@ -153,6 +152,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
ret.append(buffer);
snprintf(buffer, kBufferSize, " whole_key_filtering: %d\n",
table_options_.whole_key_filtering);
ret.append(buffer);
snprintf(buffer, kBufferSize, " skip_table_builder_flush: %d\n",
table_options_.skip_table_builder_flush);
ret.append(buffer);
snprintf(buffer, kBufferSize, " format_version: %d\n",
table_options_.format_version);
ret.append(buffer);

@ -114,8 +114,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
TableBuilderOptions table_builder_options(
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
compression_type, r->ioptions.compression_opts, false,
r->env_options.skip_table_builder_flush);
compression_type, r->ioptions.compression_opts, false);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options));
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(

@ -44,15 +44,13 @@ struct TableBuilderOptions {
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
_int_tbl_prop_collector_factories,
CompressionType _compression_type,
const CompressionOptions& _compression_opts,
bool _skip_filters, bool _skip_flush)
const CompressionOptions& _compression_opts, bool _skip_filters)
: ioptions(_ioptions),
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
compression_type(_compression_type),
compression_opts(_compression_opts),
skip_filters(_skip_filters),
skip_flush(_skip_flush) {}
skip_filters(_skip_filters) {}
const ImmutableCFOptions& ioptions;
const InternalKeyComparator& internal_comparator;
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
@ -60,7 +58,6 @@ struct TableBuilderOptions {
CompressionType compression_type;
const CompressionOptions& compression_opts;
bool skip_filters = false;
bool skip_flush = false;
};
// TableBuilder provides the interface used to build a Table

@ -98,8 +98,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression,
CompressionOptions(), false,
env_options.skip_table_builder_flush),
CompressionOptions(), false),
0, file_writer.get());
} else {
s = DB::Open(opts, dbname, &db);

@ -273,8 +273,7 @@ class TableConstructor: public Constructor {
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(),
false, options.skip_table_builder_flush),
options.compression, CompressionOptions(), false),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer_.get()));
@ -1846,8 +1845,7 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(),
false, options.skip_table_builder_flush),
kNoCompression, CompressionOptions(), false),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

@ -61,7 +61,7 @@ void createSST(const std::string& file_name,
tb.reset(opts.table_factory->NewTableBuilder(
TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
false, env_options.skip_table_builder_flush),
false),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

@ -177,7 +177,7 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) {
: CompressionType(i + 1)) {
CompressionOptions compress_opt;
TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i,
compress_opt, false, false);
compress_opt, false);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", compress_type.find(i)->second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size);

@ -298,7 +298,6 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->rate_limiter = options.rate_limiter.get();
env_options->writable_file_max_buffer_size =
options.writable_file_max_buffer_size;
env_options->skip_table_builder_flush = options.skip_table_builder_flush;
env_options->allow_fallocate = options.allow_fallocate;
}

@ -252,7 +252,6 @@ DBOptions::DBOptions()
compaction_readahead_size(0),
random_access_max_buffer_size(1024 * 1024),
writable_file_max_buffer_size(1024 * 1024),
skip_table_builder_flush(false),
use_adaptive_mutex(false),
bytes_per_sync(0),
wal_bytes_per_sync(0),
@ -316,7 +315,6 @@ DBOptions::DBOptions(const Options& options)
compaction_readahead_size(options.compaction_readahead_size),
random_access_max_buffer_size(options.random_access_max_buffer_size),
writable_file_max_buffer_size(options.writable_file_max_buffer_size),
skip_table_builder_flush(options.skip_table_builder_flush),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
@ -420,9 +418,6 @@ void DBOptions::Dump(Logger* log) const {
" Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt
"d",
writable_file_max_buffer_size);
Header(log,
" Options.skip_table_builder_flush: %d",
skip_table_builder_flush);
Header(log, " Options.use_adaptive_mutex: %d",
use_adaptive_mutex);
Header(log, " Options.rate_limiter: %p",

@ -186,9 +186,6 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"writable_file_max_buffer_size",
{offsetof(struct DBOptions, writable_file_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal}},
{"skip_table_builder_flush",
{offsetof(struct DBOptions, skip_table_builder_flush),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
OptionVerificationType::kNormal}},
@ -465,6 +462,9 @@ static std::unordered_map<std::string,
{"whole_key_filtering",
{offsetof(struct BlockBasedTableOptions, whole_key_filtering),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"skip_table_builder_flush",
{offsetof(struct BlockBasedTableOptions, skip_table_builder_flush),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"format_version",
{offsetof(struct BlockBasedTableOptions, format_version),
OptionType::kUInt32T, OptionVerificationType::kNormal}}};

@ -341,7 +341,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"compaction_readahead_size", "100"},
{"random_access_max_buffer_size", "3145728"},
{"writable_file_max_buffer_size", "314159"},
{"skip_table_builder_flush", "true"},
{"bytes_per_sync", "47"},
{"wal_bytes_per_sync", "48"},
};
@ -454,7 +453,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
ASSERT_EQ(new_db_opt.writable_file_max_buffer_size, 314159);
ASSERT_EQ(new_db_opt.skip_table_builder_flush, true);
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
}
@ -607,7 +605,8 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4;"
"filter_policy=bloomfilter:4:true;whole_key_filtering=1",
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;"
"skip_table_builder_flush=1",
&new_opt));
ASSERT_TRUE(new_opt.cache_index_and_filter_blocks);
ASSERT_EQ(new_opt.index_type, BlockBasedTableOptions::kHashSearch);
@ -622,6 +621,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
ASSERT_EQ(new_opt.block_size_deviation, 8);
ASSERT_EQ(new_opt.block_restart_interval, 4);
ASSERT_TRUE(new_opt.filter_policy != nullptr);
ASSERT_TRUE(new_opt.skip_table_builder_flush);
// unknown option
ASSERT_NOK(GetBlockBasedTableOptionsFromString(table_opt,

Loading…
Cancel
Save