Add option for WAL compression algorithm (#9432)

Summary:
Add an option to set the WAL compression algorithm - wal_compression.

TODO: WAL compression is not implemented and will only support zstd initially. Will be added in subsequent diffs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9432

Reviewed By: pdillinger

Differential Revision: D33797275

Pulled By: sidroyc

fbshipit-source-id: 8db81d9c9cea5e2e4f1445d3aecad8106137b8e7
main
Siddhartha Roychowdhury 3 years ago committed by Facebook GitHub Bot
parent 11d7329503
commit c27ca23644
  1. 8
      db/c.cc
  2. 3
      db/c_test.c
  3. 8
      db/db_impl/db_impl_open.cc
  4. 4
      include/rocksdb/c.h
  5. 5
      include/rocksdb/options.h
  6. 7
      options/db_options.cc
  7. 1
      options/db_options.h
  8. 1
      options/options_helper.cc
  9. 1
      options/options_settable_test.cc
  10. 9
      tools/db_bench_tool.cc

@ -3533,6 +3533,14 @@ unsigned char rocksdb_options_get_manual_wal_flush(rocksdb_options_t* opt) {
return opt->rep.manual_wal_flush; return opt->rep.manual_wal_flush;
} }
void rocksdb_options_set_wal_compression(rocksdb_options_t* opt, int val) {
opt->rep.wal_compression = static_cast<CompressionType>(val);
}
int rocksdb_options_get_wal_compression(rocksdb_options_t* opt) {
return opt->rep.wal_compression;
}
rocksdb_ratelimiter_t* rocksdb_ratelimiter_create( rocksdb_ratelimiter_t* rocksdb_ratelimiter_create(
int64_t rate_bytes_per_sec, int64_t rate_bytes_per_sec,
int64_t refill_period_us, int64_t refill_period_us,

@ -1777,6 +1777,9 @@ int main(int argc, char** argv) {
rocksdb_options_set_manual_wal_flush(o, 1); rocksdb_options_set_manual_wal_flush(o, 1);
CheckCondition(1 == rocksdb_options_get_manual_wal_flush(o)); CheckCondition(1 == rocksdb_options_get_manual_wal_flush(o));
rocksdb_options_set_wal_compression(o, 1);
CheckCondition(1 == rocksdb_options_get_wal_compression(o));
/* Blob Options */ /* Blob Options */
rocksdb_options_set_enable_blob_files(o, 1); rocksdb_options_set_enable_blob_files(o, 1);
CheckCondition(1 == rocksdb_options_get_enable_blob_files(o)); CheckCondition(1 == rocksdb_options_get_enable_blob_files(o));

@ -194,6 +194,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
// Supported wal compression types
if (result.wal_compression != kNoCompression &&
result.wal_compression != kZSTD) {
result.wal_compression = kNoCompression;
ROCKS_LOG_WARN(result.info_log,
"wal_compression is disabled since only zstd is supported");
}
if (!result.paranoid_checks) { if (!result.paranoid_checks) {
result.skip_checking_sst_file_sizes_on_db_open = true; result.skip_checking_sst_file_sizes_on_db_open = true;
ROCKS_LOG_INFO(result.info_log, ROCKS_LOG_INFO(result.info_log,

@ -1445,6 +1445,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_manual_wal_flush(
rocksdb_options_t* opt, unsigned char); rocksdb_options_t* opt, unsigned char);
extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_manual_wal_flush( extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_manual_wal_flush(
rocksdb_options_t* opt); rocksdb_options_t* opt);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_compression(
rocksdb_options_t* opt, int);
extern ROCKSDB_LIBRARY_API int rocksdb_options_get_wal_compression(
rocksdb_options_t* opt);
/* RateLimiter */ /* RateLimiter */
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create( extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create(

@ -1228,6 +1228,11 @@ struct DBOptions {
// file. // file.
bool manual_wal_flush = false; bool manual_wal_flush = false;
// This feature is WORK IN PROGRESS
// If enabled WAL records will be compressed before they are written.
// Only zstd is supported.
CompressionType wal_compression = kNoCompression;
// If true, RocksDB supports flushing multiple column families and committing // If true, RocksDB supports flushing multiple column families and committing
// their results atomically to MANIFEST. Note that it is not // their results atomically to MANIFEST. Note that it is not
// necessary to set atomic_flush to true if WAL is always enabled since WAL // necessary to set atomic_flush to true if WAL is always enabled since WAL

@ -384,6 +384,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, manual_wal_flush), {offsetof(struct ImmutableDBOptions, manual_wal_flush),
OptionType::kBoolean, OptionVerificationType::kNormal, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"wal_compression",
{offsetof(struct ImmutableDBOptions, wal_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"seq_per_batch", {"seq_per_batch",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, {0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
@ -725,6 +729,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
preserve_deletes(options.preserve_deletes), preserve_deletes(options.preserve_deletes),
two_write_queues(options.two_write_queues), two_write_queues(options.two_write_queues),
manual_wal_flush(options.manual_wal_flush), manual_wal_flush(options.manual_wal_flush),
wal_compression(options.wal_compression),
atomic_flush(options.atomic_flush), atomic_flush(options.atomic_flush),
avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io), avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io),
persist_stats_to_disk(options.persist_stats_to_disk), persist_stats_to_disk(options.persist_stats_to_disk),
@ -891,6 +896,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
two_write_queues); two_write_queues);
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
manual_wal_flush); manual_wal_flush);
ROCKS_LOG_HEADER(log, " Options.wal_compression: %d",
wal_compression);
ROCKS_LOG_HEADER(log, " Options.atomic_flush: %d", atomic_flush); ROCKS_LOG_HEADER(log, " Options.atomic_flush: %d", atomic_flush);
ROCKS_LOG_HEADER(log, ROCKS_LOG_HEADER(log,
" Options.avoid_unnecessary_blocking_io: %d", " Options.avoid_unnecessary_blocking_io: %d",

@ -87,6 +87,7 @@ struct ImmutableDBOptions {
bool preserve_deletes; bool preserve_deletes;
bool two_write_queues; bool two_write_queues;
bool manual_wal_flush; bool manual_wal_flush;
CompressionType wal_compression;
bool atomic_flush; bool atomic_flush;
bool avoid_unnecessary_blocking_io; bool avoid_unnecessary_blocking_io;
bool persist_stats_to_disk; bool persist_stats_to_disk;

@ -172,6 +172,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
immutable_db_options.preserve_deletes; immutable_db_options.preserve_deletes;
options.two_write_queues = immutable_db_options.two_write_queues; options.two_write_queues = immutable_db_options.two_write_queues;
options.manual_wal_flush = immutable_db_options.manual_wal_flush; options.manual_wal_flush = immutable_db_options.manual_wal_flush;
options.wal_compression = immutable_db_options.wal_compression;
options.atomic_flush = immutable_db_options.atomic_flush; options.atomic_flush = immutable_db_options.atomic_flush;
options.avoid_unnecessary_blocking_io = options.avoid_unnecessary_blocking_io =
immutable_db_options.avoid_unnecessary_blocking_io; immutable_db_options.avoid_unnecessary_blocking_io;

@ -334,6 +334,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"concurrent_prepare=false;" "concurrent_prepare=false;"
"two_write_queues=false;" "two_write_queues=false;"
"manual_wal_flush=false;" "manual_wal_flush=false;"
"wal_compression=kZSTD;"
"seq_per_batch=false;" "seq_per_batch=false;"
"atomic_flush=false;" "atomic_flush=false;"
"avoid_unnecessary_blocking_io=false;" "avoid_unnecessary_blocking_io=false;"

@ -762,6 +762,11 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
DEFINE_bool(manual_wal_flush, false, DEFINE_bool(manual_wal_flush, false,
"If true, buffer WAL until buffer is full or a manual FlushWAL()."); "If true, buffer WAL until buffer is full or a manual FlushWAL().");
DEFINE_string(wal_compression, "string",
"Algorithm to use for WAL compression. none to disable.");
static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e =
ROCKSDB_NAMESPACE::kNoCompression;
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL"); DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench", DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench",
@ -3998,6 +4003,7 @@ class Benchmark {
options.use_direct_io_for_flush_and_compaction = options.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction; FLAGS_use_direct_io_for_flush_and_compaction;
options.manual_wal_flush = FLAGS_manual_wal_flush; options.manual_wal_flush = FLAGS_manual_wal_flush;
options.wal_compression = FLAGS_wal_compression_e;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
options.ttl = FLAGS_fifo_compaction_ttl; options.ttl = FLAGS_fifo_compaction_ttl;
options.compaction_options_fifo = CompactionOptionsFIFO( options.compaction_options_fifo = CompactionOptionsFIFO(
@ -8129,6 +8135,9 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_compression_type_e = FLAGS_compression_type_e =
StringToCompressionType(FLAGS_compression_type.c_str()); StringToCompressionType(FLAGS_compression_type.c_str());
FLAGS_wal_compression_e =
StringToCompressionType(FLAGS_wal_compression.c_str());
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// Stacked BlobDB // Stacked BlobDB
FLAGS_blob_db_compression_type_e = FLAGS_blob_db_compression_type_e =

Loading…
Cancel
Save