diff --git a/util/env_posix.cc b/util/env_posix.cc index 77fb0ba44..8b1f04143 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1766,7 +1766,7 @@ class PosixEnv : public Env { ThreadPool* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status - ThreadStatusUtil::SetThreadType(tp->env_, + ThreadStatusUtil::RegisterThread(tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH ? ThreadStatus::HIGH_PRIORITY : ThreadStatus::LOW_PRIORITY)); diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 2dc15b429..2fd87cc89 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -15,6 +15,19 @@ namespace rocksdb { __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { + if (UNLIKELY(thread_status_data_ == nullptr)) { + thread_status_data_ = new ThreadStatusData(); + thread_status_data_->thread_type = ttype; + thread_status_data_->thread_id = thread_id; + std::lock_guard lck(thread_list_mutex_); + thread_data_set_.insert(thread_status_data_); + } + + ClearThreadOperationProperties(); +} + void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); @@ -24,18 +37,6 @@ void ThreadStatusUpdater::UnregisterThread() { } } -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { - auto* data = InitAndGet(); - data->thread_id.store(thread_id, std::memory_order_relaxed); -} - -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { - auto* data = InitAndGet(); - data->thread_type.store(ttype, std::memory_order_relaxed); - ClearThreadOperationProperties(); -} - void ThreadStatusUpdater::ResetThreadStatus() { ClearThreadState(); ClearThreadOperation(); @@ -44,7 +45,10 @@ void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { - auto* data = InitAndGet(); + auto* data = Get(); + if (data == nullptr) { + return; + } // set the tracking flag based on whether cf_key is non-null or not. // If enable_thread_tracking is set to false, the input cf_key // would be nullptr. @@ -53,8 +57,8 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey( } const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { - auto* data = InitAndGet(); - if (data->enable_tracking == false) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return nullptr; } return data->cf_key.load(std::memory_order_relaxed); @@ -62,9 +66,8 @@ const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { void ThreadStatusUpdater::SetThreadOperation( const ThreadStatus::OperationType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } // NOTE: Our practice here is to set all the thread operation properties @@ -82,9 +85,8 @@ void ThreadStatusUpdater::SetThreadOperation( void ThreadStatusUpdater::SetThreadOperationProperty( int i, uint64_t value) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].store(value, std::memory_order_relaxed); @@ -92,27 +94,24 @@ void ThreadStatusUpdater::SetThreadOperationProperty( void ThreadStatusUpdater::IncreaseThreadOperationProperty( int i, uint64_t delta) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); } void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_start_time.store(start_time, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadOperation() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, @@ -123,9 +122,8 @@ void ThreadStatusUpdater::ClearThreadOperation() { } void ThreadStatusUpdater::ClearThreadOperationProperties() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { @@ -135,9 +133,8 @@ void ThreadStatusUpdater::ClearThreadOperationProperties() { ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( ThreadStatus::OperationStage stage) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return ThreadStatus::STAGE_UNKNOWN; } return data->operation_stage.exchange( @@ -146,18 +143,16 @@ ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( void ThreadStatusUpdater::SetThreadState( const ThreadStatus::StateType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store(type, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadState() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store( @@ -222,11 +217,14 @@ Status ThreadStatusUpdater::GetThreadList( return Status::OK(); } -ThreadStatusData* ThreadStatusUpdater::InitAndGet() { - if (UNLIKELY(thread_status_data_ == nullptr)) { - thread_status_data_ = new ThreadStatusData(); - std::lock_guard lck(thread_list_mutex_); - thread_data_set_.insert(thread_status_data_); +ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() { + if (thread_status_data_ == nullptr) { + return nullptr; + } + if (!thread_status_data_->enable_tracking) { + assert(thread_status_data_->cf_key.load( + std::memory_order_relaxed) == nullptr); + return nullptr; } return thread_status_data_; } @@ -290,17 +288,14 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { #else -void ThreadStatusUpdater::UnregisterThread() { +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { } -void ThreadStatusUpdater::ResetThreadStatus() { -} - -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { +void ThreadStatusUpdater::UnregisterThread() { } -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { +void ThreadStatusUpdater::ResetThreadStatus() { } void ThreadStatusUpdater::SetColumnFamilyInfoKey( diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 5e7c2b894..218bba042 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -118,8 +118,8 @@ class ThreadStatusUpdater { // Set the id of the current thread. void SetThreadID(uint64_t thread_id); - // Set the thread type of the current thread. - void SetThreadType(ThreadStatus::ThreadType ttype); + // Register the current thread for tracking. + void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id); // Update the column-family info of the current thread by setting // its thread-local pointer of ThreadStateInfo to the correct entry. @@ -198,9 +198,15 @@ class ThreadStatusUpdater { // The thread-local variable for storing thread status. static __thread ThreadStatusData* thread_status_data_; - // Obtain the pointer to the thread status data. It also performs - // initialization when necessary. - ThreadStatusData* InitAndGet(); + // Returns the pointer to the thread status data only when the + // thread status data is non-null and has enable_tracking == true. + ThreadStatusData* GetLocalThreadStatus(); + + // Directly returns the pointer to thread_status_data_ without + // checking whether enabling_tracking is true of not. + ThreadStatusData* Get() { + return thread_status_data_; + } // The mutex that protects cf_info_map and db_key_map. std::mutex thread_list_mutex_; diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index 116950e13..e67a8e4ef 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -15,14 +15,14 @@ __thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; __thread bool ThreadStatusUtil::thread_updater_initialized_ = false; -void ThreadStatusUtil::SetThreadType( +void ThreadStatusUtil::RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type) { if (!MaybeInitThreadLocalUpdater(env)) { return; } assert(thread_updater_local_cache_); - thread_updater_local_cache_->SetThreadID(env->GetThreadID()); - thread_updater_local_cache_->SetThreadType(thread_type); + thread_updater_local_cache_->RegisterThread( + thread_type, env->GetThreadID()); } void ThreadStatusUtil::UnregisterThread() { diff --git a/util/thread_status_util.h b/util/thread_status_util.h index ba0238d58..aa13a6c40 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -27,8 +27,8 @@ class ColumnFamilyData; // all function calls to ThreadStatusUtil will be no-op. class ThreadStatusUtil { public: - // Set the thread type of the current thread. - static void SetThreadType( + // Register the current thread for tracking. + static void RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type); // Unregister the current thread.