Include bunch of more events into EventLogger

Summary:
Added these events:
* Recovery start, finish and also when recovery creates a file
* Trivial move
* Compaction start, finish and when compaction creates a file
* Flush start, finish

Also includes small fix to EventLogger

Also added option ROCKSDB_PRINT_EVENTS_TO_STDOUT which is useful when we debug things. I've spent far too much time chasing LOG files.

Still didn't get sst table properties in JSON. They are written very deeply into the stack. I'll address in separate diff.

TODO:
* Write specification. Let's first use this for a while and figure out what's good data to put here, too. After that we'll write spec
* Write tools that parse and analyze LOGs. This can be in python or go. Good intern task.

Test Plan: Ran db_bench with ROCKSDB_PRINT_EVENTS_TO_STDOUT. Here's the output: https://phabricator.fb.com/P19811976

Reviewers: sdong, yhchiang, rven, MarkCallaghan, kradhakrishnan, anthony

Reviewed By: anthony

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D37521
main
Igor Canadi 10 years ago
parent 3db81d535a
commit 1bb4928da9
  1. 10
      db/compaction.cc
  2. 2
      db/compaction.h
  3. 75
      db/compaction_job.cc
  4. 6
      db/compaction_job.h
  5. 11
      db/compaction_job_test.cc
  6. 40
      db/db_impl.cc
  7. 4
      db/db_impl.h
  8. 25
      db/flush_job.cc
  9. 4
      db/memtable_list_test.cc
  10. 16
      util/event_logger.cc
  11. 20
      util/event_logger.h

@ -255,6 +255,16 @@ const char* Compaction::InputLevelSummary(
return scratch->buffer;
}
uint64_t Compaction::CalculateTotalInputSize() const {
uint64_t size = 0;
for (auto& input_level : inputs_) {
for (auto f : input_level.files) {
size += f->fd.GetFileSize();
}
}
return size;
}
void Compaction::ReleaseCompactionFiles(Status status) {
MarkFilesBeingCompacted(false);
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);

@ -173,6 +173,8 @@ class Compaction {
const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const;
uint64_t CalculateTotalInputSize() const;
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();

@ -49,6 +49,7 @@
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
@ -208,7 +209,7 @@ CompactionJob::CompactionJob(
LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory,
Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback)
std::function<uint64_t()> yield_callback, EventLogger* event_logger)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_stats_(1),
@ -225,7 +226,8 @@ CompactionJob::CompactionJob(
snapshots_(snapshots),
is_snapshot_supported_(is_snapshot_supported),
table_cache_(std::move(table_cache)),
yield_callback_(std::move(yield_callback)) {
yield_callback_(std::move(yield_callback)),
event_logger_(event_logger) {
ThreadStatusUtil::SetColumnFamily(
compact_->compaction->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
@ -242,22 +244,10 @@ void CompactionJob::Prepare() {
compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer();
auto* compaction = compact_->compaction;
// Generate file_levels_ for compaction berfore making Iterator
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
ColumnFamilyData* cfd __attribute__((unused)) =
compact_->compaction->column_family_data();
assert(cfd != nullptr);
{
Compaction::InputLevelSummaryBuffer inputs_summary;
LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f",
cfd->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary),
compaction->score());
}
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
@ -291,6 +281,35 @@ Status CompactionJob::Run() {
log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto* compaction = compact_->compaction;
// Let's check if anything will get logged. Don't prepare all the info if
// we're not logging
if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
job_id_, compaction->InputLevelSummary(&inputs_summary),
compaction->score());
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event"
<< "compaction_started";
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray();
for (auto f : *compaction->inputs(i)) {
stream << f->fd.GetNumber();
}
stream.EndArray();
}
stream << "score" << compaction->score() << "input_data_size"
<< compaction->CalculateTotalInputSize();
}
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
versions_->MakeInputIterator(compact_->compaction));
@ -481,7 +500,6 @@ Status CompactionJob::Run() {
if (compact_->num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records +=
compact_->num_input_records - compact_->num_output_records;
compact_->num_input_records = compact_->num_output_records = 0;
}
RecordCompactionIOStats();
@ -503,14 +521,14 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
*status = InstallCompactionResults(db_mutex);
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
LogToBuffer(log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp),
cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
@ -524,6 +542,21 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
status->ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_id_ << "event"
<< "compaction_finished"
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->outputs.size()
<< "total_output_size" << compact_->total_bytes << "num_input_records"
<< compact_->num_input_records << "num_output_records"
<< compact_->num_output_records;
stream << "lsm_state";
stream.StartArray();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
CleanupCompaction(*status);
}
@ -997,6 +1030,10 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
" keys, %" PRIu64 " bytes",
cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes);
event_logger_->Log() << "job" << job_id_ << "event"
<< "table_file_creation"
<< "file_number" << output_number << "file_size"
<< current_bytes;
}
}
return s;

@ -30,6 +30,7 @@
#include "rocksdb/compaction_filter.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/event_logger.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
@ -60,7 +61,8 @@ class CompactionJob {
Directory* db_directory, Directory* output_directory,
Statistics* stats, SnapshotList* snapshot_list,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback);
std::function<uint64_t()> yield_callback,
EventLogger* event_logger);
~CompactionJob();
@ -125,6 +127,8 @@ class CompactionJob {
// yield callback
std::function<uint64_t()> yield_callback_;
EventLogger* event_logger_;
};
} // namespace rocksdb

@ -163,11 +163,12 @@ TEST_F(CompactionJobTest, Simple) {
};
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
CompactionJob compaction_job(0, compaction.get(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, &snapshots, true,
table_cache_, std::move(yield_callback));
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(
0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr,
nullptr, nullptr, &snapshots, true, table_cache_,
std::move(yield_callback), &event_logger);
compaction_job.Prepare();
mutex_.Unlock();
ASSERT_OK(compaction_job.Run());

@ -671,7 +671,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
// evict from cache
TableCache::Evict(table_cache_.get(), number);
fname = TableFileName(db_options_.db_paths, number, path_id);
event_logger_.Log() << "event"
event_logger_.Log() << "job" << state.job_id << "event"
<< "table_file_deletion"
<< "file_number" << number;
} else {
@ -937,6 +937,18 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
edit.SetColumnFamily(cfd->GetID());
version_edits.insert({cfd->GetID(), edit});
}
int job_id = next_job_id_.fetch_add(1);
{
auto stream = event_logger_.Log();
stream << "job" << job_id << "event"
<< "recovery_started";
stream << "log_files";
stream.StartArray();
for (auto log_number : log_numbers) {
stream << log_number;
}
stream.EndArray();
}
for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST
@ -1016,7 +1028,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
@ -1058,7 +1070,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Recovery failed
break;
@ -1087,11 +1099,14 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
}
event_logger_.Log() << "job" << job_id << "event"
<< "recovery_finished";
return status;
}
Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
@ -1129,6 +1144,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
" Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
event_logger_.Log() << "job" << job_id << "event"
<< "table_file_creation"
<< "file_number" << meta.fd.GetNumber() << "file_size"
<< meta.fd.GetFileSize();
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
@ -1453,7 +1472,7 @@ Status DBImpl::CompactFilesImpl(
env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback));
std::move(yield_callback), &event_logger_);
compaction_job.Prepare();
mutex_.Unlock();
@ -2299,6 +2318,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(
c->level() + 1, f->fd.GetFileSize());
{
event_logger_.LogToBuffer(log_buffer)
<< "job" << job_context->job_id << "event"
<< "trivial_move"
<< "destination_level" << c->level() + 1 << "file_number"
<< f->fd.GetNumber() << "file_size" << f->fd.GetFileSize();
}
LogToBuffer(
log_buffer,
"[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n",
@ -2321,7 +2347,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback));
std::move(yield_callback), &event_logger_);
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();

@ -360,8 +360,8 @@ class DBImpl : public DB {
// database is opened) and is heavyweight because it holds the mutex
// for the entire period. The second method WriteLevel0Table supports
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit);
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit);
Status DelayWrite(uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context);

@ -138,6 +138,17 @@ Status FlushJob::Run(uint64_t* file_number) {
*file_number = fn;
}
auto stream = event_logger_->LogToBuffer(log_buffer_);
stream << "job" << job_context_->job_id << "event"
<< "flush_finished";
stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();
return s;
}
@ -166,12 +177,24 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_memory_usage += m->ApproximateMemoryUsage();
}
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
@ -195,7 +218,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str());
event_logger_->Log() << "event"
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "table_file_creation"
<< "file_number" << meta.fd.GetNumber() << "file_size"
<< meta.fd.GetFileSize();

@ -222,9 +222,9 @@ TEST_F(MemTableListTest, FlushPendingTest) {
// Create some MemTables
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options, ioptions);
for (int i = 0; i < num_tables; i++) {
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb);
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb);
mem->Ref();
std::string value;

@ -21,12 +21,24 @@ namespace rocksdb {
const char* kEventLoggerPrefix = "EVENT_LOG_v1";
EventLoggerStream::EventLoggerStream(Logger* logger)
: logger_(logger), json_writter_(nullptr) {}
: logger_(logger), log_buffer_(nullptr), json_writter_(nullptr) {}
EventLoggerStream::EventLoggerStream(LogBuffer* log_buffer)
: logger_(nullptr), log_buffer_(log_buffer), json_writter_(nullptr) {}
EventLoggerStream::~EventLoggerStream() {
if (json_writter_) {
json_writter_->EndObject();
Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str());
#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT
printf("%s\n", json_writter_->Get().c_str());
#else
if (logger_) {
Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str());
} else if (log_buffer_) {
LogToBuffer(log_buffer_, "%s %s", kEventLoggerPrefix,
json_writter_->Get().c_str());
}
#endif
delete json_writter_;
}
}

@ -11,6 +11,7 @@
#include <chrono>
#include "rocksdb/env.h"
#include "util/log_buffer.h"
namespace rocksdb {
@ -56,11 +57,8 @@ class JSONWritter {
}
void StartArray() {
assert(state_ == kExpectKey);
assert(state_ == kExpectValue);
state_ = kInArray;
if (!first_element_) {
stream_ << ", ";
}
stream_ << "[";
first_element_ = true;
}
@ -125,6 +123,12 @@ class EventLoggerStream {
*json_writter_ << val;
return *this;
}
void StartArray() { json_writter_->StartArray(); }
void EndArray() { json_writter_->EndArray(); }
void StartObject() { json_writter_->StartObject(); }
void EndObject() { json_writter_->EndObject(); }
~EventLoggerStream();
private:
@ -138,7 +142,10 @@ class EventLoggerStream {
}
friend class EventLogger;
explicit EventLoggerStream(Logger* logger);
Logger* logger_;
explicit EventLoggerStream(LogBuffer* log_buffer);
// exactly one is non-nullptr
Logger* const logger_;
LogBuffer* const log_buffer_;
// ownership
JSONWritter* json_writter_;
};
@ -151,6 +158,9 @@ class EventLogger {
public:
explicit EventLogger(Logger* logger) : logger_(logger) {}
EventLoggerStream Log() { return EventLoggerStream(logger_); }
EventLoggerStream LogToBuffer(LogBuffer* log_buffer) {
return EventLoggerStream(log_buffer);
}
private:
Logger* logger_;

Loading…
Cancel
Save