diff --git a/db/compaction.cc b/db/compaction.cc index d1cf85f01..f8f4cdbb5 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -140,7 +140,7 @@ bool Compaction::InputCompressionMatchesOutput() const { return true; } TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch"); - return false; + return matches; } bool Compaction::IsTrivialMove() const { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 8abf2afa1..1575d8d2f 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -224,6 +224,7 @@ CompactionJob::CompactionJob( paranoid_file_checks_(paranoid_file_checks) { ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); + ReportStartedCompaction(compaction); } CompactionJob::~CompactionJob() { @@ -231,6 +232,43 @@ CompactionJob::~CompactionJob() { ThreadStatusUtil::ResetThreadStatus(); } +void CompactionJob::ReportStartedCompaction( + Compaction* compaction) { + ThreadStatusUtil::SetColumnFamily( + compact_->compaction->column_family_data()); + + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_JOB_ID, + job_id_); + + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL, + (static_cast(compact_->compaction->start_level()) << 32) + + compact_->compaction->output_level()); + + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_PROP_FLAGS, + compaction->IsManualCompaction() + + (compaction->IsDeletionCompaction() << 1) + + (compaction->IsTrivialMove() << 2)); + + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, + compaction->CalculateTotalInputSize()); + + IOSTATS_RESET(bytes_written); + IOSTATS_RESET(bytes_read); + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_BYTES_WRITTEN, 0); + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_BYTES_READ, 0); + + // Set the thread operation after operation properties + // to ensure GetThreadList() can always show them all together. + ThreadStatusUtil::SetThreadOperation( + ThreadStatus::OP_COMPACTION); +} + void CompactionJob::Prepare() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PREPARE); @@ -1100,8 +1138,12 @@ inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( void CompactionJob::RecordCompactionIOStats() { RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read)); IOSTATS_RESET(bytes_read); RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written)); + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written)); IOSTATS_RESET(bytes_written); } diff --git a/db/compaction_job.h b/db/compaction_job.h index 1a64aeb60..d34e4bdad 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -77,6 +77,8 @@ class CompactionJob { InstrumentedMutex* db_mutex); private: + // update the thread status for starting a compaction. + void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs diff --git a/db/db_bench.cc b/db/db_bench.cc index dfe466e31..dee16878e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -952,14 +952,14 @@ class Stats { std::vector thread_list; FLAGS_env->GetThreadList(&thread_list); - fprintf(stderr, "\n%18s %10s %25s %12s %12s %45s %12s\n", + fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n", "ThreadID", "ThreadType", "cfName", "Operation", - "ElapsedTime", "Stage", "State"); + "ElapsedTime", "Stage", "State", "OperationProperties"); int64_t current_time = 0; Env::Default()->GetCurrentTime(¤t_time); for (auto ts : thread_list) { - fprintf(stderr, "%18" PRIu64 " %10s %25s %12s %12s %45s %12s\n", + fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s", ts.thread_id, ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(), ts.cf_name.c_str(), @@ -967,6 +967,14 @@ class Stats { ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(), ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(), ThreadStatus::GetStateName(ts.state_type).c_str()); + + auto op_properties = ThreadStatus::InterpretOperationProperties( + ts.operation_type, ts.op_properties); + for (const auto& op_prop : op_properties) { + fprintf(stderr, " %s %" PRIu64" |", + op_prop.first.c_str(), op_prop.second); + } + fprintf(stderr, "\n"); } } diff --git a/db/db_test.cc b/db/db_test.cc index 1e7cbd243..4c7166eea 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12519,7 +12519,10 @@ TEST_F(DBTest, CompressLevelCompaction) { ASSERT_EQ("1,4,8", FilesPerLevel(0)); ASSERT_EQ(matches, 12); - ASSERT_EQ(didnt_match, 8); + // Currently, the test relies on the number of calls to + // InputCompressionMatchesOutput() per compaction. + const int kCallsToInputCompressionMatch = 2; + ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch); ASSERT_EQ(trivial_move, 12); ASSERT_EQ(non_trivial, 8); diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index 9767fa3bb..7bb304c2b 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -14,7 +14,10 @@ #pragma once #include +#include #include +#include +#include #ifndef ROCKSDB_USING_THREAD_STATUS #define ROCKSDB_USING_THREAD_STATUS \ @@ -64,6 +67,28 @@ struct ThreadStatus { NUM_OP_STAGES }; + // The maximum number of properties of an operation. + // This number should be set to the biggest NUM_XXX_PROPERTIES. + static const int kNumOperationProperties = 6; + + enum CompactionPropertyType : int { + COMPACTION_JOB_ID = 0, + COMPACTION_INPUT_OUTPUT_LEVEL, + COMPACTION_PROP_FLAGS, + COMPACTION_TOTAL_INPUT_BYTES, + COMPACTION_BYTES_READ, + COMPACTION_BYTES_WRITTEN, + NUM_COMPACTION_PROPERTIES + }; + + enum FlushPropertyType : int { + FLUSH_JOB_ID = 0, + FLUSH_BYTES_READ, + FLUSH_BYTES_REMAIN, + FLUSH_BYTES_WRITTEN, + NUM_FLUSH_PROPERTIES + }; + // The type used to refer to a thread state. // A state describes lower-level action of a thread // such as reading / writing a file or waiting for a mutex. @@ -80,6 +105,7 @@ struct ThreadStatus { const OperationType _operation_type, const uint64_t _op_elapsed_micros, const OperationStage _operation_stage, + const uint64_t _op_props[], const StateType _state_type) : thread_id(_id), thread_type(_thread_type), db_name(_db_name), @@ -87,7 +113,11 @@ struct ThreadStatus { operation_type(_operation_type), op_elapsed_micros(_op_elapsed_micros), operation_stage(_operation_stage), - state_type(_state_type) {} + state_type(_state_type) { + for (int i = 0; i < kNumOperationProperties; ++i) { + op_properties[i] = _op_props[i]; + } + } // An unique ID for the thread. const uint64_t thread_id; @@ -116,6 +146,11 @@ struct ThreadStatus { // in the current operation. const OperationStage operation_stage; + // A list of properties that describe some details about the current + // operation. Same field in op_properties[] might have different + // meanings for different operations. + uint64_t op_properties[kNumOperationProperties]; + // The state (lower-level action) that the current thread is involved. const StateType state_type; @@ -133,6 +168,17 @@ struct ThreadStatus { static const std::string& GetOperationStageName( OperationStage stage); + // Obtain the name of the "i"th operation property of the + // specified operation. + static const std::string& GetOperationPropertyName( + OperationType op_type, int i); + + // Translate the "i"th property of the specified operation given + // a property value. + static std::map + InterpretOperationProperties( + OperationType op_type, uint64_t* op_properties); + // Obtain the name of a state given its type. static const std::string& GetStateName(StateType state_type); }; diff --git a/util/thread_operation.h b/util/thread_operation.h index a35a70780..6df304203 100644 --- a/util/thread_operation.h +++ b/util/thread_operation.h @@ -91,6 +91,27 @@ static StateInfo global_state_table[] = { {ThreadStatus::STATE_MUTEX_WAIT, "Mutex Wait"}, }; +struct OperationProperty { + int code; + std::string name; +}; + +static OperationProperty compaction_operation_properties[] = { + {ThreadStatus::COMPACTION_JOB_ID, "JobID"}, + {ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL, "InputOutputLevel"}, + {ThreadStatus::COMPACTION_PROP_FLAGS, "Manual/Deletion/Trivial"}, + {ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, "TotalInputBytes"}, + {ThreadStatus::COMPACTION_BYTES_READ, "BytesRead"}, + {ThreadStatus::COMPACTION_BYTES_WRITTEN, "BytesWritten"}, +}; + +static OperationProperty flush_operation_properties[] = { + {ThreadStatus::FLUSH_JOB_ID, "JobID"}, + {ThreadStatus::FLUSH_BYTES_READ, "BytesRead"}, + {ThreadStatus::FLUSH_BYTES_REMAIN, "BytesRemain"}, + {ThreadStatus::FLUSH_BYTES_WRITTEN, "BytesWritten"} +}; + #else struct OperationInfo { diff --git a/util/thread_status_impl.cc b/util/thread_status_impl.cc index 4725cbe49..a21f6b355 100644 --- a/util/thread_status_impl.cc +++ b/util/thread_status_impl.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. // +#include + #include "rocksdb/env.h" #include "rocksdb/thread_status.h" #include "util/logging.h" @@ -44,6 +46,65 @@ const std::string ThreadStatus::MicrosToString(uint64_t micros) { return std::string(buffer); } +const std::string& ThreadStatus::GetOperationPropertyName( + ThreadStatus::OperationType op_type, int i) { + static const std::string empty_str = ""; + switch (op_type) { + case ThreadStatus::OP_COMPACTION: + if (i >= NUM_COMPACTION_PROPERTIES) { + return empty_str; + } + return compaction_operation_properties[i].name; + case ThreadStatus::OP_FLUSH: + if (i >= NUM_FLUSH_PROPERTIES) { + return empty_str; + } + return flush_operation_properties[i].name; + default: + return empty_str; + } +} + +std::map + ThreadStatus::InterpretOperationProperties( + ThreadStatus::OperationType op_type, uint64_t* op_properties) { + int num_properties; + switch (op_type) { + case OP_COMPACTION: + num_properties = NUM_COMPACTION_PROPERTIES; + break; + case OP_FLUSH: + num_properties = NUM_FLUSH_PROPERTIES; + break; + default: + num_properties = 0; + } + + std::map property_map; + for (int i = 0; i < num_properties; ++i) { + if (op_type == OP_COMPACTION && + i == COMPACTION_INPUT_OUTPUT_LEVEL) { + property_map.emplace( + "BaseInputLevel", op_properties[i] >> 32); + property_map.emplace( + "OutputLevel", op_properties[i] % (1LU << 32)); + } else if (op_type == OP_COMPACTION && + i == COMPACTION_PROP_FLAGS) { + property_map.emplace( + "IsManual", ((op_properties[i] & 2) >> 1)); + property_map.emplace( + "IsDeletion", ((op_properties[i] & 4) >> 2)); + property_map.emplace( + "IsTrivialMove", ((op_properties[i] & 8) >> 3)); + } else { + property_map.emplace( + GetOperationPropertyName(op_type, i), op_properties[i]); + } + } + return property_map; +} + + #else const std::string& ThreadStatus::GetThreadTypeName( @@ -76,5 +137,17 @@ const std::string ThreadStatus::MicrosToString( return dummy_str; } +const std::string& ThreadStatus::GetOperationPropertyName( + ThreadStatus::OperationType op_type, int i) { + static std::string dummy_str = ""; + return dummy_str; +} + +std::map + ThreadStatus::InterpretOperationProperties( + ThreadStatus::OperationType op_type, uint64_t* op_properties) { + return std::map(); +} + #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index bc9a65b05..31845ccb5 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -28,6 +28,7 @@ void ThreadStatusUpdater::SetThreadType( ThreadStatus::ThreadType ttype) { auto* data = InitAndGet(); data->thread_type.store(ttype, std::memory_order_relaxed); + ClearThreadOperationProperties(); } void ThreadStatusUpdater::ResetThreadStatus() { @@ -61,9 +62,37 @@ void ThreadStatusUpdater::SetThreadOperation( assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); return; } - data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, - std::memory_order_relaxed); - data->operation_type.store(type, std::memory_order_relaxed); + // NOTE: Our practice here is to set all the thread operation properties + // and stage before we set thread operation, and thread operation + // will be set in std::memory_order_release. This is to ensure + // whenever a thread operation is not OP_UNKNOWN, we will always + // have a consistent information on its properties. + data->operation_type.store(type, std::memory_order_release); + if (type == ThreadStatus::OP_UNKNOWN) { + data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, + std::memory_order_relaxed); + ClearThreadOperationProperties(); + } +} + +void ThreadStatusUpdater::SetThreadOperationProperty( + int i, uint64_t value) { + auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } + data->op_properties[i].store(value, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::IncreaseThreadOperationProperty( + int i, uint64_t delta) { + auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } + data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); } void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { @@ -85,6 +114,18 @@ void ThreadStatusUpdater::ClearThreadOperation() { std::memory_order_relaxed); data->operation_type.store( ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed); + ClearThreadOperationProperties(); +} + +void ThreadStatusUpdater::ClearThreadOperationProperties() { + auto* data = InitAndGet(); + if (!data->enable_tracking) { + assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + return; + } + for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { + data->op_properties[i].store(0, std::memory_order_relaxed); + } } ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( @@ -144,11 +185,12 @@ Status ThreadStatusUpdater::GetThreadList( ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN; ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN; uint64_t op_elapsed_micros = 0; + uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0}; if (cf_info != nullptr) { db_name = &cf_info->db_name; cf_name = &cf_info->cf_name; op_type = thread_data->operation_type.load( - std::memory_order_relaxed); + std::memory_order_acquire); // display lower-level info only when higher-level info is available. if (op_type != ThreadStatus::OP_UNKNOWN) { op_elapsed_micros = now_micros - thread_data->op_start_time.load( @@ -157,13 +199,18 @@ Status ThreadStatusUpdater::GetThreadList( std::memory_order_relaxed); state_type = thread_data->state_type.load( std::memory_order_relaxed); + for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { + op_props[i] = thread_data->op_properties[i].load( + std::memory_order_relaxed); + } } } thread_list->emplace_back( thread_data->thread_id, thread_type, db_name ? *db_name : "", cf_name ? *cf_name : "", - op_type, op_elapsed_micros, op_stage, state_type); + op_type, op_elapsed_micros, op_stage, op_props, + state_type); } return Status::OK(); @@ -284,5 +331,13 @@ void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { } +void ThreadStatusUpdater::SetThreadOperationProperty( + int i, uint64_t value) { +} + +void ThreadStatusUpdater::IncreaseThreadOperationProperty( + int i, uint64_t delta) { +} + #endif // ROCKSDB_USING_THREAD_STATUS } // namespace rocksdb diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index cb6563b95..b511a8dfb 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -88,6 +88,7 @@ struct ThreadStatusData { std::atomic operation_type; std::atomic op_start_time; std::atomic operation_stage; + std::atomic op_properties[ThreadStatus::kNumOperationProperties]; std::atomic state_type; #endif // ROCKSDB_USING_THREAD_STATUS }; @@ -131,6 +132,21 @@ class ThreadStatusUpdater { // of micro-seconds since some fixed point in time. void SetOperationStartTime(const uint64_t start_time); + // Set the "i"th property of the current operation. + // + // NOTE: Our practice here is to set all the thread operation properties + // and stage before we set thread operation, and thread operation + // will be set in std::memory_order_release. This is to ensure + // whenever a thread operation is not OP_UNKNOWN, we will always + // have a consistent information on its properties. + void SetThreadOperationProperty( + int i, uint64_t value); + + // Increase the "i"th property of the current operation with + // the specified delta. + void IncreaseThreadOperationProperty( + int i, uint64_t delta); + // Update the thread operation stage of the current thread. ThreadStatus::OperationStage SetThreadOperationStage( const ThreadStatus::OperationStage stage); @@ -138,6 +154,9 @@ class ThreadStatusUpdater { // Clear thread operation of the current thread. void ClearThreadOperation(); + // Reset all thread-operation-properties to 0. + void ClearThreadOperationProperties(); + // Update the thread state of the current thread. void SetThreadState(const ThreadStatus::StateType type); diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index cb8f8e565..c498971e5 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -76,6 +76,30 @@ ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage( return thread_updater_local_cache_->SetThreadOperationStage(stage); } +void ThreadStatusUtil::SetThreadOperationProperty( + int code, uint64_t value) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->SetThreadOperationProperty( + code, value); +} + +void ThreadStatusUtil::IncreaseThreadOperationProperty( + int code, uint64_t delta) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->IncreaseThreadOperationProperty( + code, delta); +} + void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { if (thread_updater_local_cache_ == nullptr) { // thread_updater_local_cache_ must be set in SetColumnFamily @@ -152,6 +176,14 @@ void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) { void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) { } +void ThreadStatusUtil::SetThreadOperationProperty( + int code, uint64_t value) { +} + +void ThreadStatusUtil::IncreaseThreadOperationProperty( + int code, uint64_t delta) { +} + void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { } diff --git a/util/thread_status_util.h b/util/thread_status_util.h index 1bfc80ba2..ba0238d58 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -59,6 +59,12 @@ class ThreadStatusUtil { static ThreadStatus::OperationStage SetThreadOperationStage( ThreadStatus::OperationStage stage); + static void SetThreadOperationProperty( + int code, uint64_t value); + + static void IncreaseThreadOperationProperty( + int code, uint64_t delta); + static void SetThreadState(ThreadStatus::StateType type); static void ResetThreadStatus();