Allow GetThreadList to report Flush properties.

Summary:
Allow GetThreadList to report Flush properties, which includes:
* job id
* number of bytes that has been written since flush started.
* total size of input mem-tables

Test Plan:
./db_bench --threads=30 --num=1000000 --benchmarks=fillrandom --thread_status_per_interval=100 --value_size=1000

Sample output from db_bench which tracks same flush job

          ThreadID ThreadType       cfName            Operation   ElapsedTime                                         Stage        State OperationProperties
   140213879898240   High Pri      default                Flush       5789 us                    FlushJob::WriteLevel0Table              BytesMemtables 4112835 | BytesWritten 577104 | JobID 8 |

          ThreadID ThreadType       cfName            Operation   ElapsedTime                                         Stage        State OperationProperties
   140213879898240   High Pri      default                Flush     30.634 ms                    FlushJob::WriteLevel0Table              BytesMemtables 4112835 | BytesWritten 1734865 | JobID 8 |

Reviewers: rven, igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38505
main
Yueh-Hsuan Chiang 10 years ago
parent a66f643e97
commit 3f0867c0fe
  1. 18
      db/builder.cc
  2. 30
      db/flush_job.cc
  3. 3
      db/flush_job.h
  4. 3
      include/rocksdb/thread_status.h
  5. 3
      util/thread_operation.h
  6. 12
      util/thread_status_impl.cc

@ -21,6 +21,8 @@
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/iostats_context_imp.h"
#include "util/thread_status_util.h"
#include "util/stop_watch.h"
namespace rocksdb {
@ -52,6 +54,8 @@ Status BuildTable(
const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
const Env::IOPriority io_priority, TableProperties* table_properties) {
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
meta->fd.file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
@ -176,6 +180,13 @@ Status BuildTable(
}
}
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN,
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
if (!iterator_at_next) iter->Next();
}
@ -193,6 +204,13 @@ Status BuildTable(
SequenceNumber seqno = GetInternalKeySeqno(key);
meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
meta->largest_seqno = std::max(meta->largest_seqno, seqno);
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN,
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
}
}

@ -82,8 +82,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
stats_(stats),
event_logger_(event_logger) {
// Update the thread status to indicate flush.
ThreadStatusUtil::SetColumnFamily(cfd_);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
}
@ -92,6 +91,31 @@ FlushJob::~FlushJob() {
ThreadStatusUtil::ResetThreadStatus();
}
void FlushJob::ReportStartedFlush() {
ThreadStatusUtil::SetColumnFamily(cfd_);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_JOB_ID,
job_context_->job_id);
IOSTATS_RESET(bytes_written);
}
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
uint64_t input_size = 0;
for (auto* mem : mems) {
input_size += mem->ApproximateMemoryUsage();
}
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_MEMTABLES,
input_size);
}
void FlushJob::RecordFlushIOStats() {
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
Status FlushJob::Run(uint64_t* file_number) {
AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN);
@ -105,6 +129,7 @@ Status FlushJob::Run(uint64_t* file_number) {
return Status::OK();
}
ReportFlushInputSize(mems);
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
@ -138,6 +163,7 @@ Status FlushJob::Run(uint64_t* file_number) {
if (s.ok() && file_number != nullptr) {
*file_number = fn;
}
RecordFlushIOStats();
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_context_->job_id << "event"

@ -67,6 +67,9 @@ class FlushJob {
Status Run(uint64_t* file_number = nullptr);
private:
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber);
const std::string& dbname_;

@ -85,8 +85,7 @@ struct ThreadStatus {
enum FlushPropertyType : int {
FLUSH_JOB_ID = 0,
FLUSH_BYTES_READ,
FLUSH_BYTES_REMAIN,
FLUSH_BYTES_MEMTABLES,
FLUSH_BYTES_WRITTEN,
NUM_FLUSH_PROPERTIES
};

@ -107,8 +107,7 @@ static OperationProperty compaction_operation_properties[] = {
static OperationProperty flush_operation_properties[] = {
{ThreadStatus::FLUSH_JOB_ID, "JobID"},
{ThreadStatus::FLUSH_BYTES_READ, "BytesRead"},
{ThreadStatus::FLUSH_BYTES_REMAIN, "BytesRemain"},
{ThreadStatus::FLUSH_BYTES_MEMTABLES, "BytesMemtables"},
{ThreadStatus::FLUSH_BYTES_WRITTEN, "BytesWritten"}
};

@ -18,21 +18,33 @@ const std::string& ThreadStatus::GetThreadTypeName(
ThreadStatus::ThreadType thread_type) {
static std::string thread_type_names[NUM_THREAD_TYPES + 1] = {
"High Pri", "Low Pri", "User", "Unknown"};
if (thread_type < 0 || thread_type >= NUM_THREAD_TYPES) {
return thread_type_names[NUM_THREAD_TYPES]; // "Unknown"
}
return thread_type_names[thread_type];
}
const std::string& ThreadStatus::GetOperationName(
ThreadStatus::OperationType op_type) {
if (op_type < 0 || op_type >= NUM_OP_TYPES) {
return global_operation_table[OP_UNKNOWN].name;
}
return global_operation_table[op_type].name;
}
const std::string& ThreadStatus::GetOperationStageName(
ThreadStatus::OperationStage stage) {
if (stage < 0 || stage >= NUM_OP_STAGES) {
return global_op_stage_table[STAGE_UNKNOWN].name;
}
return global_op_stage_table[stage].name;
}
const std::string& ThreadStatus::GetStateName(
ThreadStatus::StateType state_type) {
if (state_type < 0 || state_type >= NUM_STATE_TYPES) {
return global_state_table[STATE_UNKNOWN].name;
}
return global_state_table[state_type].name;
}

Loading…
Cancel
Save