From 3ee6c9baec6574433440018ecf58c0b05dc8b2e3 Mon Sep 17 00:00:00 2001 From: zczhu <> Date: Mon, 6 Jun 2022 18:32:26 -0700 Subject: [PATCH] Consolidate manual_compaction_paused_ check (#10070) Summary: As pointed out by [https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422](https://github.com/facebook/rocksdb/pull/8351#discussion_r645765422), check `manual_compaction_paused` and `manual_compaction_canceled` can be reduced by setting `*canceled` to be true in `DisableManualCompaction()` and `*canceled` to be false in the last time calling `EnableManualCompaction()`. Changed Tests: The origin `DBTest2.PausingManualCompaction1` uses a callback function to increase `manual_compaction_paused` and the origin CompactionJob/CompactionIterator with `manual_compaction_paused` can detect this. I changed the callback function so that it sets `*canceled` as true if `canceled` is not `nullptr` (to notify CompactionJob/CompactionIterator the compaction has been canceled). Pull Request resolved: https://github.com/facebook/rocksdb/pull/10070 Test Plan: This change does not introduce new features, but some slight difference in compaction implementation. Run the same manual compaction unit tests as before (e.g., PausingManualCompaction[1-4], CancelManualCompaction[1-2], CancelManualCompactionWithListener in db_test2, and db_compaction_test). Reviewed By: ajkr Differential Revision: D36949133 Pulled By: littlepig2013 fbshipit-source-id: c5dc4c956fbf8f624003a0f5ad2690240063a821 --- db/builder.cc | 7 +-- db/compaction/compaction_iterator.cc | 15 ++--- db/compaction/compaction_iterator.h | 73 +++++++++++------------ db/compaction/compaction_iterator_test.cc | 8 +-- db/compaction/compaction_job.cc | 27 ++++----- db/compaction/compaction_job.h | 8 +-- db/compaction/compaction_job_test.cc | 7 ++- db/db_impl/db_impl.h | 16 ++++- db/db_impl/db_impl_compaction_flush.cc | 34 +++++++---- db/db_impl/db_impl_secondary.cc | 4 +- db/db_test2.cc | 43 +++++++++---- db/flush_job.cc | 6 +- include/rocksdb/options.h | 7 +++ 13 files changed, 150 insertions(+), 105 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2d2a86676..f0c24de6c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -193,6 +193,7 @@ Status BuildTable( &blob_file_paths, blob_file_additions) : nullptr); + const std::atomic kManualCompactionCanceledFalse{false}; CompactionIterator c_iter( iter, tboptions.internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot, @@ -201,11 +202,9 @@ Status BuildTable( true /* internal key corruption is not ok */, range_del_agg.get(), blob_file_builder.get(), ioptions.allow_data_in_errors, ioptions.enforce_single_del_contracts, + /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, /*compaction=*/nullptr, compaction_filter.get(), - /*shutting_down=*/nullptr, - /*manual_compaction_paused=*/nullptr, - /*manual_compaction_canceled=*/nullptr, db_options.info_log, - full_history_ts_low); + /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 327f1f121..8d534cf7f 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -28,11 +28,10 @@ CompactionIterator::CompactionIterator( Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, - bool enforce_single_del_contracts, const Compaction* compaction, - const CompactionFilter* compaction_filter, + bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, + const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, - const std::atomic* manual_compaction_paused, - const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : CompactionIterator( @@ -40,10 +39,10 @@ CompactionIterator::CompactionIterator( earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, blob_file_builder, allow_data_in_errors, enforce_single_del_contracts, + manual_compaction_canceled, std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), - compaction_filter, shutting_down, manual_compaction_paused, - manual_compaction_canceled, info_log, full_history_ts_low) {} + compaction_filter, shutting_down, info_log, full_history_ts_low) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -54,11 +53,10 @@ CompactionIterator::CompactionIterator( CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, - const std::atomic* manual_compaction_paused, - const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : input_(input, cmp, @@ -78,7 +76,6 @@ CompactionIterator::CompactionIterator( compaction_(std::move(compaction)), compaction_filter_(compaction_filter), shutting_down_(shutting_down), - manual_compaction_paused_(manual_compaction_paused), manual_compaction_canceled_(manual_compaction_canceled), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 024919150..acc6f417f 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -167,39 +167,42 @@ class CompactionIterator { const Compaction* compaction_; }; - CompactionIterator( - InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, - SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, - Env* env, bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, - bool enforce_single_del_contracts, const Compaction* compaction = nullptr, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const std::atomic* manual_compaction_paused = nullptr, - const std::atomic* manual_compaction_canceled = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SequenceNumber job_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, + bool allow_data_in_errors, + bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. - CompactionIterator( - InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, - SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, - Env* env, bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, - bool enforce_single_del_contracts, - std::unique_ptr compaction, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const std::atomic* manual_compaction_paused = nullptr, - const std::atomic* manual_compaction_canceled = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SequenceNumber job_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, + bool allow_data_in_errors, + bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); @@ -320,8 +323,7 @@ class CompactionIterator { std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; - const std::atomic* manual_compaction_paused_; - const std::atomic* manual_compaction_canceled_; + const std::atomic& manual_compaction_canceled_; bool bottommost_level_; bool valid_ = false; bool visible_at_tip_; @@ -426,10 +428,7 @@ class CompactionIterator { bool IsPausingManualCompaction() { // This is a best-effort facility, so memory_order_relaxed is sufficient. - return (manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || - (manual_compaction_canceled_ && - manual_compaction_canceled_->load(std::memory_order_relaxed)); + return manual_compaction_canceled_.load(std::memory_order_relaxed); } }; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 79f598c62..a5d8d30c3 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -279,10 +279,9 @@ class CompactionIteratorTest : public testing::TestWithParam { snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, - true /*enforce_single_del_contracts*/, std::move(compaction), filter, - &shutting_down_, - /*manual_compaction_paused=*/nullptr, - /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr, + true /*enforce_single_del_contracts*/, + /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_, + std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr, full_history_ts_low)); } @@ -341,6 +340,7 @@ class CompactionIteratorTest : public testing::TestWithParam { std::unique_ptr range_del_agg_; std::unique_ptr snapshot_checker_; std::atomic shutting_down_{false}; + const std::atomic kManualCompactionCanceledFalse_{false}; FakeCompaction* compaction_proxy_; }; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index bf1895ddc..38b015bb6 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -429,8 +429,7 @@ CompactionJob::CompactionJob( bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused, - const std::atomic* manual_compaction_canceled, + const std::atomic& manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, std::string full_history_ts_low, std::string trim_ts, BlobFileCompletionCallback* blob_callback) @@ -456,7 +455,6 @@ CompactionJob::CompactionJob( fs_->OptimizeForCompactionTableRead(file_options, db_options_)), versions_(versions), shutting_down_(shutting_down), - manual_compaction_paused_(manual_compaction_paused), manual_compaction_canceled_(manual_compaction_canceled), db_directory_(db_directory), blob_output_directory_(blob_output_directory), @@ -1256,8 +1254,8 @@ void CompactionJob::NotifyOnSubcompactionBegin( if (shutting_down_->load(std::memory_order_acquire)) { return; } - if (c->is_manual_compaction() && manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_acquire) > 0) { + if (c->is_manual_compaction() && + manual_compaction_canceled_.load(std::memory_order_acquire)) { return; } @@ -1470,7 +1468,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:1", reinterpret_cast( - const_cast*>(manual_compaction_paused_))); + const_cast*>(&manual_compaction_canceled_))); Status status; const std::string* const full_history_ts_low = @@ -1484,9 +1482,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, - db_options_.enforce_single_del_contracts, sub_compact->compaction, - compaction_filter, shutting_down_, manual_compaction_paused_, - manual_compaction_canceled_, db_options_.info_log, full_history_ts_low)); + db_options_.enforce_single_del_contracts, manual_compaction_canceled_, + sub_compact->compaction, compaction_filter, shutting_down_, + db_options_.info_log, full_history_ts_low)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -1568,7 +1566,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:2", reinterpret_cast( - const_cast*>(manual_compaction_paused_))); + const_cast*>(&manual_compaction_canceled_))); if (partitioner.get()) { last_key_for_partitioner.assign(c_iter->user_key().data_, c_iter->user_key().size_); @@ -1647,10 +1645,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::ShutdownInProgress("Database shutdown"); } if ((status.ok() || status.IsColumnFamilyDropped()) && - ((manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || - (manual_compaction_canceled_ && - manual_compaction_canceled_->load(std::memory_order_relaxed)))) { + (manual_compaction_canceled_.load(std::memory_order_relaxed))) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } if (status.ok()) { @@ -2527,7 +2522,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( std::vector existing_snapshots, std::shared_ptr table_cache, EventLogger* event_logger, const std::string& dbname, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_canceled, + const std::atomic& manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, const std::string& output_path, const CompactionServiceInput& compaction_service_input, @@ -2540,7 +2535,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, - nullptr, manual_compaction_canceled, db_id, db_session_id, + manual_compaction_canceled, db_id, db_session_id, compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(output_path), compaction_input_(compaction_service_input), diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 17d6f1051..a8f0e4eeb 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -78,8 +78,7 @@ class CompactionJob { bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused = nullptr, - const std::atomic* manual_compaction_canceled = nullptr, + const std::atomic& manual_compaction_canceled, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", std::string trim_ts = "", BlobFileCompletionCallback* blob_callback = nullptr); @@ -195,8 +194,7 @@ class CompactionJob { FileOptions file_options_for_read_; VersionSet* versions_; const std::atomic* shutting_down_; - const std::atomic* manual_compaction_paused_; - const std::atomic* manual_compaction_canceled_; + const std::atomic& manual_compaction_canceled_; FSDirectory* db_directory_; FSDirectory* blob_output_directory_; InstrumentedMutex* db_mutex_; @@ -357,7 +355,7 @@ class CompactionServiceCompactionJob : private CompactionJob { std::vector existing_snapshots, std::shared_ptr table_cache, EventLogger* event_logger, const std::string& dbname, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_canceled, + const std::atomic& manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, const std::string& output_path, const CompactionServiceInput& compaction_service_input, diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 403fd1877..a704226d4 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -353,6 +353,7 @@ class CompactionJobTestBase : public testing::Test { SnapshotChecker* snapshot_checker = nullptr; ASSERT_TRUE(full_history_ts_low_.empty() || ucmp_->timestamp_size() == full_history_ts_low_.size()); + const std::atomic kManualCompactionCanceledFalse{false}; CompactionJob compaction_job( 0, &compaction, db_options_, mutable_db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, @@ -360,9 +361,9 @@ class CompactionJobTestBase : public testing::Test { earliest_write_conflict_snapshot, snapshot_checker, nullptr, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, - /*manual_compaction_paused=*/nullptr, - /*manual_compaction_canceled=*/nullptr, env_->GenerateUniqueId(), - DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_); + /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, + env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr), + full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 880d30a91..4e192e831 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1216,6 +1216,9 @@ class DBImpl : public DB { InstrumentedMutex trace_mutex_; BlockCacheTracer block_cache_tracer_; + // constant false canceled flag, used when the compaction is not manual + const std::atomic kManualCompactionCanceledFalse_{false}; + // State below is protected by mutex_ // With two_write_queues enabled, some of the variables that accessed during // WriteToWAL need different synchronization: log_empty_, alive_log_files_, @@ -1603,7 +1606,11 @@ class DBImpl : public DB { output_path_id(_output_path_id), exclusive(_exclusive), disallow_trivial_move(_disallow_trivial_move), - canceled(_canceled) {} + canceled(_canceled ? *_canceled : canceled_internal_storage) {} + // When _canceled is not provided by ther user, we assign the reference of + // canceled_internal_storage to it to consolidate canceled and + // manual_compaction_paused since DisableManualCompaction() might be + // called ColumnFamilyData* cfd; int input_level; @@ -1620,7 +1627,12 @@ class DBImpl : public DB { InternalKey* manual_end = nullptr; // how far we are compacting InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress - std::atomic* canceled; // Compaction canceled by the user? + + // When the user provides a canceled pointer in CompactRangeOptions, the + // above varaibe is the reference of the user-provided + // `canceled`, otherwise, it is the reference of canceled_internal_storage + std::atomic canceled_internal_storage = false; + std::atomic& canceled; // Compaction canceled pointer reference }; struct PrepickedCompaction { // background compaction takes ownership of `compaction`. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f552310e8..c84a745a6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1217,6 +1217,10 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, // Perform CompactFiles TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); + TEST_SYNC_POINT_CALLBACK( + "TestCompactFiles:PausingManualCompaction:3", + reinterpret_cast( + const_cast*>(&manual_compaction_paused_))); { InstrumentedMutexLock l(&mutex_); @@ -1372,7 +1376,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - &manual_compaction_paused_, nullptr, db_id_, db_session_id_, + kManualCompactionCanceledFalse_, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), &blob_callback_); @@ -1838,8 +1842,7 @@ Status DBImpl::RunManualCompaction( // and `CompactRangeOptions::canceled` might not work well together. while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { - if (manual_compaction_paused_ > 0 || - (manual.canceled != nullptr && *manual.canceled == true)) { + if (manual_compaction_paused_ > 0 || manual.canceled == true) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. manual.done = true; @@ -2376,10 +2379,18 @@ Status DBImpl::EnableAutoCompaction( return s; } +// NOTE: Calling DisableManualCompaction() may overwrite the +// user-provided canceled variable in CompactRangeOptions void DBImpl::DisableManualCompaction() { InstrumentedMutexLock l(&mutex_); manual_compaction_paused_.fetch_add(1, std::memory_order_release); + // Mark the canceled as true when the cancellation is triggered by + // manual_compaction_paused (may overwrite user-provided `canceled`) + for (const auto& manual_compaction : manual_compaction_dequeue_) { + manual_compaction->canceled = true; + } + // Wake up manual compactions waiting to start. bg_cv_.SignalAll(); @@ -2392,6 +2403,11 @@ void DBImpl::DisableManualCompaction() { } } +// NOTE: In contrast to DisableManualCompaction(), calling +// EnableManualCompaction() does NOT overwrite the user-provided *canceled +// variable to be false since there is NO CHANCE a canceled compaction +// is uncanceled. In other words, a canceled compaction must have been +// dropped out of the manual compaction queue, when we disable it. void DBImpl::EnableManualCompaction() { InstrumentedMutexLock l(&mutex_); assert(manual_compaction_paused_ > 0); @@ -3037,10 +3053,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); } else if (is_manual && - manual_compaction_paused_.load(std::memory_order_acquire) > 0) { - status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - } else if (is_manual && manual_compaction->canceled && - manual_compaction->canceled->load(std::memory_order_acquire)) { + manual_compaction->canceled.load(std::memory_order_acquire)) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { @@ -3357,6 +3370,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, GetSnapshotContext(job_context, &snapshot_seqs, &earliest_write_conflict_snapshot, &snapshot_checker); assert(is_snapshot_supported_ || snapshots_.empty()); + CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_, file_options_for_compaction_, versions_.get(), @@ -3368,9 +3382,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, - is_manual ? &manual_compaction_paused_ : nullptr, - is_manual ? manual_compaction->canceled : nullptr, db_id_, - db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + is_manual ? manual_compaction->canceled + : kManualCompactionCanceledFalse_, + db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), &blob_callback_); compaction_job.Prepare(); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index ab75819e4..55fd72acd 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -820,8 +820,8 @@ Status DBImplSecondary::CompactWithoutInstallation( file_options_for_compaction_, versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_, - options.canceled, input.db_id, db_session_id_, secondary_path_, input, - result); + options.canceled ? *options.canceled : kManualCompactionCanceledFalse_, + input.db_id, db_session_id_, secondary_path_, input, result); mutex_.Unlock(); s = compaction_job.Run(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 52bc1a78d..493e01f35 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3081,10 +3081,21 @@ TEST_F(DBTest2, PausingManualCompaction1) { int manual_compactions_paused = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) { + auto canceled = static_cast*>(arg); + // CompactRange triggers manual compaction and cancel the compaction + // by set *canceled as true + if (canceled != nullptr) { + canceled->store(true, std::memory_order_release); + } + manual_compactions_paused += 1; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) { auto paused = static_cast*>(arg); + // CompactFiles() relies on manual_compactions_paused to + // determine if thie compaction should be paused or not ASSERT_EQ(0, paused->load(std::memory_order_acquire)); paused->fetch_add(1, std::memory_order_release); - manual_compactions_paused += 1; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -3149,7 +3160,7 @@ TEST_F(DBTest2, PausingManualCompaction2) { Random rnd(301); for (int i = 0; i < 2; i++) { - // Generate a file containing 10 keys. + // Generate a file containing 100 keys. for (int j = 0; j < 100; j++) { ASSERT_OK(Put(Key(j), rnd.RandomString(50))); } @@ -3248,10 +3259,21 @@ TEST_F(DBTest2, PausingManualCompaction4) { int run_manual_compactions = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) { + auto canceled = static_cast*>(arg); + // CompactRange triggers manual compaction and cancel the compaction + // by set *canceled as true + if (canceled != nullptr) { + canceled->store(true, std::memory_order_release); + } + run_manual_compactions++; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) { auto paused = static_cast*>(arg); + // CompactFiles() relies on manual_compactions_paused to + // determine if thie compaction should be paused or not ASSERT_EQ(0, paused->load(std::memory_order_acquire)); paused->fetch_add(1, std::memory_order_release); - run_manual_compactions++; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -3266,7 +3288,6 @@ TEST_F(DBTest2, PausingManualCompaction4) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( "CompactionJob::Run():PausingManualCompaction:2"); - dbfull()->EnableManualCompaction(); ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr)); ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); #ifndef ROCKSDB_LITE @@ -3515,8 +3536,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) { "CompactionJob::FinishCompactionOutputFile1", [&](void* /*arg*/) { running_compaction++; }); - // Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction - // not run, 4 Notify compaction end. + // Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable + // manual compaction in the callback function, 3 Compaction not run, + // 4 Notify compaction end. listener->code_ = Status::kIncomplete; listener->subcode_ = Status::SubCode::kManualCompactionPaused; @@ -3533,8 +3555,9 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) { listener->num_compaction_started_ = 0; listener->num_compaction_ended_ = 0; - // Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return - // without notifying), 3 Notify compaction end (return without notifying). + // Case II: 1 Set *canceled as true in the callback function to disable manual + // compaction, 2 Notify begin compaction (return without notifying), 3 Notify + // compaction end (return without notifying). ASSERT_TRUE(dbfull() ->CompactRange(compact_options, nullptr, nullptr) .IsManualCompactionPaused()); @@ -3545,8 +3568,8 @@ TEST_F(DBTest2, CancelManualCompactionWithListener) { ASSERT_EQ(running_compaction, 0); // Case III: 1 Notify begin compaction, 2 Compaction in between - // 3. DisableManualCompaction, , 4 Notify compaction end. - // compact_options.canceled->store(false, std::memory_order_release); + // 3. Set *canceled as true in the callback function to disable manual + // compaction, 4 Notify compaction end. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( "CompactionIterator:ProcessKV"); diff --git a/db/flush_job.cc b/db/flush_job.cc index 08d166de9..bd2fc9681 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -456,6 +456,7 @@ Status FlushJob::MemPurge() { snapshot_checker_); assert(job_context_); SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence(); + const std::atomic kManualCompactionCanceledFalse{false}; CompactionIterator c_iter( iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, kMaxSequenceNumber, &existing_snapshots_, @@ -464,10 +465,9 @@ Status FlushJob::MemPurge() { true /* internal key corruption is not ok */, range_del_agg.get(), nullptr, ioptions->allow_data_in_errors, ioptions->enforce_single_del_contracts, + /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, /*compaction=*/nullptr, compaction_filter.get(), - /*shutting_down=*/nullptr, - /*manual_compaction_paused=*/nullptr, - /*manual_compaction_canceled=*/nullptr, ioptions->info_log, + /*shutting_down=*/nullptr, ioptions->info_log, &(cfd_->GetFullHistoryTsLow())); // Set earliest sequence number in the new memtable diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index bc092b24d..8f54d063f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1834,6 +1834,13 @@ struct CompactRangeOptions { // Cancellation can be delayed waiting on automatic compactions when used // together with `exclusive_manual_compaction == true`. std::atomic* canceled = nullptr; + // NOTE: Calling DisableManualCompaction() overwrites the uer-provided + // canceled variable in CompactRangeOptions. + // Typically, when CompactRange is being called in one thread (t1) with + // canceled = false, and DisableManualCompaction is being called in the + // other thread (t2), manual compaction is disabled normally, even if the + // compaction iterator may still scan a few items before *canceled is + // set to true // If set to kForce, RocksDB will override enable_blob_file_garbage_collection // to true; if set to kDisable, RocksDB will override it to false, and