From 4dff279b19e18eb88337c0f5c9320d2e58e92754 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Sat, 12 Mar 2022 20:07:04 -0800 Subject: [PATCH] DisableManualCompaction may fail to cancel an unscheduled task (#9659) Summary: https://github.com/facebook/rocksdb/issues/9625 didn't change the unschedule condition which was waiting for the background thread to clean-up the compaction. make sure we only unschedule the task when it's scheduled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9659 Reviewed By: ajkr Differential Revision: D34651820 Pulled By: jay-zhuang fbshipit-source-id: 23f42081b15ec8886cd81cbf131b116e0c74dc2f --- HISTORY.md | 2 + db/column_family.cc | 1 - db/compaction/compaction_picker_universal.cc | 1 - db/db_compaction_test.cc | 106 +++++++++++++++++ db/db_impl/db_impl.h | 31 +++-- db/db_impl/db_impl_compaction_flush.cc | 113 ++++++++++--------- 6 files changed, 188 insertions(+), 66 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 780eaac52..64137894d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,8 @@ * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) * Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data. * Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set. +* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction. +* Fixed a potential timer crash when open close DB concurrently. ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. diff --git a/db/column_family.cc b/db/column_family.cc index 4e8a82dac..e4fb0a77d 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -989,7 +989,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( GetL0ThresholdSpeedupCompaction( mutable_cf_options.level0_file_num_compaction_trigger, mutable_cf_options.level0_slowdown_writes_trigger)) { - fprintf(stdout, "JJJ2\n"); write_controller_token_ = write_controller->GetCompactionPressureToken(); ROCKS_LOG_INFO( diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 5ffb25dba..5ca2c41ea 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -372,7 +372,6 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { const int kLevel0 = 0; score_ = vstorage_->CompactionScore(kLevel0); sorted_runs_ = CalculateSortedRuns(*vstorage_); - fprintf(stdout, "JJJ1\n"); if (sorted_runs_.size() == 0 || (vstorage_->FilesMarkedForPeriodicCompaction().empty() && diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 94933e8f2..fd0eb66eb 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6889,6 +6889,112 @@ TEST_F(DBCompactionTest, FIFOWarm) { Destroy(options); } +TEST_F(DBCompactionTest, DisableMultiManualCompaction) { + const int kNumL0Files = 10; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + // Generate 2 levels of file to make sure the manual compaction is not skipped + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), "value")); + if (i % 2) { + ASSERT_OK(Flush()); + } + } + MoveFilesToLevel(2); + + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), "value")); + if (i % 2) { + ASSERT_OK(Flush()); + } + } + MoveFilesToLevel(1); + + // Block compaction queue + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + port::Thread compact_thread1([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + std::string begin_str = Key(0); + std::string end_str = Key(3); + Slice b = begin_str; + Slice e = end_str; + auto s = db_->CompactRange(cro, &b, &e); + ASSERT_TRUE(s.IsIncomplete()); + }); + + port::Thread compact_thread2([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = false; + std::string begin_str = Key(4); + std::string end_str = Key(7); + Slice b = begin_str; + Slice e = end_str; + auto s = db_->CompactRange(cro, &b, &e); + ASSERT_TRUE(s.IsIncomplete()); + }); + + // Disable manual compaction should cancel both manual compactions and both + // compaction should return incomplete. + db_->DisableManualCompaction(); + + compact_thread1.join(); + compact_thread2.join(); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); +} + +TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { + const int kNumL0Files = 4; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + // make sure the manual compaction background is started but not yet set the + // status to in_progress, then cancel the manual compaction, which should not + // result in segfault + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkCompaction", + "DBCompactionTest::DisableJustStartedManualCompaction:" + "PreDisableManualCompaction"}, + {"DBCompactionTest::DisableJustStartedManualCompaction:" + "ManualCompactionReturn", + "BackgroundCallCompaction:0"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + TEST_SYNC_POINT( + "DBCompactionTest::DisableJustStartedManualCompaction:" + "ManualCompactionReturn"); + }); + TEST_SYNC_POINT( + "DBCompactionTest::DisableJustStartedManualCompaction:" + "PreDisableManualCompaction"); + db_->DisableManualCompaction(); + + compact_thread.join(); +} + TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { const int kNumL0Files = 4; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 164d8ef3c..8e4b9d048 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1516,19 +1516,31 @@ class DBImpl : public DB { // Information for a manual compaction struct ManualCompactionState { + ManualCompactionState(ColumnFamilyData* _cfd, int _input_level, + int _output_level, uint32_t _output_path_id, + bool _exclusive, bool _disallow_trivial_move, + std::atomic* _canceled) + : cfd(_cfd), + input_level(_input_level), + output_level(_output_level), + output_path_id(_output_path_id), + exclusive(_exclusive), + disallow_trivial_move(_disallow_trivial_move), + canceled(_canceled) {} + ColumnFamilyData* cfd; int input_level; int output_level; uint32_t output_path_id; Status status; - bool done; - bool in_progress; // compaction request being processed? - bool incomplete; // only part of requested range compacted - bool exclusive; // current behavior of only one manual - bool disallow_trivial_move; // Force actual compaction to run - const InternalKey* begin; // nullptr means beginning of key range - const InternalKey* end; // nullptr means end of key range - InternalKey* manual_end; // how far we are compacting + bool done = false; + bool in_progress = false; // compaction request being processed? + bool incomplete = false; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run + const InternalKey* begin = nullptr; // nullptr means beginning of key range + const InternalKey* end = nullptr; // nullptr means end of key range + InternalKey* manual_end = nullptr; // how far we are compacting InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress std::atomic* canceled; // Compaction canceled by the user? @@ -1538,7 +1550,8 @@ class DBImpl : public DB { Compaction* compaction; // caller retains ownership of `manual_compaction_state` as it is reused // across background compactions. - ManualCompactionState* manual_compaction_state; // nullptr if non-manual + std::shared_ptr + manual_compaction_state; // nullptr if non-manual // task limiter token is requested during compaction picking. std::unique_ptr task_token; }; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4dc01bc53..44da77f7e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -285,7 +285,6 @@ Status DBImpl::FlushMemTableToOutputFile( assert(storage_info); VersionStorageInfo::LevelSummaryStorage tmp; - fprintf(stdout, "JJJ4\n"); ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", column_family_name.c_str(), storage_info->LevelSummary(&tmp)); @@ -731,7 +730,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( assert(storage_info); VersionStorageInfo::LevelSummaryStorage tmp; - fprintf(stdout, "JJJ3\n"); ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", column_family_name.c_str(), storage_info->LevelSummary(&tmp)); @@ -1800,34 +1798,27 @@ Status DBImpl::RunManualCompaction( bool scheduled = false; Env::Priority thread_pool_priority = Env::Priority::TOTAL; bool manual_conflict = false; - ManualCompactionState manual; - manual.cfd = cfd; - manual.input_level = input_level; - manual.output_level = output_level; - manual.output_path_id = compact_range_options.target_path_id; - manual.done = false; - manual.in_progress = false; - manual.incomplete = false; - manual.exclusive = exclusive; - manual.disallow_trivial_move = disallow_trivial_move; - manual.canceled = compact_range_options.canceled; + + auto manual = std::make_shared( + cfd, input_level, output_level, compact_range_options.target_path_id, + exclusive, disallow_trivial_move, compact_range_options.canceled); // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual.begin = nullptr; + manual->begin = nullptr; } else { begin_storage.SetMinPossibleForUserKey(*begin); - manual.begin = &begin_storage; + manual->begin = &begin_storage; } if (end == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual.end = nullptr; + manual->end = nullptr; } else { end_storage.SetMaxPossibleForUserKey(*end); - manual.end = &end_storage; + manual->end = &end_storage; } TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); @@ -1839,10 +1830,10 @@ Status DBImpl::RunManualCompaction( // `DisableManualCompaction()` just waited for the manual compaction queue // to drain. So return immediately. TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart"); - manual.status = + manual->status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - manual.done = true; - return manual.status; + manual->done = true; + return manual->status; } // When a manual compaction arrives, temporarily disable scheduling of @@ -1862,7 +1853,7 @@ Status DBImpl::RunManualCompaction( // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. - AddManualCompaction(&manual); + AddManualCompaction(manual.get()); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); if (exclusive) { // Limitation: there's no way to wake up the below loop when user sets @@ -1871,11 +1862,11 @@ Status DBImpl::RunManualCompaction( while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { if (manual_compaction_paused_ > 0 || - (manual.canceled != nullptr && *manual.canceled == true)) { + (manual->canceled != nullptr && *manual->canceled == true)) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. - manual.done = true; - manual.status = + manual->done = true; + manual->status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); break; } @@ -1897,56 +1888,64 @@ Status DBImpl::RunManualCompaction( // We don't check bg_error_ here, because if we get the error in compaction, // the compaction will set manual.status to bg_error_ and set manual.done to // true. - while (!manual.done) { + while (!manual->done) { assert(HasPendingManualCompaction()); manual_conflict = false; Compaction* compaction = nullptr; - if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || - scheduled || - (((manual.manual_end = &manual.tmp_storage1) != nullptr) && - ((compaction = manual.cfd->CompactRange( - *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_, - manual.input_level, manual.output_level, compact_range_options, - manual.begin, manual.end, &manual.manual_end, &manual_conflict, - max_file_num_to_ignore, trim_ts)) == nullptr && + if (ShouldntRunManualCompaction(manual.get()) || + (manual->in_progress == true) || scheduled || + (((manual->manual_end = &manual->tmp_storage1) != nullptr) && + ((compaction = manual->cfd->CompactRange( + *manual->cfd->GetLatestMutableCFOptions(), mutable_db_options_, + manual->input_level, manual->output_level, compact_range_options, + manual->begin, manual->end, &manual->manual_end, + &manual_conflict, max_file_num_to_ignore, trim_ts)) == nullptr && manual_conflict))) { // exclusive manual compactions should not see a conflict during // CompactRange assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); - if (manual_compaction_paused_ > 0 && !manual.done && - !manual.in_progress) { - manual.done = true; - manual.status = + if (manual_compaction_paused_ > 0) { + manual->done = true; + manual->status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - assert(thread_pool_priority != Env::Priority::TOTAL); - env_->UnSchedule(GetTaskTag(TaskType::kManualCompaction), - thread_pool_priority); + if (scheduled) { + assert(thread_pool_priority != Env::Priority::TOTAL); + auto unscheduled_task_num = env_->UnSchedule( + GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); + if (unscheduled_task_num > 0) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] Unscheduled %d number of manual compactions from the " + "thread-pool", + cfd->GetName().c_str(), unscheduled_task_num); + } + } break; } - if (scheduled && manual.incomplete == true) { - assert(!manual.in_progress); + if (scheduled && manual->incomplete == true) { + assert(!manual->in_progress); scheduled = false; - manual.incomplete = false; + manual->incomplete = false; } } else if (!scheduled) { if (compaction == nullptr) { - manual.done = true; + manual->done = true; bg_cv_.SignalAll(); continue; } ca = new CompactionArg; ca->db = this; ca->prepicked_compaction = new PrepickedCompaction; - ca->prepicked_compaction->manual_compaction_state = &manual; + ca->prepicked_compaction->manual_compaction_state = manual; ca->prepicked_compaction->compaction = compaction; if (!RequestCompactionToken( cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) { // Don't throttle manual compaction, only count outstanding tasks. assert(false); } - manual.incomplete = false; + manual->incomplete = false; if (compaction->bottommost_level() && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { bg_bottom_compaction_scheduled_++; @@ -1970,18 +1969,18 @@ Status DBImpl::RunManualCompaction( } log_buffer.FlushBufferToLog(); - assert(!manual.in_progress); + assert(!manual->in_progress); assert(HasPendingManualCompaction()); - RemoveManualCompaction(&manual); + RemoveManualCompaction(manual.get()); // if the manual job is unscheduled, try schedule other jobs in case there's // any unscheduled compaction job which was blocked by exclusive manual // compaction. - if (manual.status.IsIncomplete() && - manual.status.subcode() == Status::SubCode::kManualCompactionPaused) { + if (manual->status.IsIncomplete() && + manual->status.subcode() == Status::SubCode::kManualCompactionPaused) { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); - return manual.status; + return manual->status; } void DBImpl::GenerateFlushRequest(const autovector& cfds, @@ -2949,7 +2948,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, mutex_.Lock(); } else if (s.IsManualCompactionPaused()) { assert(prepicked_compaction); - ManualCompactionState* m = prepicked_compaction->manual_compaction_state; + auto m = prepicked_compaction->manual_compaction_state; assert(m); ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", m->cfd->GetName().c_str(), job_context.job_id); @@ -3031,7 +3030,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, Env::Priority thread_pri) { - ManualCompactionState* manual_compaction = + std::shared_ptr manual_compaction = prepicked_compaction == nullptr ? nullptr : prepicked_compaction->manual_compaction_state; @@ -3075,6 +3074,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!status.ok()) { if (is_manual) { manual_compaction->status = status; + manual_compaction->status + .PermitUncheckedError(); // the manual compaction thread may exit + // first, which won't be able to check the + // status manual_compaction->done = true; manual_compaction->in_progress = false; manual_compaction = nullptr; @@ -3097,7 +3100,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // InternalKey* manual_end = &manual_end_storage; bool sfm_reserved_compact_space = false; if (is_manual) { - ManualCompactionState* m = manual_compaction; + auto m = manual_compaction; assert(m->in_progress); if (!c) { m->done = true; @@ -3481,7 +3484,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c.reset(); if (is_manual) { - ManualCompactionState* m = manual_compaction; + auto m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true;