From b2863017b1e7bf2c1632f14d4c5d0d359c2946c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Gonz=C3=A1lez?= Date: Tue, 27 Oct 2015 13:03:43 +0100 Subject: [PATCH] Move posix threads into a library Summary: This patch moves all posix thread logic to a separate library. The motivation is to allow another environments to easily reuse posix threads. HDFS wraps already posix threads; this split would simplify this code. Test Plan: No new functionality is added to posix Env or the threading library, thus the current tests should suffice. --- src.mk | 1 + util/env_posix.cc | 310 +------------------------------------------ util/thread_posix.cc | 250 ++++++++++++++++++++++++++++++++++ util/thread_posix.h | 86 ++++++++++++ 4 files changed, 343 insertions(+), 304 deletions(-) create mode 100644 util/thread_posix.cc create mode 100644 util/thread_posix.h diff --git a/src.mk b/src.mk index 05c084ac5..5e56c0f80 100644 --- a/src.mk +++ b/src.mk @@ -96,6 +96,7 @@ LIB_SOURCES = \ util/env_hdfs.cc \ util/env_posix.cc \ util/io_posix.cc \ + util/thread_posix.cc \ util/file_util.cc \ util/file_reader_writer.cc \ util/filter_policy.cc \ diff --git a/util/env_posix.cc b/util/env_posix.cc index b2268b428..877146c85 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,13 +42,13 @@ #include "rocksdb/slice.h" #include "util/coding.h" #include "util/io_posix.h" +#include "util/thread_posix.h" #include "util/iostats_context_imp.h" #include "util/logging.h" #include "util/posix_logger.h" #include "util/string_util.h" #include "util/sync_point.h" #include "util/thread_status_updater.h" -#include "util/thread_status_util.h" #if !defined(TMPFS_MAGIC) #define TMPFS_MAGIC 0x01021994 @@ -110,13 +110,6 @@ static int LockOrUnlock(const std::string& fname, int fd, bool lock) { return value; } -void PthreadCall(const char* label, int result) { - if (result != 0) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } -} - class PosixFileLock : public FileLock { public: int fd_; @@ -674,300 +667,9 @@ class PosixEnv : public Env { size_t page_size_; - - class ThreadPool { - public: - ThreadPool() - : total_threads_limit_(1), - bgthreads_(0), - queue_(), - queue_len_(0), - exit_all_threads_(false), - low_io_priority_(false), - env_(nullptr) { - PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); - PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); - } - - ~ThreadPool() { - assert(bgthreads_.size() == 0U); - } - - void JoinAllThreads() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - assert(!exit_all_threads_); - exit_all_threads_ = true; - PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - for (const auto tid : bgthreads_) { - pthread_join(tid, nullptr); - } - bgthreads_.clear(); - } - - void SetHostEnv(Env* env) { - env_ = env; - } - - void LowerIOPriority() { -#ifdef OS_LINUX - PthreadCall("lock", pthread_mutex_lock(&mu_)); - low_io_priority_ = true; - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); -#endif - } - - // Return true if there is at least one thread needs to terminate. - bool HasExcessiveThread() { - return static_cast(bgthreads_.size()) > total_threads_limit_; - } - - // Return true iff the current thread is the excessive thread to terminate. - // Always terminate the running thread that is added last, even if there are - // more than one thread to terminate. - bool IsLastExcessiveThread(size_t thread_id) { - return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; - } - - // Is one of the threads to terminate. - bool IsExcessiveThread(size_t thread_id) { - return static_cast(thread_id) >= total_threads_limit_; - } - - // Return the thread priority. - // This would allow its member-thread to know its priority. - Env::Priority GetThreadPriority() { - return priority_; - } - - // Set the thread priority. - void SetThreadPriority(Env::Priority priority) { - priority_ = priority; - } - - void BGThread(size_t thread_id) { - bool low_io_priority = false; - while (true) { - // Wait until there is an item that is ready to run - PthreadCall("lock", pthread_mutex_lock(&mu_)); - // Stop waiting if the thread needs to do work or needs to terminate. - while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && - (queue_.empty() || IsExcessiveThread(thread_id))) { - PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); - } - if (exit_all_threads_) { // mechanism to let BG threads exit safely - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - break; - } - if (IsLastExcessiveThread(thread_id)) { - // Current thread is the last generated one and is excessive. - // We always terminate excessive thread in the reverse order of - // generation time. - auto terminating_thread = bgthreads_.back(); - pthread_detach(terminating_thread); - bgthreads_.pop_back(); - if (HasExcessiveThread()) { - // There is still at least more excessive thread to terminate. - WakeUpAllThreads(); - } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - break; - } - void (*function)(void*) = queue_.front().function; - void* arg = queue_.front().arg; - queue_.pop_front(); - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); - - bool decrease_io_priority = (low_io_priority != low_io_priority_); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - -#ifdef OS_LINUX - if (decrease_io_priority) { - #define IOPRIO_CLASS_SHIFT (13) - #define IOPRIO_PRIO_VALUE(class, data) \ - (((class) << IOPRIO_CLASS_SHIFT) | data) - // Put schedule into IOPRIO_CLASS_IDLE class (lowest) - // These system calls only have an effect when used in conjunction - // with an I/O scheduler that supports I/O priorities. As at - // kernel 2.6.17 the only such scheduler is the Completely - // Fair Queuing (CFQ) I/O scheduler. - // To change scheduler: - // echo cfq > /sys/block//queue/schedule - // Tunables to consider: - // /sys/block//queue/slice_idle - // /sys/block//queue/slice_sync - syscall(SYS_ioprio_set, - 1, // IOPRIO_WHO_PROCESS - 0, // current thread - IOPRIO_PRIO_VALUE(3, 0)); - low_io_priority = true; - } -#else - (void)decrease_io_priority; // avoid 'unused variable' error -#endif - (*function)(arg); - } - } - - // Helper struct for passing arguments when creating threads. - struct BGThreadMetadata { - ThreadPool* thread_pool_; - size_t thread_id_; // Thread count in the thread. - explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) - : thread_pool_(thread_pool), thread_id_(thread_id) {} - }; - - static void* BGThreadWrapper(void* arg) { - BGThreadMetadata* meta = reinterpret_cast(arg); - size_t thread_id = meta->thread_id_; - ThreadPool* tp = meta->thread_pool_; -#if ROCKSDB_USING_THREAD_STATUS - // for thread-status - ThreadStatusUtil::RegisterThread(tp->env_, - (tp->GetThreadPriority() == Env::Priority::HIGH ? - ThreadStatus::HIGH_PRIORITY : - ThreadStatus::LOW_PRIORITY)); -#endif - delete meta; - tp->BGThread(thread_id); -#if ROCKSDB_USING_THREAD_STATUS - ThreadStatusUtil::UnregisterThread(); -#endif - return nullptr; - } - - void WakeUpAllThreads() { - PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); - } - - void SetBackgroundThreadsInternal(int num, bool allow_reduce) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return; - } - if (num > total_threads_limit_ || - (num < total_threads_limit_ && allow_reduce)) { - total_threads_limit_ = std::max(1, num); - WakeUpAllThreads(); - StartBGThreads(); - } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - } - - void IncBackgroundThreadsIfNeeded(int num) { - SetBackgroundThreadsInternal(num, false); - } - - void SetBackgroundThreads(int num) { - SetBackgroundThreadsInternal(num, true); - } - - void StartBGThreads() { - // Start background thread if necessary - while ((int)bgthreads_.size() < total_threads_limit_) { - pthread_t t; - PthreadCall( - "create thread", - pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper, - new BGThreadMetadata(this, bgthreads_.size()))); - - // Set the thread name to aid debugging -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) -#if __GLIBC_PREREQ(2, 12) - char name_buf[16]; - snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, - bgthreads_.size()); - name_buf[sizeof name_buf - 1] = '\0'; - pthread_setname_np(t, name_buf); -#endif -#endif - - bgthreads_.push_back(t); - } - } - - void Schedule(void (*function)(void* arg1), void* arg, void* tag) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - - if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return; - } - - StartBGThreads(); - - // Add to priority queue - queue_.push_back(BGItem()); - queue_.back().function = function; - queue_.back().arg = arg; - queue_.back().tag = tag; - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); - - if (!HasExcessiveThread()) { - // Wake up at least one waiting thread. - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); - } else { - // Need to wake up all threads to make sure the one woken - // up is not the one to terminate. - WakeUpAllThreads(); - } - - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - } - - int UnSchedule(void* arg) { - int count = 0; - PthreadCall("lock", pthread_mutex_lock(&mu_)); - - // Remove from priority queue - BGQueue::iterator it = queue_.begin(); - while (it != queue_.end()) { - if (arg == (*it).tag) { - it = queue_.erase(it); - count++; - } else { - it++; - } - } - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return count; - } - - unsigned int GetQueueLen() const { - return queue_len_.load(std::memory_order_relaxed); - } - - private: - // Entry per Schedule() call - struct BGItem { - void* arg; - void (*function)(void*); - void* tag; - }; - typedef std::deque BGQueue; - - pthread_mutex_t mu_; - pthread_cond_t bgsignal_; - int total_threads_limit_; - std::vector bgthreads_; - BGQueue queue_; - std::atomic_uint queue_len_; // Queue length. Used for stats reporting - bool exit_all_threads_; - bool low_io_priority_; - Env::Priority priority_; - Env* env_; - }; - std::vector thread_pools_; - pthread_mutex_t mu_; std::vector threads_to_join_; - }; PosixEnv::PosixEnv() @@ -975,7 +677,7 @@ PosixEnv::PosixEnv() forceMmapOff(false), page_size_(getpagesize()), thread_pools_(Priority::TOTAL) { - PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); @@ -1017,11 +719,11 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { StartThreadState* state = new StartThreadState; state->user_function = function; state->arg = arg; - PthreadCall("start thread", - pthread_create(&t, nullptr, &StartThreadWrapper, state)); - PthreadCall("lock", pthread_mutex_lock(&mu_)); + ThreadPool::PthreadCall( + "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + ThreadPool::PthreadCall("lock", pthread_mutex_lock(&mu_)); threads_to_join_.push_back(t); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + ThreadPool::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } void PosixEnv::WaitForJoin() { diff --git a/util/thread_posix.cc b/util/thread_posix.cc new file mode 100644 index 000000000..c8c07e2a2 --- /dev/null +++ b/util/thread_posix.cc @@ -0,0 +1,250 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include "util/thread_posix.h" +#include +#ifdef OS_LINUX +#include +#endif + +namespace rocksdb { + +void ThreadPool::PthreadCall(const char* label, int result) { + if (result != 0) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); + abort(); + } +} + +ThreadPool::ThreadPool() + : total_threads_limit_(1), + bgthreads_(0), + queue_(), + queue_len_(0), + exit_all_threads_(false), + low_io_priority_(false), + env_(nullptr) { + PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); +} + +ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); } + +void ThreadPool::JoinAllThreads() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + assert(!exit_all_threads_); + exit_all_threads_ = true; + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + for (const auto tid : bgthreads_) { + pthread_join(tid, nullptr); + } + bgthreads_.clear(); +} + +void ThreadPool::LowerIOPriority() { +#ifdef OS_LINUX + PthreadCall("lock", pthread_mutex_lock(&mu_)); + low_io_priority_ = true; + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +#endif +} + +void ThreadPool::BGThread(size_t thread_id) { + bool low_io_priority = false; + while (true) { + // Wait until there is an item that is ready to run + PthreadCall("lock", pthread_mutex_lock(&mu_)); + // Stop waiting if the thread needs to do work or needs to terminate. + while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && + (queue_.empty() || IsExcessiveThread(thread_id))) { + PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); + } + if (exit_all_threads_) { // mechanism to let BG threads exit safely + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } + if (IsLastExcessiveThread(thread_id)) { + // Current thread is the last generated one and is excessive. + // We always terminate excessive thread in the reverse order of + // generation time. + auto terminating_thread = bgthreads_.back(); + pthread_detach(terminating_thread); + bgthreads_.pop_back(); + if (HasExcessiveThread()) { + // There is still at least more excessive thread to terminate. + WakeUpAllThreads(); + } + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + break; + } + void (*function)(void*) = queue_.front().function; + void* arg = queue_.front().arg; + queue_.pop_front(); + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); + + bool decrease_io_priority = (low_io_priority != low_io_priority_); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + +#ifdef OS_LINUX + if (decrease_io_priority) { +#define IOPRIO_CLASS_SHIFT (13) +#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) + // Put schedule into IOPRIO_CLASS_IDLE class (lowest) + // These system calls only have an effect when used in conjunction + // with an I/O scheduler that supports I/O priorities. As at + // kernel 2.6.17 the only such scheduler is the Completely + // Fair Queuing (CFQ) I/O scheduler. + // To change scheduler: + // echo cfq > /sys/block//queue/schedule + // Tunables to consider: + // /sys/block//queue/slice_idle + // /sys/block//queue/slice_sync + syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS + 0, // current thread + IOPRIO_PRIO_VALUE(3, 0)); + low_io_priority = true; + } +#else + (void)decrease_io_priority; // avoid 'unused variable' error +#endif + (*function)(arg); + } +} + +// Helper struct for passing arguments when creating threads. +struct BGThreadMetadata { + ThreadPool* thread_pool_; + size_t thread_id_; // Thread count in the thread. + explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + : thread_pool_(thread_pool), thread_id_(thread_id) {} +}; + +static void* BGThreadWrapper(void* arg) { + BGThreadMetadata* meta = reinterpret_cast(arg); + size_t thread_id = meta->thread_id_; + ThreadPool* tp = meta->thread_pool_; +#if ROCKSDB_USING_THREAD_STATUS + // for thread-status + ThreadStatusUtil::RegisterThread( + tp->GetHostEnv(), (tp->GetThreadPriority() == Env::Priority::HIGH + ? ThreadStatus::HIGH_PRIORITY + : ThreadStatus::LOW_PRIORITY)); +#endif + delete meta; + tp->BGThread(thread_id); +#if ROCKSDB_USING_THREAD_STATUS + ThreadStatusUtil::UnregisterThread(); +#endif + return nullptr; +} + +void ThreadPool::WakeUpAllThreads() { + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); +} + +void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + if (num > total_threads_limit_ || + (num < total_threads_limit_ && allow_reduce)) { + total_threads_limit_ = std::max(1, num); + WakeUpAllThreads(); + StartBGThreads(); + } + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { + SetBackgroundThreadsInternal(num, false); +} + +void ThreadPool::SetBackgroundThreads(int num) { + SetBackgroundThreadsInternal(num, true); +} + +void ThreadPool::StartBGThreads() { + // Start background thread if necessary + while ((int)bgthreads_.size() < total_threads_limit_) { + pthread_t t; + PthreadCall("create thread", + pthread_create(&t, nullptr, &BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size()))); + +// Set the thread name to aid debugging +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) + char name_buf[16]; + snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, + bgthreads_.size()); + name_buf[sizeof name_buf - 1] = '\0'; + pthread_setname_np(t, name_buf); +#endif +#endif + + bgthreads_.push_back(t); + } +} + +void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + + StartBGThreads(); + + // Add to priority queue + queue_.push_back(BGItem()); + queue_.back().function = function; + queue_.back().arg = arg; + queue_.back().tag = tag; + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); + + if (!HasExcessiveThread()) { + // Wake up at least one waiting thread. + PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + } else { + // Need to wake up all threads to make sure the one woken + // up is not the one to terminate. + WakeUpAllThreads(); + } + + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +int ThreadPool::UnSchedule(void* arg) { + int count = 0; + PthreadCall("lock", pthread_mutex_lock(&mu_)); + + // Remove from priority queue + BGQueue::iterator it = queue_.begin(); + while (it != queue_.end()) { + if (arg == (*it).tag) { + it = queue_.erase(it); + count++; + } else { + it++; + } + } + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return count; +} + +} // namespace rocksdb diff --git a/util/thread_posix.h b/util/thread_posix.h new file mode 100644 index 000000000..28db0d7e6 --- /dev/null +++ b/util/thread_posix.h @@ -0,0 +1,86 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "rocksdb/env.h" +#include "util/thread_status_util.h" + +namespace rocksdb { + +class ThreadPool { + public: + ThreadPool(); + ~ThreadPool(); + + void JoinAllThreads(); + void LowerIOPriority(); + void BGThread(size_t thread_id); + void WakeUpAllThreads(); + void IncBackgroundThreadsIfNeeded(int num); + void SetBackgroundThreads(int num); + void StartBGThreads(); + void Schedule(void (*function)(void* arg1), void* arg, void* tag); + int UnSchedule(void* arg); + + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } + + void SetHostEnv(Env* env) { env_ = env; } + Env* GetHostEnv() { return env_; } + + // Return true if there is at least one thread needs to terminate. + bool HasExcessiveThread() { + return static_cast(bgthreads_.size()) > total_threads_limit_; + } + + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) { + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; + } + + // Is one of the threads to terminate. + bool IsExcessiveThread(size_t thread_id) { + return static_cast(thread_id) >= total_threads_limit_; + } + + // Return the thread priority. + // This would allow its member-thread to know its priority. + Env::Priority GetThreadPriority() { return priority_; } + + // Set the thread priority. + void SetThreadPriority(Env::Priority priority) { priority_ = priority; } + + static void PthreadCall(const char* label, int result); + + private: + // Entry per Schedule() call + struct BGItem { + void* arg; + void (*function)(void*); + void* tag; + }; + typedef std::deque BGQueue; + + pthread_mutex_t mu_; + pthread_cond_t bgsignal_; + int total_threads_limit_; + std::vector bgthreads_; + BGQueue queue_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting + bool exit_all_threads_; + bool low_io_priority_; + Env::Priority priority_; + Env* env_; + + void SetBackgroundThreadsInternal(int num, bool allow_reduce); +}; + +} // namespace rocksdb