From aab91b8d8fec4c0e03595cbd3ad464eab8da39a3 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Thu, 12 May 2016 18:34:04 -0700 Subject: [PATCH] Use generic threadpool for Windows environment (#1120) Conditionally retrofit thread_posix for use with std::thread and reuse the same logic. Posix users continue using Posix interfaces. Enable XPRESS compression in test runs. Fix master introduced signed/unsigned mismatch. --- CMakeLists.txt | 1 + appveyor.yml | 4 +- port/win/env_win.cc | 252 +----------------------- src.mk | 2 +- util/env_posix.cc | 2 +- util/{thread_posix.cc => threadpool.cc} | 179 ++++++++++++++--- util/{thread_posix.h => threadpool.h} | 34 +++- 7 files changed, 182 insertions(+), 292 deletions(-) rename util/{thread_posix.cc => threadpool.cc} (69%) rename util/{thread_posix.h => threadpool.h} (81%) diff --git a/CMakeLists.txt b/CMakeLists.txt index bc34fe087..7face2c3d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -237,6 +237,7 @@ set(SOURCES util/testharness.cc util/testutil.cc util/thread_local.cc + util/threadpool.cc util/thread_status_impl.cc util/thread_status_updater.cc util/thread_status_util.cc diff --git a/appveyor.yml b/appveyor.yml index 4ee3ab417..b044b8a03 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -2,7 +2,7 @@ version: 1.0.{build} before_build: - md %APPVEYOR_BUILD_FOLDER%\build - cd %APPVEYOR_BUILD_FOLDER%\build -- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 .. +- cmake -G "Visual Studio 12 Win64" -DOPTDBG=1 -DXPRESS=1 .. - cd .. build: project: build\rocksdb.sln @@ -10,6 +10,6 @@ build: verbosity: minimal test: test_script: -- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Exclude DBTest.Randomized,DBTest.FileCreationRandomFailure -Concurrency 18 +- ps: build_tools\run_ci_db_test.ps1 -EnableRerun -Run db_test -Concurrency 10 - ps: build_tools\run_ci_db_test.ps1 -Run env_test -Concurrency 1 diff --git a/port/win/env_win.cc b/port/win/env_win.cc index fb323e22c..96f76b26e 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -32,6 +32,7 @@ #include "util/sync_point.h" #include "util/aligned_buffer.h" +#include "util/threadpool.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" @@ -1834,257 +1835,6 @@ class WinEnv : public Env { bool SupportsFastAllocate(const std::string& /* path */) { return false; } - class ThreadPool { - public: - ThreadPool() - : total_threads_limit_(1), - bgthreads_(0), - queue_(), - queue_len_(0U), - exit_all_threads_(false), - low_io_priority_(false), - env_(nullptr) {} - - ~ThreadPool() { assert(bgthreads_.size() == 0U); } - - void JoinAllThreads() { - { - std::lock_guard lock(mu_); - assert(!exit_all_threads_); - exit_all_threads_ = true; - bgsignal_.notify_all(); - } - - for (std::thread& th : bgthreads_) { - th.join(); - } - - // Subject to assert in the __dtor - bgthreads_.clear(); - } - - void SetHostEnv(Env* env) { env_ = env; } - - // Return true if there is at least one thread needs to terminate. - bool HasExcessiveThread() const { - return bgthreads_.size() > total_threads_limit_; - } - - // Return true iff the current thread is the excessive thread to terminate. - // Always terminate the running thread that is added last, even if there are - // more than one thread to terminate. - bool IsLastExcessiveThread(size_t thread_id) const { - return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; - } - - // Is one of the threads to terminate. - bool IsExcessiveThread(size_t thread_id) const { - return thread_id >= total_threads_limit_; - } - - // Return the thread priority. - // This would allow its member-thread to know its priority. - Env::Priority GetThreadPriority() { return priority_; } - - // Set the thread priority. - void SetThreadPriority(Env::Priority priority) { priority_ = priority; } - - void BGThread(size_t thread_id) { - while (true) { - // Wait until there is an item that is ready to run - std::unique_lock uniqueLock(mu_); - - // Stop waiting if the thread needs to do work or needs to terminate. - while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && - (queue_.empty() || IsExcessiveThread(thread_id))) { - bgsignal_.wait(uniqueLock); - } - - if (exit_all_threads_) { - // mechanism to let BG threads exit safely - uniqueLock.unlock(); - break; - } - - if (IsLastExcessiveThread(thread_id)) { - // Current thread is the last generated one and is excessive. - // We always terminate excessive thread in the reverse order of - // generation time. - std::thread& terminating_thread = bgthreads_.back(); - auto tid = terminating_thread.get_id(); - // Ensure that this thread is ours - assert(tid == std::this_thread::get_id()); - terminating_thread.detach(); - bgthreads_.pop_back(); - - if (HasExcessiveThread()) { - // There is still at least more excessive thread to terminate. - WakeUpAllThreads(); - } - - uniqueLock.unlock(); - - PrintThreadInfo(thread_id, gettid()); - break; - } - - void (*function)(void*) = queue_.front().function; - void* arg = queue_.front().arg; - queue_.pop_front(); - queue_len_.store(queue_.size(), std::memory_order_relaxed); - - uniqueLock.unlock(); - (*function)(arg); - } - } - - // Helper struct for passing arguments when creating threads. - struct BGThreadMetadata { - ThreadPool* thread_pool_; - size_t thread_id_; // Thread count in the thread. - - BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) - : thread_pool_(thread_pool), thread_id_(thread_id) {} - }; - - static void* BGThreadWrapper(void* arg) { - std::unique_ptr meta( - reinterpret_cast(arg)); - - size_t thread_id = meta->thread_id_; - ThreadPool* tp = meta->thread_pool_; - -#if ROCKSDB_USING_THREAD_STATUS - // for thread-status - ThreadStatusUtil::RegisterThread( - tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH - ? ThreadStatus::HIGH_PRIORITY - : ThreadStatus::LOW_PRIORITY)); -#endif - tp->BGThread(thread_id); -#if ROCKSDB_USING_THREAD_STATUS - ThreadStatusUtil::UnregisterThread(); -#endif - return nullptr; - } - - void WakeUpAllThreads() { bgsignal_.notify_all(); } - - void SetBackgroundThreadsInternal(size_t num, bool allow_reduce) { - std::lock_guard lg(mu_); - - if (exit_all_threads_) { - return; - } - - if (num > total_threads_limit_ || - (num < total_threads_limit_ && allow_reduce)) { - total_threads_limit_ = std::max(size_t(1), num); - WakeUpAllThreads(); - StartBGThreads(); - } - assert(total_threads_limit_ > 0); - } - - void IncBackgroundThreadsIfNeeded(int num) { - SetBackgroundThreadsInternal(num, false); - } - - void SetBackgroundThreads(int num) { - SetBackgroundThreadsInternal(num, true); - } - - void StartBGThreads() { - // Start background thread if necessary - while (bgthreads_.size() < total_threads_limit_) { - std::thread p_t(&ThreadPool::BGThreadWrapper, - new BGThreadMetadata(this, bgthreads_.size())); - bgthreads_.push_back(std::move(p_t)); - } - } - - void Schedule(void (*function)(void* arg1), void* arg, void* tag, - void (*unschedFunction)(void* arg)) { - std::lock_guard lg(mu_); - - if (exit_all_threads_) { - return; - } - - StartBGThreads(); - - // Add to priority queue - queue_.push_back(BGItem()); - queue_.back().function = function; - queue_.back().arg = arg; - queue_.back().tag = tag; - queue_.back().unschedFunction = unschedFunction; - queue_len_.store(queue_.size(), std::memory_order_relaxed); - - if (!HasExcessiveThread()) { - // Wake up at least one waiting thread. - bgsignal_.notify_one(); - } else { - // Need to wake up all threads to make sure the one woken - // up is not the one to terminate. - WakeUpAllThreads(); - } - } - - int UnSchedule(void* arg) { - int count = 0; - - std::lock_guard lg(mu_); - - // Remove from priority queue - BGQueue::iterator it = queue_.begin(); - while (it != queue_.end()) { - if (arg == (*it).tag) { - void (*unschedFunction)(void*) = (*it).unschedFunction; - void* arg1 = (*it).arg; - if (unschedFunction != nullptr) { - (*unschedFunction)(arg1); - } - it = queue_.erase(it); - count++; - } else { - ++it; - } - } - - queue_len_.store(queue_.size(), std::memory_order_relaxed); - - return count; - } - - unsigned int GetQueueLen() const { - return static_cast( - queue_len_.load(std::memory_order_relaxed)); - } - - private: - // Entry per Schedule() call - struct BGItem { - void* arg; - void (*function)(void*); - void* tag; - void (*unschedFunction)(void*); - }; - - typedef std::deque BGQueue; - - std::mutex mu_; - std::condition_variable bgsignal_; - size_t total_threads_limit_; - std::vector bgthreads_; - BGQueue queue_; - std::atomic_size_t queue_len_; // Queue length. Used for stats reporting - bool exit_all_threads_; - bool low_io_priority_; - Env::Priority priority_; - Env* env_; - }; - bool checkedDiskForMmap_; bool forceMmapOff; // do we override Env options? size_t page_size_; diff --git a/src.mk b/src.mk index 47bfe02a8..db6551241 100644 --- a/src.mk +++ b/src.mk @@ -101,7 +101,7 @@ LIB_SOURCES = \ util/env_hdfs.cc \ util/env_posix.cc \ util/io_posix.cc \ - util/thread_posix.cc \ + util/threadpool.cc \ util/transaction_test_util.cc \ util/sst_file_manager_impl.cc \ util/file_util.cc \ diff --git a/util/env_posix.cc b/util/env_posix.cc index 06de7a486..ff4ab9f21 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,7 +42,7 @@ #include "rocksdb/slice.h" #include "util/coding.h" #include "util/io_posix.h" -#include "util/thread_posix.h" +#include "util/threadpool.h" #include "util/iostats_context_imp.h" #include "util/logging.h" #include "util/posix_logger.h" diff --git a/util/thread_posix.cc b/util/threadpool.cc similarity index 69% rename from util/thread_posix.cc rename to util/threadpool.cc index f09abd54c..6279f1955 100644 --- a/util/thread_posix.cc +++ b/util/threadpool.cc @@ -7,11 +7,16 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/thread_posix.h" +#include "util/threadpool.h" #include -#include +#include + +#ifndef OS_WIN +# include +#endif + #ifdef OS_LINUX -#include +# include #endif namespace rocksdb { @@ -23,29 +28,128 @@ void ThreadPool::PthreadCall(const char* label, int result) { } } +namespace { +#ifdef ROCKSDB_STD_THREADPOOL + +struct Lock { + std::unique_lock ul_; + Lock(std::mutex& m) : ul_(m, std::defer_lock) {} +}; + +using Condition = std::condition_variable; + +inline +int MutexLock(Lock& mutex) { + mutex.ul_.lock(); + return 0; +} + +inline +int ConditionWait(Condition& condition, Lock& lock) { + condition.wait(lock.ul_); + return 0; +} + +inline +int ConditionSignalAll(Condition& condition) { + condition.notify_all(); + return 0; +} + +inline +int ConditionSignal(Condition& condition) { + condition.notify_one(); + return 0; +} + +inline +int MutexUnlock(Lock& mutex) { + mutex.ul_.unlock(); + return 0; +} + +inline +void ThreadJoin(std::thread& thread) { + thread.join(); +} + +inline +int ThreadDetach(std::thread& thread) { + thread.detach(); + return 0; +} + +#else + +using Lock = pthread_mutex_t&; +using Condition = pthread_cond_t&; + +inline +int MutexLock(Lock mutex) { + return pthread_mutex_lock(&mutex); +} + +inline +int ConditionWait(Condition condition, Lock lock) { + return pthread_cond_wait(&condition, &lock); +} + +inline +int ConditionSignalAll(Condition condition) { + return pthread_cond_broadcast(&condition); +} + +inline +int ConditionSignal(Condition condition) { + return pthread_cond_signal(&condition); +} + +inline +int MutexUnlock(Lock mutex) { + return pthread_mutex_unlock(&mutex); +} + +inline +void ThreadJoin(pthread_t& thread) { + pthread_join(thread, nullptr); +} + +inline +int ThreadDetach(pthread_t& thread) { + return pthread_detach(thread); +} +#endif +} + ThreadPool::ThreadPool() : total_threads_limit_(1), bgthreads_(0), queue_(), - queue_len_(0), + queue_len_(), exit_all_threads_(false), low_io_priority_(false), env_(nullptr) { +#ifndef ROCKSDB_STD_THREADPOOL PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); +#endif } ThreadPool::~ThreadPool() { assert(bgthreads_.size() == 0U); } void ThreadPool::JoinAllThreads() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + + Lock lock(mu_); + PthreadCall("lock", MutexLock(lock)); assert(!exit_all_threads_); exit_all_threads_ = true; - PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - for (const auto tid : bgthreads_) { - pthread_join(tid, nullptr); + PthreadCall("signalall", ConditionSignalAll(bgsignal_)); + PthreadCall("unlock", MutexUnlock(lock)); + + for (auto& th : bgthreads_) { + ThreadJoin(th); } + bgthreads_.clear(); } @@ -60,31 +164,35 @@ void ThreadPool::LowerIOPriority() { void ThreadPool::BGThread(size_t thread_id) { bool low_io_priority = false; while (true) { - // Wait until there is an item that is ready to run - PthreadCall("lock", pthread_mutex_lock(&mu_)); +// Wait until there is an item that is ready to run + Lock uniqueLock(mu_); + PthreadCall("lock", MutexLock(uniqueLock)); // Stop waiting if the thread needs to do work or needs to terminate. while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && (queue_.empty() || IsExcessiveThread(thread_id))) { - PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); + PthreadCall("wait", ConditionWait(bgsignal_, uniqueLock)); } + if (exit_all_threads_) { // mechanism to let BG threads exit safely - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(uniqueLock)); break; } + if (IsLastExcessiveThread(thread_id)) { // Current thread is the last generated one and is excessive. // We always terminate excessive thread in the reverse order of // generation time. - auto terminating_thread = bgthreads_.back(); - pthread_detach(terminating_thread); + auto& terminating_thread = bgthreads_.back(); + PthreadCall("detach", ThreadDetach(terminating_thread)); bgthreads_.pop_back(); if (HasExcessiveThread()) { // There is still at least more excessive thread to terminate. WakeUpAllThreads(); } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(uniqueLock)); break; } + void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); @@ -92,7 +200,7 @@ void ThreadPool::BGThread(size_t thread_id) { std::memory_order_relaxed); bool decrease_io_priority = (low_io_priority != low_io_priority_); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(uniqueLock)); #ifdef OS_LINUX if (decrease_io_priority) { @@ -124,7 +232,7 @@ void ThreadPool::BGThread(size_t thread_id) { struct BGThreadMetadata { ThreadPool* thread_pool_; size_t thread_id_; // Thread count in the thread. - explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; @@ -148,13 +256,14 @@ static void* BGThreadWrapper(void* arg) { } void ThreadPool::WakeUpAllThreads() { - PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + PthreadCall("signalall", ConditionSignalAll(bgsignal_)); } void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + Lock lock(mu_); + PthreadCall("lock", MutexLock(lock)); if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(lock)); return; } if (num > total_threads_limit_ || @@ -163,7 +272,7 @@ void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) { WakeUpAllThreads(); StartBGThreads(); } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(lock)); } void ThreadPool::IncBackgroundThreadsIfNeeded(int num) { @@ -177,11 +286,15 @@ void ThreadPool::SetBackgroundThreads(int num) { void ThreadPool::StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { +#ifdef ROCKSDB_STD_THREADPOOL + std::thread p_t(&BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size())); + bgthreads_.push_back(std::move(p_t)); +#else pthread_t t; PthreadCall("create thread", pthread_create(&t, nullptr, &BGThreadWrapper, new BGThreadMetadata(this, bgthreads_.size()))); - // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if __GLIBC_PREREQ(2, 12) @@ -192,17 +305,19 @@ void ThreadPool::StartBGThreads() { pthread_setname_np(t, name_buf); #endif #endif - bgthreads_.push_back(t); +#endif } } void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, void (*unschedFunction)(void* arg)) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + + Lock lock(mu_); + PthreadCall("lock", MutexLock(lock)); if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(lock)); return; } @@ -219,19 +334,21 @@ void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag, if (!HasExcessiveThread()) { // Wake up at least one waiting thread. - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + PthreadCall("signal", ConditionSignal(bgsignal_)); } else { // Need to wake up all threads to make sure the one woken // up is not the one to terminate. WakeUpAllThreads(); } - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(lock)); } int ThreadPool::UnSchedule(void* arg) { int count = 0; - PthreadCall("lock", pthread_mutex_lock(&mu_)); + + Lock lock(mu_); + PthreadCall("lock", MutexLock(lock)); // Remove from priority queue BGQueue::iterator it = queue_.begin(); @@ -245,12 +362,12 @@ int ThreadPool::UnSchedule(void* arg) { it = queue_.erase(it); count++; } else { - it++; + ++it; } } queue_len_.store(static_cast(queue_.size()), std::memory_order_relaxed); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + PthreadCall("unlock", MutexUnlock(lock)); return count; } diff --git a/util/thread_posix.h b/util/threadpool.h similarity index 81% rename from util/thread_posix.h rename to util/threadpool.h index 96dfe1e1e..bc6b4c69e 100644 --- a/util/thread_posix.h +++ b/util/threadpool.h @@ -7,9 +7,23 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + +#ifdef OS_WIN +# define ROCKSDB_STD_THREADPOOL +#endif + #include "rocksdb/env.h" #include "util/thread_status_util.h" +#ifdef ROCKSDB_STD_THREADPOOL +# include +# include +# include +#endif + +#include +#include + namespace rocksdb { class ThreadPool { @@ -33,28 +47,28 @@ class ThreadPool { } void SetHostEnv(Env* env) { env_ = env; } - Env* GetHostEnv() { return env_; } + Env* GetHostEnv() const { return env_; } // Return true if there is at least one thread needs to terminate. - bool HasExcessiveThread() { + bool HasExcessiveThread() const { return static_cast(bgthreads_.size()) > total_threads_limit_; } // Return true iff the current thread is the excessive thread to terminate. // Always terminate the running thread that is added last, even if there are // more than one thread to terminate. - bool IsLastExcessiveThread(size_t thread_id) { + bool IsLastExcessiveThread(size_t thread_id) const { return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; } // Is one of the threads to terminate. - bool IsExcessiveThread(size_t thread_id) { + bool IsExcessiveThread(size_t thread_id) const { return static_cast(thread_id) >= total_threads_limit_; } // Return the thread priority. // This would allow its member-thread to know its priority. - Env::Priority GetThreadPriority() { return priority_; } + Env::Priority GetThreadPriority() const { return priority_; } // Set the thread priority. void SetThreadPriority(Env::Priority priority) { priority_ = priority; } @@ -69,12 +83,20 @@ class ThreadPool { void* tag; void (*unschedFunction)(void*); }; + typedef std::deque BGQueue; + int total_threads_limit_; + +#ifdef ROCKSDB_STD_THREADPOOL + std::mutex mu_; + std::condition_variable bgsignal_; + std::vector bgthreads_; +#else pthread_mutex_t mu_; pthread_cond_t bgsignal_; - int total_threads_limit_; std::vector bgthreads_; +#endif BGQueue queue_; std::atomic_uint queue_len_; // Queue length. Used for stats reporting bool exit_all_threads_;