diff --git a/db/db_impl.cc b/db/db_impl.cc index 96778e44a..2bafc8f81 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -78,7 +78,8 @@ #include "util/stop_watch.h" #include "util/sync_point.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -3844,30 +3845,27 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } #if ROCKSDB_USING_THREAD_STATUS + void DBImpl::NewThreadStatusCfInfo( ColumnFamilyData* cfd) const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::NewColumnFamilyInfo( - this, GetName(), cfd, cfd->GetName()); + ThreadStatusUtil::NewColumnFamilyInfo(this, cfd); } } void DBImpl::EraseThreadStatusCfInfo( ColumnFamilyData* cfd) const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseColumnFamilyInfo(cfd); + ThreadStatusUtil::EraseColumnFamilyInfo(cfd); } } void DBImpl::EraseThreadStatusDbInfo() const { if (db_options_.enable_thread_tracking) { - ThreadStatusImpl::EraseDatabaseInfo(this); + ThreadStatusUtil::EraseDatabaseInfo(this); } } -Status GetThreadList(std::vector* thread_list) { - return thread_local_status.GetThreadList(thread_list); -} #else void DBImpl::NewThreadStatusCfInfo( ColumnFamilyData* cfd) const { diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 8b0beb7e0..c1d61e377 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -10,7 +10,6 @@ #include "db/merge_context.h" #include "db/db_iter.h" #include "util/perf_context_imp.h" -#include "util/thread_status_impl.h" namespace rocksdb { diff --git a/db/db_test.cc b/db/db_test.cc index 7feb98808..cb2458954 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -51,7 +51,7 @@ #include "util/testutil.h" #include "util/mock_env.h" #include "util/string_util.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" namespace rocksdb { @@ -9418,7 +9418,7 @@ TEST(DBTest, GetThreadList) { TryReopen(options); std::vector thread_list; - Status s = GetThreadList(&thread_list); + Status s = env_->GetThreadList(&thread_list); for (int i = 0; i < 2; ++i) { // repeat the test with differet number of high / low priority threads @@ -9431,7 +9431,7 @@ TEST(DBTest, GetThreadList) { env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW); // Wait to ensure the all threads has been registered env_->SleepForMicroseconds(100000); - s = GetThreadList(&thread_list); + s = env_->GetThreadList(&thread_list); ASSERT_OK(s); unsigned int thread_type_counts[ThreadStatus::ThreadType::TOTAL]; memset(thread_type_counts, 0, sizeof(thread_type_counts)); @@ -9455,15 +9455,18 @@ TEST(DBTest, GetThreadList) { if (i == 0) { // repeat the test with multiple column families CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); } } db_->DropColumnFamily(handles_[2]); delete handles_[2]; handles_.erase(handles_.begin() + 2); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); Close(); - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, true); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, true); } TEST(DBTest, DisableThreadList) { @@ -9473,7 +9476,8 @@ TEST(DBTest, DisableThreadList) { TryReopen(options); CreateAndReopenWithCF({"pikachu", "about-to-remove"}, options); // Verify non of the column family info exists - ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap(handles_, false); + env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( + handles_, false); } #endif // ROCKSDB_USING_THREAD_STATUS diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 72878ff57..a8cb694b4 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -548,12 +548,6 @@ Status DestroyDB(const std::string& name, const Options& options); Status RepairDB(const std::string& dbname, const Options& options); #endif -#if ROCKSDB_USING_THREAD_STATUS -// Obtain the status of all rocksdb-related threads. -Status GetThreadList(std::vector* thread_list); -#endif - - } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_DB_H_ diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index aded546ca..8a96ef1e1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -24,6 +24,7 @@ #include #include #include "rocksdb/status.h" +#include "rocksdb/thread_status.h" namespace rocksdb { @@ -37,6 +38,7 @@ class RandomRWFile; class Directory; struct DBOptions; class RateLimiter; +class ThreadStatusUpdater; using std::unique_ptr; using std::shared_ptr; @@ -83,7 +85,8 @@ struct EnvOptions { class Env { public: - Env() { } + Env() : thread_status_updater_(nullptr) {} + virtual ~Env(); // Return a default environment suitable for the current operating @@ -302,12 +305,34 @@ class Env { virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options) const; + // Returns the status of all threads that belong to the current Env. + virtual Status GetThreadList(std::vector* thread_list) { + return Status::NotSupported("Not supported."); + } + + // Returns the pointer to ThreadStatusUpdater. This function will be + // used in RocksDB internally to update thread status and supports + // GetThreadList(). + virtual ThreadStatusUpdater* GetThreadStatusUpdater() const { + return thread_status_updater_; + } + + protected: + // The pointer to an internal structure that will update the + // status of each thread. + ThreadStatusUpdater* thread_status_updater_; + private: // No copying allowed Env(const Env&); void operator=(const Env&); }; +// The factory function to construct a ThreadStatusUpdater. Any Env +// that supports GetThreadList() feature should call this function in its +// constructor to initialize thread_status_updater_. +ThreadStatusUpdater* CreateThreadStatusUpdater(); + // A file abstraction for reading sequentially through a file class SequentialFile { public: @@ -805,10 +830,19 @@ class EnvWrapper : public Env { void LowerThreadPoolIOPriority(Priority pool = LOW) override { target_->LowerThreadPoolIOPriority(pool); } + std::string TimeToString(uint64_t time) { return target_->TimeToString(time); } + Status GetThreadList(std::vector* thread_list) { + return target_->GetThreadList(thread_list); + } + + ThreadStatusUpdater* GetThreadStatusUpdater() const override { + return target_->GetThreadStatusUpdater(); + } + private: Env* target_; }; diff --git a/util/env_posix.cc b/util/env_posix.cc index da090ddf5..5bad58466 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -42,7 +42,8 @@ #include "util/random.h" #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" // Get nano time for mach systems #ifdef __MACH__ @@ -76,10 +77,6 @@ int rocksdb_kill_odds = 0; namespace rocksdb { -#if ROCKSDB_USING_THREAD_STATUS -extern ThreadStatusImpl thread_local_status; -#endif - namespace { // A wrapper for fadvise, if the platform doesn't support fadvise, @@ -92,6 +89,10 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) { #endif } +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + // list of pathnames that are locked static std::set lockedFiles; static port::Mutex mutex_lockedFiles; @@ -1076,10 +1077,16 @@ class PosixEnv : public Env { public: PosixEnv(); - virtual ~PosixEnv(){ + virtual ~PosixEnv() { for (const auto tid : threads_to_join_) { pthread_join(tid, nullptr); } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } + // All threads must be joined before the deletion of + // thread_status_updater_. + delete thread_status_updater_; } void SetFD_CLOEXEC(int fd, const EnvOptions* options) { @@ -1356,6 +1363,12 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status GetThreadList( + std::vector* thread_list) override { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); + } + static uint64_t gettid(pthread_t tid) { uint64_t thread_id = 0; memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); @@ -1534,12 +1547,17 @@ class PosixEnv : public Env { queue_(), queue_len_(0), exit_all_threads_(false), - low_io_priority_(false) { + low_io_priority_(false), + env_(nullptr) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } ~ThreadPool() { + assert(bgthreads_.size() == 0U); + } + + void JoinAllThreads() { PthreadCall("lock", pthread_mutex_lock(&mu_)); assert(!exit_all_threads_); exit_all_threads_ = true; @@ -1548,6 +1566,11 @@ class PosixEnv : public Env { for (const auto tid : bgthreads_) { pthread_join(tid, nullptr); } + bgthreads_.clear(); + } + + void SetHostEnv(Env* env) { + env_ = env; } void LowerIOPriority() { @@ -1669,7 +1692,7 @@ class PosixEnv : public Env { ThreadPool* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status - thread_local_status.SetThreadType( + ThreadStatusUtil::SetThreadType(tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH ? ThreadStatus::ThreadType::ROCKSDB_HIGH_PRIORITY : ThreadStatus::ThreadType::ROCKSDB_LOW_PRIORITY)); @@ -1677,7 +1700,7 @@ class PosixEnv : public Env { delete meta; tp->BGThread(thread_id); #if ROCKSDB_USING_THREAD_STATUS - thread_local_status.UnregisterThread(); + ThreadStatusUtil::UnregisterThread(); #endif return nullptr; } @@ -1779,6 +1802,7 @@ class PosixEnv : public Env { bool exit_all_threads_; bool low_io_priority_; Env::Priority priority_; + Env* env_; }; std::vector thread_pools_; @@ -1796,7 +1820,10 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false), for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(this); } + thread_status_updater_ = CreateThreadStatusUpdater(); } void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) { diff --git a/util/thread_list_test.cc b/util/thread_list_test.cc index b5ff60cc7..12ad14719 100644 --- a/util/thread_list_test.cc +++ b/util/thread_list_test.cc @@ -6,7 +6,7 @@ #include #include -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" #include "util/testharness.h" #include "rocksdb/db.h" @@ -21,16 +21,16 @@ class SleepingBackgroundTask { : db_key_(db_key), db_name_(db_name), cf_key_(cf_key), cf_name_(cf_name), should_sleep_(true), sleeping_count_(0) { - ThreadStatusImpl::NewColumnFamilyInfo( + Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( db_key_, db_name_, cf_key_, cf_name_); } ~SleepingBackgroundTask() { - ThreadStatusImpl::EraseDatabaseInfo(db_key_); + Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); } void DoSleep() { - thread_local_status.SetColumnFamilyInfoKey(cf_key_); + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_); std::unique_lock l(mutex_); sleeping_count_++; while (should_sleep_) { @@ -38,7 +38,7 @@ class SleepingBackgroundTask { } sleeping_count_--; bg_cv_.notify_all(); - thread_local_status.SetColumnFamilyInfoKey(0); + Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0); } void WakeUp() { std::unique_lock l(mutex_); @@ -101,7 +101,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { std::vector thread_list; // Verify the number of sleeping threads in each pool. - GetThreadList(&thread_list); + env->GetThreadList(&thread_list); int sleeping_count[ThreadStatus::ThreadType::TOTAL] = {0}; for (auto thread_status : thread_list) { if (thread_status.cf_name == "pikachu" && @@ -122,7 +122,7 @@ TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { sleeping_task.WaitUntilDone(); // Verify none of the threads are sleeping - GetThreadList(&thread_list); + env->GetThreadList(&thread_list); for (int i = 0; i < ThreadStatus::ThreadType::TOTAL; ++i) { sleeping_count[i] = 0; } diff --git a/util/thread_status_impl.cc b/util/thread_status_updater.cc similarity index 74% rename from util/thread_status_impl.cc rename to util/thread_status_updater.cc index 35dc181e2..0a4336251 100644 --- a/util/thread_status_impl.cc +++ b/util/thread_status_updater.cc @@ -5,26 +5,15 @@ #include "port/likely.h" #include "util/mutexlock.h" -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" namespace rocksdb { #if ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusData* ThreadStatusImpl::thread_status_data_ = nullptr; -std::mutex ThreadStatusImpl::thread_list_mutex_; -std::unordered_set ThreadStatusImpl::thread_data_set_; -std::unordered_map> - ThreadStatusImpl::cf_info_map_; -std::unordered_map> - ThreadStatusImpl::db_key_map_; -ThreadStatusImpl thread_local_status; +__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; -ThreadStatusImpl::~ThreadStatusImpl() { - assert(thread_data_set_.size() == 0); -} - -void ThreadStatusImpl::UnregisterThread() { +void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); thread_data_set_.erase(thread_status_data_); @@ -33,26 +22,26 @@ void ThreadStatusImpl::UnregisterThread() { } } -void ThreadStatusImpl::SetThreadType( +void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { auto* data = InitAndGet(); data->thread_type.store(ttype, std::memory_order_relaxed); } -void ThreadStatusImpl::SetColumnFamilyInfoKey( +void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { auto* data = InitAndGet(); data->cf_key.store(cf_key, std::memory_order_relaxed); } -void ThreadStatusImpl::SetEventInfoPtr( +void ThreadStatusUpdater::SetEventInfoPtr( const ThreadEventInfo* event_info) { auto* data = InitAndGet(); data->event_info.store(event_info, std::memory_order_relaxed); } -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { +Status ThreadStatusUpdater::GetThreadList( + std::vector* thread_list) { thread_list->clear(); std::vector> valid_list; @@ -90,7 +79,7 @@ Status ThreadStatusImpl::GetThreadList( return Status::OK(); } -ThreadStatusData* ThreadStatusImpl::InitAndGet() { +ThreadStatusData* ThreadStatusUpdater::InitAndGet() { if (UNLIKELY(thread_status_data_ == nullptr)) { thread_status_data_ = new ThreadStatusData(); thread_status_data_->thread_id = reinterpret_cast( @@ -101,7 +90,7 @@ ThreadStatusData* ThreadStatusImpl::InitAndGet() { return thread_status_data_; } -void ThreadStatusImpl::NewColumnFamilyInfo( +void ThreadStatusUpdater::NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name) { std::lock_guard lck(thread_list_mutex_); @@ -111,7 +100,7 @@ void ThreadStatusImpl::NewColumnFamilyInfo( db_key_map_[db_key].insert(cf_key); } -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { std::lock_guard lck(thread_list_mutex_); auto cf_pair = cf_info_map_.find(cf_key); assert(cf_pair != cf_info_map_.end()); @@ -132,7 +121,7 @@ void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { assert(result); } -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { +void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { std::lock_guard lck(thread_list_mutex_); auto db_pair = db_key_map_.find(db_key); if (UNLIKELY(db_pair == db_key_map_.end())) { @@ -154,41 +143,37 @@ void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { #else -ThreadStatusImpl::~ThreadStatusImpl() { -} - -void ThreadStatusImpl::UnregisterThread() { +void ThreadStatusUpdater::UnregisterThread() { } -void ThreadStatusImpl::SetThreadType( +void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { } -void ThreadStatusImpl::SetColumnFamilyInfoKey( +void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { } -void ThreadStatusImpl::SetEventInfoPtr( +void ThreadStatusUpdater::SetEventInfoPtr( const ThreadEventInfo* event_info) { } -Status ThreadStatusImpl::GetThreadList( - std::vector* thread_list) const { +Status ThreadStatusUpdater::GetThreadList( + std::vector* thread_list) { return Status::NotSupported( "GetThreadList is not supported in the current running environment."); } -void ThreadStatusImpl::NewColumnFamilyInfo( +void ThreadStatusUpdater::NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name) { } -void ThreadStatusImpl::EraseColumnFamilyInfo(const void* cf_key) { +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { } -void ThreadStatusImpl::EraseDatabaseInfo(const void* db_key) { +void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { } -ThreadStatusImpl thread_local_status; #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_impl.h b/util/thread_status_updater.h similarity index 84% rename from util/thread_status_impl.h rename to util/thread_status_updater.h index a6e9a7e5b..e0434cd21 100644 --- a/util/thread_status_impl.h +++ b/util/thread_status_updater.h @@ -3,8 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // -// The implementation of ThreadStatus. It is implemented via combination -// of macros and thread-local variables. +// The implementation of ThreadStatus. // // Note that we make get and set access to ThreadStatusData lockless. // As a result, ThreadStatusData as a whole is not atomic. However, @@ -43,10 +42,7 @@ namespace rocksdb { class ColumnFamilyHandle; -// The mutable version of ThreadStatus. It has a static set maintaining -// the set of current registered threades. -// -// Note that it is suggested to call the above macros. +// The structure that keeps constant information about a column family. struct ConstantColumnFamilyInfo { #if ROCKSDB_USING_THREAD_STATUS public: @@ -61,6 +57,7 @@ struct ConstantColumnFamilyInfo { #endif // ROCKSDB_USING_THREAD_STATUS }; +// The structure that describes an event. struct ThreadEventInfo { #if ROCKSDB_USING_THREAD_STATUS public: @@ -84,13 +81,22 @@ struct ThreadStatusData { #endif // ROCKSDB_USING_THREAD_STATUS }; -class ThreadStatusImpl { +// The class that stores and updates the status of the current thread +// using a thread-local ThreadStatusData. +// +// In most of the case, you should use ThreadStatusUtil to update +// the status of the current thread instead of using ThreadSatusUpdater +// directly. +// +// @see ThreadStatusUtil +class ThreadStatusUpdater { public: - ThreadStatusImpl() {} + ThreadStatusUpdater() {} // Releases all ThreadStatusData of all active threads. - ~ThreadStatusImpl(); + virtual ~ThreadStatusUpdater() {} + // Unregister the current thread. void UnregisterThread(); // Set the thread type of the current thread. @@ -104,29 +110,30 @@ class ThreadStatusImpl { // its thread-local pointer of ThreadEventInfo to the correct entry. void SetEventInfoPtr(const ThreadEventInfo* event_info); + // Obtain the status of all active registered threads. Status GetThreadList( - std::vector* thread_list) const; + std::vector* thread_list); // Create an entry in the global ColumnFamilyInfo table for the // specified column family. This function should be called only // when the current thread does not hold db_mutex. - static void NewColumnFamilyInfo( + void NewColumnFamilyInfo( const void* db_key, const std::string& db_name, const void* cf_key, const std::string& cf_name); // Erase all ConstantColumnFamilyInfo that is associated with the // specified db instance. This function should be called only when // the current thread does not hold db_mutex. - static void EraseDatabaseInfo(const void* db_key); + void EraseDatabaseInfo(const void* db_key); // Erase the ConstantColumnFamilyInfo that is associated with the // specified ColumnFamilyData. This function should be called only // when the current thread does not hold db_mutex. - static void EraseColumnFamilyInfo(const void* cf_key); + void EraseColumnFamilyInfo(const void* cf_key); // Verifies whether the input ColumnFamilyHandles matches // the information stored in the current cf_info_map. - static void TEST_VerifyColumnFamilyInfoMap( + void TEST_VerifyColumnFamilyInfoMap( const std::vector& handles, bool check_exist); @@ -141,27 +148,25 @@ class ThreadStatusImpl { ThreadStatusData* InitAndGet(); // The mutex that protects cf_info_map and db_key_map. - static std::mutex thread_list_mutex_; + std::mutex thread_list_mutex_; // The current status data of all active threads. - static std::unordered_set thread_data_set_; + std::unordered_set thread_data_set_; // A global map that keeps the column family information. It is stored // globally instead of inside DB is to avoid the situation where DB is // closing while GetThreadList function already get the pointer to its // CopnstantColumnFamilyInfo. - static std::unordered_map< + std::unordered_map< const void*, std::unique_ptr> cf_info_map_; // A db_key to cf_key map that allows erasing elements in cf_info_map // associated to the same db_key faster. - static std::unordered_map< + std::unordered_map< const void*, std::unordered_set> db_key_map_; #else static ThreadStatusData* thread_status_data_; #endif // ROCKSDB_USING_THREAD_STATUS }; - -extern ThreadStatusImpl thread_local_status; } // namespace rocksdb diff --git a/util/thread_status_impl_debug.cc b/util/thread_status_updater_debug.cc similarity index 91% rename from util/thread_status_impl_debug.cc rename to util/thread_status_updater_debug.cc index 5489499d3..1f53e5fc1 100644 --- a/util/thread_status_impl_debug.cc +++ b/util/thread_status_updater_debug.cc @@ -5,12 +5,12 @@ #include -#include "util/thread_status_impl.h" +#include "util/thread_status_updater.h" #include "db/column_family.h" #if ROCKSDB_USING_THREAD_STATUS namespace rocksdb { -void ThreadStatusImpl::TEST_VerifyColumnFamilyInfoMap( +void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( const std::vector& handles, bool check_exist) { std::unique_lock lock(thread_list_mutex_); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc new file mode 100644 index 000000000..c8767d9a8 --- /dev/null +++ b/util/thread_status_util.cc @@ -0,0 +1,102 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/env.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" + +namespace rocksdb { + +#if ROCKSDB_USING_THREAD_STATUS +__thread ThreadStatusUpdater* + ThreadStatusUtil::thread_updater_local_cache_ = nullptr; +__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; + +void ThreadStatusUtil::SetThreadType( + const Env* env, ThreadStatus::ThreadType thread_type) { + if (!MaybeInitThreadLocalUpdater(env)) { + return; + } + assert(thread_updater_local_cache_); + thread_updater_local_cache_->SetThreadType(thread_type); +} + +void ThreadStatusUtil::UnregisterThread() { + thread_updater_initialized_ = false; + if (thread_updater_local_cache_ != nullptr) { + thread_updater_local_cache_->UnregisterThread(); + thread_updater_local_cache_ = nullptr; + } +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { + if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { + return; + } + assert(thread_updater_local_cache_); + thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd); +} + +void ThreadStatusUtil::NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd) { + if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) { + return; + } + assert(thread_updater_local_cache_); + if (thread_updater_local_cache_) { + thread_updater_local_cache_->NewColumnFamilyInfo( + db, db->GetName(), cfd, cfd->GetName()); + } +} + +void ThreadStatusUtil::EraseColumnFamilyInfo( + const ColumnFamilyData* cfd) { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->EraseColumnFamilyInfo(cfd); +} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->EraseDatabaseInfo(db); +} + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { + if (!thread_updater_initialized_ && env != nullptr) { + thread_updater_initialized_ = true; + thread_updater_local_cache_ = env->GetThreadStatusUpdater(); + } + return (thread_updater_local_cache_ != nullptr); +} + +#else + +ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; +bool ThreadStatusUtil::thread_updater_initialized_ = false; + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { + return false; +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::EraseColumnFamilyInfo( + const ColumnFamilyData* cfd) { +} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { +} + +#endif // ROCKSDB_USING_THREAD_STATUS + +} // namespace rocksdb diff --git a/util/thread_status_util.h b/util/thread_status_util.h new file mode 100644 index 000000000..c583d5a5d --- /dev/null +++ b/util/thread_status_util.h @@ -0,0 +1,93 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "db/column_family.h" +#include "rocksdb/env.h" +#include "rocksdb/thread_status.h" +#include "util/thread_status_updater.h" + +namespace rocksdb { + +// The static utility class for updating thread-local status. +// +// The thread-local status is updated via the thread-local cached +// pointer thread_updater_local_cache_. During each function call, +// when ThreadStatusUtil finds thread_updater_local_cache_ is +// left uninitialized (determined by thread_updater_initialized_), +// it will tries to initialize it using the return value of +// Env::GetThreadStatusUpdater(). When thread_updater_local_cache_ +// is initialized by a non-null pointer, each function call will +// then update the status of the current thread. Otherwise, +// all function calls to ThreadStatusUtil will be no-op. +class ThreadStatusUtil { + public: + // Set the thread type of the current thread. + static void SetThreadType( + const Env* env, ThreadStatus::ThreadType thread_type); + + // Unregister the current thread. + static void UnregisterThread(); + + // Create an entry in the global ColumnFamilyInfo table for the + // specified column family. This function should be called only + // when the current thread does not hold db_mutex. + static void NewColumnFamilyInfo( + const DB* db, const ColumnFamilyData* cfd); + + // Erase the ConstantColumnFamilyInfo that is associated with the + // specified ColumnFamilyData. This function should be called only + // when the current thread does not hold db_mutex. + static void EraseColumnFamilyInfo(const ColumnFamilyData* cfd); + + // Erase all ConstantColumnFamilyInfo that is associated with the + // specified db instance. This function should be called only when + // the current thread does not hold db_mutex. + static void EraseDatabaseInfo(const DB* db); + + // Update the thread status to indicate the current thread is doing + // something related to the specified column family. + static void SetColumnFamily(const ColumnFamilyData* cfd); + + protected: + // Initialize the thread-local ThreadStatusUpdater when it finds + // the cached value is nullptr. Returns true if it has cached + // a non-null pointer. + static bool MaybeInitThreadLocalUpdater(const Env* env); + +#if ROCKSDB_USING_THREAD_STATUS + // A boolean flag indicating whether thread_updater_local_cache_ + // is initialized. It is set to true when an Env uses any + // ThreadStatusUtil functions using the current thread other + // than UnregisterThread(). It will be set to false when + // UnregisterThread() is called. + // + // When this variable is set to true, thread_updater_local_cache_ + // will not be updated until this variable is again set to false + // in UnregisterThread(). + static __thread bool thread_updater_initialized_; + + // The thread-local cached ThreadStatusUpdater that caches the + // thread_status_updater_ of the first Env that uses any ThreadStatusUtil + // function other than UnregisterThread(). This variable will + // be cleared when UnregisterThread() is called. + // + // When this variable is set to a non-null pointer, then the status + // of the current thread will be updated when a function of + // ThreadStatusUtil is called. Otherwise, all functions of + // ThreadStatusUtil will be no-op. + // + // When thread_updater_initialized_ is set to true, this variable + // will not be updated until this thread_updater_initialized_ is + // again set to false in UnregisterThread(). + static __thread ThreadStatusUpdater* thread_updater_local_cache_; +#else + static bool thread_updater_initialized_; + static ThreadStatusUpdater* thread_updater_local_cache_; +#endif +}; + +} // namespace rocksdb diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index fd35698b4..3bd27e46a 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -8,7 +8,6 @@ #include "db/db_impl.h" #include "db/version_set.h" #include "table/get_context.h" -#include "util/thread_status_impl.h" namespace rocksdb {