diff --git a/HISTORY.md b/HISTORY.md index 3f1e7c9ae..201e34402 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,7 @@ # Rocksdb Change Log +## Unreleased +### Buf Fixes +* Fix a bug that can cause unnecessary bg thread to be scheduled(#6104). ## 6.6.0 (11/25/2019) ### Bug Fixes * Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 08a1d8d1b..ed3a12e9f 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -212,6 +212,30 @@ TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { t.join(); } +TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) { + Options options = CurrentOptions(); + Reopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + int called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto unscheduled_flushes = *reinterpret_cast(arg); + ASSERT_EQ(0, unscheduled_flushes); + ++called; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("a", "foo")); + FlushOptions flush_opts; + ASSERT_OK(dbfull()->Flush(flush_opts)); + ASSERT_EQ(1, called); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b01fdbc96..b389e40f4 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1918,6 +1918,10 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { fta->thread_pri_ = Env::Priority::HIGH; env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, &DBImpl::UnscheduleFlushCallback); + --unscheduled_flushes_; + TEST_SYNC_POINT_CALLBACK( + "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", + &unscheduled_flushes_); } // special case -- if high-pri (flush) thread pool is empty, then schedule @@ -1932,6 +1936,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { fta->thread_pri_ = Env::Priority::LOW; env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, &DBImpl::UnscheduleFlushCallback); + --unscheduled_flushes_; } } @@ -2015,8 +2020,6 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { assert(!flush_queue_.empty()); FlushRequest flush_req = flush_queue_.front(); - assert(unscheduled_flushes_ >= static_cast(flush_req.size())); - unscheduled_flushes_ -= static_cast(flush_req.size()); flush_queue_.pop_front(); // TODO: need to unset flush reason? return flush_req; @@ -2058,7 +2061,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, cfd->Ref(); cfd->SetFlushReason(flush_reason); } - unscheduled_flushes_ += static_cast(flush_req.size()); + ++unscheduled_flushes_; flush_queue_.push_back(flush_req); }