From e6534900bd78e1a2342d67d9f9a382545fbeaf9b Mon Sep 17 00:00:00 2001 From: sherriiiliu <79488180+sherriiiliu@users.noreply.github.com> Date: Tue, 30 Mar 2021 18:34:11 -0700 Subject: [PATCH] Fix possible hang issue in ~DBImpl() when flush is scheduled in LOW pool (#8125) Summary: In DBImpl::CloseHelper, we wait for bg_compaction_scheduled_ and bg_flush_scheduled_ to drop to 0. Unschedule is called prior to cancel any unscheduled flushes/compactions. It is assumed that anything in the high priority is a flush, and anything in the low priority pool is a compaction. This assumption, however, is broken when the high-pri pool is full. As a result, bg_compaction_scheduled_ can go < 0 and bg_flush_scheduled_ will remain > 0 and DB can be in hang state. The fix is, we decrement the `bg_{flush,compaction,bottom_compaction}_scheduled_` inside the `Unschedule{Flush,Compaction,BottomCompaction}Callback()`s. DB `mutex_` will make the counts atomic in `Unschedule`. Related discussion: https://github.com/facebook/rocksdb/issues/7928 Pull Request resolved: https://github.com/facebook/rocksdb/pull/8125 Test Plan: Added new test case which hangs without the fix. Reviewed By: jay-zhuang Differential Revision: D27390043 Pulled By: ajkr fbshipit-source-id: 78a367fba9a59ac5607ad24bd1c46dc16d5ec110 --- HISTORY.md | 1 + db/db_flush_test.cc | 60 ++++++++++++++++++++++++++ db/db_impl/db_impl.cc | 12 ++---- db/db_impl/db_impl.h | 1 + db/db_impl/db_impl_compaction_flush.cc | 22 +++++++++- 5 files changed, 87 insertions(+), 9 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 9672d0196..2b4ded59f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Bug Fixes * Use thread-safe `strerror_r()` to get error messages. +* Fixed a potential hang in shutdown for a DB whose `Env` has high-pri thread pool disabled (`Env::GetBackgroundThreads(Env::Priority::HIGH) == 0`) * Made BackupEngine thread-safe and added documentation comments to clarify what is safe for multiple BackupEngine objects accessing the same backup directory. ### Performance Improvements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 6cd170fb9..e94901cbc 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -166,6 +166,66 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { ASSERT_EQ(1, num_compactions); } +// Test when flush job is submitted to low priority thread pool and when DB is +// closed in the meanwhile, CloseHelper doesn't hang. +TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) { + Options options = CurrentOptions(); + options.max_background_flushes = 1; + options.max_total_wal_size = 8192; + + DestroyAndReopen(options); + CreateColumnFamilies({"cf1", "cf2"}, options); + + env_->SetBackgroundThreads(0, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); + test::SleepingBackgroundTask sleeping_task_low; + int num_flushes = 0; + + SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush", + [&](void* /*arg*/) { ++num_flushes; }); + + int num_low_flush_unscheduled = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) { + num_low_flush_unscheduled++; + // There should be one flush job in low pool that needs to be + // unscheduled + ASSERT_EQ(num_low_flush_unscheduled, 1); + }); + + int num_high_flush_unscheduled = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) { + num_high_flush_unscheduled++; + // There should be no flush job in high pool + ASSERT_EQ(num_high_flush_unscheduled, 0); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "key1", DummyString(8192))); + // Block thread so that flush cannot be run and can be removed from the queue + // when called Unschedule. + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + sleeping_task_low.WaitUntilSleeping(); + + // Trigger flush and flush job will be scheduled to LOW priority thread. + ASSERT_OK(Put(0, "key2", DummyString(8192))); + + // Close DB and flush job in low priority queue will be removed without + // running. + Close(); + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + ASSERT_EQ(0, num_flushes); + + TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options); + ASSERT_OK(Put(0, "key3", DummyString(8192))); + ASSERT_OK(Flush(0)); + ASSERT_EQ(1, num_flushes); +} + TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { Options options = CurrentOptions(); options.write_buffer_size = 100; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index aaa3b5125..7c883683e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -522,15 +522,11 @@ Status DBImpl::CloseHelper() { // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) CancelAllBackgroundWork(false); - int bottom_compactions_unscheduled = - env_->UnSchedule(this, Env::Priority::BOTTOM); - int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); - int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); - Status ret = Status::OK(); mutex_.Lock(); - bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; - bg_compaction_scheduled_ -= compactions_unscheduled; - bg_flush_scheduled_ -= flushes_unscheduled; + env_->UnSchedule(this, Env::Priority::BOTTOM); + env_->UnSchedule(this, Env::Priority::LOW); + env_->UnSchedule(this, Env::Priority::HIGH); + Status ret = Status::OK(); // Wait for background work to finish while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6a18cee8e..916f15195 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1414,6 +1414,7 @@ class DBImpl : public DB { DBImpl* db; // background compaction takes ownership of `prepicked_compaction`. PrepickedCompaction* prepicked_compaction; + Env::Priority compaction_pri_; }; // Initialize the built-in column family for persistent stats. Depending on diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 3d95eeaf7..a4c965766 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1741,6 +1741,7 @@ Status DBImpl::RunManualCompaction( } ca = new CompactionArg; ca->db = this; + ca->compaction_pri_ = Env::Priority::LOW; ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; @@ -2272,6 +2273,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; + ca->compaction_pri_ = Env::Priority::LOW; ca->prepicked_compaction = nullptr; bg_compaction_scheduled_++; unscheduled_compactions_--; @@ -2459,7 +2461,16 @@ void DBImpl::BGWorkPurge(void* db) { } void DBImpl::UnscheduleCompactionCallback(void* arg) { - CompactionArg ca = *(reinterpret_cast(arg)); + CompactionArg* ca_ptr = reinterpret_cast(arg); + Env::Priority compaction_pri = ca_ptr->compaction_pri_; + if (Env::Priority::BOTTOM == compaction_pri) { + // Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM + ca_ptr->db->bg_bottom_compaction_scheduled_--; + } else if (Env::Priority::LOW == compaction_pri) { + // Decrement bg_compaction_scheduled_ if priority is LOW + ca_ptr->db->bg_compaction_scheduled_--; + } + CompactionArg ca = *(ca_ptr); delete reinterpret_cast(arg); if (ca.prepicked_compaction != nullptr) { if (ca.prepicked_compaction->compaction != nullptr) { @@ -2471,6 +2482,14 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) { } void DBImpl::UnscheduleFlushCallback(void* arg) { + // Decrement bg_flush_scheduled_ in flush callback + reinterpret_cast(arg)->db_->bg_flush_scheduled_--; + Env::Priority flush_pri = reinterpret_cast(arg)->thread_pri_; + if (Env::Priority::LOW == flush_pri) { + TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback"); + } else if (Env::Priority::HIGH == flush_pri) { + TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback"); + } delete reinterpret_cast(arg); TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback"); } @@ -3073,6 +3092,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); CompactionArg* ca = new CompactionArg; ca->db = this; + ca->compaction_pri_ = Env::Priority::BOTTOM; ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->compaction = c.release(); ca->prepicked_compaction->manual_compaction_state = nullptr;