diff --git a/Makefile b/Makefile index 94a994cba..2131a4114 100644 --- a/Makefile +++ b/Makefile @@ -228,6 +228,7 @@ TESTS = \ geodb_test \ rate_limiter_test \ options_test \ + event_logger_test \ cuckoo_table_builder_test \ cuckoo_table_reader_test \ cuckoo_table_db_test \ @@ -623,6 +624,9 @@ compact_files_test: db/compact_files_test.o $(LIBOBJECTS) $(TESTHARNESS) options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + sst_dump_test: util/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2450d825f..ca4b5a7db 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -199,9 +199,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) db_options_(SanitizeOptions(dbname, options)), stats_(db_options_.statistics.get()), db_lock_(nullptr), - mutex_(stats_, env_, - DB_MUTEX_WAIT_MICROS, - options.use_adaptive_mutex), + mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, options.use_adaptive_mutex), shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), @@ -229,6 +227,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) #ifndef ROCKSDB_LITE wal_manager_(db_options_, env_options_), #endif // ROCKSDB_LITE + event_logger_(db_options_.info_log.get()), bg_work_gate_closed_(false), refitting_level_(false), opened_successfully_(false), @@ -652,6 +651,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // evict from cache TableCache::Evict(table_cache_.get(), number); fname = TableFileName(db_options_.db_paths, number, path_id); + event_logger_.Log() << "event" + << "table_file_deletion" + << "file_number" << number; } else { fname = ((type == kLogFile) ? db_options_.wal_dir : dbname_) + "/" + to_delete; @@ -1140,7 +1142,8 @@ Status DBImpl::FlushMemTableToOutputFile( env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots_.GetNewest(), job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U), - GetCompressionFlush(*cfd->ioptions()), stats_); + GetCompressionFlush(*cfd->ioptions()), stats_, + &event_logger_); uint64_t file_number; Status s = flush_job.Run(&file_number); diff --git a/db/db_impl.h b/db/db_impl.h index c2faeed7f..b5d529bdf 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -32,6 +32,8 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" +#include "util/event_logger.h" +#include "util/hash.h" #include "util/stop_watch.h" #include "util/thread_local.h" #include "util/scoped_arena_iterator.h" @@ -599,6 +601,9 @@ class DBImpl : public DB { WalManager wal_manager_; #endif // ROCKSDB_LITE + // Unified interface for logging events + EventLogger event_logger_; + // A value of true temporarily disables scheduling of background work bool bg_work_gate_closed_; diff --git a/db/flush_job.cc b/db/flush_job.cc index 0f8dc58de..44be45ab6 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -40,6 +40,7 @@ #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "util/coding.h" +#include "util/event_logger.h" #include "util/file_util.h" #include "util/logging.h" #include "util/log_buffer.h" @@ -61,7 +62,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, - CompressionType output_compression, Statistics* stats) + CompressionType output_compression, Statistics* stats, + EventLogger* event_logger) : dbname_(dbname), cfd_(cfd), db_options_(db_options), @@ -76,7 +78,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, db_directory_(db_directory), output_file_directory_(output_file_directory), output_compression_(output_compression), - stats_(stats) {} + stats_(stats), + event_logger_(event_logger) {} Status FlushJob::Run(uint64_t* file_number) { // Save the contents of the earliest memtable as a new Table @@ -180,6 +183,10 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); + event_logger_->Log() << "event" + << "table_file_creation" + << "file_number" << meta.fd.GetNumber() << "file_size" + << meta.fd.GetFileSize(); if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { output_file_directory_->Fsync(); } diff --git a/db/flush_job.h b/db/flush_job.h index 40cdc5045..1526d673b 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -28,6 +28,7 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "util/autovector.h" +#include "util/event_logger.h" #include "util/instrumented_mutex.h" #include "util/stop_watch.h" #include "util/thread_local.h" @@ -59,7 +60,7 @@ class FlushJob { SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, - Statistics* stats); + Statistics* stats, EventLogger* event_logger); ~FlushJob() {} Status Run(uint64_t* file_number = nullptr); @@ -82,6 +83,7 @@ class FlushJob { Directory* output_file_directory_; CompressionType output_compression_; Statistics* stats_; + EventLogger* event_logger_; }; } // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 15dd91675..72941ede0 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -83,11 +83,12 @@ class FlushJobTest { TEST(FlushJobTest, Empty) { JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + EventLogger event_logger(db_options_.info_log.get()); FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, SequenceNumber(), &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr); + kNoCompression, nullptr, &event_logger); ASSERT_OK(flush_job.Run()); job_context.Clean(); } @@ -107,11 +108,12 @@ TEST(FlushJobTest, NonEmpty) { } cfd->imm()->Add(new_mem); + EventLogger event_logger(db_options_.info_log.get()); FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, SequenceNumber(), &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr); + kNoCompression, nullptr, &event_logger); mutex_.Lock(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); diff --git a/src.mk b/src.mk index 609c3dcdf..d783086f8 100644 --- a/src.mk +++ b/src.mk @@ -105,6 +105,7 @@ LIB_SOURCES = \ utilities/spatialdb/spatial_db.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ + util/event_logger.cc \ util/ldb_cmd.cc \ util/ldb_tool.cc \ util/log_buffer.cc \ @@ -209,6 +210,7 @@ TEST_BENCH_SOURCES = \ util/memenv_test.cc \ util/mock_env_test.cc \ util/options_test.cc \ + util/event_logger_test.cc \ util/rate_limiter_test.cc \ util/signal_test.cc \ util/slice_transform_test.cc \ diff --git a/util/event_logger.cc b/util/event_logger.cc new file mode 100644 index 000000000..a1b665075 --- /dev/null +++ b/util/event_logger.cc @@ -0,0 +1,35 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "util/event_logger.h" + +#include +#include +#include +#include +#include + +#include "util/string_util.h" + +namespace rocksdb { + +const char* kEventLoggerPrefix = "EVENT_LOG_v1"; + +EventLoggerStream::EventLoggerStream(Logger* logger) + : logger_(logger), json_writter_(nullptr) {} + +EventLoggerStream::~EventLoggerStream() { + if (json_writter_) { + json_writter_->EndObject(); + Log(logger_, "%s %s", kEventLoggerPrefix, json_writter_->Get().c_str()); + delete json_writter_; + } +} + +} // namespace rocksdb diff --git a/util/event_logger.h b/util/event_logger.h new file mode 100644 index 000000000..99916a230 --- /dev/null +++ b/util/event_logger.h @@ -0,0 +1,158 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include + +#include "rocksdb/env.h" + +namespace rocksdb { + +// JSONWritter doesn't support objects in arrays yet. There wasn't a need for +// that. +class JSONWritter { + public: + JSONWritter() : state_(kExpectKey), first_element_(true) { stream_ << "{"; } + + void AddKey(const std::string& key) { + assert(state_ == kExpectKey); + if (!first_element_) { + stream_ << ", "; + } + stream_ << "\"" << key << "\": "; + state_ = kExpectValue; + first_element_ = false; + } + + void AddValue(const char* value) { + assert(state_ == kExpectValue || state_ == kInArray); + if (state_ == kInArray && !first_element_) { + stream_ << ", "; + } + stream_ << "\"" << value << "\""; + if (state_ != kInArray) { + state_ = kExpectKey; + } + first_element_ = false; + } + + template + void AddValue(const T& value) { + assert(state_ == kExpectValue || state_ == kInArray); + if (state_ == kInArray && !first_element_) { + stream_ << ", "; + } + stream_ << value; + if (state_ != kInArray) { + state_ = kExpectKey; + } + first_element_ = false; + } + + void StartArray() { + assert(state_ == kExpectKey); + state_ = kInArray; + if (!first_element_) { + stream_ << ", "; + } + stream_ << "["; + first_element_ = true; + } + + void EndArray() { + assert(state_ == kInArray); + state_ = kExpectKey; + stream_ << "]"; + first_element_ = false; + } + + void StartObject() { + assert(state_ == kExpectValue); + stream_ << "{"; + first_element_ = true; + } + + void EndObject() { + assert(state_ == kExpectKey); + stream_ << "}"; + first_element_ = false; + } + + std::string Get() const { return stream_.str(); } + + JSONWritter& operator<<(const char* val) { + if (state_ == kExpectKey) { + AddKey(val); + } else { + AddValue(val); + } + return *this; + } + + JSONWritter& operator<<(const std::string& val) { + return *this << val.c_str(); + } + + template + JSONWritter& operator<<(const T& val) { + assert(state_ != kExpectKey); + AddValue(val); + return *this; + } + + private: + enum JSONWritterState { + kExpectKey, + kExpectValue, + kInArray, + }; + JSONWritterState state_; + bool first_element_; + std::ostringstream stream_; +}; + +class EventLoggerStream { + public: + template + EventLoggerStream& operator<<(const T& val) { + MakeStream(); + *json_writter_ << val; + return *this; + } + ~EventLoggerStream(); + + private: + void MakeStream() { + if (!json_writter_) { + json_writter_ = new JSONWritter(); + *this << "time_micros" + << std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + } + } + friend class EventLogger; + explicit EventLoggerStream(Logger* logger); + Logger* logger_; + // ownership + JSONWritter* json_writter_; +}; + +// here is an example of the output that will show up in the LOG: +// 2015/01/15-14:13:25.788019 1105ef000 EVENT_LOG_v1 {"time_micros": +// 1421360005788015, "event": "table_file_creation", "file_number": 12, +// "file_size": 1909699} +class EventLogger { + public: + explicit EventLogger(Logger* logger) : logger_(logger) {} + EventLoggerStream Log() { return EventLoggerStream(logger_); } + + private: + Logger* logger_; +}; + +} // namespace rocksdb diff --git a/util/event_logger_test.cc b/util/event_logger_test.cc new file mode 100644 index 000000000..0d7985f61 --- /dev/null +++ b/util/event_logger_test.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include "util/event_logger.h" +#include "util/testharness.h" + +namespace rocksdb { + +class EventLoggerTest {}; + +class StringLogger : public Logger { + public: + using Logger::Logv; + virtual void Logv(const char* format, va_list ap) override { + vsnprintf(buffer_, sizeof(buffer_), format, ap); + } + char* buffer() { return buffer_; } + + private: + char buffer_[1000]; +}; + +TEST(EventLoggerTest, SimpleTest) { + StringLogger logger; + EventLogger event_logger(&logger); + event_logger.Log() << "id" << 5 << "event" + << "just_testing"; + std::string output(logger.buffer()); + ASSERT_TRUE(output.find("\"event\": \"just_testing\"") != std::string::npos); + ASSERT_TRUE(output.find("\"id\": 5") != std::string::npos); + ASSERT_TRUE(output.find("\"time_micros\"") != std::string::npos); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +}