Add initial set of options for integrated blob write path (#7280)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/7280

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D23195192

Pulled By: ltamasi

fbshipit-source-id: 743b382de391963e62ba86119e9fbd0233ea3b3a
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent cc24ac14eb
commit b9bb59d49d
  1. 11
      db/column_family.cc
  2. 45
      include/rocksdb/advanced_options.h
  3. 28
      options/cf_options.cc
  4. 14
      options/cf_options.h
  5. 16
      options/options.cc
  6. 6
      options/options_helper.cc
  7. 4
      options/options_settable_test.cc
  8. 16
      options/options_test.cc
  9. 4
      test_util/testutil.cc

@ -12,6 +12,7 @@
#include <algorithm> #include <algorithm>
#include <cinttypes> #include <cinttypes>
#include <limits> #include <limits>
#include <sstream>
#include <string> #include <string>
#include <vector> #include <vector>
@ -148,6 +149,16 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
"should be nonzero if we're using zstd's dictionary generator."); "should be nonzero if we're using zstd's dictionary generator.");
} }
} }
if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
std::ostringstream oss;
oss << "The specified blob compression type "
<< CompressionTypeToString(cf_options.blob_compression_type)
<< " is not available.";
return Status::InvalidArgument(oss.str());
}
return Status::OK(); return Status::OK();
} }

@ -717,6 +717,51 @@ struct AdvancedColumnFamilyOptions {
// data is left uncompressed (unless compression is also requested). // data is left uncompressed (unless compression is also requested).
uint64_t sample_for_compression = 0; uint64_t sample_for_compression = 0;
// UNDER CONSTRUCTION -- DO NOT USE
// When set, large values (blobs) are written to separate blob files, and
// only pointers to them are stored in SST files. This can reduce write
// amplification for large-value use cases at the cost of introducing a level
// of indirection for reads. See also the options min_blob_size,
// blob_file_size, and blob_compression_type below.
//
// Default: false
//
// Dynamically changeable through the SetOptions() API
bool enable_blob_files = false;
// UNDER CONSTRUCTION -- DO NOT USE
// The size of the smallest value to be stored separately in a blob file.
// Values which have an uncompressed size smaller than this threshold are
// stored alongside the keys in SST files in the usual fashion. A value of
// zero for this option means that all values are stored in blob files. Note
// that enable_blob_files has to be set in order for this option to have any
// effect.
//
// Default: 0
//
// Dynamically changeable through the SetOptions() API
uint64_t min_blob_size = 0;
// UNDER CONSTRUCTION -- DO NOT USE
// The size limit for blob files. When writing blob files, a new file is
// opened once this limit is reached. Note that enable_blob_files has to be
// set in order for this option to have any effect.
//
// Default: 256 MB
//
// Dynamically changeable through the SetOptions() API
uint64_t blob_file_size = 1ULL << 28;
// UNDER CONSTRUCTION -- DO NOT USE
// The compression algorithm to use for large values stored in blob files.
// Note that enable_blob_files has to be set in order for this option to have
// any effect.
//
// Default: no compression
//
// Dynamically changeable through the SetOptions() API
CompressionType blob_compression_type = kNoCompression;
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions(); AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options // Create ColumnFamilyOptions from Options

@ -631,6 +631,24 @@ std::unordered_map<std::string, OptionTypeInfo>
OptionType::kUInt64T, OptionVerificationType::kNormal, OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable, OptionTypeFlags::kMutable,
offsetof(struct MutableCFOptions, sample_for_compression)}}, offsetof(struct MutableCFOptions, sample_for_compression)}},
{"enable_blob_files",
{offset_of(&ColumnFamilyOptions::enable_blob_files),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable,
offsetof(struct MutableCFOptions, enable_blob_files)}},
{"min_blob_size",
{offset_of(&ColumnFamilyOptions::min_blob_size), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
offsetof(struct MutableCFOptions, min_blob_size)}},
{"blob_file_size",
{offset_of(&ColumnFamilyOptions::blob_file_size), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
offsetof(struct MutableCFOptions, blob_file_size)}},
{"blob_compression_type",
{offset_of(&ColumnFamilyOptions::blob_compression_type),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable,
offsetof(struct MutableCFOptions, blob_compression_type)}},
// The following properties were handled as special cases in ParseOption // The following properties were handled as special cases in ParseOption
// This means that the properties could be read from the options file // This means that the properties could be read from the options file
// but never written to the file or compared to each other. // but never written to the file or compared to each other.
@ -920,6 +938,16 @@ void MutableCFOptions::Dump(Logger* log) const {
compaction_options_fifo.max_table_files_size); compaction_options_fifo.max_table_files_size);
ROCKS_LOG_INFO(log, "compaction_options_fifo.allow_compaction : %d", ROCKS_LOG_INFO(log, "compaction_options_fifo.allow_compaction : %d",
compaction_options_fifo.allow_compaction); compaction_options_fifo.allow_compaction);
// Blob file related options
ROCKS_LOG_INFO(log, " enable_blob_files: %s",
enable_blob_files ? "true" : "false");
ROCKS_LOG_INFO(log, " min_blob_size: %" PRIu64,
min_blob_size);
ROCKS_LOG_INFO(log, " blob_file_size: %" PRIu64,
blob_file_size);
ROCKS_LOG_INFO(log, " blob_compression_type: %s",
CompressionTypeToString(blob_compression_type).c_str());
} }
MutableCFOptions::MutableCFOptions(const Options& options) MutableCFOptions::MutableCFOptions(const Options& options)

@ -155,6 +155,10 @@ struct MutableCFOptions {
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
compaction_options_fifo(options.compaction_options_fifo), compaction_options_fifo(options.compaction_options_fifo),
compaction_options_universal(options.compaction_options_universal), compaction_options_universal(options.compaction_options_universal),
enable_blob_files(options.enable_blob_files),
min_blob_size(options.min_blob_size),
blob_file_size(options.blob_file_size),
blob_compression_type(options.blob_compression_type),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations), options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks), paranoid_file_checks(options.paranoid_file_checks),
@ -192,6 +196,10 @@ struct MutableCFOptions {
ttl(0), ttl(0),
periodic_compaction_seconds(0), periodic_compaction_seconds(0),
compaction_options_fifo(), compaction_options_fifo(),
enable_blob_files(false),
min_blob_size(0),
blob_file_size(0),
blob_compression_type(kNoCompression),
max_sequential_skip_in_iterations(0), max_sequential_skip_in_iterations(0),
paranoid_file_checks(false), paranoid_file_checks(false),
report_bg_io_stats(false), report_bg_io_stats(false),
@ -247,6 +255,12 @@ struct MutableCFOptions {
CompactionOptionsFIFO compaction_options_fifo; CompactionOptionsFIFO compaction_options_fifo;
CompactionOptionsUniversal compaction_options_universal; CompactionOptionsUniversal compaction_options_universal;
// Blob file related options
bool enable_blob_files;
uint64_t min_blob_size;
uint64_t blob_file_size;
CompressionType blob_compression_type;
// Misc options // Misc options
uint64_t max_sequential_skip_in_iterations; uint64_t max_sequential_skip_in_iterations;
bool paranoid_file_checks; bool paranoid_file_checks;

@ -88,7 +88,11 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
report_bg_io_stats(options.report_bg_io_stats), report_bg_io_stats(options.report_bg_io_stats),
ttl(options.ttl), ttl(options.ttl),
periodic_compaction_seconds(options.periodic_compaction_seconds), periodic_compaction_seconds(options.periodic_compaction_seconds),
sample_for_compression(options.sample_for_compression) { sample_for_compression(options.sample_for_compression),
enable_blob_files(options.enable_blob_files),
min_blob_size(options.min_blob_size),
blob_file_size(options.blob_file_size),
blob_compression_type(options.blob_compression_type) {
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() < if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) { static_cast<unsigned int>(num_levels)) {
@ -369,6 +373,16 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, ROCKS_LOG_HEADER(log,
" Options.periodic_compaction_seconds: %" PRIu64, " Options.periodic_compaction_seconds: %" PRIu64,
periodic_compaction_seconds); periodic_compaction_seconds);
ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s",
enable_blob_files ? "true" : "false");
ROCKS_LOG_HEADER(log,
" Options.min_blob_size: %" PRIu64,
min_blob_size);
ROCKS_LOG_HEADER(log,
" Options.blob_file_size: %" PRIu64,
blob_file_size);
ROCKS_LOG_HEADER(log, " Options.blob_compression_type: %s",
CompressionTypeToString(blob_compression_type).c_str());
} // ColumnFamilyOptions::Dump } // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const { void Options::Dump(Logger* log) const {

@ -207,6 +207,12 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.compaction_options_universal = cf_opts.compaction_options_universal =
mutable_cf_options.compaction_options_universal; mutable_cf_options.compaction_options_universal;
// Blob file related options
cf_opts.enable_blob_files = mutable_cf_options.enable_blob_files;
cf_opts.min_blob_size = mutable_cf_options.min_blob_size;
cf_opts.blob_file_size = mutable_cf_options.blob_file_size;
cf_opts.blob_compression_type = mutable_cf_options.blob_compression_type;
// Misc options // Misc options
cf_opts.max_sequential_skip_in_iterations = cf_opts.max_sequential_skip_in_iterations =
mutable_cf_options.max_sequential_skip_in_iterations; mutable_cf_options.max_sequential_skip_in_iterations;

@ -490,6 +490,10 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"ttl=60;" "ttl=60;"
"periodic_compaction_seconds=3600;" "periodic_compaction_seconds=3600;"
"sample_for_compression=0;" "sample_for_compression=0;"
"enable_blob_files=true;"
"min_blob_size=256;"
"blob_file_size=1000000;"
"blob_compression_type=kBZip2Compression;"
"compaction_options_fifo={max_table_files_size=3;allow_" "compaction_options_fifo={max_table_files_size=3;allow_"
"compaction=false;};", "compaction=false;};",
new_options)); new_options));

