From 09fcf4fb6b9aad87fabdfc39d722f0aa1b43378f Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 27 Nov 2019 14:46:38 -0800 Subject: [PATCH] Fix a potential bug scheduling unnecessary threads (#6104) Summary: RocksDB should decrement the counter `unscheduled_flushes_` as soon as the bg thread is scheduled. Before this fix, the counter is decremented only when the bg thread starts and picks an element from the flush queue. This may cause more than necessary bg threads to be scheduled. Not a correctness issue, but may affect flush thread count. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6104 Test Plan: ``` make check ``` Differential Revision: D18735584 Pulled By: riversand963 fbshipit-source-id: d36272d4a08a494aeeab6200a3cff7a3d1a2dc10 --- HISTORY.md | 3 +++ db/db_flush_test.cc | 24 ++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 9 ++++++--- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 3f1e7c9ae..201e34402 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,7 @@ # Rocksdb Change Log +## Unreleased +### Buf Fixes +* Fix a bug that can cause unnecessary bg thread to be scheduled(#6104). ## 6.6.0 (11/25/2019) ### Bug Fixes * Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 08a1d8d1b..ed3a12e9f 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -212,6 +212,30 @@ TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) { t.join(); } +TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) { + Options options = CurrentOptions(); + Reopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + int called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) { + ASSERT_NE(nullptr, arg); + auto unscheduled_flushes = *reinterpret_cast(arg); + ASSERT_EQ(0, unscheduled_flushes); + ++called; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("a", "foo")); + FlushOptions flush_opts; + ASSERT_OK(dbfull()->Flush(flush_opts)); + ASSERT_EQ(1, called); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b01fdbc96..b389e40f4 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1918,6 +1918,10 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { fta->thread_pri_ = Env::Priority::HIGH; env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, &DBImpl::UnscheduleFlushCallback); + --unscheduled_flushes_; + TEST_SYNC_POINT_CALLBACK( + "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", + &unscheduled_flushes_); } // special case -- if high-pri (flush) thread pool is empty, then schedule @@ -1932,6 +1936,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { fta->thread_pri_ = Env::Priority::LOW; env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, &DBImpl::UnscheduleFlushCallback); + --unscheduled_flushes_; } } @@ -2015,8 +2020,6 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { assert(!flush_queue_.empty()); FlushRequest flush_req = flush_queue_.front(); - assert(unscheduled_flushes_ >= static_cast(flush_req.size())); - unscheduled_flushes_ -= static_cast(flush_req.size()); flush_queue_.pop_front(); // TODO: need to unset flush reason? return flush_req; @@ -2058,7 +2061,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, cfd->Ref(); cfd->SetFlushReason(flush_reason); } - unscheduled_flushes_ += static_cast(flush_req.size()); + ++unscheduled_flushes_; flush_queue_.push_back(flush_req); }