From 0288bdbc5379c4a2965e50b222a6f6b0e85cf376 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Tue, 2 Feb 2021 11:39:20 -0800 Subject: [PATCH] Add the integrated BlobDB to the stress/crash tests (#7900) Summary: The patch adds support for the options related to the new BlobDB implementation to `db_stress`, including support for dynamically adjusting them using `SetOptions` when `set_options_one_in` and a new flag `allow_setting_blob_options_dynamically` are specified. (The latter is used to prevent the options from being enabled when incompatible features are in use.) The patch also updates the `db_stress` help messages of the existing stacked BlobDB related options to clarify that they pertain to the old implementation. In addition, it adds the new BlobDB to the crash test script. In order to prevent a combinatorial explosion of jobs and still perform whitebox/blackbox testing (including under ASAN/TSAN/UBSAN), and to also test BlobDB in conjunction with atomic flush and transactions, the script sets the BlobDB options in 10% of normal/`cf_consistency`/`txn` crash test runs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7900 Test Plan: Ran `make check` and `db_stress`/`db_crashtest.py` with various options. Reviewed By: jay-zhuang Differential Revision: D26094913 Pulled By: ltamasi fbshipit-source-id: c2ef3391a05e43a9687f24e297df05f4a5584814 --- db_stress_tool/db_stress_common.h | 11 +++++ db_stress_tool/db_stress_gflags.cc | 59 +++++++++++++++++----- db_stress_tool/db_stress_test_base.cc | 70 ++++++++++++++++++++++++++- db_stress_tool/db_stress_test_base.h | 2 + tools/db_crashtest.py | 26 +++++++++- 5 files changed, 154 insertions(+), 14 deletions(-) diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 686ee4969..5d51829ad 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -232,6 +232,7 @@ DECLARE_int32(get_property_one_in); DECLARE_string(file_checksum_impl); #ifndef ROCKSDB_LITE +// Options for StackableDB-based BlobDB DECLARE_bool(use_blob_db); DECLARE_uint64(blob_db_min_blob_size); DECLARE_uint64(blob_db_bytes_per_sync); @@ -239,6 +240,16 @@ DECLARE_uint64(blob_db_file_size); DECLARE_bool(blob_db_enable_gc); DECLARE_double(blob_db_gc_cutoff); #endif // !ROCKSDB_LITE + +// Options for integrated BlobDB +DECLARE_bool(allow_setting_blob_options_dynamically); +DECLARE_bool(enable_blob_files); +DECLARE_uint64(min_blob_size); +DECLARE_uint64(blob_file_size); +DECLARE_string(blob_compression_type); +DECLARE_bool(enable_blob_garbage_collection); +DECLARE_double(blob_garbage_collection_age_cutoff); + DECLARE_int32(approximate_size_one_in); DECLARE_bool(sync_fault_injection); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index becc4820f..d46fea322 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -325,33 +325,68 @@ DEFINE_bool(enable_write_thread_adaptive_yield, true, "Use a yielding spin loop for brief writer thread waits."); #ifndef ROCKSDB_LITE -// BlobDB Options -DEFINE_bool(use_blob_db, false, "Use BlobDB."); +// Options for StackableDB-based BlobDB +DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Use BlobDB."); -DEFINE_uint64(blob_db_min_blob_size, - ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size, - "Smallest blob to store in a file. Blobs smaller than this " - "will be inlined with the key in the LSM tree."); +DEFINE_uint64( + blob_db_min_blob_size, + ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size, + "[Stacked BlobDB] Smallest blob to store in a file. Blobs " + "smaller than this will be inlined with the key in the LSM tree."); -DEFINE_uint64(blob_db_bytes_per_sync, - ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync, - "Sync blob files once per every N bytes written."); +DEFINE_uint64( + blob_db_bytes_per_sync, + ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync, + "[Stacked BlobDB] Sync blob files once per every N bytes written."); DEFINE_uint64(blob_db_file_size, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size, - "Target size of each blob file."); + "[Stacked BlobDB] Target size of each blob file."); DEFINE_bool( blob_db_enable_gc, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection, - "Enable BlobDB garbage collection."); + "[Stacked BlobDB] Enable BlobDB garbage collection."); DEFINE_double( blob_db_gc_cutoff, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff, - "Cutoff ratio for BlobDB garbage collection."); + "[Stacked BlobDB] Cutoff ratio for BlobDB garbage collection."); #endif // !ROCKSDB_LITE +// Options for integrated BlobDB +DEFINE_bool(allow_setting_blob_options_dynamically, false, + "[Integrated BlobDB] Allow setting blob options dynamically."); + +DEFINE_bool( + enable_blob_files, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().enable_blob_files, + "[Integrated BlobDB] Enable writing large values to separate blob files."); + +DEFINE_uint64(min_blob_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().min_blob_size, + "[Integrated BlobDB] The size of the smallest value to be stored " + "separately in a blob file."); + +DEFINE_uint64(blob_file_size, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().blob_file_size, + "[Integrated BlobDB] The size limit for blob files."); + +DEFINE_string(blob_compression_type, "none", + "[Integrated BlobDB] The compression algorithm to use for large " + "values stored in blob files."); + +DEFINE_bool(enable_blob_garbage_collection, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .enable_blob_garbage_collection, + "[Integrated BlobDB] Enable blob garbage collection."); + +DEFINE_double(blob_garbage_collection_age_cutoff, + ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions() + .blob_garbage_collection_age_cutoff, + "[Integrated BlobDB] The cutoff in terms of blob file age for " + "garbage collection."); + static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 52e879e24..e31da3a4e 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -108,6 +108,22 @@ std::shared_ptr StressTest::NewCache(size_t capacity) { } } +std::vector StressTest::GetBlobCompressionTags() { + std::vector compression_tags{"kNoCompression"}; + + if (Snappy_Supported()) { + compression_tags.emplace_back("kSnappyCompression"); + } + if (LZ4_Supported()) { + compression_tags.emplace_back("kLZ4Compression"); + } + if (ZSTD_Supported()) { + compression_tags.emplace_back("kZSTD"); + } + + return compression_tags; +} + bool StressTest::BuildOptionsTable() { if (FLAGS_set_options_one_in <= 0) { return true; @@ -186,6 +202,21 @@ bool StressTest::BuildOptionsTable() { {"max_sequential_skip_in_iterations", {"4", "8", "12"}}, }; + if (FLAGS_allow_setting_blob_options_dynamically) { + options_tbl.emplace("enable_blob_files", + std::vector{"false", "true"}); + options_tbl.emplace("min_blob_size", + std::vector{"0", "16", "256"}); + options_tbl.emplace("blob_file_size", + std::vector{"1M", "16M", "256M", "1G"}); + options_tbl.emplace("blob_compression_type", GetBlobCompressionTags()); + options_tbl.emplace("enable_blob_garbage_collection", + std::vector{"false", "true"}); + options_tbl.emplace( + "blob_garbage_collection_age_cutoff", + std::vector{"0.0", "0.25", "0.5", "0.75", "1.0"}); + } + options_table_ = std::move(options_tbl); for (const auto& iter : options_table_) { @@ -1869,7 +1900,7 @@ void StressTest::PrintEnv() const { fprintf(stdout, "TransactionDB : %s\n", FLAGS_use_txn ? "true" : "false"); #ifndef ROCKSDB_LITE - fprintf(stdout, "BlobDB : %s\n", + fprintf(stdout, "Stacked BlobDB : %s\n", FLAGS_use_blob_db ? "true" : "false"); #endif // !ROCKSDB_LITE fprintf(stdout, "Read only mode : %s\n", @@ -2083,6 +2114,17 @@ void StressTest::Open() { options_.file_checksum_gen_factory = GetFileChecksumImpl(FLAGS_file_checksum_impl); options_.track_and_verify_wals_in_manifest = true; + + // Integrated BlobDB + options_.enable_blob_files = FLAGS_enable_blob_files; + options_.min_blob_size = FLAGS_min_blob_size; + options_.blob_file_size = FLAGS_blob_file_size; + options_.blob_compression_type = + StringToCompressionType(FLAGS_blob_compression_type.c_str()); + options_.enable_blob_garbage_collection = + FLAGS_enable_blob_garbage_collection; + options_.blob_garbage_collection_age_cutoff = + FLAGS_blob_garbage_collection_age_cutoff; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n"); @@ -2172,6 +2214,31 @@ void StressTest::Open() { options_.best_efforts_recovery = FLAGS_best_efforts_recovery; options_.paranoid_file_checks = FLAGS_paranoid_file_checks; + if ((options_.enable_blob_files || options_.enable_blob_garbage_collection || + FLAGS_allow_setting_blob_options_dynamically) && + (FLAGS_use_merge || FLAGS_enable_compaction_filter || + FLAGS_checkpoint_one_in > 0 || FLAGS_backup_one_in > 0 || + FLAGS_best_efforts_recovery)) { + fprintf( + stderr, + "Integrated BlobDB is currently incompatible with Merge, compaction " + "filters, checkpoints, backup/restore, and best-effort recovery\n"); + exit(1); + } + + if (options_.enable_blob_files) { + fprintf(stdout, + "Integrated BlobDB: blob files enabled, min blob size %" PRIu64 + ", blob file size %" PRIu64 ", blob compression type %s\n", + options_.min_blob_size, options_.blob_file_size, + CompressionTypeToString(options_.blob_compression_type).c_str()); + } + + if (options_.enable_blob_garbage_collection) { + fprintf(stdout, "Integrated BlobDB: blob GC enabled, cutoff %f\n", + options_.blob_garbage_collection_age_cutoff); + } + fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); Status s; @@ -2229,6 +2296,7 @@ void StressTest::Open() { options_.create_missing_column_families = true; if (!FLAGS_use_txn) { #ifndef ROCKSDB_LITE + // StackableDB-based BlobDB if (FLAGS_use_blob_db) { blob_db::BlobDBOptions blob_db_options; blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 426af3bd0..8fc15def1 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -24,6 +24,8 @@ class StressTest { std::shared_ptr NewCache(size_t capacity); + static std::vector GetBlobCompressionTags(); + bool BuildOptionsTable(); void InitDb(); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index f59524f9f..32a6abe1b 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -265,6 +265,23 @@ best_efforts_recovery_params = { "continuous_verification_interval": 0, } +blob_params = { + "allow_setting_blob_options_dynamically": 1, + # Enable blob files and GC with a 75% chance initially; note that they might still be + # enabled/disabled during the test via SetOptions + "enable_blob_files": lambda: random.choice([0] + [1] * 3), + "min_blob_size": lambda: random.choice([0, 16, 256]), + "blob_file_size": lambda: random.choice([1048576, 16777216, 268435456, 1073741824]), + "blob_compression_type": lambda: random.choice(["none", "snappy", "lz4", "zstd"]), + "enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3), + "blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]), + # The following are currently incompatible with the integrated BlobDB + "use_merge": 0, + "enable_compaction_filter": 0, + "backup_one_in": 0, + "checkpoint_one_in": 0, +} + def finalize_and_sanitize(src_params): dest_params = dict([(k, v() if callable(v) else v) for (k, v) in src_params.items()]) @@ -356,6 +373,12 @@ def gen_cmd_params(args): if args.test_best_efforts_recovery: params.update(best_efforts_recovery_params) + # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery + # if specified on the command line; otherwise, apply BlobDB related overrides + # with a 10% chance. + if not args.test_best_efforts_recovery and random.choice([0] * 9 + [1]) == 1: + params.update(blob_params) + for k, v in vars(args).items(): if v is not None: params[k] = v @@ -628,7 +651,8 @@ def main(): + list(whitebox_default_params.items()) + list(simple_default_params.items()) + list(blackbox_simple_default_params.items()) - + list(whitebox_simple_default_params.items())) + + list(whitebox_simple_default_params.items()) + + list(blob_params.items())) for k, v in all_params.items(): parser.add_argument("--" + k, type=type(v() if callable(v) else v))