Fix possible hang issue in ~DBImpl() when flush is scheduled in LOW pool (#8125)

Summary:
In DBImpl::CloseHelper, we wait for bg_compaction_scheduled_
and bg_flush_scheduled_ to drop to 0. Unschedule is called prior
to cancel any unscheduled flushes/compactions. It is assumed that
anything in the high priority is a flush, and anything in the low
priority pool is a compaction. This assumption, however, is broken when
the high-pri pool is full.
As a result, bg_compaction_scheduled_ can go < 0 and bg_flush_scheduled_
will remain > 0 and DB can be in hang state.
The fix is, we decrement the `bg_{flush,compaction,bottom_compaction}_scheduled_`
inside the `Unschedule{Flush,Compaction,BottomCompaction}Callback()`s. DB
`mutex_` will make the counts atomic in `Unschedule`.
Related discussion: https://github.com/facebook/rocksdb/issues/7928

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8125

Test Plan: Added new test case which hangs without the fix.

Reviewed By: jay-zhuang

Differential Revision: D27390043

Pulled By: ajkr

fbshipit-source-id: 78a367fba9a59ac5607ad24bd1c46dc16d5ec110
main
sherriiiliu 4 years ago committed by Facebook GitHub Bot
parent 9418403c4b
commit e6534900bd
  1. 1
      HISTORY.md
  2. 60
      db/db_flush_test.cc
  3. 12
      db/db_impl/db_impl.cc
  4. 1
      db/db_impl/db_impl.h
  5. 22
      db/db_impl/db_impl_compaction_flush.cc

@ -6,6 +6,7 @@
### Bug Fixes
* Use thread-safe `strerror_r()` to get error messages.
* Fixed a potential hang in shutdown for a DB whose `Env` has high-pri thread pool disabled (`Env::GetBackgroundThreads(Env::Priority::HIGH) == 0`)
* Made BackupEngine thread-safe and added documentation comments to clarify what is safe for multiple BackupEngine objects accessing the same backup directory.
### Performance Improvements

@ -166,6 +166,66 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
ASSERT_EQ(1, num_compactions);
}
// Test when flush job is submitted to low priority thread pool and when DB is
// closed in the meanwhile, CloseHelper doesn't hang.
TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) {
Options options = CurrentOptions();
options.max_background_flushes = 1;
options.max_total_wal_size = 8192;
DestroyAndReopen(options);
CreateColumnFamilies({"cf1", "cf2"}, options);
env_->SetBackgroundThreads(0, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
test::SleepingBackgroundTask sleeping_task_low;
int num_flushes = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
[&](void* /*arg*/) { ++num_flushes; });
int num_low_flush_unscheduled = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) {
num_low_flush_unscheduled++;
// There should be one flush job in low pool that needs to be
// unscheduled
ASSERT_EQ(num_low_flush_unscheduled, 1);
});
int num_high_flush_unscheduled = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) {
num_high_flush_unscheduled++;
// There should be no flush job in high pool
ASSERT_EQ(num_high_flush_unscheduled, 0);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(0, "key1", DummyString(8192)));
// Block thread so that flush cannot be run and can be removed from the queue
// when called Unschedule.
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
sleeping_task_low.WaitUntilSleeping();
// Trigger flush and flush job will be scheduled to LOW priority thread.
ASSERT_OK(Put(0, "key2", DummyString(8192)));
// Close DB and flush job in low priority queue will be removed without
// running.
Close();
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
ASSERT_EQ(0, num_flushes);
TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options);
ASSERT_OK(Put(0, "key3", DummyString(8192)));
ASSERT_OK(Flush(0));
ASSERT_EQ(1, num_flushes);
}
TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
Options options = CurrentOptions();
options.write_buffer_size = 100;

@ -522,15 +522,11 @@ Status DBImpl::CloseHelper() {
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);
int bottom_compactions_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
Status ret = Status::OK();
mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled;
env_->UnSchedule(this, Env::Priority::BOTTOM);
env_->UnSchedule(this, Env::Priority::LOW);
env_->UnSchedule(this, Env::Priority::HIGH);
Status ret = Status::OK();
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||

@ -1414,6 +1414,7 @@ class DBImpl : public DB {
DBImpl* db;
// background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction* prepicked_compaction;
Env::Priority compaction_pri_;
};
// Initialize the built-in column family for persistent stats. Depending on

@ -1741,6 +1741,7 @@ Status DBImpl::RunManualCompaction(
}
ca = new CompactionArg;
ca->db = this;
ca->compaction_pri_ = Env::Priority::LOW;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction;
@ -2272,6 +2273,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->compaction_pri_ = Env::Priority::LOW;
ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++;
unscheduled_compactions_--;
@ -2459,7 +2461,16 @@ void DBImpl::BGWorkPurge(void* db) {
}
void DBImpl::UnscheduleCompactionCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
CompactionArg* ca_ptr = reinterpret_cast<CompactionArg*>(arg);
Env::Priority compaction_pri = ca_ptr->compaction_pri_;
if (Env::Priority::BOTTOM == compaction_pri) {
// Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM
ca_ptr->db->bg_bottom_compaction_scheduled_--;
} else if (Env::Priority::LOW == compaction_pri) {
// Decrement bg_compaction_scheduled_ if priority is LOW
ca_ptr->db->bg_compaction_scheduled_--;
}
CompactionArg ca = *(ca_ptr);
delete reinterpret_cast<CompactionArg*>(arg);
if (ca.prepicked_compaction != nullptr) {
if (ca.prepicked_compaction->compaction != nullptr) {
@ -2471,6 +2482,14 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) {
}
void DBImpl::UnscheduleFlushCallback(void* arg) {
// Decrement bg_flush_scheduled_ in flush callback
reinterpret_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--;
Env::Priority flush_pri = reinterpret_cast<FlushThreadArg*>(arg)->thread_pri_;
if (Env::Priority::LOW == flush_pri) {
TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback");
} else if (Env::Priority::HIGH == flush_pri) {
TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback");
}
delete reinterpret_cast<FlushThreadArg*>(arg);
TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
}
@ -3073,6 +3092,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->compaction_pri_ = Env::Priority::BOTTOM;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->compaction = c.release();
ca->prepicked_compaction->manual_compaction_state = nullptr;

Loading…
Cancel
Save