diff --git a/db/compaction.cc b/db/compaction.cc index 20a8ba40d..2f202cf91 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -255,6 +255,16 @@ const char* Compaction::InputLevelSummary( return scratch->buffer; } +uint64_t Compaction::CalculateTotalInputSize() const { + uint64_t size = 0; + for (auto& input_level : inputs_) { + for (auto f : input_level.files) { + size += f->fd.GetFileSize(); + } + } + return size; +} + void Compaction::ReleaseCompactionFiles(Status status) { MarkFilesBeingCompacted(false); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); diff --git a/db/compaction.h b/db/compaction.h index 876f91c30..d31a76921 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -173,6 +173,8 @@ class Compaction { const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const; + uint64_t CalculateTotalInputSize() const; + // In case of compaction error, reset the nextIndex that is used // to pick up the next file to be compacted from files_by_size_ void ResetNextCompactionIndex(); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 00b27881c..3066e8f9a 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -49,6 +49,7 @@ #include "util/perf_context_imp.h" #include "util/iostats_context_imp.h" #include "util/stop_watch.h" +#include "util/string_util.h" #include "util/sync_point.h" #include "util/thread_status_util.h" @@ -208,7 +209,7 @@ CompactionJob::CompactionJob( LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported, std::shared_ptr table_cache, - std::function yield_callback) + std::function yield_callback, EventLogger* event_logger) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_stats_(1), @@ -225,7 +226,8 @@ CompactionJob::CompactionJob( snapshots_(snapshots), is_snapshot_supported_(is_snapshot_supported), table_cache_(std::move(table_cache)), - yield_callback_(std::move(yield_callback)) { + yield_callback_(std::move(yield_callback)), + event_logger_(event_logger) { ThreadStatusUtil::SetColumnFamily( compact_->compaction->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); @@ -242,22 +244,10 @@ void CompactionJob::Prepare() { compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); - auto* compaction = compact_->compaction; - // Generate file_levels_ for compaction berfore making Iterator - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + ColumnFamilyData* cfd __attribute__((unused)) = + compact_->compaction->column_family_data(); assert(cfd != nullptr); - { - Compaction::InputLevelSummaryBuffer inputs_summary; - LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f", - cfd->GetName().c_str(), job_id_, - compaction->InputLevelSummary(&inputs_summary), - compaction->score()); - } - char scratch[2345]; - compact_->compaction->Summary(scratch, sizeof(scratch)); - LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n", - cfd->GetName().c_str(), scratch); assert(cfd->current()->storage_info()->NumLevelFiles( compact_->compaction->level()) > 0); @@ -291,6 +281,35 @@ Status CompactionJob::Run() { log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + auto* compaction = compact_->compaction; + // Let's check if anything will get logged. Don't prepare all the info if + // we're not logging + if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { + Compaction::InputLevelSummaryBuffer inputs_summary; + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), + job_id_, compaction->InputLevelSummary(&inputs_summary), + compaction->score()); + char scratch[2345]; + compact_->compaction->Summary(scratch, sizeof(scratch)); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); + // build event logger report + auto stream = event_logger_->Log(); + stream << "job" << job_id_ << "event" + << "compaction_started"; + for (size_t i = 0; i < compaction->num_input_levels(); ++i) { + stream << ("files_L" + ToString(compaction->level(i))); + stream.StartArray(); + for (auto f : *compaction->inputs(i)) { + stream << f->fd.GetNumber(); + } + stream.EndArray(); + } + stream << "score" << compaction->score() << "input_data_size" + << compaction->CalculateTotalInputSize(); + } + const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( versions_->MakeInputIterator(compact_->compaction)); @@ -481,7 +500,6 @@ Status CompactionJob::Run() { if (compact_->num_input_records > compact_->num_output_records) { compaction_stats_.num_dropped_records += compact_->num_input_records - compact_->num_output_records; - compact_->num_input_records = compact_->num_output_records = 0; } RecordCompactionIOStats(); @@ -503,14 +521,14 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { *status = InstallCompactionResults(db_mutex); } VersionStorageInfo::LevelSummaryStorage tmp; + auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; LogToBuffer(log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp), + cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1) / static_cast(stats.micros), stats.bytes_written / static_cast(stats.micros), @@ -524,6 +542,21 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { status->ToString().c_str(), stats.num_input_records, stats.num_dropped_records); + auto stream = event_logger_->LogToBuffer(log_buffer_); + stream << "job" << job_id_ << "event" + << "compaction_finished" + << "output_level" << compact_->compaction->output_level() + << "num_output_files" << compact_->outputs.size() + << "total_output_size" << compact_->total_bytes << "num_input_records" + << compact_->num_input_records << "num_output_records" + << compact_->num_output_records; + stream << "lsm_state"; + stream.StartArray(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + CleanupCompaction(*status); } @@ -997,6 +1030,10 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { " keys, %" PRIu64 " bytes", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes); + event_logger_->Log() << "job" << job_id_ << "event" + << "table_file_creation" + << "file_number" << output_number << "file_size" + << current_bytes; } } return s; diff --git a/db/compaction_job.h b/db/compaction_job.h index 512db9f97..b7422725d 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -30,6 +30,7 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" +#include "util/event_logger.h" #include "util/stop_watch.h" #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" @@ -60,7 +61,8 @@ class CompactionJob { Directory* db_directory, Directory* output_directory, Statistics* stats, SnapshotList* snapshot_list, bool is_snapshot_supported, std::shared_ptr table_cache, - std::function yield_callback); + std::function yield_callback, + EventLogger* event_logger); ~CompactionJob(); @@ -125,6 +127,8 @@ class CompactionJob { // yield callback std::function yield_callback_; + + EventLogger* event_logger_; }; } // namespace rocksdb diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index c5cbb5ad1..a0b6a4306 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -163,11 +163,12 @@ TEST_F(CompactionJobTest, Simple) { }; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); - CompactionJob compaction_job(0, compaction.get(), db_options_, - *cfd->GetLatestMutableCFOptions(), env_options_, - versions_.get(), &shutting_down_, &log_buffer, - nullptr, nullptr, nullptr, &snapshots, true, - table_cache_, std::move(yield_callback)); + EventLogger event_logger(db_options_.info_log.get()); + CompactionJob compaction_job( + 0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, + nullptr, nullptr, &snapshots, true, table_cache_, + std::move(yield_callback), &event_logger); compaction_job.Prepare(); mutex_.Unlock(); ASSERT_OK(compaction_job.Run()); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a1807d48..5fad92208 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -671,7 +671,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // evict from cache TableCache::Evict(table_cache_.get(), number); fname = TableFileName(db_options_.db_paths, number, path_id); - event_logger_.Log() << "event" + event_logger_.Log() << "job" << state.job_id << "event" << "table_file_deletion" << "file_number" << number; } else { @@ -937,6 +937,18 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, edit.SetColumnFamily(cfd->GetID()); version_edits.insert({cfd->GetID(), edit}); } + int job_id = next_job_id_.fetch_add(1); + { + auto stream = event_logger_.Log(); + stream << "job" << job_id << "event" + << "recovery_started"; + stream << "log_files"; + stream.StartArray(); + for (auto log_number : log_numbers) { + stream << log_number; + } + stream.EndArray(); + } for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST @@ -1016,7 +1028,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. @@ -1058,7 +1070,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // flush the final memtable (if non-empty) if (cfd->mem()->GetFirstSequenceNumber() != 0) { - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); if (!status.ok()) { // Recovery failed break; @@ -1087,11 +1099,14 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } + event_logger_.Log() << "job" << job_id << "event" + << "recovery_finished"; + return status; } -Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, - VersionEdit* edit) { +Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, + MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1129,6 +1144,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); + event_logger_.Log() << "job" << job_id << "event" + << "table_file_creation" + << "file_number" << meta.fd.GetNumber() << "file_size" + << meta.fd.GetFileSize(); } ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -1453,7 +1472,7 @@ Status DBImpl::CompactFilesImpl( env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback)); + std::move(yield_callback), &event_logger_); compaction_job.Prepare(); mutex_.Unlock(); @@ -2299,6 +2318,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved( c->level() + 1, f->fd.GetFileSize()); + { + event_logger_.LogToBuffer(log_buffer) + << "job" << job_context->job_id << "event" + << "trivial_move" + << "destination_level" << c->level() + 1 << "file_number" + << f->fd.GetNumber() << "file_size" << f->fd.GetFileSize(); + } LogToBuffer( log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n", @@ -2321,7 +2347,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback)); + std::move(yield_callback), &event_logger_); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); diff --git a/db/db_impl.h b/db/db_impl.h index cf7e7fa76..2a1f1b088 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -360,8 +360,8 @@ class DBImpl : public DB { // database is opened) and is heavyweight because it holds the mutex // for the entire period. The second method WriteLevel0Table supports // concurrent flush memtables to storage. - Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, - VersionEdit* edit); + Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, + MemTable* mem, VersionEdit* edit); Status DelayWrite(uint64_t expiration_time); Status ScheduleFlushes(WriteContext* context); diff --git a/db/flush_job.cc b/db/flush_job.cc index d8cb3eb04..351713e45 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -138,6 +138,17 @@ Status FlushJob::Run(uint64_t* file_number) { *file_number = fn; } + auto stream = event_logger_->LogToBuffer(log_buffer_); + stream << "job" << job_context_->job_id << "event" + << "flush_finished"; + stream << "lsm_state"; + stream.StartArray(); + auto vstorage = cfd_->current()->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + return s; } @@ -166,12 +177,24 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, ReadOptions ro; ro.total_order_seek = true; Arena arena; + uint64_t total_num_entries = 0, total_num_deletes = 0; + size_t total_memory_usage = 0; for (MemTable* m : mems) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); memtables.push_back(m->NewIterator(ro, &arena)); + total_num_entries += m->num_entries(); + total_num_deletes += m->num_deletes(); + total_memory_usage += m->ApproximateMemoryUsage(); } + + event_logger_->Log() << "job" << job_context_->job_id << "event" + << "flush_started" + << "num_memtables" << mems.size() << "num_entries" + << total_num_entries << "num_deletes" + << total_num_deletes << "memory_usage" + << total_memory_usage; { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], @@ -195,7 +218,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); - event_logger_->Log() << "event" + event_logger_->Log() << "job" << job_context_->job_id << "event" << "table_file_creation" << "file_number" << meta.fd.GetNumber() << "file_size" << meta.fd.GetFileSize(); diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 22f1eb876..fc4e948ab 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -222,9 +222,9 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Create some MemTables std::vector tables; + MutableCFOptions mutable_cf_options(options, ioptions); for (int i = 0; i < num_tables; i++) { - MemTable* mem = - new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb); + MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb); mem->Ref(); std::string value; diff --git a/util/event_logger.cc b/util/event_logger.cc index e9e1fdc38..fdecb8e54 100644 --- a/util/event_logger.cc +++ b/util/event_logger.cc @@ -21,12 +21,24 @@ namespace rocksdb { const char* kEventLoggerPrefix = "EVENT_LOG_v1"; EventLoggerStream::EventLoggerStream(Logger* logger) - : logger_(logger), json_writter_(nullptr) {} + : logger_(logger), log_buffer_(nullptr), json_writter_(nullptr) {} + +EventLoggerStream::EventLoggerStream(LogBuffer* log_buffer) + : logger_(nullptr), log_buffer_(log_buffer), json_writter_(nullptr) {} EventLoggerStream::~EventLoggerStream() { if (json_writter_) { json_writter_->EndObject(); - Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str()); +#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT + printf("%s\n", json_writter_->Get().c_str()); +#else + if (logger_) { + Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str()); + } else if (log_buffer_) { + LogToBuffer(log_buffer_, "%s %s", kEventLoggerPrefix, + json_writter_->Get().c_str()); + } +#endif delete json_writter_; } } diff --git a/util/event_logger.h b/util/event_logger.h index e1d94f323..9054a6225 100644 --- a/util/event_logger.h +++ b/util/event_logger.h @@ -11,6 +11,7 @@ #include #include "rocksdb/env.h" +#include "util/log_buffer.h" namespace rocksdb { @@ -56,11 +57,8 @@ class JSONWritter { } void StartArray() { - assert(state_ == kExpectKey); + assert(state_ == kExpectValue); state_ = kInArray; - if (!first_element_) { - stream_ << ", "; - } stream_ << "["; first_element_ = true; } @@ -125,6 +123,12 @@ class EventLoggerStream { *json_writter_ << val; return *this; } + + void StartArray() { json_writter_->StartArray(); } + void EndArray() { json_writter_->EndArray(); } + void StartObject() { json_writter_->StartObject(); } + void EndObject() { json_writter_->EndObject(); } + ~EventLoggerStream(); private: @@ -138,7 +142,10 @@ class EventLoggerStream { } friend class EventLogger; explicit EventLoggerStream(Logger* logger); - Logger* logger_; + explicit EventLoggerStream(LogBuffer* log_buffer); + // exactly one is non-nullptr + Logger* const logger_; + LogBuffer* const log_buffer_; // ownership JSONWritter* json_writter_; }; @@ -151,6 +158,9 @@ class EventLogger { public: explicit EventLogger(Logger* logger) : logger_(logger) {} EventLoggerStream Log() { return EventLoggerStream(logger_); } + EventLoggerStream LogToBuffer(LogBuffer* log_buffer) { + return EventLoggerStream(log_buffer); + } private: Logger* logger_;