@ -97,6 +97,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"min_partial_merge_operands", "31"}, {"min_partial_merge_operands", "31"},
{"prefix_extractor", "fixed:31"}, {"prefix_extractor", "fixed:31"},
{"optimize_filters_for_hits", "true"}, {"optimize_filters_for_hits", "true"},
{"enable_blob_files", "true"},
{"min_blob_size", "1K"},
{"blob_file_size", "1G"},
{"blob_compression_type", "kZSTD"},
}; };
std::unordered_map<std::string, std::string> db_options_map = { std::unordered_map<std::string, std::string> db_options_map = {
@ -221,6 +225,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true);
ASSERT_EQ(std::string(new_cf_opt.prefix_extractor->Name()), ASSERT_EQ(std::string(new_cf_opt.prefix_extractor->Name()),
"rocksdb.FixedPrefix.31"); "rocksdb.FixedPrefix.31");
ASSERT_EQ(new_cf_opt.enable_blob_files, true);
ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10);
ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30);
ASSERT_EQ(new_cf_opt.blob_compression_type, kZSTD);
cf_options_map["write_buffer_size"] = "hello"; cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map, ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map,
@ -1500,6 +1508,10 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"min_partial_merge_operands", "31"}, {"min_partial_merge_operands", "31"},
{"prefix_extractor", "fixed:31"}, {"prefix_extractor", "fixed:31"},
{"optimize_filters_for_hits", "true"}, {"optimize_filters_for_hits", "true"},
{"enable_blob_files", "true"},
{"min_blob_size", "1K"},
{"blob_file_size", "1G"},
{"blob_compression_type", "kZSTD"},
}; };
std::unordered_map<std::string, std::string> db_options_map = { std::unordered_map<std::string, std::string> db_options_map = {
@ -1616,6 +1628,10 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true);
ASSERT_EQ(std::string(new_cf_opt.prefix_extractor->Name()), ASSERT_EQ(std::string(new_cf_opt.prefix_extractor->Name()),
"rocksdb.FixedPrefix.31"); "rocksdb.FixedPrefix.31");
ASSERT_EQ(new_cf_opt.enable_blob_files, true);
ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10);
ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30);
ASSERT_EQ(new_cf_opt.blob_compression_type, kZSTD);
cf_options_map["write_buffer_size"] = "hello"; cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap( ASSERT_NOK(GetColumnFamilyOptionsFromMap(

@ -368,6 +368,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
cf_opt->force_consistency_checks = rnd->Uniform(2); cf_opt->force_consistency_checks = rnd->Uniform(2);
cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2); cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2);
cf_opt->memtable_whole_key_filtering = rnd->Uniform(2); cf_opt->memtable_whole_key_filtering = rnd->Uniform(2);
cf_opt->enable_blob_files = rnd->Uniform(2);
// double options // double options
cf_opt->hard_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13; cf_opt->hard_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13;
@ -417,6 +418,8 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
cf_opt->target_file_size_base * rnd->Uniform(100); cf_opt->target_file_size_base * rnd->Uniform(100);
cf_opt->compaction_options_fifo.max_table_files_size = cf_opt->compaction_options_fifo.max_table_files_size =
uint_max + rnd->Uniform(10000); uint_max + rnd->Uniform(10000);
cf_opt->min_blob_size = uint_max + rnd->Uniform(10000);
cf_opt->blob_file_size = uint_max + rnd->Uniform(10000);
// unsigned int options // unsigned int options
cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000);
@ -435,6 +438,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, DBOptions& db_options,
cf_opt->compression = RandomCompressionType(rnd); cf_opt->compression = RandomCompressionType(rnd);
RandomCompressionTypeVector(cf_opt->num_levels, RandomCompressionTypeVector(cf_opt->num_levels,
&cf_opt->compression_per_level, rnd); &cf_opt->compression_per_level, rnd);
cf_opt->blob_compression_type = RandomCompressionType(rnd);
} }
bool IsDirectIOSupported(Env* env, const std::string& dir) { bool IsDirectIOSupported(Env* env, const std::string& dir) {

Loading…
Cancel
Save