Only initialize the ThreadStatusData when necessary.

Summary:
Before this patch, any function call to ThreadStatusUtil might automatically initialize and register the thread status data.  However, if it is the user-thread making this call, the allocated thread-status-data will never be released as such threads are not managed by rocksdb.

In this patch, I remove the automatic-initialization part.  Thread-status data is only initialized and uninitialized in Env during the thread creation and destruction.

Test Plan:
db_test
thread_list_test
listener_test

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D40017
main
Yueh-Hsuan Chiang 9 years ago
parent 1a08d0beb5
commit 1369f015ee
  1. 2
      util/env_posix.cc
  2. 103
      util/thread_status_updater.cc
  3. 16
      util/thread_status_updater.h
  4. 6
      util/thread_status_util.cc
  5. 4
      util/thread_status_util.h

@ -1766,7 +1766,7 @@ class PosixEnv : public Env {
ThreadPool* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS
// for thread-status
ThreadStatusUtil::SetThreadType(tp->env_,
ThreadStatusUtil::RegisterThread(tp->env_,
(tp->GetThreadPriority() == Env::Priority::HIGH ?
ThreadStatus::HIGH_PRIORITY :
ThreadStatus::LOW_PRIORITY));

@ -15,6 +15,19 @@ namespace rocksdb {
__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
void ThreadStatusUpdater::RegisterThread(
ThreadStatus::ThreadType ttype, uint64_t thread_id) {
if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData();
thread_status_data_->thread_type = ttype;
thread_status_data_->thread_id = thread_id;
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.insert(thread_status_data_);
}
ClearThreadOperationProperties();
}
void ThreadStatusUpdater::UnregisterThread() {
if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
@ -24,18 +37,6 @@ void ThreadStatusUpdater::UnregisterThread() {
}
}
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
auto* data = InitAndGet();
data->thread_id.store(thread_id, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) {
auto* data = InitAndGet();
data->thread_type.store(ttype, std::memory_order_relaxed);
ClearThreadOperationProperties();
}
void ThreadStatusUpdater::ResetThreadStatus() {
ClearThreadState();
ClearThreadOperation();
@ -44,7 +45,10 @@ void ThreadStatusUpdater::ResetThreadStatus() {
void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) {
auto* data = InitAndGet();
auto* data = Get();
if (data == nullptr) {
return;
}
// set the tracking flag based on whether cf_key is non-null or not.
// If enable_thread_tracking is set to false, the input cf_key
// would be nullptr.
@ -53,8 +57,8 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey(
}
const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
auto* data = InitAndGet();
if (data->enable_tracking == false) {
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return nullptr;
}
return data->cf_key.load(std::memory_order_relaxed);
@ -62,9 +66,8 @@ const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
void ThreadStatusUpdater::SetThreadOperation(
const ThreadStatus::OperationType type) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
// NOTE: Our practice here is to set all the thread operation properties
@ -82,9 +85,8 @@ void ThreadStatusUpdater::SetThreadOperation(
void ThreadStatusUpdater::SetThreadOperationProperty(
int i, uint64_t value) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->op_properties[i].store(value, std::memory_order_relaxed);
@ -92,27 +94,24 @@ void ThreadStatusUpdater::SetThreadOperationProperty(
void ThreadStatusUpdater::IncreaseThreadOperationProperty(
int i, uint64_t delta) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->op_start_time.store(start_time, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadOperation() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
@ -123,9 +122,8 @@ void ThreadStatusUpdater::ClearThreadOperation() {
}
void ThreadStatusUpdater::ClearThreadOperationProperties() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
@ -135,9 +133,8 @@ void ThreadStatusUpdater::ClearThreadOperationProperties() {
ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
ThreadStatus::OperationStage stage) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return ThreadStatus::STAGE_UNKNOWN;
}
return data->operation_stage.exchange(
@ -146,18 +143,16 @@ ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->state_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadState() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
return;
}
data->state_type.store(
@ -222,11 +217,14 @@ Status ThreadStatusUpdater::GetThreadList(
return Status::OK();
}
ThreadStatusData* ThreadStatusUpdater::InitAndGet() {
if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData();
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.insert(thread_status_data_);
ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
if (thread_status_data_ == nullptr) {
return nullptr;
}
if (!thread_status_data_->enable_tracking) {
assert(thread_status_data_->cf_key.load(
std::memory_order_relaxed) == nullptr);
return nullptr;
}
return thread_status_data_;
}
@ -290,17 +288,14 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
#else
void ThreadStatusUpdater::UnregisterThread() {
void ThreadStatusUpdater::RegisterThread(
ThreadStatus::ThreadType ttype, uint64_t thread_id) {
}
void ThreadStatusUpdater::ResetThreadStatus() {
}
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
void ThreadStatusUpdater::UnregisterThread() {
}
void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) {
void ThreadStatusUpdater::ResetThreadStatus() {
}
void ThreadStatusUpdater::SetColumnFamilyInfoKey(

@ -118,8 +118,8 @@ class ThreadStatusUpdater {
// Set the id of the current thread.
void SetThreadID(uint64_t thread_id);
// Set the thread type of the current thread.
void SetThreadType(ThreadStatus::ThreadType ttype);
// Register the current thread for tracking.
void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id);
// Update the column-family info of the current thread by setting
// its thread-local pointer of ThreadStateInfo to the correct entry.
@ -198,9 +198,15 @@ class ThreadStatusUpdater {
// The thread-local variable for storing thread status.
static __thread ThreadStatusData* thread_status_data_;
// Obtain the pointer to the thread status data. It also performs
// initialization when necessary.
ThreadStatusData* InitAndGet();
// Returns the pointer to the thread status data only when the
// thread status data is non-null and has enable_tracking == true.
ThreadStatusData* GetLocalThreadStatus();
// Directly returns the pointer to thread_status_data_ without
// checking whether enabling_tracking is true of not.
ThreadStatusData* Get() {
return thread_status_data_;
}
// The mutex that protects cf_info_map and db_key_map.
std::mutex thread_list_mutex_;

@ -15,14 +15,14 @@ __thread ThreadStatusUpdater*
ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
__thread bool ThreadStatusUtil::thread_updater_initialized_ = false;
void ThreadStatusUtil::SetThreadType(
void ThreadStatusUtil::RegisterThread(
const Env* env, ThreadStatus::ThreadType thread_type) {
if (!MaybeInitThreadLocalUpdater(env)) {
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetThreadID(env->GetThreadID());
thread_updater_local_cache_->SetThreadType(thread_type);
thread_updater_local_cache_->RegisterThread(
thread_type, env->GetThreadID());
}
void ThreadStatusUtil::UnregisterThread() {

@ -27,8 +27,8 @@ class ColumnFamilyData;
// all function calls to ThreadStatusUtil will be no-op.
class ThreadStatusUtil {
public:
// Set the thread type of the current thread.
static void SetThreadType(
// Register the current thread for tracking.
static void RegisterThread(
const Env* env, ThreadStatus::ThreadType thread_type);
// Unregister the current thread.

Loading…
Cancel
Save