diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 806bc548a..290e323e1 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2759,6 +2759,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // See if there's more work to be done MaybeScheduleFlushOrCompaction(); + + if (prepicked_compaction != nullptr && + prepicked_compaction->task_token != nullptr) { + // Releasing task tokens affects the DB state, so must be done before we + // potentially signal the DB close process to proceed below. + prepicked_compaction->task_token->ReleaseOnce(); + } + if (made_progress || (bg_compaction_scheduled_ == 0 && bg_bottom_compaction_scheduled_ == 0) || diff --git a/util/concurrent_task_limiter_impl.cc b/util/concurrent_task_limiter_impl.cc index 2342677d8..efa01a17f 100644 --- a/util/concurrent_task_limiter_impl.cc +++ b/util/concurrent_task_limiter_impl.cc @@ -59,9 +59,14 @@ ConcurrentTaskLimiter* NewConcurrentTaskLimiter( return new ConcurrentTaskLimiterImpl(name, limit); } -TaskLimiterToken::~TaskLimiterToken() { - --limiter_->outstanding_tasks_; +void TaskLimiterToken::ReleaseOnce() { + if (!released_) { + --limiter_->outstanding_tasks_; + released_ = true; + } assert(limiter_->outstanding_tasks_ >= 0); } +TaskLimiterToken::~TaskLimiterToken() { ReleaseOnce(); } + } // namespace ROCKSDB_NAMESPACE diff --git a/util/concurrent_task_limiter_impl.h b/util/concurrent_task_limiter_impl.h index d8c1e03cb..0e6cf1466 100644 --- a/util/concurrent_task_limiter_impl.h +++ b/util/concurrent_task_limiter_impl.h @@ -53,11 +53,16 @@ class ConcurrentTaskLimiterImpl : public ConcurrentTaskLimiter { class TaskLimiterToken { public: explicit TaskLimiterToken(ConcurrentTaskLimiterImpl* limiter) - : limiter_(limiter) {} + : limiter_(limiter), released_(false) {} ~TaskLimiterToken(); + // Releases the token from the `ConcurrentTaskLimiterImpl` if not already + // released. + // Not thread-safe. + void ReleaseOnce(); private: ConcurrentTaskLimiterImpl* limiter_; + bool released_; // no copying allowed TaskLimiterToken(const TaskLimiterToken&) = delete;