diff --git a/HISTORY.md b/HISTORY.md index d0470e292..5b8a409b0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,7 @@ ### New Features * Add FileSystem::ReadAsync API in io_tracing +* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange. * Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file. ### Behavior changes diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 4d5245443..3f91da9c6 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -214,7 +214,9 @@ Compaction::Compaction( CompressionOptions _compression_opts, Temperature _output_temperature, uint32_t _max_subcompactions, std::vector _grandparents, bool _manual_compaction, const std::string& _trim_ts, double _score, - bool _deletion_compaction, CompactionReason _compaction_reason) + bool _deletion_compaction, CompactionReason _compaction_reason, + BlobGarbageCollectionPolicy _blob_garbage_collection_policy, + double _blob_garbage_collection_age_cutoff) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -240,7 +242,19 @@ Compaction::Compaction( trim_ts_(_trim_ts), is_trivial_move_(false), compaction_reason_(_compaction_reason), - notify_on_compaction_completion_(false) { + notify_on_compaction_completion_(false), + enable_blob_garbage_collection_( + _blob_garbage_collection_policy == BlobGarbageCollectionPolicy::kForce + ? true + : (_blob_garbage_collection_policy == + BlobGarbageCollectionPolicy::kDisable + ? false + : mutable_cf_options()->enable_blob_garbage_collection)), + blob_garbage_collection_age_cutoff_( + _blob_garbage_collection_age_cutoff < 0 || + _blob_garbage_collection_age_cutoff > 1 + ? mutable_cf_options()->blob_garbage_collection_age_cutoff + : _blob_garbage_collection_age_cutoff) { MarkFilesBeingCompacted(true); if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; @@ -325,8 +339,8 @@ bool Compaction::IsTrivialMove() const { } if (!(start_level_ != output_level_ && num_input_levels() == 1 && - input(0, 0)->fd.GetPathId() == output_path_id() && - InputCompressionMatchesOutput())) { + input(0, 0)->fd.GetPathId() == output_path_id() && + InputCompressionMatchesOutput())) { return false; } diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index fc473e293..ad9ec470c 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -81,7 +81,10 @@ class Compaction { std::vector grandparents, bool manual_compaction = false, const std::string& trim_ts = "", double score = -1, bool deletion_compaction = false, - CompactionReason compaction_reason = CompactionReason::kUnknown); + CompactionReason compaction_reason = CompactionReason::kUnknown, + BlobGarbageCollectionPolicy blob_garbage_collection_policy = + BlobGarbageCollectionPolicy::kUseDefault, + double blob_garbage_collection_age_cutoff = -1); // No copying allowed Compaction(const Compaction&) = delete; @@ -306,6 +309,14 @@ class Compaction { uint32_t max_subcompactions() const { return max_subcompactions_; } + bool enable_blob_garbage_collection() const { + return enable_blob_garbage_collection_; + } + + double blob_garbage_collection_age_cutoff() const { + return blob_garbage_collection_age_cutoff_; + } + // start and end are sub compact range. Null if no boundary. // This is used to filter out some input files' ancester's time range. uint64_t MinInputFileOldestAncesterTime(const InternalKey* start, @@ -332,8 +343,9 @@ class Compaction { // Get the atomic file boundaries for all files in the compaction. Necessary // in order to avoid the scenario described in - // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb - // down appropriate key boundaries to RangeDelAggregator during compaction. + // https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and + // plumb down appropriate key boundaries to RangeDelAggregator during + // compaction. static std::vector PopulateWithAtomicBoundaries( VersionStorageInfo* vstorage, std::vector inputs); @@ -348,7 +360,7 @@ class Compaction { VersionStorageInfo* input_vstorage_; - const int start_level_; // the lowest level to be compacted + const int start_level_; // the lowest level to be compacted const int output_level_; // levels to which output files are stored uint64_t max_output_file_size_; uint64_t max_compaction_bytes_; @@ -359,7 +371,7 @@ class Compaction { VersionEdit edit_; const int number_levels_; ColumnFamilyData* cfd_; - Arena arena_; // Arena used to allocate space for file_levels_ + Arena arena_; // Arena used to allocate space for file_levels_ const uint32_t output_path_id_; CompressionType output_compression_; @@ -377,7 +389,7 @@ class Compaction { // State used to check for number of overlapping grandparent files // (grandparent == "output_level_ + 1") std::vector grandparents_; - const double score_; // score that was used to pick this compaction. + const double score_; // score that was used to pick this compaction. // Is this compaction creating a file in the bottom most level? const bool bottommost_level_; @@ -412,6 +424,12 @@ class Compaction { // Notify on compaction completion only if listener was notified on compaction // begin. bool notify_on_compaction_completion_; + + // Enable/disable GC collection for blobs during compaction. + bool enable_blob_garbage_collection_; + + // Blob garbage collection age cutoff. + double blob_garbage_collection_age_cutoff_; }; // Return sum of sizes of all files in `files`. diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index dc994aa5f..024919150 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -142,12 +142,11 @@ class CompactionIterator { } bool enable_blob_garbage_collection() const override { - return compaction_->mutable_cf_options()->enable_blob_garbage_collection; + return compaction_->enable_blob_garbage_collection(); } double blob_garbage_collection_age_cutoff() const override { - return compaction_->mutable_cf_options() - ->blob_garbage_collection_age_cutoff; + return compaction_->blob_garbage_collection_age_cutoff(); } uint64_t blob_compaction_readahead_size() const override { diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 1f29004ae..07d241de9 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -640,7 +640,11 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(vstorage, mutable_cf_options, output_level, 1), GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, - /* grandparents */ {}, /* is manual */ true, trim_ts); + /* grandparents */ {}, /* is manual */ true, trim_ts, /* score */ -1, + /* deletion_compaction */ false, CompactionReason::kUnknown, + compact_range_options.blob_garbage_collection_policy, + compact_range_options.blob_garbage_collection_age_cutoff); + RegisterCompaction(c); vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); return c; @@ -818,8 +822,10 @@ Compaction* CompactionPicker::CompactRange( vstorage->base_level()), GetCompressionOptions(mutable_cf_options, vstorage, output_level), Temperature::kUnknown, compact_range_options.max_subcompactions, - std::move(grandparents), - /* is manual compaction */ true, trim_ts); + std::move(grandparents), /* is manual */ true, trim_ts, /* score */ -1, + /* deletion_compaction */ false, CompactionReason::kUnknown, + compact_range_options.blob_garbage_collection_policy, + compact_range_options.blob_garbage_collection_age_cutoff); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); RegisterCompaction(compaction); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c49b3c257..d307cadbb 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6387,6 +6387,74 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC, ::testing::Combine(::testing::Values(0.0, 0.5, 1.0), ::testing::Bool())); +TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.blob_file_size = 32; // one blob per file + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0; + + DestroyAndReopen(options); + + for (int i = 0; i < 128; i += 2) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + ASSERT_OK( + Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1))); + ASSERT_OK(Flush()); + } + + std::vector original_blob_files = GetBlobFileNumbers(); + ASSERT_EQ(original_blob_files.size(), 128); + + // Note: turning off enable_blob_files before the compaction results in + // garbage collected values getting inlined. + ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}})); + + CompactRangeOptions cro; + cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce; + cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_; + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // Check that the GC stats are correct + { + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + ASSERT_GE(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); + } + + const size_t cutoff_index = static_cast( + cro.blob_garbage_collection_age_cutoff * original_blob_files.size()); + const size_t expected_num_files = original_blob_files.size() - cutoff_index; + + const std::vector new_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(new_blob_files.size(), expected_num_files); + + // Original blob files below the cutoff should be gone, original blob files + // at or above the cutoff should be still there + for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) { + ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); + } + + for (size_t i = 0; i < 128; ++i) { + ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i)); + } +} + TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { Options options; options.env = env_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 8b80f4d21..d3e103b5f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -681,7 +681,7 @@ void StressTest::OperateDb(ThreadState* thread) { #ifndef NDEBUG if (FLAGS_read_fault_one_in) { fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), - FLAGS_read_fault_one_in); + FLAGS_read_fault_one_in); } #endif // NDEBUG if (FLAGS_write_fault_one_in) { @@ -2119,6 +2119,15 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, static_cast(bottom_level_styles.size())]; cro.allow_write_stall = static_cast(thread->rand.Next() % 2); cro.max_subcompactions = static_cast(thread->rand.Next() % 4); + std::vector blob_gc_policies = { + BlobGarbageCollectionPolicy::kForce, + BlobGarbageCollectionPolicy::kDisable, + BlobGarbageCollectionPolicy::kUseDefault}; + cro.blob_garbage_collection_policy = + blob_gc_policies[thread->rand.Next() % + static_cast(blob_gc_policies.size())]; + cro.blob_garbage_collection_age_cutoff = + static_cast(thread->rand.Next() % 100) / 100.0; const Snapshot* pre_snapshot = nullptr; uint32_t pre_hash = 0; @@ -2322,7 +2331,8 @@ void StressTest::PrintEnv() const { fprintf(stdout, "Open metadata write fault one in:\n"); fprintf(stdout, " %d\n", FLAGS_open_metadata_write_fault_one_in); - fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); + fprintf(stdout, "Sync fault injection : %d\n", + FLAGS_sync_fault_injection); fprintf(stdout, "Best efforts recovery : %d\n", static_cast(FLAGS_best_efforts_recovery)); fprintf(stdout, "Fail if OPTIONS file error: %d\n", @@ -2697,7 +2707,7 @@ void StressTest::Reopen(ThreadState* thread) { } assert(!write_prepared || bg_canceled); #else - (void) thread; + (void)thread; #endif for (auto cf : column_families_) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2a9bebfa5..bc092b24d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1791,6 +1791,17 @@ enum class BottommostLevelCompaction { kForceOptimized, }; +// For manual compaction, we can configure if we want to skip/force garbage +// collection of blob files. +enum class BlobGarbageCollectionPolicy { + // Force blob file garbage collection. + kForce, + // Skip blob file garbage collection. + kDisable, + // Inherit blob file garbage collection policy from ColumnFamilyOptions. + kUseDefault, +}; + // CompactRangeOptions is used by CompactRange() call. struct CompactRangeOptions { // If true, no other compaction will run at the same time as this @@ -1823,6 +1834,19 @@ struct CompactRangeOptions { // Cancellation can be delayed waiting on automatic compactions when used // together with `exclusive_manual_compaction == true`. std::atomic* canceled = nullptr; + + // If set to kForce, RocksDB will override enable_blob_file_garbage_collection + // to true; if set to kDisable, RocksDB will override it to false, and + // kUseDefault leaves the setting in effect. This enables customers to both + // force-enable and force-disable GC when calling CompactRange. + BlobGarbageCollectionPolicy blob_garbage_collection_policy = + BlobGarbageCollectionPolicy::kUseDefault; + + // If set to < 0 or > 1, RocksDB leaves blob_garbage_collection_age_cutoff + // from ColumnFamilyOptions in effect. Otherwise, it will override the + // user-provided setting. This enables customers to selectively override the + // age cutoff. + double blob_garbage_collection_age_cutoff = -1; }; // IngestExternalFileOptions is used by IngestExternalFile()