From a0c569ee1d8f631e2fcda2e8728114326f7ecfdb Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Tue, 15 Feb 2022 17:59:31 -0800 Subject: [PATCH] Cancel manual compaction in thread-pool queue (#9557) Summary: Fix `DisableManualCompaction()` has to wait scheduled manual compaction to start the execution to cancel the job. When a manual compaction in thread-pool queue is cancel, set the job is_canceled to true and clean the resource. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9557 Test Plan: added unittest that will hang without the change Reviewed By: ajkr Differential Revision: D34214910 Pulled By: jay-zhuang fbshipit-source-id: 89dbaee78ddf26eb13ce862c2b15f4a098b36a78 --- HISTORY.md | 1 + db/db_compaction_test.cc | 117 ++++++++++++++++ db/db_impl/db_impl.h | 1 + db/db_impl/db_impl_compaction_flush.cc | 181 ++++++++++++++----------- 4 files changed, 222 insertions(+), 78 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a0bbe59e0..adcf5d02f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ ### Performance Improvements * Mitigated the overhead of building the file location hash table used by the online LSM tree consistency checks, which can improve performance for certain workloads (see #9351). * Switched to using a sorted `std::vector` instead of `std::map` for storing the metadata objects for blob files, which can improve performance for certain workloads, especially when the number of blob files is high. +* DisableManualCompaction() doesn't have to wait scheduled manual compaction to be executed in thread-pool to cancel the job. ### Public API changes * Require C++17 compatible compiler (GCC >= 7, Clang >= 5, Visual Studio >= 2017). See #9388. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 8943f6b9d..dce922e97 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6797,6 +6797,123 @@ TEST_F(DBCompactionTest, FIFOWarm) { Destroy(options); } +TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { + const int kNumL0Files = 4; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::RunManualCompaction:Scheduled", + "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "PreDisableManualCompaction"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + // Block compaction queue + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "PreDisableManualCompaction"); + + // Generate more files to trigger auto compaction which is scheduled after + // manual compaction. Has to generate 4 more files because existing files are + // pending compaction + for (int i = 0; i < kNumL0Files; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + ASSERT_EQ(ToString(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0)); + + db_->DisableManualCompaction(); + + // CompactRange should return before the compaction has the chance to run + compact_thread.join(); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_EQ("0,1", FilesPerLevel(0)); +} + +TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { + const int kNumL0Files = 4; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::RunManualCompaction:Scheduled", + "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "PreDisableManualCompaction"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + // Block compaction queue + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "PreDisableManualCompaction"); + + // Generate more files to trigger auto compaction which is scheduled after + // manual compaction. Has to generate 4 more files because existing files are + // pending compaction + for (int i = 0; i < kNumL0Files; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + ASSERT_EQ(ToString(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0)); + + db_->DisableManualCompaction(); + + // CompactRange should return before the compaction has the chance to run + compact_thread.join(); + + // Try close DB while manual compaction is canceled but still in the queue. + // And an auto-triggered compaction is also in the queue. + auto s = db_->Close(); + ASSERT_OK(s); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); +} + TEST_F(DBCompactionTest, DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) { // When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index af4b1dd77..4e25121c7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1536,6 +1536,7 @@ class DBImpl : public DB { ManualCompactionState* manual_compaction_state; // nullptr if non-manual // task limiter token is requested during compaction picking. std::unique_ptr task_token; + bool is_canceled = false; }; struct CompactionArg { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index e249b82d9..0e1864788 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1760,7 +1760,7 @@ Status DBImpl::RunManualCompaction( input_level >= 0); InternalKey begin_storage, end_storage; - CompactionArg* ca; + CompactionArg* ca = nullptr; bool scheduled = false; bool manual_conflict = false; @@ -1879,6 +1879,16 @@ Status DBImpl::RunManualCompaction( assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); + if (manual_compaction_paused_ > 0 && !manual.done && + !manual.in_progress) { + manual.done = true; + manual.status = + Status::Incomplete(Status::SubCode::kManualCompactionPaused); + if (ca && ca->prepicked_compaction) { + ca->prepicked_compaction->is_canceled = true; + } + break; + } if (scheduled && manual.incomplete == true) { assert(!manual.in_progress); scheduled = false; @@ -1915,6 +1925,7 @@ Status DBImpl::RunManualCompaction( &DBImpl::UnscheduleCompactionCallback); } scheduled = true; + TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled"); } } @@ -2840,91 +2851,106 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, Env::Priority bg_thread_pri) { bool made_progress = false; - JobContext job_context(next_job_id_.fetch_add(1), true); TEST_SYNC_POINT("BackgroundCallCompaction:0"); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); - // This call will unlock/lock the mutex to wait for current running - // IngestExternalFile() calls to finish. - WaitForIngestFile(); - - num_running_compactions_++; - - std::unique_ptr::iterator> - pending_outputs_inserted_elem(new std::list::iterator( - CaptureCurrentFileNumberInPendingOutputs())); - - assert((bg_thread_pri == Env::Priority::BOTTOM && - bg_bottom_compaction_scheduled_) || - (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); - Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, - prepicked_compaction, bg_thread_pri); - TEST_SYNC_POINT("BackgroundCallCompaction:1"); - if (s.IsBusy()) { - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - immutable_db_options_.clock->SleepForMicroseconds( - 10000); // prevent hot loop - mutex_.Lock(); - } else if (!s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - uint64_t error_cnt = - default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - ROCKS_LOG_ERROR(immutable_db_options_.info_log, - "Waiting after background compaction error: %s, " - "Accumulated background error counts: %" PRIu64, - s.ToString().c_str(), error_cnt); - LogFlush(immutable_db_options_.info_log); - immutable_db_options_.clock->SleepForMicroseconds(1000000); - mutex_.Lock(); - } else if (s.IsManualCompactionPaused()) { - ManualCompactionState* m = prepicked_compaction->manual_compaction_state; - assert(m); - ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", - m->cfd->GetName().c_str(), job_context.job_id); - } - - ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - - // If compaction failed, we want to delete all temporary files that we might - // have created (they might not be all recorded in job_context in case of a - // failure). Thus, we force full scan in FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && - !s.IsColumnFamilyDropped() && - !s.IsBusy()); - TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); - - // delete unnecessary files if any, this is done outside the mutex - if (job_context.HaveSomethingToClean() || - job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { - mutex_.Unlock(); - // Have to flush the info logs before bg_compaction_scheduled_-- - // because if bg_flush_scheduled_ becomes 0 and the lock is - // released, the deconstructor of DB can kick in and destroy all the - // states of DB so info_log might not be available after that point. - // It also applies to access other states that DB owns. - log_buffer.FlushBufferToLog(); - if (job_context.HaveSomethingToDelete()) { - PurgeObsoleteFiles(job_context); - TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); + if (prepicked_compaction && prepicked_compaction->is_canceled) { + assert(prepicked_compaction->compaction); + ROCKS_LOG_BUFFER(&log_buffer, "[%s] Skip canceled manual compaction job", + prepicked_compaction->compaction->column_family_data() + ->GetName() + .c_str()); + prepicked_compaction->compaction->ReleaseCompactionFiles( + Status::Incomplete(Status::SubCode::kManualCompactionPaused)); + delete prepicked_compaction->compaction; + } else { + JobContext job_context(next_job_id_.fetch_add(1), true); + // This call will unlock/lock the mutex to wait for current running + // IngestExternalFile() calls to finish. + WaitForIngestFile(); + + num_running_compactions_++; + + std::unique_ptr::iterator> + pending_outputs_inserted_elem(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); + + assert((bg_thread_pri == Env::Priority::BOTTOM && + bg_bottom_compaction_scheduled_) || + (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); + Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, + prepicked_compaction, bg_thread_pri); + TEST_SYNC_POINT("BackgroundCallCompaction:1"); + if (s.IsBusy()) { + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + immutable_db_options_.clock->SleepForMicroseconds( + 10000); // prevent hot loop + mutex_.Lock(); + } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + uint64_t error_cnt = + default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "Waiting after background compaction error: %s, " + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); + LogFlush(immutable_db_options_.info_log); + immutable_db_options_.clock->SleepForMicroseconds(1000000); + mutex_.Lock(); + } else if (s.IsManualCompactionPaused()) { + assert(prepicked_compaction); + ManualCompactionState* m = + prepicked_compaction->manual_compaction_state; + assert(m); + ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", + m->cfd->GetName().c_str(), job_context.job_id); + } + + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + + // If compaction failed, we want to delete all temporary files that we + // might have created (they might not be all recorded in job_context in + // case of a failure). Thus, we force full scan in FindObsoleteFiles() + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && + !s.IsColumnFamilyDropped() && + !s.IsBusy()); + TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); + + // delete unnecessary files if any, this is done outside the mutex + if (job_context.HaveSomethingToClean() || + job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. + log_buffer.FlushBufferToLog(); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + TEST_SYNC_POINT( + "DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); + } + job_context.Clean(); + mutex_.Lock(); } - job_context.Clean(); - mutex_.Lock(); + + assert(num_running_compactions_ > 0); + num_running_compactions_--; } - assert(num_running_compactions_ > 0); - num_running_compactions_--; if (bg_thread_pri == Env::Priority::LOW) { bg_compaction_scheduled_--; } else { @@ -2943,7 +2969,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // must be done before we potentially signal the DB close process to // proceed below. prepicked_compaction->task_token.reset(); - ; } if (made_progress ||