Add Env::GetThreadID(), which returns the ID of the current thread.

Summary:
Add Env::GetThreadID(), which returns the ID of the current thread.

In addition, make GetThreadList() and InfoLog use same unique ID for the same thread.

Test Plan:
db_test
listener_test

Reviewers: igor, rven, IslamAbdelRahman, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D39735
main
Yueh-Hsuan Chiang 10 years ago
parent 73faa3d41d
commit 3eddd1abe9
  1. 6
      db/db_impl.cc
  2. 59
      db/listener_test.cc
  3. 8
      hdfs/env_hdfs.h
  4. 13
      include/rocksdb/env.h
  5. 7
      util/env.cc
  6. 4
      util/env_posix.cc
  7. 20
      util/thread_status_updater.cc
  8. 9
      util/thread_status_updater.h
  9. 15
      util/thread_status_util.cc
  10. 2
      util/thread_status_util.h

@ -285,7 +285,6 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
}
DBImpl::~DBImpl() {
EraseThreadStatusDbInfo();
mutex_.Lock();
if (!shutting_down_.load(std::memory_order_acquire) && flush_on_destroy_) {
@ -316,6 +315,7 @@ DBImpl::~DBImpl() {
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
bg_cv_.Wait();
}
EraseThreadStatusDbInfo();
flush_scheduler_.Clear();
while (!flush_queue_.empty()) {
@ -1310,7 +1310,7 @@ void DBImpl::NotifyOnFlushCompleted(
// go to L0 in the future.
info.file_path = MakeTableFileName(db_options_.db_paths[0].path,
file_number);
info.thread_id = ThreadStatusUtil::GetThreadID();
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
@ -1621,7 +1621,7 @@ void DBImpl::NotifyOnCompactionCompleted(
CompactionJobInfo info;
info.cf_name = cfd->GetName();
info.status = st;
info.thread_id = ThreadStatusUtil::GetThreadID();
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.base_input_level = c->start_level();
info.output_level = c->output_level();

@ -156,6 +156,8 @@ class TestCompactionListener : public EventListener {
compacted_dbs_.push_back(db);
ASSERT_GT(ci.input_files.size(), 0U);
ASSERT_GT(ci.output_files.size(), 0U);
ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
ASSERT_GT(ci.thread_id, 0U);
}
std::vector<DB*> compacted_dbs_;
@ -177,7 +179,9 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
options.max_bytes_for_level_base = options.target_file_size_base * 2;
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
options.level0_file_num_compaction_trigger = kNumL0Files;
TestCompactionListener* listener = new TestCompactionListener();
@ -211,6 +215,11 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
explicit TestFlushListener(Env* env) :
slowdown_count(0),
stop_count(0),
db_closed(false),
env_(env) {}
void OnTableFileCreated(
const TableFileCreationInfo& info) override {
// remember the info for later checking the FlushJobInfo.
@ -224,6 +233,25 @@ class TestFlushListener : public EventListener {
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
#if ROCKSDB_USING_THREAD_STATUS
// Verify the id of the current thread that created this table
// file matches the id of any active flush or compaction thread.
uint64_t thread_id = env_->GetThreadID();
std::vector<ThreadStatus> thread_list;
ASSERT_OK(env_->GetThreadList(&thread_list));
bool found_match = false;
for (auto thread_status : thread_list) {
if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
if (thread_id == thread_status.thread_id) {
found_match = true;
break;
}
}
}
ASSERT_TRUE(found_match);
#endif // ROCKSDB_USING_THREAD_STATUS
}
void OnFlushCompleted(
@ -241,19 +269,29 @@ class TestFlushListener : public EventListener {
ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
ASSERT_GT(info.thread_id, 0U);
}
std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_;
int slowdown_count;
int stop_count;
bool db_closing;
std::atomic_bool db_closed;
TableFileCreationInfo prev_fc_info_;
protected:
Env* env_;
};
TEST_F(EventListenerTest, OnSingleDBFlushTest) {
Options options;
options.write_buffer_size = 100000;
TestFlushListener* listener = new TestFlushListener();
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
@ -284,7 +322,10 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
TEST_F(EventListenerTest, MultiCF) {
Options options;
options.write_buffer_size = 100000;
TestFlushListener* listener = new TestFlushListener();
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
@ -312,18 +353,21 @@ TEST_F(EventListenerTest, MultiCF) {
}
TEST_F(EventListenerTest, MultiDBMultiListeners) {
Options options;
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
std::vector<TestFlushListener*> listeners;
const int kNumDBs = 5;
const int kNumListeners = 10;
for (int i = 0; i < kNumListeners; ++i) {
listeners.emplace_back(new TestFlushListener());
listeners.emplace_back(new TestFlushListener(options.env));
}
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
Options options;
options.create_if_missing = true;
for (int i = 0; i < kNumListeners; ++i) {
options.listeners.emplace_back(listeners[i]);
@ -374,6 +418,7 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
}
}
for (auto handles : vec_handles) {
for (auto h : handles) {
delete h;
@ -389,7 +434,10 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
TEST_F(EventListenerTest, DisableBGCompaction) {
Options options;
TestFlushListener* listener = new TestFlushListener();
#if ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env);
const int kSlowdownTrigger = 5;
const int kStopTrigger = 10;
options.level0_slowdown_writes_trigger = kSlowdownTrigger;
@ -409,6 +457,7 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
// keep writing until writes are forced to stop.
for (int i = 0; static_cast<int>(cf_meta.file_count) < kStopTrigger; ++i) {
Put(1, ToString(i), std::string(100000, 'x'), wopts);
db_->Flush(FlushOptions());
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
}
ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger);

@ -164,6 +164,10 @@ class HdfsEnv : public Env {
return (uint64_t)pthread_self();
}
virtual uint64_t GetThreadID() const override {
return HdfsEnv::gettid();
}
private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
hdfsFS fileSys_; // a single FileSystem object for all files
@ -360,6 +364,10 @@ class HdfsEnv : public Env {
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
}
virtual std::string TimeToString(uint64_t number) override { return ""; }
virtual uint64_t GetThreadID() const override {
return 0;
}
};
}

@ -17,12 +17,12 @@
#ifndef STORAGE_ROCKSDB_INCLUDE_ENV_H_
#define STORAGE_ROCKSDB_INCLUDE_ENV_H_
#include <stdint.h>
#include <cstdarg>
#include <string>
#include <memory>
#include <limits>
#include <memory>
#include <string>
#include <vector>
#include <stdint.h>
#include "rocksdb/status.h"
#include "rocksdb/thread_status.h"
@ -320,6 +320,9 @@ class Env {
return thread_status_updater_;
}
// Returns the ID of the current thread.
virtual uint64_t GetThreadID() const;
protected:
// The pointer to an internal structure that will update the
// status of each thread.
@ -876,6 +879,10 @@ class EnvWrapper : public Env {
return target_->GetThreadStatusUpdater();
}
uint64_t GetThreadID() const override {
return target_->GetThreadID();
}
private:
Env* target_;
};

@ -9,7 +9,9 @@
#include "rocksdb/env.h"
#include <thread>
#include <sys/time.h>
#include "rocksdb/options.h"
#include "util/arena.h"
#include "util/autovector.h"
@ -19,6 +21,11 @@ namespace rocksdb {
Env::~Env() {
}
uint64_t Env::GetThreadID() const {
std::hash<std::thread::id> hasher;
return hasher(std::this_thread::get_id());
}
SequentialFile::~SequentialFile() {
}

@ -1435,6 +1435,10 @@ class PosixEnv : public Env {
return gettid(tid);
}
virtual uint64_t GetThreadID() const {
return gettid(pthread_self());
}
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) override {
FILE* f;

@ -15,11 +15,6 @@ namespace rocksdb {
__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
uint64_t ThreadStatusUpdater::GetThreadID() {
auto* data = InitAndGet();
return data->thread_id;
}
void ThreadStatusUpdater::UnregisterThread() {
if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_);
@ -29,6 +24,11 @@ void ThreadStatusUpdater::UnregisterThread() {
}
}
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
auto* data = InitAndGet();
data->thread_id.store(thread_id, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) {
auto* data = InitAndGet();
@ -173,6 +173,8 @@ Status ThreadStatusUpdater::GetThreadList(
std::lock_guard<std::mutex> lck(thread_list_mutex_);
for (auto* thread_data : thread_data_set_) {
assert(thread_data);
auto thread_id = thread_data->thread_id.load(
std::memory_order_relaxed);
auto thread_type = thread_data->thread_type.load(
std::memory_order_relaxed);
// Since any change to cf_info_map requires thread_list_mutex,
@ -181,7 +183,6 @@ Status ThreadStatusUpdater::GetThreadList(
auto cf_key = thread_data->cf_key.load(
std::memory_order_relaxed);
auto iter = cf_info_map_.find(cf_key);
assert(cf_key == 0 || iter != cf_info_map_.end());
auto* cf_info = iter != cf_info_map_.end() ?
iter->second.get() : nullptr;
const std::string* db_name = nullptr;
@ -211,7 +212,7 @@ Status ThreadStatusUpdater::GetThreadList(
}
}
thread_list->emplace_back(
thread_data->thread_id, thread_type,
thread_id, thread_type,
db_name ? *db_name : "",
cf_name ? *cf_name : "",
op_type, op_elapsed_micros, op_stage, op_props,
@ -224,8 +225,6 @@ Status ThreadStatusUpdater::GetThreadList(
ThreadStatusData* ThreadStatusUpdater::InitAndGet() {
if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData();
thread_status_data_->thread_id = reinterpret_cast<uint64_t>(
thread_status_data_);
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.insert(thread_status_data_);
}
@ -297,8 +296,7 @@ void ThreadStatusUpdater::UnregisterThread() {
void ThreadStatusUpdater::ResetThreadStatus() {
}
uint64_t ThreadStatusUpdater::GetThreadID() {
return 0;
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
}
void ThreadStatusUpdater::SetThreadType(

@ -64,7 +64,8 @@ struct ConstantColumnFamilyInfo {
// status of a thread using a set of atomic pointers.
struct ThreadStatusData {
#if ROCKSDB_USING_THREAD_STATUS
explicit ThreadStatusData() : thread_id(0), enable_tracking(false) {
explicit ThreadStatusData() : enable_tracking(false) {
thread_id.store(0);
thread_type.store(ThreadStatus::USER);
cf_key.store(nullptr);
operation_type.store(ThreadStatus::OP_UNKNOWN);
@ -72,8 +73,6 @@ struct ThreadStatusData {
state_type.store(ThreadStatus::STATE_UNKNOWN);
}
uint64_t thread_id;
// A flag to indicate whether the thread tracking is enabled
// in the current thread. This value will be updated based on whether
// the associated Options::enable_thread_tracking is set to true
@ -83,6 +82,7 @@ struct ThreadStatusData {
// will be no-op.
bool enable_tracking;
std::atomic<uint64_t> thread_id;
std::atomic<ThreadStatus::ThreadType> thread_type;
std::atomic<const void*> cf_key;
std::atomic<ThreadStatus::OperationType> operation_type;
@ -115,7 +115,8 @@ class ThreadStatusUpdater {
// ColumnFamilyInfoKey, ThreadOperation, and ThreadState.
void ResetThreadStatus();
uint64_t GetThreadID();
// Set the id of the current thread.
void SetThreadID(uint64_t thread_id);
// Set the thread type of the current thread.
void SetThreadType(ThreadStatus::ThreadType ttype);

@ -21,6 +21,7 @@ void ThreadStatusUtil::SetThreadType(
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetThreadID(env->GetThreadID());
thread_updater_local_cache_->SetThreadType(thread_type);
}
@ -32,16 +33,6 @@ void ThreadStatusUtil::UnregisterThread() {
}
}
uint64_t ThreadStatusUtil::GetThreadID() {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return 0;
}
return thread_updater_local_cache_->GetThreadID();
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
if (!MaybeInitThreadLocalUpdater(cfd->ioptions()->env)) {
return;
@ -180,10 +171,6 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
return false;
}
uint64_t ThreadStatusUtil::GetThreadID() {
return 0;
}
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
}

@ -27,8 +27,6 @@ class ColumnFamilyData;
// all function calls to ThreadStatusUtil will be no-op.
class ThreadStatusUtil {
public:
static uint64_t GetThreadID();
// Set the thread type of the current thread.
static void SetThreadType(
const Env* env, ThreadStatus::ThreadType thread_type);

Loading…
Cancel
Save