rocksdb/util/thread_posix.cc

257 lines
8.1 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/thread_posix.h"
#include <atomic>
#include <unistd.h>
#ifdef OS_LINUX
#include <sys/syscall.h>
#endif
namespace rocksdb {
void ThreadPool::PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
}
ThreadPool::ThreadPool()
: total_threads_limit_(1),
bgthreads_(0),
queue_(),
queue_len_(0),
exit_all_threads_(false),
low_io_priority_(false),
env_(nullptr) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
}
ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); }
void ThreadPool::JoinAllThreads() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
assert(!exit_all_threads_);
exit_all_threads_ = true;
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
for (const auto tid : bgthreads_) {
pthread_join(tid, nullptr);
}
bgthreads_.clear();
}
void ThreadPool::LowerIOPriority() {
#ifdef OS_LINUX
PthreadCall("lock", pthread_mutex_lock(&mu_));
low_io_priority_ = true;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
#endif
}
void ThreadPool::BGThread(size_t thread_id) {
bool low_io_priority = false;
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
if (IsLastExcessiveThread(thread_id)) {
// Current thread is the last generated one and is excessive.
// We always terminate excessive thread in the reverse order of
// generation time.
auto terminating_thread = bgthreads_.back();
pthread_detach(terminating_thread);
bgthreads_.pop_back();
if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break;
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
#ifdef OS_LINUX
if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
// Put schedule into IOPRIO_CLASS_IDLE class (lowest)
// These system calls only have an effect when used in conjunction
// with an I/O scheduler that supports I/O priorities. As at
// kernel 2.6.17 the only such scheduler is the Completely
// Fair Queuing (CFQ) I/O scheduler.
// To change scheduler:
// echo cfq > /sys/block/<device_name>/queue/schedule
// Tunables to consider:
// /sys/block/<device_name>/queue/slice_idle
// /sys/block/<device_name>/queue/slice_sync
syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
0, // current thread
IOPRIO_PRIO_VALUE(3, 0));
low_io_priority = true;
}
#else
(void)decrease_io_priority; // avoid 'unused variable' error
#endif
(*function)(arg);
}
}
// Helper struct for passing arguments when creating threads.
struct BGThreadMetadata {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.
explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};
static void* BGThreadWrapper(void* arg) {
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS
// for thread-status
ThreadStatusUtil::RegisterThread(
tp->GetHostEnv(), (tp->GetThreadPriority() == Env::Priority::HIGH
? ThreadStatus::HIGH_PRIORITY
: ThreadStatus::LOW_PRIORITY));
#endif
delete meta;
tp->BGThread(thread_id);
#if ROCKSDB_USING_THREAD_STATUS
ThreadStatusUtil::UnregisterThread();
#endif
return nullptr;
}
void ThreadPool::WakeUpAllThreads() {
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
}
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
if (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce)) {
total_threads_limit_ = std::max(1, num);
WakeUpAllThreads();
StartBGThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) {
SetBackgroundThreadsInternal(num, false);
}
void ThreadPool::SetBackgroundThreads(int num) {
SetBackgroundThreadsInternal(num, true);
}
void ThreadPool::StartBGThreads() {
// Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) {
pthread_t t;
PthreadCall("create thread",
pthread_create(&t, nullptr, &BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size())));
// Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
char name_buf[16];
snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt,
bgthreads_.size());
name_buf[sizeof name_buf - 1] = '\0';
pthread_setname_np(t, name_buf);
#endif
#endif
bgthreads_.push_back(t);
}
}
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg)) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
StartBGThreads();
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
queue_.back().tag = tag;
queue_.back().unschedFunction = unschedFunction;
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
} else {
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
int ThreadPool::UnSchedule(void* arg) {
int count = 0;
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Remove from priority queue
BGQueue::iterator it = queue_.begin();
while (it != queue_.end()) {
if (arg == (*it).tag) {
void (*unschedFunction)(void*) = (*it).unschedFunction;
void* arg1 = (*it).arg;
if (unschedFunction != nullptr) {
(*unschedFunction)(arg1);
}
it = queue_.erase(it);
count++;
} else {
it++;
}
}
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return count;
}
} // namespace rocksdb