From 1bb4928da9fba1e88500bb39d5a556ccbb3d580e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 27 Apr 2015 15:20:02 -0700 Subject: [PATCH] Include bunch of more events into EventLogger Summary: Added these events: * Recovery start, finish and also when recovery creates a file * Trivial move * Compaction start, finish and when compaction creates a file * Flush start, finish Also includes small fix to EventLogger Also added option ROCKSDB_PRINT_EVENTS_TO_STDOUT which is useful when we debug things. I've spent far too much time chasing LOG files. Still didn't get sst table properties in JSON. They are written very deeply into the stack. I'll address in separate diff. TODO: * Write specification. Let's first use this for a while and figure out what's good data to put here, too. After that we'll write spec * Write tools that parse and analyze LOGs. This can be in python or go. Good intern task. Test Plan: Ran db_bench with ROCKSDB_PRINT_EVENTS_TO_STDOUT. Here's the output: https://phabricator.fb.com/P19811976 Reviewers: sdong, yhchiang, rven, MarkCallaghan, kradhakrishnan, anthony Reviewed By: anthony Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37521 --- db/compaction.cc | 10 ++++++ db/compaction.h | 2 ++ db/compaction_job.cc | 75 +++++++++++++++++++++++++++++---------- db/compaction_job.h | 6 +++- db/compaction_job_test.cc | 11 +++--- db/db_impl.cc | 40 +++++++++++++++++---- db/db_impl.h | 4 +-- db/flush_job.cc | 25 ++++++++++++- db/memtable_list_test.cc | 4 +-- util/event_logger.cc | 16 +++++++-- util/event_logger.h | 20 ++++++++--- 11 files changed, 169 insertions(+), 44 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 20a8ba40d..2f202cf91 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -255,6 +255,16 @@ const char* Compaction::InputLevelSummary( return scratch->buffer; } +uint64_t Compaction::CalculateTotalInputSize() const { + uint64_t size = 0; + for (auto& input_level : inputs_) { + for (auto f : input_level.files) { + size += f->fd.GetFileSize(); + } + } + return size; +} + void Compaction::ReleaseCompactionFiles(Status status) { MarkFilesBeingCompacted(false); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); diff --git a/db/compaction.h b/db/compaction.h index 876f91c30..d31a76921 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -173,6 +173,8 @@ class Compaction { const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const; + uint64_t CalculateTotalInputSize() const; + // In case of compaction error, reset the nextIndex that is used // to pick up the next file to be compacted from files_by_size_ void ResetNextCompactionIndex(); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 00b27881c..3066e8f9a 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -49,6 +49,7 @@ #include "util/perf_context_imp.h" #include "util/iostats_context_imp.h" #include "util/stop_watch.h" +#include "util/string_util.h" #include "util/sync_point.h" #include "util/thread_status_util.h" @@ -208,7 +209,7 @@ CompactionJob::CompactionJob( LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported, std::shared_ptr table_cache, - std::function yield_callback) + std::function yield_callback, EventLogger* event_logger) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_stats_(1), @@ -225,7 +226,8 @@ CompactionJob::CompactionJob( snapshots_(snapshots), is_snapshot_supported_(is_snapshot_supported), table_cache_(std::move(table_cache)), - yield_callback_(std::move(yield_callback)) { + yield_callback_(std::move(yield_callback)), + event_logger_(event_logger) { ThreadStatusUtil::SetColumnFamily( compact_->compaction->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); @@ -242,22 +244,10 @@ void CompactionJob::Prepare() { compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); - auto* compaction = compact_->compaction; - // Generate file_levels_ for compaction berfore making Iterator - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd __attribute__((unused)) = + compact_->compaction->column_family_data(); assert(cfd != nullptr); - { - Compaction::InputLevelSummaryBuffer inputs_summary; - LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f", - cfd->GetName().c_str(), job_id_, - compaction->InputLevelSummary(&inputs_summary), - compaction->score()); - } - char scratch[2345]; - compact_->compaction->Summary(scratch, sizeof(scratch)); - LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n", - cfd->GetName().c_str(), scratch); assert(cfd->current()->storage_info()->NumLevelFiles( compact_->compaction->level()) > 0); @@ -291,6 +281,35 @@ Status CompactionJob::Run() { log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + auto* compaction = compact_->compaction; + // Let's check if anything will get logged. Don't prepare all the info if + // we're not logging + if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { + Compaction::InputLevelSummaryBuffer inputs_summary; + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), + job_id_, compaction->InputLevelSummary(&inputs_summary), + compaction->score()); + char scratch[2345]; + compact_->compaction->Summary(scratch, sizeof(scratch)); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); + // build event logger report + auto stream = event_logger_->Log(); + stream << "job" << job_id_ << "event" + << "compaction_started"; + for (size_t i = 0; i < compaction->num_input_levels(); ++i) { + stream << ("files_L" + ToString(compaction->level(i))); + stream.StartArray(); + for (auto f : *compaction->inputs(i)) { + stream << f->fd.GetNumber(); + } + stream.EndArray(); + } + stream << "score" << compaction->score() << "input_data_size" + << compaction->CalculateTotalInputSize(); + } + const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( versions_->MakeInputIterator(compact_->compaction)); @@ -481,7 +500,6 @@ Status CompactionJob::Run() { if (compact_->num_input_records > compact_->num_output_records) { compaction_stats_.num_dropped_records += compact_->num_input_records - compact_->num_output_records; - compact_->num_input_records = compact_->num_output_records = 0; } RecordCompactionIOStats(); @@ -503,14 +521,14 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { *status = InstallCompactionResults(db_mutex); } VersionStorageInfo::LevelSummaryStorage tmp; + auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; LogToBuffer(log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp), + cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1) / static_cast(stats.micros), stats.bytes_written / static_cast(stats.micros), @@ -524,6 +542,21 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { status->ToString().c_str(), stats.num_input_records, stats.num_dropped_records); + auto stream = event_logger_->LogToBuffer(log_buffer_); + stream << "job" << job_id_ << "event" + << "compaction_finished" + << "output_level" << compact_->compaction->output_level() + << "num_output_files" << compact_->outputs.size() + << "total_output_size" << compact_->total_bytes << "num_input_records" + << compact_->num_input_records << "num_output_records" + << compact_->num_output_records; + stream << "lsm_state"; + stream.StartArray(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + CleanupCompaction(*status); } @@ -997,6 +1030,10 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { " keys, %" PRIu64 " bytes", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes); + event_logger_->Log() << "job" << job_id_ << "event" + << "table_file_creation" + << "file_number" << output_number << "file_size" + << current_bytes; } } return s; diff --git a/db/compaction_job.h b/db/compaction_job.h index 512db9f97..b7422725d 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -30,6 +30,7 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" +#include "util/event_logger.h" #include "util/stop_watch.h" #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" @@ -60,7 +61,8 @@ class CompactionJob { Directory* db_directory, Directory* output_directory, Statistics* stats, SnapshotList* snapshot_list, bool is_snapshot_supported, std::shared_ptr table_cache, - std::function yield_callback); + std::function yield_callback, + EventLogger* event_logger); ~CompactionJob(); @@ -125,6 +127,8 @@ class CompactionJob { // yield callback std::function yield_callback_; + + EventLogger* event_logger_; }; } // namespace rocksdb diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index c5cbb5ad1..a0b6a4306 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -163,11 +163,12 @@ TEST_F(CompactionJobTest, Simple) { }; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); - CompactionJob compaction_job(0, compaction.get(), db_options_, - *cfd->GetLatestMutableCFOptions(), env_options_, - versions_.get(), &shutting_down_, &log_buffer, - nullptr, nullptr, nullptr, &snapshots, true, - table_cache_, std::move(yield_callback)); + EventLogger event_logger(db_options_.info_log.get()); + CompactionJob compaction_job( + 0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, + nullptr, nullptr, &snapshots, true, table_cache_, + std::move(yield_callback), &event_logger); compaction_job.Prepare(); mutex_.Unlock(); ASSERT_OK(compaction_job.Run()); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a1807d48..5fad92208 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -671,7 +671,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // evict from cache TableCache::Evict(table_cache_.get(), number); fname = TableFileName(db_options_.db_paths, number, path_id); - event_logger_.Log() << "event" + event_logger_.Log() << "job" << state.job_id << "event" << "table_file_deletion" << "file_number" << number; } else { @@ -937,6 +937,18 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, edit.SetColumnFamily(cfd->GetID()); version_edits.insert({cfd->GetID(), edit}); } + int job_id = next_job_id_.fetch_add(1); + { + auto stream = event_logger_.Log(); + stream << "job" << job_id << "event" + << "recovery_started"; + stream << "log_files"; + stream.StartArray(); + for (auto log_number : log_numbers) { + stream << log_number; + } + stream.EndArray(); + } for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST @@ -1016,7 +1028,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. @@ -1058,7 +1070,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // flush the final memtable (if non-empty) if (cfd->mem()->GetFirstSequenceNumber() != 0) { - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); if (!status.ok()) { // Recovery failed break; @@ -1087,11 +1099,14 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } + event_logger_.Log() << "job" << job_id << "event" + << "recovery_finished"; + return status; } -Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, - VersionEdit* edit) { +Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, + MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1129,6 +1144,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); + event_logger_.Log() << "job" << job_id << "event" + << "table_file_creation" + << "file_number" << meta.fd.GetNumber() << "file_size" + << meta.fd.GetFileSize(); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -1453,7 +1472,7 @@ Status DBImpl::CompactFilesImpl( env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback)); + std::move(yield_callback), &event_logger_); compaction_job.Prepare(); mutex_.Unlock(); @@ -2299,6 +2318,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved( c->level() + 1, f->fd.GetFileSize()); + { + event_logger_.LogToBuffer(log_buffer) + << "job" << job_context->job_id << "event" + << "trivial_move" + << "destination_level" << c->level() + 1 << "file_number" + << f->fd.GetNumber() << "file_size" << f->fd.GetFileSize(); + } LogToBuffer( log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n", @@ -2321,7 +2347,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback)); + std::move(yield_callback), &event_logger_); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); diff --git a/db/db_impl.h b/db/db_impl.h index cf7e7fa76..2a1f1b088 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -360,8 +360,8 @@ class DBImpl : public DB { // database is opened) and is heavyweight because it holds the mutex // for the entire period. The second method WriteLevel0Table supports // concurrent flush memtables to storage. - Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, - VersionEdit* edit); + Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, + MemTable* mem, VersionEdit* edit); Status DelayWrite(uint64_t expiration_time); Status ScheduleFlushes(WriteContext* context); diff --git a/db/flush_job.cc b/db/flush_job.cc index d8cb3eb04..351713e45 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -138,6 +138,17 @@ Status FlushJob::Run(uint64_t* file_number) { *file_number = fn; } + auto stream = event_logger_->LogToBuffer(log_buffer_); + stream << "job" << job_context_->job_id << "event" + << "flush_finished"; + stream << "lsm_state"; + stream.StartArray(); + auto vstorage = cfd_->current()->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + return s; } @@ -166,12 +177,24 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, ReadOptions ro; ro.total_order_seek = true; Arena arena; + uint64_t total_num_entries = 0, total_num_deletes = 0; + size_t total_memory_usage = 0; for (MemTable* m : mems) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); memtables.push_back(m->NewIterator(ro, &arena)); + total_num_entries += m->num_entries(); + total_num_deletes += m->num_deletes(); + total_memory_usage += m->ApproximateMemoryUsage(); } + + event_logger_->Log() << "job" << job_context_->job_id << "event" + << "flush_started" + << "num_memtables" << mems.size() << "num_entries" + << total_num_entries << "num_deletes" + << total_num_deletes << "memory_usage" + << total_memory_usage; { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], @@ -195,7 +218,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); - event_logger_->Log() << "event" + event_logger_->Log() << "job" << job_context_->job_id << "event" << "table_file_creation" << "file_number" << meta.fd.GetNumber() << "file_size" << meta.fd.GetFileSize(); diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 22f1eb876..fc4e948ab 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -222,9 +222,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Create some MemTables std::vector tables; + MutableCFOptions mutable_cf_options(options, ioptions); for (int i = 0; i < num_tables; i++) { - MemTable* mem = - new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb); + MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb); mem->Ref(); std::string value; diff --git a/util/event_logger.cc b/util/event_logger.cc index e9e1fdc38..fdecb8e54 100644 --- a/util/event_logger.cc +++ b/util/event_logger.cc @@ -21,12 +21,24 @@ namespace rocksdb { const char* kEventLoggerPrefix = "EVENT_LOG_v1"; EventLoggerStream::EventLoggerStream(Logger* logger) - : logger_(logger), json_writter_(nullptr) {} + : logger_(logger), log_buffer_(nullptr), json_writter_(nullptr) {} + +EventLoggerStream::EventLoggerStream(LogBuffer* log_buffer) + : logger_(nullptr), log_buffer_(log_buffer), json_writter_(nullptr) {} EventLoggerStream::~EventLoggerStream() { if (json_writter_) { json_writter_->EndObject(); - Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str()); +#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT + printf("%s\n", json_writter_->Get().c_str()); +#else + if (logger_) { + Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str()); + } else if (log_buffer_) { + LogToBuffer(log_buffer_, "%s %s", kEventLoggerPrefix, + json_writter_->Get().c_str()); + } +#endif delete json_writter_; } } diff --git a/util/event_logger.h b/util/event_logger.h index e1d94f323..9054a6225 100644 --- a/util/event_logger.h +++ b/util/event_logger.h @@ -11,6 +11,7 @@ #include #include "rocksdb/env.h" +#include "util/log_buffer.h" namespace rocksdb { @@ -56,11 +57,8 @@ class JSONWritter { } void StartArray() { - assert(state_ == kExpectKey); + assert(state_ == kExpectValue); state_ = kInArray; - if (!first_element_) { - stream_ << ", "; - } stream_ << "["; first_element_ = true; } @@ -125,6 +123,12 @@ class EventLoggerStream { *json_writter_ << val; return *this; } + + void StartArray() { json_writter_->StartArray(); } + void EndArray() { json_writter_->EndArray(); } + void StartObject() { json_writter_->StartObject(); } + void EndObject() { json_writter_->EndObject(); } + ~EventLoggerStream(); private: @@ -138,7 +142,10 @@ class EventLoggerStream { } friend class EventLogger; explicit EventLoggerStream(Logger* logger); - Logger* logger_; + explicit EventLoggerStream(LogBuffer* log_buffer); + // exactly one is non-nullptr + Logger* const logger_; + LogBuffer* const log_buffer_; // ownership JSONWritter* json_writter_; }; @@ -151,6 +158,9 @@ class EventLogger { public: explicit EventLogger(Logger* logger) : logger_(logger) {} EventLoggerStream Log() { return EventLoggerStream(logger_); } + EventLoggerStream LogToBuffer(LogBuffer* log_buffer) { + return EventLoggerStream(log_buffer); + } private: Logger* logger_;