diff --git a/db/compaction_job.cc b/db/compaction_job.cc index d836ccd30..7b786c116 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -50,6 +50,7 @@ #include "util/iostats_context_imp.h" #include "util/stop_watch.h" #include "util/sync_point.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -270,6 +271,11 @@ void CompactionJob::Prepare() { Status CompactionJob::Run() { log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ThreadStatusUtil::SetColumnFamily(cfd); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); +#ifndef NDEBUG + ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION); +#endif const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( @@ -459,6 +465,7 @@ Status CompactionJob::Run() { RecordCompactionIOStats(); LogFlush(db_options_.info_log); + ThreadStatusUtil::ResetThreadStatus(); return status; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 4720742ae..f5d6d99f0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2145,6 +2145,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->ReleaseCompactionFiles(status); *madeProgress = true; } else if (!is_manual && c->IsTrivialMove()) { + // Instrument for event update + // TODO(yhchiang): add op details for showing trivial-move. + ThreadStatusUtil::SetColumnFamily(c->column_family_data()); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); +#ifndef NDEBUG + ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION); +#endif + // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); @@ -2171,6 +2179,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); c->ReleaseCompactionFiles(status); *madeProgress = true; + + // Clear Instrument + ThreadStatusUtil::ResetThreadStatus(); } else { auto yield_callback = [&]() { return CallFlushDuringCompaction(c->column_family_data(), diff --git a/db/db_impl.h b/db/db_impl.h index de834a0fa..4664a3d60 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -234,6 +234,7 @@ class DBImpl : public DB { uint64_t TEST_max_total_in_memory_state() { return max_total_in_memory_state_; } + #endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 65eaff6b3..db4c91ae5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -10,6 +10,7 @@ #ifndef ROCKSDB_LITE #include "db/db_impl.h" +#include "util/thread_status_updater.h" namespace rocksdb { diff --git a/db/db_test.cc b/db/db_test.cc index 9fa2a40b5..7146f9585 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -52,7 +52,7 @@ #include "util/testutil.h" #include "util/mock_env.h" #include "util/string_util.h" -#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" namespace rocksdb { @@ -9414,7 +9414,7 @@ TEST(DBTest, DynamicMemtableOptions) { } #if ROCKSDB_USING_THREAD_STATUS -TEST(DBTest, GetThreadList) { +TEST(DBTest, GetThreadStatus) { Options options; options.env = env_; options.enable_thread_tracking = true; @@ -9472,7 +9472,7 @@ TEST(DBTest, GetThreadList) { handles_, true); } -TEST(DBTest, DisableThreadList) { +TEST(DBTest, DisableThreadStatus) { Options options; options.env = env_; options.enable_thread_tracking = false; @@ -9482,6 +9482,146 @@ TEST(DBTest, DisableThreadList) { env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap( handles_, false); } + +TEST(DBTest, ThreadStatusSingleCompaction) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 100; + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = options.target_file_size_base * 2; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; + options = CurrentOptions(options); + options.env = env_; + options.enable_thread_tracking = true; + const int kNumL0Files = 4; + options.level0_file_num_compaction_trigger = kNumL0Files; + for (int tests = 0; tests < 2; ++tests) { + TryReopen(options); + // Each compaction will run at least 2 seconds, which allows + // the test to capture the status of compaction with fewer + // false alarm. + const int kCompactionDelayMicro = 2000000; + ThreadStatusUtil::TEST_SetOperationDelay( + ThreadStatus::OP_COMPACTION, kCompactionDelayMicro); + + Random rnd(301); + for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) { + ASSERT_OK(Put(ToString(key), RandomString(&rnd, kTestValueSize))); + } + + // wait for compaction to be scheduled + env_->SleepForMicroseconds(500000); + + // check how many threads are doing compaction using GetThreadList + std::vector thread_list; + Status s = env_->GetThreadList(&thread_list); + ASSERT_OK(s); + int compaction_count = 0; + for (auto thread : thread_list) { + if (thread.operation_type == ThreadStatus::OP_COMPACTION) { + compaction_count++; + } + } + + if (options.enable_thread_tracking) { + // expecting one single L0 to L1 compaction + ASSERT_EQ(compaction_count, 1); + } else { + // If thread tracking is not enabled, compaction count should be 0. + ASSERT_EQ(compaction_count, 0); + } + + ThreadStatusUtil::TEST_SetOperationDelay( + ThreadStatus::OP_COMPACTION, 0); + + // repeat the test with disabling thread tracking. + options.enable_thread_tracking = false; + } +} + +TEST(DBTest, ThreadStatusMultipleCompaction) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 10; + const int kNumL0Files = 4; + + const int kHighPriCount = 3; + const int kLowPriCount = 5; + env_->SetBackgroundThreads(kHighPriCount, Env::HIGH); + env_->SetBackgroundThreads(kLowPriCount, Env::LOW); + + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = + options.target_file_size_base * kNumL0Files; + options.compression = kNoCompression; + options = CurrentOptions(options); + options.env = env_; + options.enable_thread_tracking = true; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.max_bytes_for_level_multiplier = 2; + options.max_background_compactions = kLowPriCount; + + for (int tests = 0; tests < 2; ++tests) { + TryReopen(options); + Random rnd(301); + + int max_compaction_count = 0; + std::vector thread_list; + const int kCompactionDelayMicro = 10000; + ThreadStatusUtil::TEST_SetOperationDelay( + ThreadStatus::OP_COMPACTION, kCompactionDelayMicro); + + // Make rocksdb busy + int key = 0; + for (int file = 0; file < 64 * kNumL0Files; ++file) { + for (int k = 0; k < kEntriesPerBuffer; ++k) { + ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize))); + } + + // check how many threads are doing compaction using GetThreadList + int compaction_count = 0; + Status s = env_->GetThreadList(&thread_list); + for (auto thread : thread_list) { + if (thread.operation_type == ThreadStatus::OP_COMPACTION) { + compaction_count++; + } + } + + // Record the max number of compactions at a time. + if (max_compaction_count < compaction_count) { + max_compaction_count = compaction_count; + } + } + + if (options.enable_thread_tracking) { + // Expect rocksdb max-out the concurrent compaction jobs. + ASSERT_EQ(max_compaction_count, options.max_background_compactions); + } else { + // If thread tracking is not enabled, compaction count should be 0. + ASSERT_EQ(max_compaction_count, 0); + } + + // repeat the test with disabling thread tracking. + options.enable_thread_tracking = false; + } + + ThreadStatusUtil::TEST_SetOperationDelay( + ThreadStatus::OP_COMPACTION, 0); +} + + + #endif // ROCKSDB_USING_THREAD_STATUS TEST(DBTest, DynamicCompactionOptions) { diff --git a/util/thread_event_info.h b/util/thread_event_info.h new file mode 100644 index 000000000..28916deb4 --- /dev/null +++ b/util/thread_event_info.h @@ -0,0 +1,71 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file defines the structures for thread event and operation. +// Thread events are used to describe high level action of a +// thread such as doing compaction or flush, while thread operation +// are used to describe lower-level action such as reading / +// writing a file or waiting for a mutex. Events and operations +// are designed to be independent. Typically, a thread usually involves +// in one event and one operation at any specific point in time. + +#pragma once + +#include "include/rocksdb/thread_status.h" + +#include + +namespace rocksdb { + +#if ROCKSDB_USING_THREAD_STATUS + +// The structure that describes a major thread event. +struct EventInfo { + const ThreadStatus::EventType code; + const std::string name; +}; + +// The global event table. +// +// When updating a status of a thread, the pointer of the EventInfo +// of the current ThreadStatusData will be pointing to one of the +// rows in this global table. +// +// Note that it's not designed to be constant as in the future we +// might consider adding global count to the EventInfo. +static EventInfo global_event_table[] = { + {ThreadStatus::EVENT_UNKNOWN, ""}, + {ThreadStatus::EVENT_COMPACTION, "Compaction"}, + {ThreadStatus::EVENT_FLUSH, "Flush"} +}; + +// The structure that describes a operation. +struct OperationInfo { + const ThreadStatus::OperationType code; + const std::string name; +}; + +// The global operation table. +// +// When updating a status of a thread, the pointer of the OperationInfo +// of the current ThreadStatusData will be pointing to one of the +// rows in this global table. +static OperationInfo global_operation_table[] = { + {ThreadStatus::OPERATION_UNKNOWN, ""}, + {ThreadStatus::OPERATION_WRITE_FILE, "Writing SST file"}, + {ThreadStatus::OPERATION_READ_FILE, "Reaing SST file"}, + {ThreadStatus::OPERATION_WAIT_DB_MUTEX, "Waiting DB Mutex"} +}; + +#else + +struct EventInfo { +}; + +struct OperationInfo { +}; + +#endif // ROCKSDB_USING_THREAD_STATUS +} // namespace rocksdb diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index feb129885..25f7b1c5c 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -29,20 +29,46 @@ void ThreadStatusUpdater::SetThreadType( data->thread_type.store(ttype, std::memory_order_relaxed); } +void ThreadStatusUpdater::ResetThreadStatus() { + ClearThreadState(); + ClearThreadOperation(); + SetColumnFamilyInfoKey(nullptr); +} + void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { auto* data = InitAndGet(); + // set the tracking flag based on whether cf_key is non-null or not. + // If enable_thread_tracking is set to false, the input cf_key + // would be nullptr. + data->enable_tracking = (cf_key != nullptr); data->cf_key.store(cf_key, std::memory_order_relaxed); } +const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { + auto* data = InitAndGet(); + if (data->enable_tracking == false) { + return nullptr; + } + return data->cf_key.load(std::memory_order_relaxed); +} + void ThreadStatusUpdater::SetThreadOperation( const ThreadStatus::OperationType type) { auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } data->operation_type.store(type, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadOperation() { auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } data->operation_type.store( ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed); } @@ -50,11 +76,19 @@ void ThreadStatusUpdater::ClearThreadOperation() { void ThreadStatusUpdater::SetThreadState( const ThreadStatus::StateType type) { auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } data->state_type.store(type, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadState() { auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } data->state_type.store( ThreadStatus::STATE_UNKNOWN, std::memory_order_relaxed); } @@ -176,6 +210,9 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { void ThreadStatusUpdater::UnregisterThread() { } +void ThreadStatusUpdater::ResetThreadStatus() { +} + void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { } diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index c97102a96..5d4e55bb2 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -64,13 +64,24 @@ struct ConstantColumnFamilyInfo { // status of a thread using a set of atomic pointers. struct ThreadStatusData { #if ROCKSDB_USING_THREAD_STATUS - explicit ThreadStatusData() : thread_id(0) { + explicit ThreadStatusData() : thread_id(0), enable_tracking(false) { thread_type.store(ThreadStatus::USER); - cf_key.store(0); + cf_key.store(nullptr); operation_type.store(ThreadStatus::OP_UNKNOWN); state_type.store(ThreadStatus::STATE_UNKNOWN); } + uint64_t thread_id; + + // A flag to indicate whether the thread tracking is enabled + // in the current thread. This value will be updated based on whether + // the associated Options::enable_thread_tracking is set to true + // in ThreadStatusUtil::SetColumnFamily(). + // + // If set to false, then SetThreadOperation and SetThreadState + // will be no-op. + bool enable_tracking; + std::atomic thread_type; std::atomic cf_key; std::atomic operation_type; @@ -96,6 +107,10 @@ class ThreadStatusUpdater { // Unregister the current thread. void UnregisterThread(); + // Reset the status of the current thread. This includes resetting + // ColumnFamilyInfoKey, ThreadOperation, and ThreadState. + void ResetThreadStatus(); + // Set the thread type of the current thread. void SetThreadType(ThreadStatus::ThreadType ttype); @@ -103,6 +118,9 @@ class ThreadStatusUpdater { // its thread-local pointer of ThreadStateInfo to the correct entry. void SetColumnFamilyInfoKey(const void* cf_key); + // returns the column family info key. + const void* GetColumnFamilyInfoKey(); + // Update the thread operation of the current thread. void SetThreadOperation(const ThreadStatus::OperationType type); @@ -143,7 +161,6 @@ class ThreadStatusUpdater { bool check_exist); protected: - #if ROCKSDB_USING_THREAD_STATUS // The thread-local variable for storing thread status. static __thread ThreadStatusData* thread_status_data_; @@ -169,6 +186,7 @@ class ThreadStatusUpdater { // associated to the same db_key faster. std::unordered_map< const void*, std::unordered_set> db_key_map_; + #else static ThreadStatusData* thread_status_data_; #endif // ROCKSDB_USING_THREAD_STATUS diff --git a/util/thread_status_updater_debug.cc b/util/thread_status_updater_debug.cc index 1f53e5fc1..274f427d3 100644 --- a/util/thread_status_updater_debug.cc +++ b/util/thread_status_updater_debug.cc @@ -7,9 +7,11 @@ #include "util/thread_status_updater.h" #include "db/column_family.h" -#if ROCKSDB_USING_THREAD_STATUS namespace rocksdb { + +#ifndef NDEBUG +#if ROCKSDB_USING_THREAD_STATUS void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( const std::vector& handles, bool check_exist) { @@ -29,5 +31,16 @@ void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( } } } -} // namespace rocksdb + +#else + +void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( + const std::vector& handles, + bool check_exist) { +} + #endif // ROCKSDB_USING_THREAD_STATUS +#endif // !NDEBUG + + +} // namespace rocksdb diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index c8767d9a8..970f79ae8 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -9,6 +9,7 @@ namespace rocksdb { + #if ROCKSDB_USING_THREAD_STATUS __thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; @@ -36,7 +37,41 @@ void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { return; } assert(thread_updater_local_cache_); - thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd); + if (cfd != nullptr && cfd->options()->enable_thread_tracking) { + thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd); + } else { + // When cfd == nullptr or enable_thread_tracking == false, we set + // ColumnFamilyInfoKey to nullptr, which makes SetThreadOperation + // and SetThreadState become no-op. + thread_updater_local_cache_->SetColumnFamilyInfoKey(nullptr); + } +} + +void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->SetThreadOperation(op); +} + +void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->SetThreadState(state); +} + +void ThreadStatusUtil::ResetThreadStatus() { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->ResetThreadStatus(); } void ThreadStatusUtil::NewColumnFamilyInfo( @@ -86,6 +121,12 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { } +void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) { +} + +void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { +} + void ThreadStatusUtil::NewColumnFamilyInfo( const DB* db, const ColumnFamilyData* cfd) { } @@ -97,6 +138,9 @@ void ThreadStatusUtil::EraseColumnFamilyInfo( void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { } +void ThreadStatusUtil::ResetThreadStatus() { +} + #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_util.h b/util/thread_status_util.h index c583d5a5d..a8549e8ae 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -12,6 +12,7 @@ namespace rocksdb { + // The static utility class for updating thread-local status. // // The thread-local status is updated via the thread-local cached @@ -52,6 +53,22 @@ class ThreadStatusUtil { // something related to the specified column family. static void SetColumnFamily(const ColumnFamilyData* cfd); + static void SetThreadOperation(ThreadStatus::OperationType type); + + static void SetThreadState(ThreadStatus::StateType type); + + static void ResetThreadStatus(); + +#ifndef NDEBUG + static void TEST_SetOperationDelay( + const ThreadStatus::OperationType operation, int micro); + static void TEST_OperationDelay( + const ThreadStatus::OperationType operation); + static void TEST_SetStateDelay( + const ThreadStatus::StateType state, int micro); + static void TEST_StateDelay(const ThreadStatus::StateType state); +#endif + protected: // Initialize the thread-local ThreadStatusUpdater when it finds // the cached value is nullptr. Returns true if it has cached diff --git a/util/thread_status_util_debug.cc b/util/thread_status_util_debug.cc new file mode 100644 index 000000000..5378acaf8 --- /dev/null +++ b/util/thread_status_util_debug.cc @@ -0,0 +1,41 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/env.h" +#include "util/thread_status_updater.h" +#include "util/thread_status_util.h" + +namespace rocksdb { + +#ifndef NDEBUG +// the delay for debugging purpose. +static int operations_delay[ThreadStatus::NUM_OP_TYPES] ={0}; +static int states_delay[ThreadStatus::NUM_STATE_TYPES] = {0}; + +void ThreadStatusUtil::TEST_SetStateDelay( + const ThreadStatus::StateType state, int micro) { + states_delay[state] = micro; +} + +void ThreadStatusUtil::TEST_StateDelay( + const ThreadStatus::StateType state) { + Env::Default()->SleepForMicroseconds( + states_delay[state]); +} + +void ThreadStatusUtil::TEST_SetOperationDelay( + const ThreadStatus::OperationType operation, int micro) { + operations_delay[operation] = micro; +} + + +void ThreadStatusUtil::TEST_OperationDelay( + const ThreadStatus::OperationType operation) { + Env::Default()->SleepForMicroseconds( + operations_delay[operation]); +} +#endif // !NDEBUG + +} // namespace rocksdb