From 45bab305f98d2233b66546f6de78d7a1dad7bc44 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 22 Dec 2014 12:20:17 -0800 Subject: [PATCH] Move GetThreadList() feature under Env. Summary: GetThreadList() feature depends on the thread creation and destruction, which is currently handled under Env. This patch moves GetThreadList() feature under Env to better manage the dependency of GetThreadList() feature on thread creation and destruction. Renamed ThreadStatusImpl to ThreadStatusUpdater. Add ThreadStatusUtil, which is a static class contains utility functions for ThreadStatusUpdater. Test Plan: run db_test, thread_list_test and db_bench and verify the life cycle of Env and ThreadStatusUpdater is properly managed. Reviewers: igor, sdong Reviewed By: sdong Subscribers: ljin, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30057 --- db/db_impl.cc | 14 ++- db/db_impl_readonly.cc | 1 - db/db_test.cc | 18 ++-- include/rocksdb/db.h | 6 -- include/rocksdb/env.h | 36 ++++++- util/env_posix.cc | 45 ++++++-- util/thread_list_test.cc | 14 +-- ...tatus_impl.cc => thread_status_updater.cc} | 57 ++++------ ..._status_impl.h => thread_status_updater.h} | 45 ++++---- ...ebug.cc => thread_status_updater_debug.cc} | 4 +- util/thread_status_util.cc | 102 ++++++++++++++++++ util/thread_status_util.h | 93 ++++++++++++++++ utilities/compacted_db/compacted_db_impl.cc | 1 - 13 files changed, 338 insertions(+), 98 deletions(-) rename util/{thread_status_impl.cc => thread_status_updater.cc} (74%) rename util/{thread_status_impl.h => thread_status_updater.h} (84%) rename util/{thread_status_impl_debug.cc => thread_status_updater_debug.cc} (91%) create mode 100644 util/thread_status_util.cc create mode 100644 util/thread_status_util.h diff --git a/db/db_impl.cc b/db/db_impl.cc index 96778e44a..2bafc8f81 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -78,7 +78,8 @@ #include "util/stop_watch.h" #include "util/sync_point.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -3844,30 +3845,27 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } #if ROCKSDB_USING_THREAD_STATUS + void DBImpl::NewThreadStatusCfInfo( ColumnFamilyData* cfd) const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::NewColumnFamilyInfo( - this, GetName(), cfd, cfd->GetName()); + ThreadStatusUtil::NewColumnFamilyInfo(this, cfd); } } void DBImpl::EraseThreadStatusCfInfo( ColumnFamilyData* cfd) const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseColumnFamilyInfo(cfd); + ThreadStatusUtil::EraseColumnFamilyInfo(cfd); } } void DBImpl::EraseThreadStatusDbInfo() const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseDatabaseInfo(this); + ThreadStatusUtil::EraseDatabaseInfo(this); } } -Status GetThreadList(std::vector* thread_list) { - return thread_local_status.GetThreadList(thread_list); -} #else void DBImpl::NewThreadStatusCfInfo( ColumnFamilyData* cfd) const { diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 8b0beb7e0..c1d61e377 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -10,7 +10,6 @@ #include "db/merge_context.h" #include "db/db_iter.h" #include "util/perf_context_imp.h" -#include "util/thread_status_impl.h" namespace rocksdb { diff --git a/db/db_test.cc b/db/db_test.cc index 7feb98808..cb2458954 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -51,7 +51,7 @@ #include "util/testutil.h" #include "util/mock_env.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" namespace rocksdb { @@ -9418,7 +9418,7 @@ TEST(DBTest, GetThreadList) { TryReopen(options); std::vector thread_list; - Status s = GetThreadList(&thread_list); + Status s = env_->GetThreadList(&thread_list); for (int i = 0; i < 2; ++i) { // repeat the test with differet number of high / low priority threads @@ -9431,7 +9431,7 @@ TEST(DBTest, GetThreadList) { env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW); // Wait to ensure the all threads has been registered env_->SleepForMicroseconds(100000); - s = GetThreadList(&thread_list); + s = env_->GetThreadList(&thread_list); ASSERT_OK(s); unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL]; memset(thread_type_counts, 0, sizeof(thread_type_counts)); @@ -9455,15 +9455,18 @@ TEST(DBTest, GetThreadList) { if (i == 0) { // repeat the test with multiple column families CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); } } db_->DropColumnFamily(handles_[2]); delete handles_[2]; handles_.erase(handles_.begin() + 2); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); Close(); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); } TEST(DBTest, DisableThreadList) { @@ -9473,7 +9476,8 @@ TEST(DBTest, DisableThreadList) { TryReopen(options); CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); // Verify non of the column family info exists - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, false); } #endif // ROCKSDB_USING_THREAD_STATUS diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 72878ff57..a8cb694b4 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -548,12 +548,6 @@ Status DestroyDB(const std::string& name, const Options& options); Status RepairDB(const std::string& dbname, const Options& options); #endif -#if ROCKSDB_USING_THREAD_STATUS -// Obtain the status of all rocksdb-related threads. -Status GetThreadList(std::vector* thread_list); -#endif - - } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_DB_H_ diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index aded546ca..8a96ef1e1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -24,6 +24,7 @@ #include #include #include "rocksdb/status.h" +#include "rocksdb/thread_status.h" namespace rocksdb { @@ -37,6 +38,7 @@ class RandomRWFile; class Directory; struct DBOptions; class RateLimiter; +class ThreadStatusUpdater; using std::unique_ptr; using std::shared_ptr; @@ -83,7 +85,8 @@ struct EnvOptions { class Env { public: - Env() { } + Env() : thread_status_updater_(nullptr) {} + virtual ~Env(); // Return a default environment suitable for the current operating @@ -302,12 +305,34 @@ class Env { virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options) const; + // Returns the status of all threads that belong to the current Env. + virtual Status GetThreadList(std::vector* thread_list) { + return Status::NotSupported("Not supported."); + } + + // Returns the pointer to ThreadStatusUpdater. This function will be + // used in RocksDB internally to update thread status and supports + // GetThreadList(). + virtual ThreadStatusUpdater* GetThreadStatusUpdater() const { + return thread_status_updater_; + } + + protected: + // The pointer to an internal structure that will update the + // status of each thread. + ThreadStatusUpdater* thread_status_updater_; + private: // No copying allowed Env(const Env&); void operator=(const Env&); }; +// The factory function to construct a ThreadStatusUpdater. Any Env +// that supports GetThreadList() feature should call this function in its +// constructor to initialize thread_status_updater_. +ThreadStatusUpdater* CreateThreadStatusUpdater(); + // A file abstraction for reading sequentially through a file class SequentialFile { public: @@ -805,10 +830,19 @@ class EnvWrapper : public Env { void LowerThreadPoolIOPriority(Priority pool = LOW) override { target_->LowerThreadPoolIOPriority(pool); } + std::string TimeToString(uint64_t time) { return target_->TimeToString(time); } + Status GetThreadList(std::vector* thread_list) { + return target_->GetThreadList(thread_list); + } + + ThreadStatusUpdater* GetThreadStatusUpdater() const override { + return target_->GetThreadStatusUpdater(); + } + private: Env* target_; }; diff --git a/util/env_posix.cc b/util/env_posix.cc index da090ddf5..5bad58466 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,7 +42,8 @@ #include "util/random.h" #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" // Get nano time for mach systems #ifdef __MACH__ @@ -76,10 +77,6 @@ int rocksdb_kill_odds = 0; namespace rocksdb { -#if ROCKSDB_USING_THREAD_STATUS -extern ThreadStatusImpl thread_local_status; -#endif - namespace { // A wrapper for fadvise, if the platform doesn't support fadvise, @@ -92,6 +89,10 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) { #endif } +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + // list of pathnames that are locked static std::set lockedFiles; static port::Mutex mutex_lockedFiles; @@ -1076,10 +1077,16 @@ class PosixEnv : public Env { public: PosixEnv(); - virtual ~PosixEnv(){ + virtual ~PosixEnv() { for (const auto tid : threads_to_join_) { pthread_join(tid, nullptr); } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } + // All threads must be joined before the deletion of + // thread_status_updater_. + delete thread_status_updater_; } void SetFD_CLOEXEC(int fd, const EnvOptions* options) { @@ -1356,6 +1363,12 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status GetThreadList( + std::vector* thread_list) override { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); + } + static uint64_t gettid(pthread_t tid) { uint64_t thread_id = 0; memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); @@ -1534,12 +1547,17 @@ class PosixEnv : public Env { queue_(), queue_len_(0), exit_all_threads_(false), - low_io_priority_(false) { + low_io_priority_(false), + env_(nullptr) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } ~ThreadPool() { + assert(bgthreads_.size() == 0U); + } + + void JoinAllThreads() { PthreadCall("lock", pthread_mutex_lock(&mu_)); assert(!exit_all_threads_); exit_all_threads_ = true; @@ -1548,6 +1566,11 @@ class PosixEnv : public Env { for (const auto tid : bgthreads_) { pthread_join(tid, nullptr); } + bgthreads_.clear(); + } + + void SetHostEnv(Env* env) { + env_ = env; } void LowerIOPriority() { @@ -1669,7 +1692,7 @@ class PosixEnv : public Env { ThreadPool* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status - thread_local_status.SetThreadType( + ThreadStatusUtil::SetThreadType(tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH ? ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY : ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY)); @@ -1677,7 +1700,7 @@ class PosixEnv : public Env { delete meta; tp->BGThread(thread_id); #if ROCKSDB_USING_THREAD_STATUS - thread_local_status.UnregisterThread(); + ThreadStatusUtil::UnregisterThread(); #endif return nullptr; } @@ -1779,6 +1802,7 @@ class PosixEnv : public Env { bool exit_all_threads_; bool low_io_priority_; Env::Priority priority_; + Env* env_; }; std::vector thread_pools_; @@ -1796,7 +1820,10 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false), for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(this); } + thread_status_updater_ = CreateThreadStatusUpdater(); } void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { diff --git a/util/thread_list_test.cc b/util/thread_list_test.cc index b5ff60cc7..12ad14719 100644 --- a/util/thread_list_test.cc +++ b/util/thread_list_test.cc @@ -6,7 +6,7 @@ #include #include -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" #include "util/testharness.h" #include "rocksdb/db.h" @@ -21,16 +21,16 @@ class SleepingBackgroundTask { : db_key_(db_key), db_name_(db_name), cf_key_(cf_key), cf_name_(cf_name), should_sleep_(true), sleeping_count_(0) { - ThreadStatusImpl::NewColumnFamilyInfo( + Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( db_key_, db_name_, cf_key_, cf_name_); } ~SleepingBackgroundTask() { - ThreadStatusImpl::EraseDatabaseInfo(db_key_); + Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); } void DoSleep() { - thread_local_status.SetColumnFamilyInfoKey(cf_key_); + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_); std::unique_lock l(mutex_); sleeping_count_++; while (should_sleep_) { @@ -38,7 +38,7 @@ class SleepingBackgroundTask { } sleeping_count_--; bg_cv_.notify_all(); - thread_local_status.SetColumnFamilyInfoKey(0); + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0); } void WakeUp() { std::unique_lock l(mutex_); @@ -101,7 +101,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { std::vector thread_list; // Verify the number of sleeping threads in each pool. - GetThreadList(&thread_list); + env->GetThreadList(&thread_list); int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0}; for (auto thread_status : thread_list) { if (thread_status.cf_name == "pikachu" && @@ -122,7 +122,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { sleeping_task.WaitUntilDone(); // Verify none of the threads are sleeping - GetThreadList(&thread_list); + env->GetThreadList(&thread_list); for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) { sleeping_count[i] = 0; } diff --git a/util/thread_status_impl.cc b/util/thread_status_updater.cc similarity index 74% rename from util/thread_status_impl.cc rename to util/thread_status_updater.cc index 35dc181e2..0a4336251 100644 --- a/util/thread_status_impl.cc +++ b/util/thread_status_updater.cc @@ -5,26 +5,15 @@ #include "port/likely.h" #include "util/mutexlock.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" namespace rocksdb { #if ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr; -std::mutex ThreadStatusImpl::thread_list_mutex_; -std::unordered_set ThreadStatusImpl::thread_data_set_; -std::unordered_map> - ThreadStatusImpl::cf_info_map_; -std::unordered_map> - ThreadStatusImpl::db_key_map_; -ThreadStatusImpl thread_local_status; +__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; -ThreadStatusImpl::~ThreadStatusImpl() { - assert(thread_data_set_.size() == 0); -} - -void ThreadStatusImpl::UnregisterThread() { +void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); thread_data_set_.erase(thread_status_data_); @@ -33,26 +22,26 @@ void ThreadStatusImpl::UnregisterThread() { } } -void ThreadStatusImpl::SetThreadType( +void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { auto* data = InitAndGet(); data->thread_type.store(ttype, std::memory_order_relaxed); } -void ThreadStatusImpl::SetColumnFamilyInfoKey( +void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { auto* data = InitAndGet(); data->cf_key.store(cf_key, std::memory_order_relaxed); } -void ThreadStatusImpl::SetEventInfoPtr( +void ThreadStatusUpdater::SetEventInfoPtr( const ThreadEventInfo* event_info) { auto* data = InitAndGet(); data->event_info.store(event_info, std::memory_order_relaxed); } -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { +Status ThreadStatusUpdater::GetThreadList( + std::vector* thread_list) { thread_list->clear(); std::vector> valid_list; @@ -90,7 +79,7 @@ Status ThreadStatusImpl::GetThreadList( return Status::OK(); } -ThreadStatusData* ThreadStatusImpl::InitAndGet() { +ThreadStatusData* ThreadStatusUpdater::InitAndGet() { if (UNLIKELY(thread_status_data_ == nullptr)) { thread_status_data_ = new ThreadStatusData(); thread_status_data_->thread_id = reinterpret_cast( @@ -101,7 +90,7 @@ ThreadStatusData* ThreadStatusImpl::InitAndGet() { return thread_status_data_; } -void ThreadStatusImpl::NewColumnFamilyInfo( +void ThreadStatusUpdater::NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name) { std::lock_guard lck(thread_list_mutex_); @@ -111,7 +100,7 @@ void ThreadStatusImpl::NewColumnFamilyInfo( db_key_map_[db_key].insert(cf_key); } -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { std::lock_guard lck(thread_list_mutex_); auto cf_pair = cf_info_map_.find(cf_key); assert(cf_pair != cf_info_map_.end()); @@ -132,7 +121,7 @@ void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { assert(result); } -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { +void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { std::lock_guard lck(thread_list_mutex_); auto db_pair = db_key_map_.find(db_key); if (UNLIKELY(db_pair == db_key_map_.end())) { @@ -154,41 +143,37 @@ void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { #else -ThreadStatusImpl::~ThreadStatusImpl() { -} - -void ThreadStatusImpl::UnregisterThread() { +void ThreadStatusUpdater::UnregisterThread() { } -void ThreadStatusImpl::SetThreadType( +void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { } -void ThreadStatusImpl::SetColumnFamilyInfoKey( +void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { } -void ThreadStatusImpl::SetEventInfoPtr( +void ThreadStatusUpdater::SetEventInfoPtr( const ThreadEventInfo* event_info) { } -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { +Status ThreadStatusUpdater::GetThreadList( + std::vector* thread_list) { return Status::NotSupported( "GetThreadList is not supported in the current running environment."); } -void ThreadStatusImpl::NewColumnFamilyInfo( +void ThreadStatusUpdater::NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name) { } -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { } -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { +void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { } -ThreadStatusImpl thread_local_status; #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_impl.h b/util/thread_status_updater.h similarity index 84% rename from util/thread_status_impl.h rename to util/thread_status_updater.h index a6e9a7e5b..e0434cd21 100644 --- a/util/thread_status_impl.h +++ b/util/thread_status_updater.h @@ -3,8 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // -// The implementation of ThreadStatus. It is implemented via combination -// of macros and thread-local variables. +// The implementation of ThreadStatus. // // Note that we make get and set access to ThreadStatusData lockless. // As a result, ThreadStatusData as a whole is not atomic. However, @@ -43,10 +42,7 @@ namespace rocksdb { class ColumnFamilyHandle; -// The mutable version of ThreadStatus. It has a static set maintaining -// the set of current registered threades. -// -// Note that it is suggested to call the above macros. +// The structure that keeps constant information about a column family. struct ConstantColumnFamilyInfo { #if ROCKSDB_USING_THREAD_STATUS public: @@ -61,6 +57,7 @@ struct ConstantColumnFamilyInfo { #endif // ROCKSDB_USING_THREAD_STATUS }; +// The structure that describes an event. struct ThreadEventInfo { #if ROCKSDB_USING_THREAD_STATUS public: @@ -84,13 +81,22 @@ struct ThreadStatusData { #endif // ROCKSDB_USING_THREAD_STATUS }; -class ThreadStatusImpl { +// The class that stores and updates the status of the current thread +// using a thread-local ThreadStatusData. +// +// In most of the case, you should use ThreadStatusUtil to update +// the status of the current thread instead of using ThreadSatusUpdater +// directly. +// +// @see ThreadStatusUtil +class ThreadStatusUpdater { public: - ThreadStatusImpl() {} + ThreadStatusUpdater() {} // Releases all ThreadStatusData of all active threads. - ~ThreadStatusImpl(); + virtual ~ThreadStatusUpdater() {} + // Unregister the current thread. void UnregisterThread(); // Set the thread type of the current thread. @@ -104,29 +110,30 @@ class ThreadStatusImpl { // its thread-local pointer of ThreadEventInfo to the correct entry. void SetEventInfoPtr(const ThreadEventInfo* event_info); + // Obtain the status of all active registered threads. Status GetThreadList( - std::vector* thread_list) const; + std::vector* thread_list); // Create an entry in the global ColumnFamilyInfo table for the // specified column family. This function should be called only // when the current thread does not hold db_mutex. - static void NewColumnFamilyInfo( + void NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name); // Erase all ConstantColumnFamilyInfo that is associated with the // specified db instance. This function should be called only when // the current thread does not hold db_mutex. - static void EraseDatabaseInfo(const void* db_key); + void EraseDatabaseInfo(const void* db_key); // Erase the ConstantColumnFamilyInfo that is associated with the // specified ColumnFamilyData. This function should be called only // when the current thread does not hold db_mutex. - static void EraseColumnFamilyInfo(const void* cf_key); + void EraseColumnFamilyInfo(const void* cf_key); // Verifies whether the input ColumnFamilyHandles matches // the information stored in the current cf_info_map. - static void TEST_VerifyColumnFamilyInfoMap( + void TEST_VerifyColumnFamilyInfoMap( const std::vector& handles, bool check_exist); @@ -141,27 +148,25 @@ class ThreadStatusImpl { ThreadStatusData* InitAndGet(); // The mutex that protects cf_info_map and db_key_map. - static std::mutex thread_list_mutex_; + std::mutex thread_list_mutex_; // The current status data of all active threads. - static std::unordered_set thread_data_set_; + std::unordered_set thread_data_set_; // A global map that keeps the column family information. It is stored // globally instead of inside DB is to avoid the situation where DB is // closing while GetThreadList function already get the pointer to its // CopnstantColumnFamilyInfo. - static std::unordered_map< + std::unordered_map< const void*, std::unique_ptr> cf_info_map_; // A db_key to cf_key map that allows erasing elements in cf_info_map // associated to the same db_key faster. - static std::unordered_map< + std::unordered_map< const void*, std::unordered_set> db_key_map_; #else static ThreadStatusData* thread_status_data_; #endif // ROCKSDB_USING_THREAD_STATUS }; - -extern ThreadStatusImpl thread_local_status; } // namespace rocksdb diff --git a/util/thread_status_impl_debug.cc b/util/thread_status_updater_debug.cc similarity index 91% rename from util/thread_status_impl_debug.cc rename to util/thread_status_updater_debug.cc index 5489499d3..1f53e5fc1 100644 --- a/util/thread_status_impl_debug.cc +++ b/util/thread_status_updater_debug.cc @@ -5,12 +5,12 @@ #include -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" #include "db/column_family.h" #if ROCKSDB_USING_THREAD_STATUS namespace rocksdb { -void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap( +void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( const std::vector& handles, bool check_exist) { std::unique_lock lock(thread_list_mutex_); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc new file mode 100644 index 000000000..c8767d9a8 --- /dev/null +++ b/util/thread_status_util.cc @@ -0,0 +1,102 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/env.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" + +namespace rocksdb { + +#if ROCKSDB_USING_THREAD_STATUS +__thread ThreadStatusUpdater* + ThreadStatusUtil::thread_updater_local_cache_ = nullptr; +__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; + +void ThreadStatusUtil::SetThreadType( + const Env* env, ThreadStatus::ThreadType thread_type) { + if (!MaybeInitThreadLocalUpdater(env)) { + return; + } + assert(thread_updater_local_cache_); + thread_updater_local_cache_->SetThreadType(thread_type); +} + +void ThreadStatusUtil::UnregisterThread() { + thread_updater_initialized_ = false; + if (thread_updater_local_cache_ != nullptr) { + thread_updater_local_cache_->UnregisterThread(); + thread_updater_local_cache_ = nullptr; + } +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { + if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { + return; + } + assert(thread_updater_local_cache_); + thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd); +} + +void ThreadStatusUtil::NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd) { + if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { + return; + } + assert(thread_updater_local_cache_); + if (thread_updater_local_cache_) { + thread_updater_local_cache_->NewColumnFamilyInfo( + db, db->GetName(), cfd, cfd->GetName()); + } +} + +void ThreadStatusUtil::EraseColumnFamilyInfo( + const ColumnFamilyData* cfd) { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->EraseColumnFamilyInfo(cfd); +} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->EraseDatabaseInfo(db); +} + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { + if (!thread_updater_initialized_ && env != nullptr) { + thread_updater_initialized_ = true; + thread_updater_local_cache_ = env->GetThreadStatusUpdater(); + } + return (thread_updater_local_cache_ != nullptr); +} + +#else + +ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; +bool ThreadStatusUtil::thread_updater_initialized_ = false; + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { + return false; +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::EraseColumnFamilyInfo( + const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { +} + +#endif // ROCKSDB_USING_THREAD_STATUS + +} // namespace rocksdb diff --git a/util/thread_status_util.h b/util/thread_status_util.h new file mode 100644 index 000000000..c583d5a5d --- /dev/null +++ b/util/thread_status_util.h @@ -0,0 +1,93 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "db/column_family.h" +#include "rocksdb/env.h" +#include "rocksdb/thread_status.h" +#include "util/thread_status_updater.h" + +namespace rocksdb { + +// The static utility class for updating thread-local status. +// +// The thread-local status is updated via the thread-local cached +// pointer thread_updater_local_cache_. During each function call, +// when ThreadStatusUtil finds thread_updater_local_cache_ is +// left uninitialized (determined by thread_updater_initialized_), +// it will tries to initialize it using the return value of +// Env::GetThreadStatusUpdater(). When thread_updater_local_cache_ +// is initialized by a non-null pointer, each function call will +// then update the status of the current thread. Otherwise, +// all function calls to ThreadStatusUtil will be no-op. +class ThreadStatusUtil { + public: + // Set the thread type of the current thread. + static void SetThreadType( + const Env* env, ThreadStatus::ThreadType thread_type); + + // Unregister the current thread. + static void UnregisterThread(); + + // Create an entry in the global ColumnFamilyInfo table for the + // specified column family. This function should be called only + // when the current thread does not hold db_mutex. + static void NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd); + + // Erase the ConstantColumnFamilyInfo that is associated with the + // specified ColumnFamilyData. This function should be called only + // when the current thread does not hold db_mutex. + static void EraseColumnFamilyInfo(const ColumnFamilyData* cfd); + + // Erase all ConstantColumnFamilyInfo that is associated with the + // specified db instance. This function should be called only when + // the current thread does not hold db_mutex. + static void EraseDatabaseInfo(const DB* db); + + // Update the thread status to indicate the current thread is doing + // something related to the specified column family. + static void SetColumnFamily(const ColumnFamilyData* cfd); + + protected: + // Initialize the thread-local ThreadStatusUpdater when it finds + // the cached value is nullptr. Returns true if it has cached + // a non-null pointer. + static bool MaybeInitThreadLocalUpdater(const Env* env); + +#if ROCKSDB_USING_THREAD_STATUS + // A boolean flag indicating whether thread_updater_local_cache_ + // is initialized. It is set to true when an Env uses any + // ThreadStatusUtil functions using the current thread other + // than UnregisterThread(). It will be set to false when + // UnregisterThread() is called. + // + // When this variable is set to true, thread_updater_local_cache_ + // will not be updated until this variable is again set to false + // in UnregisterThread(). + static __thread bool thread_updater_initialized_; + + // The thread-local cached ThreadStatusUpdater that caches the + // thread_status_updater_ of the first Env that uses any ThreadStatusUtil + // function other than UnregisterThread(). This variable will + // be cleared when UnregisterThread() is called. + // + // When this variable is set to a non-null pointer, then the status + // of the current thread will be updated when a function of + // ThreadStatusUtil is called. Otherwise, all functions of + // ThreadStatusUtil will be no-op. + // + // When thread_updater_initialized_ is set to true, this variable + // will not be updated until this thread_updater_initialized_ is + // again set to false in UnregisterThread(). + static __thread ThreadStatusUpdater* thread_updater_local_cache_; +#else + static bool thread_updater_initialized_; + static ThreadStatusUpdater* thread_updater_local_cache_; +#endif +}; + +} // namespace rocksdb diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index fd35698b4..3bd27e46a 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -8,7 +8,6 @@ #include "db/db_impl.h" #include "db/version_set.h" #include "table/get_context.h" -#include "util/thread_status_impl.h" namespace rocksdb {