diff --git a/db/builder.cc b/db/builder.cc index a3ed607bf..2a33bb008 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -21,6 +21,8 @@ #include "rocksdb/options.h" #include "rocksdb/table.h" #include "table/block_based_table_builder.h" +#include "util/iostats_context_imp.h" +#include "util/thread_status_util.h" #include "util/stop_watch.h" namespace rocksdb { @@ -52,6 +54,8 @@ Status BuildTable( const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, const Env::IOPriority io_priority, TableProperties* table_properties) { + // Reports the IOStats for flush for every following bytes. + const size_t kReportFlushIOStatsEvery = 1048576; Status s; meta->fd.file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -176,6 +180,13 @@ Status BuildTable( } } + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, + IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); + } if (!iterator_at_next) iter->Next(); } @@ -193,6 +204,13 @@ Status BuildTable( SequenceNumber seqno = GetInternalKeySeqno(key); meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); meta->largest_seqno = std::max(meta->largest_seqno, seqno); + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, + IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); + } } } diff --git a/db/flush_job.cc b/db/flush_job.cc index c0745cc64..0f6c85f9b 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -82,8 +82,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, stats_(stats), event_logger_(event_logger) { // Update the thread status to indicate flush. - ThreadStatusUtil::SetColumnFamily(cfd_); - ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); + ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); } @@ -92,6 +91,31 @@ FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); } +void FlushJob::ReportStartedFlush() { + ThreadStatusUtil::SetColumnFamily(cfd_); + ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::COMPACTION_JOB_ID, + job_context_->job_id); + IOSTATS_RESET(bytes_written); +} + +void FlushJob::ReportFlushInputSize(const autovector& mems) { + uint64_t input_size = 0; + for (auto* mem : mems) { + input_size += mem->ApproximateMemoryUsage(); + } + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_MEMTABLES, + input_size); +} + +void FlushJob::RecordFlushIOStats() { + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); +} + Status FlushJob::Run(uint64_t* file_number) { AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); @@ -105,6 +129,7 @@ Status FlushJob::Run(uint64_t* file_number) { return Status::OK(); } + ReportFlushInputSize(mems); // entries mems are (implicitly) sorted in ascending order by their created // time. We will use the first memtable's `edit` to keep the meta info for @@ -138,6 +163,7 @@ Status FlushJob::Run(uint64_t* file_number) { if (s.ok() && file_number != nullptr) { *file_number = fn; } + RecordFlushIOStats(); auto stream = event_logger_->LogToBuffer(log_buffer_); stream << "job" << job_context_->job_id << "event" diff --git a/db/flush_job.h b/db/flush_job.h index 0534466d2..c504b14fd 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -67,6 +67,9 @@ class FlushJob { Status Run(uint64_t* file_number = nullptr); private: + void ReportStartedFlush(); + void ReportFlushInputSize(const autovector& mems); + void RecordFlushIOStats(); Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, uint64_t* filenumber); const std::string& dbname_; diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index e642284d4..4f67441b5 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -85,8 +85,7 @@ struct ThreadStatus { enum FlushPropertyType : int { FLUSH_JOB_ID = 0, - FLUSH_BYTES_READ, - FLUSH_BYTES_REMAIN, + FLUSH_BYTES_MEMTABLES, FLUSH_BYTES_WRITTEN, NUM_FLUSH_PROPERTIES }; diff --git a/util/thread_operation.h b/util/thread_operation.h index 6df304203..709e755a3 100644 --- a/util/thread_operation.h +++ b/util/thread_operation.h @@ -107,8 +107,7 @@ static OperationProperty compaction_operation_properties[] = { static OperationProperty flush_operation_properties[] = { {ThreadStatus::FLUSH_JOB_ID, "JobID"}, - {ThreadStatus::FLUSH_BYTES_READ, "BytesRead"}, - {ThreadStatus::FLUSH_BYTES_REMAIN, "BytesRemain"}, + {ThreadStatus::FLUSH_BYTES_MEMTABLES, "BytesMemtables"}, {ThreadStatus::FLUSH_BYTES_WRITTEN, "BytesWritten"} }; diff --git a/util/thread_status_impl.cc b/util/thread_status_impl.cc index 52dff91f2..6c72160b9 100644 --- a/util/thread_status_impl.cc +++ b/util/thread_status_impl.cc @@ -18,21 +18,33 @@ const std::string& ThreadStatus::GetThreadTypeName( ThreadStatus::ThreadType thread_type) { static std::string thread_type_names[NUM_THREAD_TYPES + 1] = { "High Pri", "Low Pri", "User", "Unknown"}; + if (thread_type < 0 || thread_type >= NUM_THREAD_TYPES) { + return thread_type_names[NUM_THREAD_TYPES]; // "Unknown" + } return thread_type_names[thread_type]; } const std::string& ThreadStatus::GetOperationName( ThreadStatus::OperationType op_type) { + if (op_type < 0 || op_type >= NUM_OP_TYPES) { + return global_operation_table[OP_UNKNOWN].name; + } return global_operation_table[op_type].name; } const std::string& ThreadStatus::GetOperationStageName( ThreadStatus::OperationStage stage) { + if (stage < 0 || stage >= NUM_OP_STAGES) { + return global_op_stage_table[STAGE_UNKNOWN].name; + } return global_op_stage_table[stage].name; } const std::string& ThreadStatus::GetStateName( ThreadStatus::StateType state_type) { + if (state_type < 0 || state_type >= NUM_STATE_TYPES) { + return global_state_table[STATE_UNKNOWN].name; + } return global_state_table[state_type].name; }