From c594b0e89db209d72914ba9d30351acecca59fa3 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Fri, 13 Mar 2015 10:45:40 -0700 Subject: [PATCH] Allow GetThreadList() to report operation stage. Summary: Allow GetThreadList() to report operation stage. Test Plan: ./thread_list_test ./db_bench --benchmarks=fillrandom --num=100000 --threads=40 \ --max_background_compactions=10 --max_background_flushes=3 \ --thread_status_per_interval=1000 --key_size=16 --value_size=1000 \ --num_column_families=10 export ROCKSDB_TESTS=ThreadStatus ./db_test Sample output ThreadID ThreadType cfName Operation OP_StartTime ElapsedTime Stage State 140116265861184 Low Pri 140116270055488 Low Pri 140116274249792 High Pri column_family_name_000005 Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116400078912 Low Pri column_family_name_000004 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116358135872 Low Pri column_family_name_000006 Compaction 2015/03/10-14:58:10 1 us CompactionJob::FinishCompactionOutputFile 140116341358656 Low Pri 140116295221312 High Pri default Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116324581440 Low Pri column_family_name_000009 Compaction 2015/03/10-14:58:11 0 us CompactionJob::ProcessKeyValueCompaction 140116278444096 Low Pri 140116299415616 Low Pri column_family_name_000008 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116291027008 High Pri column_family_name_000001 Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116286832704 Low Pri column_family_name_000002 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116282638400 Low Pri Reviewers: rven, igor, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D34683 --- db/compaction_job.cc | 30 +++++++++++++++++++++++------ db/compaction_job.h | 2 +- db/db_bench.cc | 7 ++++--- db/db_test.cc | 11 +++++++---- db/flush_job.cc | 22 ++++++++++++++------- db/flush_job.h | 3 ++- db/memtable_list.cc | 7 +++++++ include/rocksdb/thread_status.h | 26 +++++++++++++++++++++++++ util/thread_list_test.cc | 9 ++++++++- util/thread_operation.h | 34 +++++++++++++++++++++++++++++++++ util/thread_status_impl.cc | 11 +++++++++++ util/thread_status_updater.cc | 20 ++++++++++++++++++- util/thread_status_updater.h | 6 ++++++ util/thread_status_util.cc | 27 ++++++++++++++++++++++++++ util/thread_status_util.h | 16 ++++++++++++++++ 15 files changed, 207 insertions(+), 24 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 6ae8a42ba..f67c56f5c 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -225,9 +225,22 @@ CompactionJob::CompactionJob( snapshots_(snapshots), is_snapshot_supported_(is_snapshot_supported), table_cache_(std::move(table_cache)), - yield_callback_(std::move(yield_callback)) {} + yield_callback_(std::move(yield_callback)) { + ThreadStatusUtil::SetColumnFamily( + compact_->compaction->column_family_data()); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); + TEST_SYNC_POINT("CompactionJob::CompationJob()"); +} + +CompactionJob::~CompactionJob() { + assert(compact_ == nullptr); + TEST_SYNC_POINT("CompactionJob::~CompactionJob()"); + ThreadStatusUtil::ResetThreadStatus(); +} void CompactionJob::Prepare() { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_PREPARE); compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); @@ -275,11 +288,10 @@ void CompactionJob::Prepare() { } Status CompactionJob::Run() { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_RUN); log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); - ThreadStatusUtil::SetColumnFamily(cfd); - ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); - TEST_SYNC_POINT("CompactionJob::Run:Start"); const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( @@ -469,12 +481,12 @@ Status CompactionJob::Run() { RecordCompactionIOStats(); LogFlush(db_options_.info_log); - TEST_SYNC_POINT("CompactionJob::Run:End"); - ThreadStatusUtil::ResetThreadStatus(); return status; } void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex->AssertHeld(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( @@ -511,6 +523,8 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_PROCESS_KV); size_t combined_idx = 0; Status status; std::string compaction_filter_value; @@ -849,6 +863,8 @@ void CompactionJob::CallCompactionFilterV2( if (compact_ == nullptr || compaction_filter_v2 == nullptr) { return; } + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_FILTER_V2); // Assemble slice vectors for user keys and existing values. // We also keep track of our parsed internal key structs because @@ -907,6 +923,8 @@ void CompactionJob::CallCompactionFilterV2( } Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(compact_ != nullptr); assert(compact_->outfile); assert(compact_->builder != nullptr); diff --git a/db/compaction_job.h b/db/compaction_job.h index 2efcf86fc..0db7b6f06 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -62,7 +62,7 @@ class CompactionJob { bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback); - ~CompactionJob() { assert(compact_ == nullptr); } + ~CompactionJob(); // no copy/move CompactionJob(CompactionJob&& job) = delete; diff --git a/db/db_bench.cc b/db/db_bench.cc index a79574a26..efa9b5944 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -926,9 +926,9 @@ class Stats { std::vector thread_list; FLAGS_env->GetThreadList(&thread_list); - fprintf(stderr, "\n%18s %10s %25s %12s %20s %13s %12s\n", + fprintf(stderr, "\n%18s %10s %25s %12s %20s %13s %45s %12s\n", "ThreadID", "ThreadType", "cfName", "Operation", - "OP_StartTime ", "ElapsedTime", "State"); + "OP_StartTime ", "ElapsedTime", "Stage", "State"); int64_t current_time = 0; Env::Default()->GetCurrentTime(¤t_time); @@ -941,13 +941,14 @@ class Stats { } else { elapsed_time[0] = 0; } - fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %20s %13s %12s\n", + fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %20s %13s %45s %12s\n", ts.thread_id, ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(), ts.cf_name.c_str(), ThreadStatus::GetOperationName(ts.operation_type).c_str(), ThreadStatus::TimeToString(ts.op_start_time).c_str(), elapsed_time, + ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(), ThreadStatus::GetStateName(ts.state_type).c_str()); } } diff --git a/db/db_test.cc b/db/db_test.cc index 877855610..b6b67b4fe 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10107,8 +10107,8 @@ TEST(DBTest, ThreadStatusFlush) { options = CurrentOptions(options); rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"FlushJob::Run:Start", "DBTest::ThreadStatusFlush:1"}, - {"DBTest::ThreadStatusFlush:2", "FlushJob::Run:End"}, + {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"}, + {"DBTest::ThreadStatusFlush:2", "FlushJob::~FlushJob()"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -10151,8 +10151,10 @@ TEST(DBTest, ThreadStatusSingleCompaction) { options.level0_file_num_compaction_trigger = kNumL0Files; rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"CompactionJob::Run:Start", "DBTest::ThreadStatusSingleCompaction:1"}, - {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run:End"}, + {"CompactionJob::CompationJob()", + "DBTest::ThreadStatusSingleCompaction:1"}, + {"DBTest::ThreadStatusSingleCompaction:2", + "CompactionJob::Run:~CompactionJob()"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -10175,6 +10177,7 @@ TEST(DBTest, ThreadStatusSingleCompaction) { // If thread tracking is not enabled, compaction count should be 0. VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0); } + // TODO(yhchiang): adding assert to verify each compaction stage. TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2"); // repeat the test with disabling thread tracking. diff --git a/db/flush_job.cc b/db/flush_job.cc index 44be45ab6..ab44aaf38 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -79,9 +79,21 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, output_file_directory_(output_file_directory), output_compression_(output_compression), stats_(stats), - event_logger_(event_logger) {} + event_logger_(event_logger) { + // Update the thread status to indicate flush. + ThreadStatusUtil::SetColumnFamily(cfd_); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); + TEST_SYNC_POINT("FlushJob::FlushJob()"); +} + +FlushJob::~FlushJob() { + TEST_SYNC_POINT("FlushJob::~FlushJob()"); + ThreadStatusUtil::ResetThreadStatus(); +} Status FlushJob::Run(uint64_t* file_number) { + AutoThreadOperationStageUpdater stage_run( + ThreadStatus::STAGE_FLUSH_RUN); // Save the contents of the earliest memtable as a new Table uint64_t fn; autovector mems; @@ -92,10 +104,6 @@ Status FlushJob::Run(uint64_t* file_number) { return Status::OK(); } - // Update the thread status to indicate flush. - ThreadStatusUtil::SetColumnFamily(cfd_); - ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); - TEST_SYNC_POINT("FlushJob::Run:Start"); // entries mems are (implicitly) sorted in ascending order by their created // time. We will use the first memtable's `edit` to keep the meta info for @@ -130,13 +138,13 @@ Status FlushJob::Run(uint64_t* file_number) { *file_number = fn; } - TEST_SYNC_POINT("FlushJob::Run:End"); - ThreadStatusUtil::ResetThreadStatus(); return s; } Status FlushJob::WriteLevel0Table(const autovector& mems, VersionEdit* edit, uint64_t* filenumber) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); FileMetaData meta; diff --git a/db/flush_job.h b/db/flush_job.h index 1526d673b..0534466d2 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -61,7 +61,8 @@ class FlushJob { LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger); - ~FlushJob() {} + + ~FlushJob(); Status Run(uint64_t* file_number = nullptr); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 44c069dd5..73c1e3d59 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -19,6 +19,7 @@ #include "table/merger.h" #include "util/coding.h" #include "util/log_buffer.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -127,6 +128,8 @@ bool MemTableList::IsFlushPending() const { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(autovector* ret) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; @@ -145,6 +148,8 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { void MemTableList::RollbackMemtableFlush(const autovector& mems, uint64_t file_number) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_ROLLBACK); assert(!mems.empty()); // If the flush was not successful, then just reset state. @@ -167,6 +172,8 @@ Status MemTableList::InstallMemtableFlushResults( const autovector& mems, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); // flush was sucessful diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index 254f924f0..c5e4d9dd1 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -48,6 +48,22 @@ struct ThreadStatus { NUM_OP_TYPES }; + enum OperationStage : int { + STAGE_UNKNOWN = 0, + STAGE_FLUSH_RUN, + STAGE_FLUSH_WRITE_L0, + STAGE_COMPACTION_PREPARE, + STAGE_COMPACTION_RUN, + STAGE_COMPACTION_PROCESS_KV, + STAGE_COMPACTION_FILTER_V2, + STAGE_COMPACTION_INSTALL, + STAGE_COMPACTION_SYNC_FILE, + STAGE_PICK_MEMTABLES_TO_FLUSH, + STAGE_MEMTABLE_ROLLBACK, + STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS, + NUM_OP_STAGES + }; + // The type used to refer to a thread state. // A state describes lower-level action of a thread // such as reading / writing a file or waiting for a mutex. @@ -63,12 +79,14 @@ struct ThreadStatus { const std::string& _cf_name, const OperationType _operation_type, const int64_t _op_start_time, + const OperationStage _operation_stage, const StateType _state_type) : thread_id(_id), thread_type(_thread_type), db_name(_db_name), cf_name(_cf_name), operation_type(_operation_type), op_start_time(_op_start_time), + operation_stage(_operation_stage), state_type(_state_type) {} // An unique ID for the thread. @@ -95,6 +113,10 @@ struct ThreadStatus { // Epoch, 1970-01-01 00:00:00 (UTC). const int64_t op_start_time; + // An integer showing the current stage where the thread is involved + // in the current operation. + const OperationStage operation_stage; + // The state (lower-level action) that the current thread is involved. const StateType state_type; @@ -108,6 +130,10 @@ struct ThreadStatus { static const std::string TimeToString(int64_t op_start_time); + // Obtain a human-readable string describing the specified operation stage. + static const std::string& GetOperationStageName( + OperationStage stage); + // Obtain the name of a state given its type. static const std::string& GetStateName(StateType state_type); }; diff --git a/util/thread_list_test.cc b/util/thread_list_test.cc index 737b78fe3..7a1bd22b8 100644 --- a/util/thread_list_test.cc +++ b/util/thread_list_test.cc @@ -94,7 +94,7 @@ class ThreadListTest { } }; -TEST(ThreadListTest, EventTables) { +TEST(ThreadListTest, GlobalTables) { // verify the global tables for operations and states are properly indexed. for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) { ASSERT_EQ(global_operation_table[type].type, type); @@ -109,6 +109,13 @@ TEST(ThreadListTest, EventTables) { ThreadStatus::GetStateName( ThreadStatus::StateType(type))); } + + for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) { + ASSERT_EQ(global_op_stage_table[stage].stage, stage); + ASSERT_EQ(global_op_stage_table[stage].name, + ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage(stage))); + } } TEST(ThreadListTest, SimpleColumnFamilyInfoTest) { diff --git a/util/thread_operation.h b/util/thread_operation.h index 45521e227..a35a70780 100644 --- a/util/thread_operation.h +++ b/util/thread_operation.h @@ -41,6 +41,40 @@ static OperationInfo global_operation_table[] = { {ThreadStatus::OP_FLUSH, "Flush"} }; +struct OperationStageInfo { + const ThreadStatus::OperationStage stage; + const std::string name; +}; + +// A table maintains the mapping from stage type to stage string. +// Note that the string must be changed accordingly when the +// associated function name changed. +static OperationStageInfo global_op_stage_table[] = { + {ThreadStatus::STAGE_UNKNOWN, ""}, + {ThreadStatus::STAGE_FLUSH_RUN, + "FlushJob::Run"}, + {ThreadStatus::STAGE_FLUSH_WRITE_L0, + "FlushJob::WriteLevel0Table"}, + {ThreadStatus::STAGE_COMPACTION_PREPARE, + "CompactionJob::Prepare"}, + {ThreadStatus::STAGE_COMPACTION_RUN, + "CompactionJob::Run"}, + {ThreadStatus::STAGE_COMPACTION_PROCESS_KV, + "CompactionJob::ProcessKeyValueCompaction"}, + {ThreadStatus::STAGE_COMPACTION_FILTER_V2, + "CompactionJob::CallCompactionFilterV2"}, + {ThreadStatus::STAGE_COMPACTION_INSTALL, + "CompactionJob::Install"}, + {ThreadStatus::STAGE_COMPACTION_SYNC_FILE, + "CompactionJob::FinishCompactionOutputFile"}, + {ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH, + "MemTableList::PickMemtablesToFlush"}, + {ThreadStatus::STAGE_MEMTABLE_ROLLBACK, + "MemTableList::RollbackMemtableFlush"}, + {ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS, + "MemTableList::InstallMemtableFlushResults"}, +}; + // The structure that describes a state. struct StateInfo { const ThreadStatus::StateType type; diff --git a/util/thread_status_impl.cc b/util/thread_status_impl.cc index 007031106..6959f008e 100644 --- a/util/thread_status_impl.cc +++ b/util/thread_status_impl.cc @@ -23,6 +23,11 @@ const std::string& ThreadStatus::GetOperationName( return global_operation_table[op_type].name; } +const std::string& ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage stage) { + return global_op_stage_table[stage].name; +} + const std::string& ThreadStatus::GetStateName( ThreadStatus::StateType state_type) { return global_state_table[state_type].name; @@ -50,6 +55,12 @@ const std::string& ThreadStatus::GetOperationName( return dummy_str; } +const std::string& ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage stage) { + static std::string dummy_str = ""; + return dummy_str; +} + const std::string& ThreadStatus::GetStateName( ThreadStatus::StateType state_type) { static std::string dummy_str = ""; diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 5fdfe24c0..401e01496 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -60,6 +60,8 @@ void ThreadStatusUpdater::SetThreadOperation( assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); return; } + data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, + std::memory_order_relaxed); data->operation_type.store(type, std::memory_order_relaxed); } @@ -78,10 +80,23 @@ void ThreadStatusUpdater::ClearThreadOperation() { assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); return; } + data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, + std::memory_order_relaxed); data->operation_type.store( ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed); } +ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( + ThreadStatus::OperationStage stage) { + auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return ThreadStatus::STAGE_UNKNOWN; + } + return data->operation_stage.exchange( + stage, std::memory_order_relaxed); +} + void ThreadStatusUpdater::SetThreadState( const ThreadStatus::StateType type) { auto* data = InitAndGet(); @@ -124,6 +139,7 @@ Status ThreadStatusUpdater::GetThreadList( const std::string* db_name = nullptr; const std::string* cf_name = nullptr; ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN; + ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN; ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN; int64_t op_start_time = 0; if (cf_info != nullptr) { @@ -135,6 +151,8 @@ Status ThreadStatusUpdater::GetThreadList( if (op_type != ThreadStatus::OP_UNKNOWN) { op_start_time = thread_data->op_start_time.load( std::memory_order_relaxed); + op_stage = thread_data->operation_stage.load( + std::memory_order_relaxed); state_type = thread_data->state_type.load( std::memory_order_relaxed); } @@ -143,7 +161,7 @@ Status ThreadStatusUpdater::GetThreadList( thread_data->thread_id, thread_type, db_name ? *db_name : "", cf_name ? *cf_name : "", - op_type, op_start_time, state_type); + op_type, op_start_time, op_stage, state_type); } return Status::OK(); diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 089aecc1f..4f657889c 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -87,6 +87,7 @@ struct ThreadStatusData { std::atomic cf_key; std::atomic operation_type; std::atomic op_start_time; + std::atomic operation_stage; std::atomic state_type; #endif // ROCKSDB_USING_THREAD_STATUS }; @@ -126,8 +127,13 @@ class ThreadStatusUpdater { // Update the thread operation of the current thread. void SetThreadOperation(const ThreadStatus::OperationType type); + // Set the start time of an operation. void SetOperationStartTime(const int64_t start_time); + // Update the thread operation stage of the current thread. + ThreadStatus::OperationStage SetThreadOperationStage( + const ThreadStatus::OperationStage stage); + // Clear thread operation of the current thread. void ClearThreadOperation(); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index 818009627..3b98d6bca 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -66,6 +66,17 @@ void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) { thread_updater_local_cache_->SetThreadOperation(op); } +ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage( + ThreadStatus::OperationStage stage) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return ThreadStatus::STAGE_UNKNOWN; + } + + return thread_updater_local_cache_->SetThreadOperationStage(stage); +} + void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { if (thread_updater_local_cache_ == nullptr) { // thread_updater_local_cache_ must be set in SetColumnFamily @@ -118,6 +129,15 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { return (thread_updater_local_cache_ != nullptr); } +AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage stage) { + prev_stage_ = ThreadStatusUtil::SetThreadOperationStage(stage); +} + +AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() { + ThreadStatusUtil::SetThreadOperationStage(prev_stage_); +} + #else ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; @@ -150,6 +170,13 @@ void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { void ThreadStatusUtil::ResetThreadStatus() { } +AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage stage) { +} + +AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() { +} + #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_util.h b/util/thread_status_util.h index 486e08b1d..769144e61 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -56,6 +56,9 @@ class ThreadStatusUtil { static void SetThreadOperation(ThreadStatus::OperationType type); + static ThreadStatus::OperationStage SetThreadOperationStage( + ThreadStatus::OperationStage stage); + static void SetThreadState(ThreadStatus::StateType type); static void ResetThreadStatus(); @@ -104,4 +107,17 @@ class ThreadStatusUtil { #endif }; +// A helper class for updating thread state. It will set the +// thread state according to the input parameter in its constructor +// and set the thread state to the previous state in its destructor. +class AutoThreadOperationStageUpdater { + public: + explicit AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage stage); + ~AutoThreadOperationStageUpdater(); + + private: + ThreadStatus::OperationStage prev_stage_; +}; + } // namespace rocksdb