Fix a potential bug scheduling unnecessary threads (#6104)

Summary:
RocksDB should decrement the counter `unscheduled_flushes_` as soon as the bg
thread is scheduled. Before this fix, the counter is decremented only when the
bg thread starts and picks an element from the flush queue. This may cause more
than necessary bg threads to be scheduled. Not a correctness issue, but may
affect flush thread count.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6104

Test Plan:
```
make check
```

Differential Revision: D18735584

Pulled By: riversand963

fbshipit-source-id: d36272d4a08a494aeeab6200a3cff7a3d1a2dc10
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent f19faf7814
commit 09fcf4fb6b
  1. 3
      HISTORY.md
  2. 24
      db/db_flush_test.cc
  3. 9
      db/db_impl/db_impl_compaction_flush.cc

@ -1,4 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Buf Fixes
* Fix a bug that can cause unnecessary bg thread to be scheduled(#6104).
## 6.6.0 (11/25/2019) ## 6.6.0 (11/25/2019)
### Bug Fixes ### Bug Fixes
* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0. * Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0.

@ -212,6 +212,30 @@ TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
t.join(); t.join();
} }
TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
Options options = CurrentOptions();
Reopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
int called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
ASSERT_EQ(0, unscheduled_flushes);
++called;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("a", "foo"));
FlushOptions flush_opts;
ASSERT_OK(dbfull()->Flush(flush_opts));
ASSERT_EQ(1, called);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBFlushDirectIOTest, DirectIO) { TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;

@ -1918,6 +1918,10 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
fta->thread_pri_ = Env::Priority::HIGH; fta->thread_pri_ = Env::Priority::HIGH;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
&DBImpl::UnscheduleFlushCallback); &DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
&unscheduled_flushes_);
} }
// special case -- if high-pri (flush) thread pool is empty, then schedule // special case -- if high-pri (flush) thread pool is empty, then schedule
@ -1932,6 +1936,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
fta->thread_pri_ = Env::Priority::LOW; fta->thread_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
&DBImpl::UnscheduleFlushCallback); &DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
} }
} }
@ -2015,8 +2020,6 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty()); assert(!flush_queue_.empty());
FlushRequest flush_req = flush_queue_.front(); FlushRequest flush_req = flush_queue_.front();
assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
unscheduled_flushes_ -= static_cast<int>(flush_req.size());
flush_queue_.pop_front(); flush_queue_.pop_front();
// TODO: need to unset flush reason? // TODO: need to unset flush reason?
return flush_req; return flush_req;
@ -2058,7 +2061,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
cfd->Ref(); cfd->Ref();
cfd->SetFlushReason(flush_reason); cfd->SetFlushReason(flush_reason);
} }
unscheduled_flushes_ += static_cast<int>(flush_req.size()); ++unscheduled_flushes_;
flush_queue_.push_back(flush_req); flush_queue_.push_back(flush_req);
} }

Loading…
Cancel
Save