fix ThreadStatus for bottom-pri compaction threads

Summary:
added `ThreadType::BOTTOM_PRIORITY` which is used in the `ThreadStatus` object to indicate the thread is used for bottom-pri compactions. Previously there was a bug where we mislabeled such threads as `ThreadType::LOW_PRIORITY`.
Closes https://github.com/facebook/rocksdb/pull/3270

Differential Revision: D6559428

Pulled By: ajkr

fbshipit-source-id: 96b1a50a9c19492b1a5fd1b77cf7061a6f9f1d1c
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent b4d88d7128
commit 5a7e08468a
  1. 15
      db/db_test.cc
  2. 3
      include/rocksdb/thread_status.h
  3. 21
      monitoring/thread_status_impl.cc
  4. 24
      util/threadpool_imp.cc

@ -3488,12 +3488,16 @@ TEST_F(DBTest, GetThreadStatus) {
const int kTestCount = 3; const int kTestCount = 3;
const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5}; const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3}; const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
const unsigned int kBottomPriCounts[kTestCount] = {2, 1, 4};
for (int test = 0; test < kTestCount; ++test) { for (int test = 0; test < kTestCount; ++test) {
// Change the number of threads in high / low priority pool. // Change the number of threads in high / low priority pool.
env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH); env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW); env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
env_->SetBackgroundThreads(kBottomPriCounts[test], Env::BOTTOM);
// Wait to ensure the all threads has been registered // Wait to ensure the all threads has been registered
unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES]; unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
// TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
// all threads have been registered.
// Try up to 60 seconds. // Try up to 60 seconds.
for (int num_try = 0; num_try < 60000; num_try++) { for (int num_try = 0; num_try < 60000; num_try++) {
env_->SleepForMicroseconds(1000); env_->SleepForMicroseconds(1000);
@ -3508,20 +3512,21 @@ TEST_F(DBTest, GetThreadStatus) {
if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] == if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] ==
kHighPriCounts[test] && kHighPriCounts[test] &&
thread_type_counts[ThreadStatus::LOW_PRIORITY] == thread_type_counts[ThreadStatus::LOW_PRIORITY] ==
kLowPriCounts[test]) { kLowPriCounts[test] &&
thread_type_counts[ThreadStatus::BOTTOM_PRIORITY] ==
kBottomPriCounts[test]) {
break; break;
} }
} }
// Verify the total number of threades
ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY] +
thread_type_counts[ThreadStatus::LOW_PRIORITY],
kHighPriCounts[test] + kLowPriCounts[test]);
// Verify the number of high-priority threads // Verify the number of high-priority threads
ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY], ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY],
kHighPriCounts[test]); kHighPriCounts[test]);
// Verify the number of low-priority threads // Verify the number of low-priority threads
ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY], ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY],
kLowPriCounts[test]); kLowPriCounts[test]);
// Verify the number of bottom-priority threads
ASSERT_EQ(thread_type_counts[ThreadStatus::BOTTOM_PRIORITY],
kBottomPriCounts[test]);
} }
if (i == 0) { if (i == 0) {
// repeat the test with multiple column families // repeat the test with multiple column families

@ -45,6 +45,7 @@ struct ThreadStatus {
HIGH_PRIORITY = 0, // RocksDB BG thread in high-pri thread pool HIGH_PRIORITY = 0, // RocksDB BG thread in high-pri thread pool
LOW_PRIORITY, // RocksDB BG thread in low-pri thread pool LOW_PRIORITY, // RocksDB BG thread in low-pri thread pool
USER, // User thread (Non-RocksDB BG thread) USER, // User thread (Non-RocksDB BG thread)
BOTTOM_PRIORITY, // RocksDB BG thread in bottom-pri thread pool
NUM_THREAD_TYPES NUM_THREAD_TYPES
}; };
@ -163,7 +164,7 @@ struct ThreadStatus {
// The followings are a set of utility functions for interpreting // The followings are a set of utility functions for interpreting
// the information of ThreadStatus // the information of ThreadStatus
static const std::string& GetThreadTypeName(ThreadType thread_type); static std::string GetThreadTypeName(ThreadType thread_type);
// Obtain the name of an operation given its type. // Obtain the name of an operation given its type.
static const std::string& GetOperationName(OperationType op_type); static const std::string& GetOperationName(OperationType op_type);

@ -14,14 +14,21 @@
namespace rocksdb { namespace rocksdb {
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
const std::string& ThreadStatus::GetThreadTypeName( std::string ThreadStatus::GetThreadTypeName(
ThreadStatus::ThreadType thread_type) { ThreadStatus::ThreadType thread_type) {
static std::string thread_type_names[NUM_THREAD_TYPES + 1] = { switch (thread_type) {
"High Pri", "Low Pri", "User", "Unknown"}; case ThreadStatus::ThreadType::HIGH_PRIORITY:
if (thread_type < 0 || thread_type >= NUM_THREAD_TYPES) { return "High Pri";
return thread_type_names[NUM_THREAD_TYPES]; // "Unknown" case ThreadStatus::ThreadType::LOW_PRIORITY:
return "Low Pri";
case ThreadStatus::ThreadType::USER:
return "User";
case ThreadStatus::ThreadType::BOTTOM_PRIORITY:
return "Bottom Pri";
case ThreadStatus::ThreadType::NUM_THREAD_TYPES:
assert(false);
} }
return thread_type_names[thread_type]; return "Unknown";
} }
const std::string& ThreadStatus::GetOperationName( const std::string& ThreadStatus::GetOperationName(
@ -120,7 +127,7 @@ std::map<std::string, uint64_t>
#else #else
const std::string& ThreadStatus::GetThreadTypeName( std::string ThreadStatus::GetThreadTypeName(
ThreadStatus::ThreadType thread_type) { ThreadStatus::ThreadType thread_type) {
static std::string dummy_str = ""; static std::string dummy_str = "";
return dummy_str; return dummy_str;

@ -254,11 +254,25 @@ void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
size_t thread_id = meta->thread_id_; size_t thread_id = meta->thread_id_;
ThreadPoolImpl::Impl* tp = meta->thread_pool_; ThreadPoolImpl::Impl* tp = meta->thread_pool_;
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
// for thread-status // initialize it because compiler isn't good enough to see we don't use it
ThreadStatusUtil::RegisterThread( // uninitialized
tp->GetHostEnv(), (tp->GetThreadPriority() == Env::Priority::HIGH ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
? ThreadStatus::HIGH_PRIORITY switch (tp->GetThreadPriority()) {
: ThreadStatus::LOW_PRIORITY)); case Env::Priority::HIGH:
thread_type = ThreadStatus::HIGH_PRIORITY;
break;
case Env::Priority::LOW:
thread_type = ThreadStatus::LOW_PRIORITY;
break;
case Env::Priority::BOTTOM:
thread_type = ThreadStatus::BOTTOM_PRIORITY;
break;
case Env::Priority::TOTAL:
assert(false);
return nullptr;
}
assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
#endif #endif
delete meta; delete meta;
tp->BGThread(thread_id); tp->BGThread(thread_id);

Loading…
Cancel
Save