From dc1c90c4e3dc2e32334b62adab017c7fb4dc203c Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Wed, 13 Apr 2022 13:28:09 -0700 Subject: [PATCH] Support canceling running RemoteCompaction on remote side (#9725) Summary: Add the ability to cancel remote compaction on the remote side by setting `OpenAndCompactOptions.canceled` to true. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9725 Test Plan: added unittest Reviewed By: ajkr Differential Revision: D35018800 Pulled By: jay-zhuang fbshipit-source-id: be3652f9645e0347df429e42a5614d5a9b3a1ec4 --- db/compaction/compaction_job.cc | 3 +- db/compaction/compaction_job.h | 1 + db/compaction/compaction_service_test.cc | 54 +++++++++++++++++++++++- db/db_impl/db_impl_secondary.cc | 31 ++++++++++---- db/db_impl/db_impl_secondary.h | 8 ++-- db/db_secondary_test.cc | 32 +++++++------- include/rocksdb/db.h | 6 +++ include/rocksdb/options.h | 5 +++ 8 files changed, 111 insertions(+), 29 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 6a0630b97..e4ec263b5 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2487,6 +2487,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( std::vector existing_snapshots, std::shared_ptr table_cache, EventLogger* event_logger, const std::string& dbname, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, const std::string& output_path, const CompactionServiceInput& compaction_service_input, @@ -2499,7 +2500,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, - nullptr, nullptr, db_id, db_session_id, + nullptr, manual_compaction_canceled, db_id, db_session_id, compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(output_path), compaction_input_(compaction_service_input), diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 17b4feccd..8bec6fe85 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -343,6 +343,7 @@ class CompactionServiceCompactionJob : private CompactionJob { std::vector existing_snapshots, std::shared_ptr table_cache, EventLogger* event_logger, const std::string& dbname, const std::shared_ptr& io_tracer, + const std::atomic* manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, const std::string& output_path, const CompactionServiceInput& compaction_service_input, diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 9d4a19077..926f80782 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -78,8 +78,12 @@ class MyTestCompactionService : public CompactionService { options_override.listeners = listeners_; } + OpenAndCompactOptions options; + options.canceled = &canceled_; + Status s = DB::OpenAndCompact( - db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), + options, db_path_, + db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), compaction_input, compaction_service_result, options_override); if (is_override_wait_result_) { *compaction_service_result = override_wait_result_; @@ -118,6 +122,8 @@ class MyTestCompactionService : public CompactionService { is_override_wait_status_ = false; } + void SetCanceled(bool canceled) { canceled_ = canceled; } + private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; @@ -136,6 +142,7 @@ class MyTestCompactionService : public CompactionService { bool is_override_wait_result_ = false; std::string override_wait_result_; std::vector> listeners_; + std::atomic_bool canceled_{false}; }; class CompactionServiceTest : public DBTestBase { @@ -331,6 +338,51 @@ TEST_F(CompactionServiceTest, ManualCompaction) { VerifyTestData(); } +TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + // Test cancel compaction at the beginning + my_cs->SetCanceled(true); + auto s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_TRUE(s.IsIncomplete()); + // compaction number is not increased + ASSERT_GE(my_cs->GetCompactionNum(), comp_num); + VerifyTestData(); + + // Test cancel compaction in progress + ReopenWithCompactionService(&options); + GenerateTestData(); + my_cs = GetCompactionService(); + my_cs->SetCanceled(false); + + std::atomic_bool cancel_issued{false}; + SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress", + [&](void* /*arg*/) { + cancel_issued = true; + my_cs->SetCanceled(true); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(cancel_issued); + // compaction number is not increased + ASSERT_GE(my_cs->GetCompactionNum(), comp_num); + VerifyTestData(); +} + TEST_F(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); options.disable_auto_compactions = true; diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 8f1051edb..d1a8709da 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -711,8 +711,11 @@ Status DB::OpenAsSecondary( } Status DBImplSecondary::CompactWithoutInstallation( - ColumnFamilyHandle* cfh, const CompactionServiceInput& input, - CompactionServiceResult* result) { + const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh, + const CompactionServiceInput& input, CompactionServiceResult* result) { + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } InstrumentedMutexLock l(&mutex_); auto cfd = static_cast_with_check(cfh)->cfd(); if (!cfd) { @@ -774,7 +777,7 @@ Status DBImplSecondary::CompactWithoutInstallation( file_options_for_compaction_, versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_, - db_id_, db_session_id_, secondary_path_, input, result); + options.canceled, db_id_, db_session_id_, secondary_path_, input, result); mutex_.Unlock(); s = compaction_job.Run(); @@ -793,9 +796,13 @@ Status DBImplSecondary::CompactWithoutInstallation( } Status DB::OpenAndCompact( - const std::string& name, const std::string& output_directory, - const std::string& input, std::string* result, + const OpenAndCompactOptions& options, const std::string& name, + const std::string& output_directory, const std::string& input, + std::string* output, const CompactionServiceOptionsOverride& override_options) { + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } CompactionServiceInput compaction_input; Status s = CompactionServiceInput::Read(input, &compaction_input); if (!s.ok()) { @@ -849,10 +856,10 @@ Status DB::OpenAndCompact( CompactionServiceResult compaction_result; DBImplSecondary* db_secondary = static_cast_with_check(db); assert(handles.size() > 0); - s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input, - &compaction_result); + s = db_secondary->CompactWithoutInstallation( + options, handles[0], compaction_input, &compaction_result); - Status serialization_status = compaction_result.Write(result); + Status serialization_status = compaction_result.Write(output); for (auto& handle : handles) { delete handle; @@ -864,6 +871,14 @@ Status DB::OpenAndCompact( return s; } +Status DB::OpenAndCompact( + const std::string& name, const std::string& output_directory, + const std::string& input, std::string* output, + const CompactionServiceOptionsOverride& override_options) { + return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input, + output, override_options); +} + #else // !ROCKSDB_LITE Status DB::OpenAsSecondary(const Options& /*options*/, diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 5e5d03b67..fcc86cc87 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -228,10 +228,11 @@ class DBImplSecondary : public DBImpl { Status CheckConsistency() override; #ifndef NDEBUG - Status TEST_CompactWithoutInstallation(ColumnFamilyHandle* cfh, + Status TEST_CompactWithoutInstallation(const OpenAndCompactOptions& options, + ColumnFamilyHandle* cfh, const CompactionServiceInput& input, CompactionServiceResult* result) { - return CompactWithoutInstallation(cfh, input, result); + return CompactWithoutInstallation(options, cfh, input, result); } #endif // NDEBUG @@ -346,7 +347,8 @@ class DBImplSecondary : public DBImpl { // Run compaction without installation, the output files will be placed in the // secondary DB path. The LSM tree won't be changed, the secondary DB is still // in read-only mode. - Status CompactWithoutInstallation(ColumnFamilyHandle* cfh, + Status CompactWithoutInstallation(const OpenAndCompactOptions& options, + ColumnFamilyHandle* cfh, const CompactionServiceInput& input, CompactionServiceResult* result); diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 881fcc8c9..dd9d8954e 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -188,8 +188,8 @@ TEST_F(DBSecondaryTest, SimpleInternalCompaction) { auto cfh = db_secondary_->DefaultColumnFamily(); CompactionServiceResult result; - ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, - &result)); + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result)); ASSERT_EQ(result.output_files.size(), 1); InternalKey smallest, largest; @@ -248,8 +248,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { OpenSecondary(options); auto cfh = db_secondary_->DefaultColumnFamily(); CompactionServiceResult result; - ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1, - &result)); + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input1, &result)); ASSERT_OK(result.status); // pick 2 files on level 1 for compaction, which has 6 overlap files on L2 @@ -261,8 +261,8 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { } input2.output_level = 2; - ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, - &result)); + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input2, &result)); ASSERT_OK(result.status); CloseSecondary(); @@ -273,15 +273,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { } OpenSecondary(options); cfh = db_secondary_->DefaultColumnFamily(); - Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, - &result); + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input2, &result); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_OK(result.status); // TODO: L0 -> L1 compaction should success, currently version is not built // if files is missing. - // ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, - // input1, &result)); + // ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(OpenAndCompactOptions(), + // cfh, input1, &result)); } TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) { @@ -319,8 +319,8 @@ TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) { auto cfh = db_secondary_->DefaultColumnFamily(); CompactionServiceResult result; - Status s = - db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_OK(result.status); } @@ -356,15 +356,15 @@ TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { auto cfh = db_secondary_->DefaultColumnFamily(); CompactionServiceResult result; - Status s = - db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); + Status s = db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_OK(result.status); input.input_files.erase(input.input_files.begin()); - ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, - &result)); + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation( + OpenAndCompactOptions(), cfh, input, &result)); ASSERT_OK(result.status); } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2b920f0b0..2172f21cd 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -290,6 +290,12 @@ class DB { const std::string& input, std::string* output, const CompactionServiceOptionsOverride& override_options); + static Status OpenAndCompact( + const OpenAndCompactOptions& options, const std::string& name, + const std::string& output_directory, const std::string& input, + std::string* output, + const CompactionServiceOptionsOverride& override_options); + // Experimental and subject to change // Open DB and trim data newer than specified timestamp. // The trim_ts specified the user-defined timestamp trim bound. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 727a543df..fe76ff2cd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1946,6 +1946,11 @@ struct CompactionServiceOptionsOverride { std::shared_ptr statistics = nullptr; }; +struct OpenAndCompactOptions { + // Allows cancellation of an in-progress compaction. + std::atomic* canceled = nullptr; +}; + #ifndef ROCKSDB_LITE struct LiveFilesStorageInfoOptions { // Whether to populate FileStorageInfo::file_checksum* or leave blank