Allow GetThreadList() to report the start time of the current operation.

Summary: Allow GetThreadList() to report the start time of the current operation.

Test Plan:
./db_bench --benchmarks=fillrandom --num=100000 --threads=40 \
  --max_background_compactions=10 --max_background_flushes=3 \
  --thread_status_per_interval=1000 --key_size=16 --value_size=1000 \
  --num_column_families=10

Sample output:
          ThreadID ThreadType                    cfName    Operation        OP_StartTime         State
   140338840797248   High Pri column_family_name_000003        Flush 2015/03/09-17:49:59
   140338844991552   High Pri column_family_name_000004        Flush 2015/03/09-17:49:59
   140338849185856    Low Pri
   140338983403584    Low Pri
   140339008569408    Low Pri
   140338861768768    Low Pri
   140338924683328    Low Pri
   140338899517504    Low Pri
   140338853380160    Low Pri
   140338882740288    Low Pri
   140338865963072   High Pri column_family_name_000006        Flush 2015/03/09-17:49:59
   140338954043456    Low Pri
   140338857574464    Low Pri

Reviewers: igor, rven, sdong

Reviewed By: sdong

Subscribers: lgalanis, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D34689
main
Yueh-Hsuan Chiang 10 years ago
parent 37921b4997
commit 89597bb66b
  1. 19
      db/db_bench.cc
  2. 11
      include/rocksdb/thread_status.h
  3. 11
      util/logging.cc
  4. 9
      util/thread_status_impl.cc
  5. 14
      util/thread_status_updater.cc
  6. 4
      util/thread_status_updater.h
  7. 9
      util/thread_status_util.cc

@ -924,15 +924,28 @@ class Stats {
std::vector<ThreadStatus> thread_list;
FLAGS_env->GetThreadList(&thread_list);
fprintf(stderr, "\n%18s %10s %25s %12s %12s\n",
"ThreadID", "ThreadType", "cfName", "Operation", "State");
fprintf(stderr, "\n%18s %10s %25s %12s %20s %13s %12s\n",
"ThreadID", "ThreadType", "cfName", "Operation",
"OP_StartTime ", "ElapsedTime", "State");
int64_t current_time = 0;
Env::Default()->GetCurrentTime(&current_time);
for (auto ts : thread_list) {
fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %12s\n",
char elapsed_time[25];
if (ts.op_start_time != 0) {
AppendHumanMicros(
current_time - ts.op_start_time,
elapsed_time, 24);
} else {
elapsed_time[0] = 0;
}
fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %20s %13s %12s\n",
ts.thread_id,
ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
ts.cf_name.c_str(),
ThreadStatus::GetOperationName(ts.operation_type).c_str(),
ThreadStatus::TimeToString(ts.op_start_time).c_str(),
elapsed_time,
ThreadStatus::GetStateName(ts.state_type).c_str());
}
}

