From 38671c4d54f1508af4368c6617517143e1617b66 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 6 Dec 2012 16:12:48 -0800 Subject: [PATCH] Fix a race condition while processing tasks by background threads. Summary: Suppose you submit 100 background tasks one after another. The first enqueu task finds that the queue is empty and wakes up one worker thread. Now suppose that all remaining 99 work items are enqueued, they do not wake up any worker threads because the queue is already non-empty. This causes a situation when there are 99 tasks in the task queue but only one worker thread is processing a task while the remaining worker threads are waiting. The fix is to always wakeup one worker thread while enqueuing a task. I also added a check to count the number of elements in the queue to help in debugging. Test Plan: make clean check. Reviewers: chip Reviewed By: chip CC: leveldb Differential Revision: https://reviews.facebook.net/D7203 --- util/env_posix.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index fae7ac6e0..726e4ae0e 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -813,12 +813,14 @@ class PosixEnv : public Env { // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; typedef std::deque BGQueue; + int queue_size_; // number of items in BGQueue BGQueue queue_; }; PosixEnv::PosixEnv() : page_size_(getpagesize()), started_bgthread_(0), - num_threads_(1) { + num_threads_(1), + queue_size_(0) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); bgthread_.resize(num_threads_); @@ -835,16 +837,14 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) { fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]); } - // If the queue is currently empty, the background thread may currently be - // waiting. - if (queue_.empty()) { - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); - } + // always wake up at least one waiting thread. + PthreadCall("signal", pthread_cond_signal(&bgsignal_)); // Add to priority queue queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; + queue_size_++; PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } @@ -860,6 +860,7 @@ void PosixEnv::BGThread() { void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); + queue_size_--; PthreadCall("unlock", pthread_mutex_unlock(&mu_)); (*function)(arg);