From 66da4679830029bb344c747fe3d2a356d9a8a540 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Sun, 9 Mar 2014 22:01:13 -0700 Subject: [PATCH] [RocksDB] LogBuffer Cleanup Summary: Moved LogBuffer class to an internal header. Removed some unneccesary indirection. Enabled log buffer for BackgroundCallFlush. Forced log buffer flush right after Unlock to improve time ordering of info log. Test Plan: make check; db_bench compare LOG output Reviewers: sdong Reviewed By: sdong CC: leveldb, igor Differential Revision: https://reviews.facebook.net/D16707 --- db/compaction_picker.cc | 3 +- db/db_impl.cc | 99 ++++++++++++++++++++++++----------------- db/db_impl.h | 12 +++-- db/version_set.h | 5 ++- include/rocksdb/env.h | 24 ---------- util/env.cc | 73 ------------------------------ util/log_buffer.cc | 67 ++++++++++++++++++++++++++++ util/log_buffer.h | 46 +++++++++++++++++++ 8 files changed, 185 insertions(+), 144 deletions(-) create mode 100644 util/log_buffer.cc create mode 100644 util/log_buffer.h diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 255d0cf84..e1abf3c45 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -10,6 +10,7 @@ #include "db/compaction_picker.h" #include +#include "util/log_buffer.h" #include "util/statistics.h" namespace rocksdb { @@ -702,7 +703,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( uint64_t sz = (candidate_size * (100L + ratio)) /100; if (sz < f->file_size) { break; - } + } if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. diff --git a/db/db_impl.cc b/db/db_impl.cc index 99cfc6e6c..deea5e080 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,6 +57,7 @@ #include "util/coding.h" #include "util/hash_skiplist_rep.h" #include "util/logging.h" +#include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -1192,7 +1193,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, - uint64_t* filenumber) { + uint64_t* filenumber, + LogBuffer* log_buffer) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1208,6 +1210,7 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, Status s; { mutex_.Unlock(); + log_buffer->FlushBufferToLog(); std::vector memtables; for (MemTable* m : mems) { Log(options_.info_log, @@ -1278,7 +1281,8 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, } Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(imm_.size() != 0); assert(imm_.IsFlushPending()); @@ -1288,7 +1292,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, autovector mems; imm_.PickMemtablesToFlush(&mems); if (mems.empty()) { - Log(options_.info_log, "Nothing in memstore to flush"); + LogToBuffer(log_buffer, "Nothing in memstore to flush"); return Status::OK(); } @@ -1311,7 +1315,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &file_number); + Status s = WriteLevel0Table(mems, edit, &file_number, log_buffer); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::ShutdownInProgress( @@ -1866,13 +1870,14 @@ void DBImpl::BGWorkCompaction(void* db) { } Status DBImpl::BackgroundFlush(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { Status stat; while (stat.ok() && imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(madeProgress, deletion_state); + stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer); } return stat; } @@ -1881,41 +1886,48 @@ void DBImpl::BackgroundCallFlush() { bool madeProgress = false; DeletionState deletion_state(true); assert(bg_flush_scheduled_); - MutexLock l(&mutex_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundFlush(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background flush error: %s", - s.ToString().c_str()); + LogBuffer log_buffer(INFO, options_.info_log.get()); + { + MutexLock l(&mutex_); + + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + Log(options_.info_log, "Waiting after background flush error: %s", + s.ToString().c_str()); + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } + + // If !s.ok(), this means that Flush failed. In that case, we want + // to delete all obsolete files and we force FindObsoleteFiles() + FindObsoleteFiles(deletion_state, !s.ok()); + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + log_buffer.FlushBufferToLog(); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - // If !s.ok(), this means that Flush failed. In that case, we want - // to delete all obsolete files and we force FindObsoleteFiles() - FindObsoleteFiles(deletion_state, !s.ok()); - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } - - bg_flush_scheduled_--; - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + bg_flush_scheduled_--; + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); + log_buffer.FlushBufferToLog(); } @@ -1933,7 +1945,7 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); - LogBuffer log_buffer(INFO, options_.info_log); + LogBuffer log_buffer(INFO, options_.info_log.get()); { MutexLock l(&mutex_); // Log(options_.info_log, "XXX BG Thread %llx process new work item", @@ -1949,6 +1961,7 @@ void DBImpl::BackgroundCallCompaction() { // the problem. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error mutex_.Unlock(); + log_buffer.FlushBufferToLog(); Log(options_.info_log, "Waiting after background compaction error: %s", s.ToString().c_str()); LogFlush(options_.info_log); @@ -1966,6 +1979,7 @@ void DBImpl::BackgroundCallCompaction() { // delete unnecessary files if any, this is done outside the mutex if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); + log_buffer.FlushBufferToLog(); PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } @@ -2005,7 +2019,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, "compaction slots " "available %d", options_.max_background_compactions - bg_compaction_scheduled_); - Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); + Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state, + log_buffer); if (!stat.ok()) { if (is_manual) { manual_compaction_->status = stat; @@ -2067,7 +2082,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, deletion_state); + status = DoCompactionWork(compact, deletion_state, log_buffer); CleanupCompaction(compact, status); versions_->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); @@ -2336,7 +2351,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( } Status DBImpl::DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { assert(compact); int64_t imm_micros = 0; // Micros spent doing imm_ compactions Log(options_.info_log, @@ -2379,6 +2395,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + // flush log buffer immediately after releasing the mutex + log_buffer->FlushBufferToLog(); const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); @@ -2412,10 +2430,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogFlush(options_.info_log); mutex_.Lock(); if (imm_.IsFlushPending()) { - FlushMemTableToOutputFile(nullptr, deletion_state); + FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); + log_buffer->FlushBufferToLog(); imm_micros += (env_->NowMicros() - imm_start); } diff --git a/db/db_impl.h b/db/db_impl.h index 2877317cb..cde0b07f8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -310,7 +310,8 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(bool* madeProgress, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, bool read_only); @@ -322,7 +323,8 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); Status WriteLevel0Table(autovector& mems, VersionEdit* edit, - uint64_t* filenumber); + uint64_t* filenumber, + LogBuffer* log_buffer); uint64_t SlowdownAmount(int n, double bottom, double top); // MakeRoomForWrite will return superversion_to_free through an arugment, @@ -350,10 +352,12 @@ class DBImpl : public DB { void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); - Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); + Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); diff --git a/db/version_set.h b/db/version_set.h index a2c9ff039..317cfe353 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -38,12 +38,13 @@ namespace log { class Writer; } class Compaction; class CompactionPicker; class Iterator; +class LogBuffer; +class LookupKey; class MemTable; +class MergeContext; class TableCache; class Version; class VersionSet; -class MergeContext; -class LookupKey; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index b5f78d1c3..c96a659fe 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -575,26 +575,6 @@ class Logger { InfoLogLevel log_level_; }; -// A class to buffer info log entries and flush them in the end. -class LogBuffer { - public: - // log_level: the log level for all the logs - // info_log: logger to write the logs to - LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); - ~LogBuffer(); - - // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, va_list ap); - - // Flush all buffered log to the info log. - void FlushBufferToLog() const; - - private: - struct Rep; - Rep* rep_; - const InfoLogLevel log_level_; - const shared_ptr& info_log_; -}; // Identifies a locked file. class FileLock { @@ -607,10 +587,6 @@ class FileLock { void operator=(const FileLock&); }; -// Add log to the LogBuffer for a delayed info logging. It can be used when -// we want to add some logs inside a mutex. -extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); - extern void LogFlush(const shared_ptr& info_log); extern void Log(const InfoLogLevel log_level, diff --git a/util/env.cc b/util/env.cc index 36bc78d55..419c8145d 100644 --- a/util/env.cc +++ b/util/env.cc @@ -31,82 +31,9 @@ WritableFile::~WritableFile() { Logger::~Logger() { } -// One log entry with its timestamp -struct BufferedLog { - struct timeval now_tv; // Timestamp of the log - char message[1]; // Beginning of log message -}; - -struct LogBuffer::Rep { - Arena arena_; - autovector logs_; -}; - -// Lazily initialize Rep to avoid allocations when new log is added. -LogBuffer::LogBuffer(const InfoLogLevel log_level, - const shared_ptr& info_log) - : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} - -LogBuffer::~LogBuffer() { delete rep_; } - -void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { - if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { - // Skip the level because of its level. - return; - } - if (rep_ == nullptr) { - rep_ = new Rep(); - } - - const size_t kLogSizeLimit = 512; - char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); - BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); - char* p = buffered_log->message; - char* limit = alloc_mem + kLogSizeLimit - 1; - - // store the time - gettimeofday(&(buffered_log->now_tv), nullptr); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Add '\0' to the end - *p = '\0'; - - rep_->logs_.push_back(buffered_log); -} - -void LogBuffer::FlushBufferToLog() const { - if (rep_ != nullptr) { - for (BufferedLog* log : rep_->logs_) { - const time_t seconds = log->now_tv.tv_sec; - struct tm t; - localtime_r(&seconds, &t); - Log(log_level_, info_log_, - "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", - t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, - t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); - } - } -} - FileLock::~FileLock() { } -void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { - if (log_buffer != nullptr) { - va_list ap; - va_start(ap, format); - log_buffer->AddLogToBuffer(format, ap); - va_end(ap); - } -} - void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush(); diff --git a/util/log_buffer.cc b/util/log_buffer.cc new file mode 100644 index 000000000..f27d62126 --- /dev/null +++ b/util/log_buffer.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/log_buffer.h" + +#include + +namespace rocksdb { + +LogBuffer::LogBuffer(const InfoLogLevel log_level, + Logger*info_log) + : log_level_(log_level), info_log_(info_log) {} + +void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Add '\0' to the end + *p = '\0'; + + logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() { + for (BufferedLog* log : logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + logs_.clear(); +} + +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format, ap); + va_end(ap); + } +} + +} // namespace rocksdb diff --git a/util/log_buffer.h b/util/log_buffer.h new file mode 100644 index 000000000..76503a084 --- /dev/null +++ b/util/log_buffer.h @@ -0,0 +1,46 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "rocksdb/env.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +class Logger; + +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, Logger* info_log); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, va_list ap); + + // Flush all buffered log to the info log. + void FlushBufferToLog(); + + private: + // One log entry with its timestamp + struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message + }; + + const InfoLogLevel log_level_; + Logger* info_log_; + Arena arena_; + autovector logs_; +}; + +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); + +} // namespace rocksdb