@ -62,11 +62,14 @@ struct ThreadStatus {
const std::string& _db_name,
const std::string& _cf_name,
const OperationType _operation_type,
const int64_t _op_start_time,
const StateType _state_type) :
thread_id(_id), thread_type(_thread_type),
db_name(_db_name),
cf_name(_cf_name),
operation_type(_operation_type), state_type(_state_type) {}
operation_type(_operation_type),
op_start_time(_op_start_time),
state_type(_state_type) {}
// An unique ID for the thread.
const uint64_t thread_id;
@ -88,6 +91,10 @@ struct ThreadStatus {
// The operation (high-level action) that the current thread is involved.
const OperationType operation_type;
// The start time of the current status in the form of seconds since the
// Epoch, 1970-01-01 00:00:00 (UTC).
const int64_t op_start_time;
// The state (lower-level action) that the current thread is involved.
const StateType state_type;
@ -99,6 +106,8 @@ struct ThreadStatus {
// Obtain the name of an operation given its type.
static const std::string& GetOperationName(OperationType op_type);
static const std::string TimeToString(int64_t op_start_time);
// Obtain the name of a state given its type.
static const std::string& GetStateName(StateType state_type);
};

@ -32,9 +32,18 @@ int AppendHumanMicros(uint64_t micros, char* output, int len) {
} else if (micros < 10000000) {
return snprintf(output, len, "%.3lf ms",
static_cast<double>(micros) / 1000);
} else {
} else if (micros < 1000000l * 60) {
return snprintf(output, len, "%.3lf sec",
static_cast<double>(micros) / 1000000);
} else if (micros < 1000000l * 60 * 60) {
return snprintf(output, len, "%02" PRIu64 ":%05.3f",
micros / 1000000 / 60,
static_cast<double>(micros % 60000000) / 1000000);
} else {
return snprintf(output, len, "%02" PRIu64 ":%02" PRIu64 ":%05.3f",
micros / 1000000 / 3600,
(micros / 1000000 / 60) % 60,
static_cast<double>(micros % 60000000) / 1000000);
}
}

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "rocksdb/env.h"
#include "rocksdb/thread_status.h"
#include "util/thread_operation.h"
@ -27,6 +28,14 @@ const std::string& ThreadStatus::GetStateName(
return global_state_table[state_type].name;
}
const std::string ThreadStatus::TimeToString(
int64_t time) {
if (time == 0) {
return "";
}
return Env::Default()->TimeToString(time);
}
#else
const std::string& ThreadStatus::GetThreadTypeName(

@ -63,6 +63,15 @@ void ThreadStatusUpdater::SetThreadOperation(
data->operation_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetOperationStartTime(const int64_t start_time) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return;
}
data->op_start_time.store(start_time, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadOperation() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
@ -116,6 +125,7 @@ Status ThreadStatusUpdater::GetThreadList(
const std::string* cf_name = nullptr;
ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
int64_t op_start_time = 0;
if (cf_info != nullptr) {
db_name = &cf_info->db_name;
cf_name = &cf_info->cf_name;
@ -123,6 +133,8 @@ Status ThreadStatusUpdater::GetThreadList(
std::memory_order_relaxed);
// display lower-level info only when higher-level info is available.
if (op_type != ThreadStatus::OP_UNKNOWN) {
op_start_time = thread_data->op_start_time.load(
std::memory_order_relaxed);
state_type = thread_data->state_type.load(
std::memory_order_relaxed);
}
@ -131,7 +143,7 @@ Status ThreadStatusUpdater::GetThreadList(
thread_data->thread_id, thread_type,
db_name ? *db_name : "",
cf_name ? *cf_name : "",
op_type, state_type);
op_type, op_start_time, state_type);
}
return Status::OK();

@ -68,6 +68,7 @@ struct ThreadStatusData {
thread_type.store(ThreadStatus::USER);
cf_key.store(nullptr);
operation_type.store(ThreadStatus::OP_UNKNOWN);
op_start_time.store(0);
state_type.store(ThreadStatus::STATE_UNKNOWN);
}
@ -85,6 +86,7 @@ struct ThreadStatusData {
std::atomic<ThreadStatus::ThreadType> thread_type;
std::atomic<const void*> cf_key;
std::atomic<ThreadStatus::OperationType> operation_type;
std::atomic<int64_t> op_start_time;
std::atomic<ThreadStatus::StateType> state_type;
#endif // ROCKSDB_USING_THREAD_STATUS
};
@ -124,6 +126,8 @@ class ThreadStatusUpdater {
// Update the thread operation of the current thread.
void SetThreadOperation(const ThreadStatus::OperationType type);
void SetOperationStartTime(const int64_t start_time);
// Clear thread operation of the current thread.
void ClearThreadOperation();

@ -54,6 +54,15 @@ void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
return;
}
if (op != ThreadStatus::OP_UNKNOWN) {
int64_t current_time = 0;
Env::Default()->GetCurrentTime(&current_time);
thread_updater_local_cache_->SetOperationStartTime(current_time);
} else {
// TDOO(yhchiang): we could report the time when we set operation to
// OP_UNKNOWN once the whole instrumentation has been done.
thread_updater_local_cache_->SetOperationStartTime(0);
}
thread_updater_local_cache_->SetThreadOperation(op);
}

Loading…
Cancel
Save