Fix a race condition while processing tasks by background threads.

Summary:
Suppose you submit 100 background tasks one after another. The first
enqueu task finds that the queue is empty and wakes up one worker thread.
Now suppose that all remaining 99 work items are enqueued, they do not
wake up any worker threads because the queue is already non-empty.
This causes a situation when there are 99 tasks in the task queue but
only one worker thread is processing a task while the remaining
worker threads are waiting.
The fix is to always wakeup one worker thread while enqueuing a task.

I also added a check to count the number of elements in the queue
to help in debugging.

Test Plan: make clean check.

Reviewers: chip

Reviewed By: chip

CC: leveldb

Differential Revision: https://reviews.facebook.net/D7203
main
Dhruba Borthakur 12 years ago
parent 768edfaaed
commit 38671c4d54
  1. 13
      util/env_posix.cc

@ -813,12 +813,14 @@ class PosixEnv : public Env {
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue;
int queue_size_; // number of items in BGQueue
BGQueue queue_;
};
PosixEnv::PosixEnv() : page_size_(getpagesize()),
started_bgthread_(0),
num_threads_(1) {
num_threads_(1),
queue_size_(0) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
bgthread_.resize(num_threads_);
@ -835,16 +837,14 @@ void PosixEnv::Schedule(void (*function)(void*), void* arg) {
fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// always wake up at least one waiting thread.
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
queue_size_++;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
@ -860,6 +860,7 @@ void PosixEnv::BGThread() {
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
queue_size_--;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);

Loading…
Cancel
Save