From 3dc6ebaf7470969c937c1d29801ea5a5b69cf9e9 Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Wed, 1 Jun 2022 19:40:26 -0700 Subject: [PATCH] Support specifying blob garbage collection parameters when CompactRange() (#10073) Summary: Garbage collection is generally controlled by the BlobDB configuration options `enable_blob_garbage_collection` and `blob_garbage_collection_age_cutoff`. However, there might be use cases where we would want to temporarily override these options while performing a manual compaction. (One use case would be doing a full key-space manual compaction with full=100% garbage collection age cutoff in order to minimize the space occupied by the database.) Our goal here is to make it possible to override the configured GC parameters when using the `CompactRange` API to perform manual compactions. This PR would involve: - Extending the `CompactRangeOptions` structure so clients can both force-enable and force-disable GC, as well as use a different cutoff than what's currently configured - Storing whether blob GC should actually be enabled during a certain manual compaction and the cutoff to use in the `Compaction` object (considering the above overrides) and passing it to `CompactionIterator` via `CompactionProxy` - Updating the BlobDB wiki to document the new options. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10073 Test Plan: Adding unit tests and adding the new options to the stress test tool. Reviewed By: ltamasi Differential Revision: D36848700 Pulled By: gangliao fbshipit-source-id: c878ef101d1c612429999f513453c319f75d78e9 --- HISTORY.md | 1 + db/compaction/compaction.cc | 22 +++++++-- db/compaction/compaction.h | 30 +++++++++--- db/compaction/compaction_iterator.h | 5 +- db/compaction/compaction_picker.cc | 12 +++-- db/db_compaction_test.cc | 68 +++++++++++++++++++++++++++ db_stress_tool/db_stress_test_base.cc | 16 +++++-- include/rocksdb/options.h | 24 ++++++++++ 8 files changed, 159 insertions(+), 19 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d0470e292..5b8a409b0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,7 @@ ### New Features * Add FileSystem::ReadAsync API in io_tracing +* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange. * Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file. ### Behavior changes diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 4d5245443..3f91da9c6 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -214,7 +214,9 @@ Compaction::Compaction( CompressionOptions _compression_opts, Temperature _output_temperature, uint32_t _max_subcompactions, std::vector _grandparents, bool _manual_compaction, const std::string& _trim_ts, double _score, - bool _deletion_compaction, CompactionReason _compaction_reason) + bool _deletion_compaction, CompactionReason _compaction_reason, + BlobGarbageCollectionPolicy _blob_garbage_collection_policy, + double _blob_garbage_collection_age_cutoff) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -240,7 +242,19 @@ Compaction::Compaction( trim_ts_(_trim_ts), is_trivial_move_(false), compaction_reason_(_compaction_reason), - notify_on_compaction_completion_(false) { + notify_on_compaction_completion_(false), + enable_blob_garbage_collection_( + _blob_garbage_collection_policy == BlobGarbageCollectionPolicy::kForce + ? true + : (_blob_garbage_collection_policy == + BlobGarbageCollectionPolicy::kDisable + ? false + : mutable_cf_options()->enable_blob_garbage_collection)), + blob_garbage_collection_age_cutoff_( + _blob_garbage_collection_age_cutoff < 0 || + _blob_garbage_collection_age_cutoff > 1 + ? mutable_cf_options()->blob_garbage_collection_age_cutoff + : _blob_garbage_collection_age_cutoff) { MarkFilesBeingCompacted(true); if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; @@ -325,8 +339,8 @@ bool Compaction::IsTrivialMove() const { } if (!(start_level_ != output_level_ && num_input_levels() == 1 && - input(0, 0)->fd.GetPathId() == output_path_id() && - InputCompressionMatchesOutput())) { + input(0, 0)->fd.GetPathId() == output_path_id() && + InputCompressionMatchesOutput())) { return false; } diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index fc473e293..ad9ec470c 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -81,7 +81,10 @@ class Compaction { std::vector grandparents, bool manual_compaction = false, const std::string& trim_ts = "", double score = -1, bool deletion_compaction = false, - CompactionReason compaction_reason = CompactionReason::kUnknown); + CompactionReason compaction_reason = CompactionReason::kUnknown, + BlobGarbageCollectionPolicy blob_garbage_collection_policy = + BlobGarbageCollectionPolicy::kUseDefault, + double blob_garbage_collection_age_cutoff = -1); // No copying allowed Compaction(const Compaction&) = delete; @@ -306,6 +309,14 @@ class Compaction { uint32_t max_subcompactions() const { return max_subcompactions_; } + bool enable_blob_garbage_collection() const { + return enable_blob_garbage_collection_; + } + + double blob_garbage_collection_age_cutoff() const { + return blob_garbage_collection_age_cutoff_; + } + // start and end are sub compact range. Null if no boundary. // This is used to filter out some input files' ancester's time range. uint64_t MinInputFileOldestAncesterTime(const InternalKey* start, @@ -332,8 +343,9 @@ class Compaction { // Get the atomic file boundaries for all files in the compaction. Necessary // in order to avoid the scenario described in - // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb - // down appropriate key boundaries to RangeDelAggregator during compaction. + // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and + // plumb down appropriate key boundaries to RangeDelAggregator during + // compaction. static std::vector PopulateWithAtomicBoundaries( VersionStorageInfo* vstorage, std::vector inputs); @@ -348,7 +360,7 @@ class Compaction { VersionStorageInfo* input_vstorage_; - const int start_level_; // the lowest level to be compacted + const int start_level_; // the lowest level to be compacted const int output_level_; // levels to which output files are stored uint64_t max_output_file_size_; uint64_t max_compaction_bytes_; @@ -359,7 +371,7 @@ class Compaction { VersionEdit edit_; const int number_levels_; ColumnFamilyData* cfd_; - Arena arena_; // Arena used to allocate space for file_levels_ + Arena arena_; // Arena used to allocate space for file_levels_ const uint32_t output_path_id_; CompressionType output_compression_; @@ -377,7 +389,7 @@ class Compaction { // State used to check for number of overlapping grandparent files // (grandparent == "output_level_ + 1") std::vector grandparents_; - const double score_; // score that was used to pick this compaction. + const double score_; // score that was used to pick this compaction. // Is this compaction creating a file in the bottom most level? const bool bottommost_level_; @@ -412,6 +424,12 @@ class Compaction { // Notify on compaction completion only if listener was notified on compaction // begin. bool notify_on_compaction_completion_; + + // Enable/disable GC collection for blobs during compaction. + bool enable_blob_garbage_collection_; + + // Blob garbage collection age cutoff. + double blob_garbage_collection_age_cutoff_; }; // Return sum of sizes of all files in `files`. diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index dc994aa5f..024919150 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -142,12 +142,11 @@ class CompactionIterator { } bool enable_blob_garbage_collection() const override { - return compaction_->mutable_cf_options()->enable_blob_garbage_collection; + return compaction_->enable_blob_garbage_collection(); } double blob_garbage_collection_age_cutoff() const override { - return compaction_->mutable_cf_options() - ->blob_garbage_collection_age_cutoff; + return compaction_->blob_garbage_collection_age_cutoff(); } uint64_t blob_compaction_readahead_size() const override { diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 1f29004ae..07d241de9 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -640,7 +640,11 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(vstorage, mutable_cf_options, output_level, 1), GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, - /* grandparents */ {}, /* is manual */ true, trim_ts); + /* grandparents */ {}, /* is manual */ true, trim_ts, /* score */ -1, + /* deletion_compaction */ false, CompactionReason::kUnknown, + compact_range_options.blob_garbage_collection_policy, + compact_range_options.blob_garbage_collection_age_cutoff); + RegisterCompaction(c); vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); return c; @@ -818,8 +822,10 @@ Compaction* CompactionPicker::CompactRange( vstorage->base_level()), GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, - std::move(grandparents), - /* is manual compaction */ true, trim_ts); + std::move(grandparents), /* is manual */ true, trim_ts, /* score */ -1, + /* deletion_compaction */ false, CompactionReason::kUnknown, + compact_range_options.blob_garbage_collection_policy, + compact_range_options.blob_garbage_collection_age_cutoff); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); RegisterCompaction(compaction); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c49b3c257..d307cadbb 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6387,6 +6387,74 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC, ::testing::Combine(::testing::Values(0.0, 0.5, 1.0), ::testing::Bool())); +TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.blob_file_size = 32; // one blob per file + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0; + + DestroyAndReopen(options); + + for (int i = 0; i < 128; i += 2) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + ASSERT_OK( + Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1))); + ASSERT_OK(Flush()); + } + + std::vector original_blob_files = GetBlobFileNumbers(); + ASSERT_EQ(original_blob_files.size(), 128); + + // Note: turning off enable_blob_files before the compaction results in + // garbage collected values getting inlined. + ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}})); + + CompactRangeOptions cro; + cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce; + cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_; + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // Check that the GC stats are correct + { + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + ASSERT_GE(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); + } + + const size_t cutoff_index = static_cast( + cro.blob_garbage_collection_age_cutoff * original_blob_files.size()); + const size_t expected_num_files = original_blob_files.size() - cutoff_index; + + const std::vector new_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(new_blob_files.size(), expected_num_files); + + // Original blob files below the cutoff should be gone, original blob files + // at or above the cutoff should be still there + for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) { + ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); + } + + for (size_t i = 0; i < 128; ++i) { + ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i)); + } +} + TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { Options options; options.env = env_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 8b80f4d21..d3e103b5f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -681,7 +681,7 @@ void StressTest::OperateDb(ThreadState* thread) { #ifndef NDEBUG if (FLAGS_read_fault_one_in) { fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), - FLAGS_read_fault_one_in); + FLAGS_read_fault_one_in); } #endif // NDEBUG if (FLAGS_write_fault_one_in) { @@ -2119,6 +2119,15 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, static_cast(bottom_level_styles.size())]; cro.allow_write_stall = static_cast(thread->rand.Next() % 2); cro.max_subcompactions = static_cast(thread->rand.Next() % 4); + std::vector blob_gc_policies = { + BlobGarbageCollectionPolicy::kForce, + BlobGarbageCollectionPolicy::kDisable, + BlobGarbageCollectionPolicy::kUseDefault}; + cro.blob_garbage_collection_policy = + blob_gc_policies[thread->rand.Next() % + static_cast(blob_gc_policies.size())]; + cro.blob_garbage_collection_age_cutoff = + static_cast(thread->rand.Next() % 100) / 100.0; const Snapshot* pre_snapshot = nullptr; uint32_t pre_hash = 0; @@ -2322,7 +2331,8 @@ void StressTest::PrintEnv() const { fprintf(stdout, "Open metadata write fault one in:\n"); fprintf(stdout, " %d\n", FLAGS_open_metadata_write_fault_one_in); - fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); + fprintf(stdout, "Sync fault injection : %d\n", + FLAGS_sync_fault_injection); fprintf(stdout, "Best efforts recovery : %d\n", static_cast(FLAGS_best_efforts_recovery)); fprintf(stdout, "Fail if OPTIONS file error: %d\n", @@ -2697,7 +2707,7 @@ void StressTest::Reopen(ThreadState* thread) { } assert(!write_prepared || bg_canceled); #else - (void) thread; + (void)thread; #endif for (auto cf : column_families_) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2a9bebfa5..bc092b24d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1791,6 +1791,17 @@ enum class BottommostLevelCompaction { kForceOptimized, }; +// For manual compaction, we can configure if we want to skip/force garbage +// collection of blob files. +enum class BlobGarbageCollectionPolicy { + // Force blob file garbage collection. + kForce, + // Skip blob file garbage collection. + kDisable, + // Inherit blob file garbage collection policy from ColumnFamilyOptions. + kUseDefault, +}; + // CompactRangeOptions is used by CompactRange() call. struct CompactRangeOptions { // If true, no other compaction will run at the same time as this @@ -1823,6 +1834,19 @@ struct CompactRangeOptions { // Cancellation can be delayed waiting on automatic compactions when used // together with `exclusive_manual_compaction == true`. std::atomic* canceled = nullptr; + + // If set to kForce, RocksDB will override enable_blob_file_garbage_collection + // to true; if set to kDisable, RocksDB will override it to false, and + // kUseDefault leaves the setting in effect. This enables customers to both + // force-enable and force-disable GC when calling CompactRange. + BlobGarbageCollectionPolicy blob_garbage_collection_policy = + BlobGarbageCollectionPolicy::kUseDefault; + + // If set to < 0 or > 1, RocksDB leaves blob_garbage_collection_age_cutoff + // from ColumnFamilyOptions in effect. Otherwise, it will override the + // user-provided setting. This enables customers to selectively override the + // age cutoff. + double blob_garbage_collection_age_cutoff = -1; }; // IngestExternalFileOptions is used by IngestExternalFile()