commit
9c227923c6
@ -0,0 +1,250 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include "util/thread_posix.h" |
||||||
|
#include <unistd.h> |
||||||
|
#ifdef OS_LINUX |
||||||
|
#include <sys/syscall.h> |
||||||
|
#endif |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
void ThreadPool::PthreadCall(const char* label, int result) { |
||||||
|
if (result != 0) { |
||||||
|
fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); |
||||||
|
abort(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
ThreadPool::ThreadPool() |
||||||
|
: total_threads_limit_(1), |
||||||
|
bgthreads_(0), |
||||||
|
queue_(), |
||||||
|
queue_len_(0), |
||||||
|
exit_all_threads_(false), |
||||||
|
low_io_priority_(false), |
||||||
|
env_(nullptr) { |
||||||
|
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); |
||||||
|
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); |
||||||
|
} |
||||||
|
|
||||||
|
ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); } |
||||||
|
|
||||||
|
void ThreadPool::JoinAllThreads() { |
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
assert(!exit_all_threads_); |
||||||
|
exit_all_threads_ = true; |
||||||
|
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
for (const auto tid : bgthreads_) { |
||||||
|
pthread_join(tid, nullptr); |
||||||
|
} |
||||||
|
bgthreads_.clear(); |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::LowerIOPriority() { |
||||||
|
#ifdef OS_LINUX |
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
low_io_priority_ = true; |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
#endif |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::BGThread(size_t thread_id) { |
||||||
|
bool low_io_priority = false; |
||||||
|
while (true) { |
||||||
|
// Wait until there is an item that is ready to run
|
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
// Stop waiting if the thread needs to do work or needs to terminate.
|
||||||
|
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && |
||||||
|
(queue_.empty() || IsExcessiveThread(thread_id))) { |
||||||
|
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); |
||||||
|
} |
||||||
|
if (exit_all_threads_) { // mechanism to let BG threads exit safely
|
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
break; |
||||||
|
} |
||||||
|
if (IsLastExcessiveThread(thread_id)) { |
||||||
|
// Current thread is the last generated one and is excessive.
|
||||||
|
// We always terminate excessive thread in the reverse order of
|
||||||
|
// generation time.
|
||||||
|
auto terminating_thread = bgthreads_.back(); |
||||||
|
pthread_detach(terminating_thread); |
||||||
|
bgthreads_.pop_back(); |
||||||
|
if (HasExcessiveThread()) { |
||||||
|
// There is still at least more excessive thread to terminate.
|
||||||
|
WakeUpAllThreads(); |
||||||
|
} |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
break; |
||||||
|
} |
||||||
|
void (*function)(void*) = queue_.front().function; |
||||||
|
void* arg = queue_.front().arg; |
||||||
|
queue_.pop_front(); |
||||||
|
queue_len_.store(static_cast<unsigned int>(queue_.size()), |
||||||
|
std::memory_order_relaxed); |
||||||
|
|
||||||
|
bool decrease_io_priority = (low_io_priority != low_io_priority_); |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
|
||||||
|
#ifdef OS_LINUX |
||||||
|
if (decrease_io_priority) { |
||||||
|
#define IOPRIO_CLASS_SHIFT (13) |
||||||
|
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) |
||||||
|
// Put schedule into IOPRIO_CLASS_IDLE class (lowest)
|
||||||
|
// These system calls only have an effect when used in conjunction
|
||||||
|
// with an I/O scheduler that supports I/O priorities. As at
|
||||||
|
// kernel 2.6.17 the only such scheduler is the Completely
|
||||||
|
// Fair Queuing (CFQ) I/O scheduler.
|
||||||
|
// To change scheduler:
|
||||||
|
// echo cfq > /sys/block/<device_name>/queue/schedule
|
||||||
|
// Tunables to consider:
|
||||||
|
// /sys/block/<device_name>/queue/slice_idle
|
||||||
|
// /sys/block/<device_name>/queue/slice_sync
|
||||||
|
syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
|
||||||
|
0, // current thread
|
||||||
|
IOPRIO_PRIO_VALUE(3, 0)); |
||||||
|
low_io_priority = true; |
||||||
|
} |
||||||
|
#else |
||||||
|
(void)decrease_io_priority; // avoid 'unused variable' error
|
||||||
|
#endif |
||||||
|
(*function)(arg); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Helper struct for passing arguments when creating threads.
|
||||||
|
struct BGThreadMetadata { |
||||||
|
ThreadPool* thread_pool_; |
||||||
|
size_t thread_id_; // Thread count in the thread.
|
||||||
|
explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) |
||||||
|
: thread_pool_(thread_pool), thread_id_(thread_id) {} |
||||||
|
}; |
||||||
|
|
||||||
|
static void* BGThreadWrapper(void* arg) { |
||||||
|
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); |
||||||
|
size_t thread_id = meta->thread_id_; |
||||||
|
ThreadPool* tp = meta->thread_pool_; |
||||||
|
#if ROCKSDB_USING_THREAD_STATUS |
||||||
|
// for thread-status
|
||||||
|
ThreadStatusUtil::RegisterThread( |
||||||
|
tp->GetHostEnv(), (tp->GetThreadPriority() == Env::Priority::HIGH |
||||||
|
? ThreadStatus::HIGH_PRIORITY |
||||||
|
: ThreadStatus::LOW_PRIORITY)); |
||||||
|
#endif |
||||||
|
delete meta; |
||||||
|
tp->BGThread(thread_id); |
||||||
|
#if ROCKSDB_USING_THREAD_STATUS |
||||||
|
ThreadStatusUtil::UnregisterThread(); |
||||||
|
#endif |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::WakeUpAllThreads() { |
||||||
|
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { |
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
if (exit_all_threads_) { |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
return; |
||||||
|
} |
||||||
|
if (num > total_threads_limit_ || |
||||||
|
(num < total_threads_limit_ && allow_reduce)) { |
||||||
|
total_threads_limit_ = std::max(1, num); |
||||||
|
WakeUpAllThreads(); |
||||||
|
StartBGThreads(); |
||||||
|
} |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { |
||||||
|
SetBackgroundThreadsInternal(num, false); |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::SetBackgroundThreads(int num) { |
||||||
|
SetBackgroundThreadsInternal(num, true); |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::StartBGThreads() { |
||||||
|
// Start background thread if necessary
|
||||||
|
while ((int)bgthreads_.size() < total_threads_limit_) { |
||||||
|
pthread_t t; |
||||||
|
PthreadCall("create thread", |
||||||
|
pthread_create(&t, nullptr, &BGThreadWrapper, |
||||||
|
new BGThreadMetadata(this, bgthreads_.size()))); |
||||||
|
|
||||||
|
// Set the thread name to aid debugging
|
||||||
|
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) |
||||||
|
#if __GLIBC_PREREQ(2, 12) |
||||||
|
char name_buf[16]; |
||||||
|
snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, |
||||||
|
bgthreads_.size()); |
||||||
|
name_buf[sizeof name_buf - 1] = '\0'; |
||||||
|
pthread_setname_np(t, name_buf); |
||||||
|
#endif |
||||||
|
#endif |
||||||
|
|
||||||
|
bgthreads_.push_back(t); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { |
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
|
||||||
|
if (exit_all_threads_) { |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
return; |
||||||
|
} |
||||||
|
|
||||||
|
StartBGThreads(); |
||||||
|
|
||||||
|
// Add to priority queue
|
||||||
|
queue_.push_back(BGItem()); |
||||||
|
queue_.back().function = function; |
||||||
|
queue_.back().arg = arg; |
||||||
|
queue_.back().tag = tag; |
||||||
|
queue_len_.store(static_cast<unsigned int>(queue_.size()), |
||||||
|
std::memory_order_relaxed); |
||||||
|
|
||||||
|
if (!HasExcessiveThread()) { |
||||||
|
// Wake up at least one waiting thread.
|
||||||
|
PthreadCall("signal", pthread_cond_signal(&bgsignal_)); |
||||||
|
} else { |
||||||
|
// Need to wake up all threads to make sure the one woken
|
||||||
|
// up is not the one to terminate.
|
||||||
|
WakeUpAllThreads(); |
||||||
|
} |
||||||
|
|
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
} |
||||||
|
|
||||||
|
int ThreadPool::UnSchedule(void* arg) { |
||||||
|
int count = 0; |
||||||
|
PthreadCall("lock", pthread_mutex_lock(&mu_)); |
||||||
|
|
||||||
|
// Remove from priority queue
|
||||||
|
BGQueue::iterator it = queue_.begin(); |
||||||
|
while (it != queue_.end()) { |
||||||
|
if (arg == (*it).tag) { |
||||||
|
it = queue_.erase(it); |
||||||
|
count++; |
||||||
|
} else { |
||||||
|
it++; |
||||||
|
} |
||||||
|
} |
||||||
|
queue_len_.store(static_cast<unsigned int>(queue_.size()), |
||||||
|
std::memory_order_relaxed); |
||||||
|
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
||||||
|
return count; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace rocksdb
|
@ -0,0 +1,86 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
#pragma once |
||||||
|
#include "rocksdb/env.h" |
||||||
|
#include "util/thread_status_util.h" |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
class ThreadPool { |
||||||
|
public: |
||||||
|
ThreadPool(); |
||||||
|
~ThreadPool(); |
||||||
|
|
||||||
|
void JoinAllThreads(); |
||||||
|
void LowerIOPriority(); |
||||||
|
void BGThread(size_t thread_id); |
||||||
|
void WakeUpAllThreads(); |
||||||
|
void IncBackgroundThreadsIfNeeded(int num); |
||||||
|
void SetBackgroundThreads(int num); |
||||||
|
void StartBGThreads(); |
||||||
|
void Schedule(void (*function)(void* arg1), void* arg, void* tag); |
||||||
|
int UnSchedule(void* arg); |
||||||
|
|
||||||
|
unsigned int GetQueueLen() const { |
||||||
|
return queue_len_.load(std::memory_order_relaxed); |
||||||
|
} |
||||||
|
|
||||||
|
void SetHostEnv(Env* env) { env_ = env; } |
||||||
|
Env* GetHostEnv() { return env_; } |
||||||
|
|
||||||
|
// Return true if there is at least one thread needs to terminate.
|
||||||
|
bool HasExcessiveThread() { |
||||||
|
return static_cast<int>(bgthreads_.size()) > total_threads_limit_; |
||||||
|
} |
||||||
|
|
||||||
|
// Return true iff the current thread is the excessive thread to terminate.
|
||||||
|
// Always terminate the running thread that is added last, even if there are
|
||||||
|
// more than one thread to terminate.
|
||||||
|
bool IsLastExcessiveThread(size_t thread_id) { |
||||||
|
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; |
||||||
|
} |
||||||
|
|
||||||
|
// Is one of the threads to terminate.
|
||||||
|
bool IsExcessiveThread(size_t thread_id) { |
||||||
|
return static_cast<int>(thread_id) >= total_threads_limit_; |
||||||
|
} |
||||||
|
|
||||||
|
// Return the thread priority.
|
||||||
|
// This would allow its member-thread to know its priority.
|
||||||
|
Env::Priority GetThreadPriority() { return priority_; } |
||||||
|
|
||||||
|
// Set the thread priority.
|
||||||
|
void SetThreadPriority(Env::Priority priority) { priority_ = priority; } |
||||||
|
|
||||||
|
static void PthreadCall(const char* label, int result); |
||||||
|
|
||||||
|
private: |
||||||
|
// Entry per Schedule() call
|
||||||
|
struct BGItem { |
||||||
|
void* arg; |
||||||
|
void (*function)(void*); |
||||||
|
void* tag; |
||||||
|
}; |
||||||
|
typedef std::deque<BGItem> BGQueue; |
||||||
|
|
||||||
|
pthread_mutex_t mu_; |
||||||
|
pthread_cond_t bgsignal_; |
||||||
|
int total_threads_limit_; |
||||||
|
std::vector<pthread_t> bgthreads_; |
||||||
|
BGQueue queue_; |
||||||
|
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
|
||||||
|
bool exit_all_threads_; |
||||||
|
bool low_io_priority_; |
||||||
|
Env::Priority priority_; |
||||||
|
Env* env_; |
||||||
|
|
||||||
|
void SetBackgroundThreadsInternal(int num, bool allow_reduce); |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace rocksdb
|
Loading…
Reference in new issue