diff --git a/HISTORY.md b/HISTORY.md index b3516ba44..f3ffe4cdb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,9 @@ ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. +### Bug Fixes +* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread. + ## 7.0.0 (02/20/2022) ### Bug Fixes * Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 6af4bcafd..94933e8f2 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6951,7 +6951,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::RunManualCompaction:Scheduled", - "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:" "PreDisableManualCompaction"}}); SyncPoint::GetInstance()->EnableProcessing(); @@ -6979,7 +6979,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { }); TEST_SYNC_POINT( - "DBCompactionTest::DisableManualCompactionThreadQueueFull:" + "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:" "PreDisableManualCompaction"); // Generate more files to trigger auto compaction which is scheduled after @@ -7006,6 +7006,63 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { sleeping_task_low.WaitUntilDone(); } +TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { + const int kNumL0Files = 4; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::RunManualCompaction:Scheduled", + "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:" + "PreDisableManualCompaction"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + // Block compaction queue + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:" + "PreDisableManualCompaction"); + + // Generate more files to trigger auto compaction which is scheduled after + // manual compaction. Has to generate 4 more files because existing files are + // pending compaction + for (int i = 0; i < kNumL0Files; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + ASSERT_EQ(ToString(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0)); + + // Close DB with manual compaction and auto triggered compaction in the queue. + auto s = db_->Close(); + ASSERT_OK(s); + + // manual compaction thread should return with Incomplete(). + compact_thread.join(); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); +} + TEST_F(DBCompactionTest, DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) { // When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0f00f33d0..27dd5abf0 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -539,10 +539,19 @@ Status DBImpl::CloseHelper() { // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) CancelAllBackgroundWork(false); + + // Cancel manual compaction if there's any + if (HasPendingManualCompaction()) { + DisableManualCompaction(); + } mutex_.Lock(); - env_->UnSchedule(this, Env::Priority::BOTTOM); - env_->UnSchedule(this, Env::Priority::LOW); - env_->UnSchedule(this, Env::Priority::HIGH); + // Unschedule all tasks for this DB + for (uint8_t i = 0; i < static_cast(TaskType::kCount); i++) { + env_->UnSchedule(GetTaskTag(i), Env::Priority::BOTTOM); + env_->UnSchedule(GetTaskTag(i), Env::Priority::LOW); + env_->UnSchedule(GetTaskTag(i), Env::Priority::HIGH); + } + Status ret = Status::OK(); // Wait for background work to finish diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c37a46c3e..d1e95c75a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1539,7 +1539,6 @@ class DBImpl : public DB { ManualCompactionState* manual_compaction_state; // nullptr if non-manual // task limiter token is requested during compaction picking. std::unique_ptr task_token; - bool is_canceled = false; }; struct CompactionArg { @@ -1734,6 +1733,25 @@ class DBImpl : public DB { } } + // TaskType is used to identify tasks in thread-pool, currently only + // differentiate manual compaction, which could be unscheduled from the + // thread-pool. + enum class TaskType : uint8_t { + kDefault = 0, + kManualCompaction = 1, + kCount = 2, + }; + + // Task tag is used to identity tasks in thread-pool, which is + // dbImpl obj address + type + inline void* GetTaskTag(TaskType type) { + return GetTaskTag(static_cast(type)); + } + + inline void* GetTaskTag(uint8_t type) { + return static_cast(static_cast(this)) + type; + } + // REQUIRES: mutex locked and in write thread. void AssignAtomicFlushSeq(const autovector& cfds); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 0e1864788..05520d426 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1763,6 +1763,7 @@ Status DBImpl::RunManualCompaction( CompactionArg* ca = nullptr; bool scheduled = false; + Env::Priority thread_pool_priority = Env::Priority::TOTAL; bool manual_conflict = false; ManualCompactionState manual; manual.cfd = cfd; @@ -1884,9 +1885,9 @@ Status DBImpl::RunManualCompaction( manual.done = true; manual.status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - if (ca && ca->prepicked_compaction) { - ca->prepicked_compaction->is_canceled = true; - } + assert(thread_pool_priority != Env::Priority::TOTAL); + env_->UnSchedule(GetTaskTag(TaskType::kManualCompaction), + thread_pool_priority); break; } if (scheduled && manual.incomplete == true) { @@ -1916,13 +1917,17 @@ Status DBImpl::RunManualCompaction( bg_bottom_compaction_scheduled_++; ca->compaction_pri_ = Env::Priority::BOTTOM; env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, - Env::Priority::BOTTOM, this, + Env::Priority::BOTTOM, + GetTaskTag(TaskType::kManualCompaction), &DBImpl::UnscheduleCompactionCallback); + thread_pool_priority = Env::Priority::BOTTOM; } else { bg_compaction_scheduled_++; ca->compaction_pri_ = Env::Priority::LOW; - env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, + env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, + GetTaskTag(TaskType::kManualCompaction), &DBImpl::UnscheduleCompactionCallback); + thread_pool_priority = Env::Priority::LOW; } scheduled = true; TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled"); @@ -1933,6 +1938,13 @@ Status DBImpl::RunManualCompaction( assert(!manual.in_progress); assert(HasPendingManualCompaction()); RemoveManualCompaction(&manual); + // if the manual job is unscheduled, try schedule other jobs in case there's + // any unscheduled compaction job which was blocked by exclusive manual + // compaction. + if (manual.status.IsIncomplete() && + manual.status.subcode() == Status::SubCode::kManualCompactionPaused) { + MaybeScheduleFlushOrCompaction(); + } bg_cv_.SignalAll(); return manual.status; } @@ -2661,6 +2673,8 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) { delete reinterpret_cast(arg); if (ca.prepicked_compaction != nullptr) { if (ca.prepicked_compaction->compaction != nullptr) { + ca.prepicked_compaction->compaction->ReleaseCompactionFiles( + Status::Incomplete(Status::SubCode::kManualCompactionPaused)); delete ca.prepicked_compaction->compaction; } delete ca.prepicked_compaction; @@ -2851,106 +2865,93 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, Env::Priority bg_thread_pri) { bool made_progress = false; + JobContext job_context(next_job_id_.fetch_add(1), true); TEST_SYNC_POINT("BackgroundCallCompaction:0"); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); - if (prepicked_compaction && prepicked_compaction->is_canceled) { - assert(prepicked_compaction->compaction); - ROCKS_LOG_BUFFER(&log_buffer, "[%s] Skip canceled manual compaction job", - prepicked_compaction->compaction->column_family_data() - ->GetName() - .c_str()); - prepicked_compaction->compaction->ReleaseCompactionFiles( - Status::Incomplete(Status::SubCode::kManualCompactionPaused)); - delete prepicked_compaction->compaction; - } else { - JobContext job_context(next_job_id_.fetch_add(1), true); - // This call will unlock/lock the mutex to wait for current running - // IngestExternalFile() calls to finish. - WaitForIngestFile(); - - num_running_compactions_++; - - std::unique_ptr::iterator> - pending_outputs_inserted_elem(new std::list::iterator( - CaptureCurrentFileNumberInPendingOutputs())); - - assert((bg_thread_pri == Env::Priority::BOTTOM && - bg_bottom_compaction_scheduled_) || - (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); - Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, - prepicked_compaction, bg_thread_pri); - TEST_SYNC_POINT("BackgroundCallCompaction:1"); - if (s.IsBusy()) { - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - immutable_db_options_.clock->SleepForMicroseconds( - 10000); // prevent hot loop - mutex_.Lock(); - } else if (!s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - uint64_t error_cnt = - default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - ROCKS_LOG_ERROR(immutable_db_options_.info_log, - "Waiting after background compaction error: %s, " - "Accumulated background error counts: %" PRIu64, - s.ToString().c_str(), error_cnt); - LogFlush(immutable_db_options_.info_log); - immutable_db_options_.clock->SleepForMicroseconds(1000000); - mutex_.Lock(); - } else if (s.IsManualCompactionPaused()) { - assert(prepicked_compaction); - ManualCompactionState* m = - prepicked_compaction->manual_compaction_state; - assert(m); - ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", - m->cfd->GetName().c_str(), job_context.job_id); - } - - ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - - // If compaction failed, we want to delete all temporary files that we - // might have created (they might not be all recorded in job_context in - // case of a failure). Thus, we force full scan in FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && - !s.IsColumnFamilyDropped() && - !s.IsBusy()); - TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); - - // delete unnecessary files if any, this is done outside the mutex - if (job_context.HaveSomethingToClean() || - job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { - mutex_.Unlock(); - // Have to flush the info logs before bg_compaction_scheduled_-- - // because if bg_flush_scheduled_ becomes 0 and the lock is - // released, the deconstructor of DB can kick in and destroy all the - // states of DB so info_log might not be available after that point. - // It also applies to access other states that DB owns. - log_buffer.FlushBufferToLog(); - if (job_context.HaveSomethingToDelete()) { - PurgeObsoleteFiles(job_context); - TEST_SYNC_POINT( - "DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); - } - job_context.Clean(); - mutex_.Lock(); - } + // This call will unlock/lock the mutex to wait for current running + // IngestExternalFile() calls to finish. + WaitForIngestFile(); + + num_running_compactions_++; + + std::unique_ptr::iterator> + pending_outputs_inserted_elem(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); + + assert((bg_thread_pri == Env::Priority::BOTTOM && + bg_bottom_compaction_scheduled_) || + (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); + Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, + prepicked_compaction, bg_thread_pri); + TEST_SYNC_POINT("BackgroundCallCompaction:1"); + if (s.IsBusy()) { + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + immutable_db_options_.clock->SleepForMicroseconds( + 10000); // prevent hot loop + mutex_.Lock(); + } else if (!s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + uint64_t error_cnt = + default_cf_internal_stats_->BumpAndGetBackgroundErrorCount(); + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "Waiting after background compaction error: %s, " + "Accumulated background error counts: %" PRIu64, + s.ToString().c_str(), error_cnt); + LogFlush(immutable_db_options_.info_log); + immutable_db_options_.clock->SleepForMicroseconds(1000000); + mutex_.Lock(); + } else if (s.IsManualCompactionPaused()) { + assert(prepicked_compaction); + ManualCompactionState* m = prepicked_compaction->manual_compaction_state; + assert(m); + ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", + m->cfd->GetName().c_str(), job_context.job_id); + } + + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - assert(num_running_compactions_ > 0); - num_running_compactions_--; + // If compaction failed, we want to delete all temporary files that we + // might have created (they might not be all recorded in job_context in + // case of a failure). Thus, we force full scan in FindObsoleteFiles() + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && + !s.IsManualCompactionPaused() && + !s.IsColumnFamilyDropped() && + !s.IsBusy()); + TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); + + // delete unnecessary files if any, this is done outside the mutex + if (job_context.HaveSomethingToClean() || + job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. + log_buffer.FlushBufferToLog(); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); + } + job_context.Clean(); + mutex_.Lock(); } + assert(num_running_compactions_ > 0); + num_running_compactions_--; + if (bg_thread_pri == Env::Priority::LOW) { bg_compaction_scheduled_--; } else {