diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 255d0cf84..e1abf3c45 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -10,6 +10,7 @@ #include "db/compaction_picker.h" #include +#include "util/log_buffer.h" #include "util/statistics.h" namespace rocksdb { @@ -702,7 +703,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( uint64_t sz = (candidate_size * (100L + ratio)) /100; if (sz < f->file_size) { break; - } + } if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. diff --git a/db/db_impl.cc b/db/db_impl.cc index 99cfc6e6c..deea5e080 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,6 +57,7 @@ #include "util/coding.h" #include "util/hash_skiplist_rep.h" #include "util/logging.h" +#include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -1192,7 +1193,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, - uint64_t* filenumber) { + uint64_t* filenumber, + LogBuffer* log_buffer) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1208,6 +1210,7 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, Status s; { mutex_.Unlock(); + log_buffer->FlushBufferToLog(); std::vector memtables; for (MemTable* m : mems) { Log(options_.info_log, @@ -1278,7 +1281,8 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, } Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(imm_.size() != 0); assert(imm_.IsFlushPending()); @@ -1288,7 +1292,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, autovector mems; imm_.PickMemtablesToFlush(&mems); if (mems.empty()) { - Log(options_.info_log, "Nothing in memstore to flush"); + LogToBuffer(log_buffer, "Nothing in memstore to flush"); return Status::OK(); } @@ -1311,7 +1315,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, } // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &file_number); + Status s = WriteLevel0Table(mems, edit, &file_number, log_buffer); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::ShutdownInProgress( @@ -1866,13 +1870,14 @@ void DBImpl::BGWorkCompaction(void* db) { } Status DBImpl::BackgroundFlush(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { Status stat; while (stat.ok() && imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(madeProgress, deletion_state); + stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer); } return stat; } @@ -1881,41 +1886,48 @@ void DBImpl::BackgroundCallFlush() { bool madeProgress = false; DeletionState deletion_state(true); assert(bg_flush_scheduled_); - MutexLock l(&mutex_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundFlush(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background flush error: %s", - s.ToString().c_str()); + LogBuffer log_buffer(INFO, options_.info_log.get()); + { + MutexLock l(&mutex_); + + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + Log(options_.info_log, "Waiting after background flush error: %s", + s.ToString().c_str()); + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } + + // If !s.ok(), this means that Flush failed. In that case, we want + // to delete all obsolete files and we force FindObsoleteFiles() + FindObsoleteFiles(deletion_state, !s.ok()); + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + log_buffer.FlushBufferToLog(); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - // If !s.ok(), this means that Flush failed. In that case, we want - // to delete all obsolete files and we force FindObsoleteFiles() - FindObsoleteFiles(deletion_state, !s.ok()); - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } - - bg_flush_scheduled_--; - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + bg_flush_scheduled_--; + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); + log_buffer.FlushBufferToLog(); } @@ -1933,7 +1945,7 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); - LogBuffer log_buffer(INFO, options_.info_log); + LogBuffer log_buffer(INFO, options_.info_log.get()); { MutexLock l(&mutex_); // Log(options_.info_log, "XXX BG Thread %llx process new work item", @@ -1949,6 +1961,7 @@ void DBImpl::BackgroundCallCompaction() { // the problem. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error mutex_.Unlock(); + log_buffer.FlushBufferToLog(); Log(options_.info_log, "Waiting after background compaction error: %s", s.ToString().c_str()); LogFlush(options_.info_log); @@ -1966,6 +1979,7 @@ void DBImpl::BackgroundCallCompaction() { // delete unnecessary files if any, this is done outside the mutex if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); + log_buffer.FlushBufferToLog(); PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } @@ -2005,7 +2019,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, "compaction slots " "available %d", options_.max_background_compactions - bg_compaction_scheduled_); - Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); + Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state, + log_buffer); if (!stat.ok()) { if (is_manual) { manual_compaction_->status = stat; @@ -2067,7 +2082,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, deletion_state); + status = DoCompactionWork(compact, deletion_state, log_buffer); CleanupCompaction(compact, status); versions_->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); @@ -2336,7 +2351,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( } Status DBImpl::DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { assert(compact); int64_t imm_micros = 0; // Micros spent doing imm_ compactions Log(options_.info_log, @@ -2379,6 +2395,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + // flush log buffer immediately after releasing the mutex + log_buffer->FlushBufferToLog(); const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); @@ -2412,10 +2430,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogFlush(options_.info_log); mutex_.Lock(); if (imm_.IsFlushPending()) { - FlushMemTableToOutputFile(nullptr, deletion_state); + FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); + log_buffer->FlushBufferToLog(); imm_micros += (env_->NowMicros() - imm_start); } diff --git a/db/db_impl.h b/db/db_impl.h index 2877317cb..cde0b07f8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -310,7 +310,8 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(bool* madeProgress, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, bool read_only); @@ -322,7 +323,8 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); Status WriteLevel0Table(autovector& mems, VersionEdit* edit, - uint64_t* filenumber); + uint64_t* filenumber, + LogBuffer* log_buffer); uint64_t SlowdownAmount(int n, double bottom, double top); // MakeRoomForWrite will return superversion_to_free through an arugment, @@ -350,10 +352,12 @@ class DBImpl : public DB { void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); - Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); + Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); diff --git a/db/version_set.h b/db/version_set.h index a2c9ff039..317cfe353 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -38,12 +38,13 @@ namespace log { class Writer; } class Compaction; class CompactionPicker; class Iterator; +class LogBuffer; +class LookupKey; class MemTable; +class MergeContext; class TableCache; class Version; class VersionSet; -class MergeContext; -class LookupKey; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index b5f78d1c3..c96a659fe 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -575,26 +575,6 @@ class Logger { InfoLogLevel log_level_; }; -// A class to buffer info log entries and flush them in the end. -class LogBuffer { - public: - // log_level: the log level for all the logs - // info_log: logger to write the logs to - LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); - ~LogBuffer(); - - // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, va_list ap); - - // Flush all buffered log to the info log. - void FlushBufferToLog() const; - - private: - struct Rep; - Rep* rep_; - const InfoLogLevel log_level_; - const shared_ptr& info_log_; -}; // Identifies a locked file. class FileLock { @@ -607,10 +587,6 @@ class FileLock { void operator=(const FileLock&); }; -// Add log to the LogBuffer for a delayed info logging. It can be used when -// we want to add some logs inside a mutex. -extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); - extern void LogFlush(const shared_ptr& info_log); extern void Log(const InfoLogLevel log_level, diff --git a/util/env.cc b/util/env.cc index 36bc78d55..419c8145d 100644 --- a/util/env.cc +++ b/util/env.cc @@ -31,82 +31,9 @@ WritableFile::~WritableFile() { Logger::~Logger() { } -// One log entry with its timestamp -struct BufferedLog { - struct timeval now_tv; // Timestamp of the log - char message[1]; // Beginning of log message -}; - -struct LogBuffer::Rep { - Arena arena_; - autovector logs_; -}; - -// Lazily initialize Rep to avoid allocations when new log is added. -LogBuffer::LogBuffer(const InfoLogLevel log_level, - const shared_ptr& info_log) - : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} - -LogBuffer::~LogBuffer() { delete rep_; } - -void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { - if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { - // Skip the level because of its level. - return; - } - if (rep_ == nullptr) { - rep_ = new Rep(); - } - - const size_t kLogSizeLimit = 512; - char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); - BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); - char* p = buffered_log->message; - char* limit = alloc_mem + kLogSizeLimit - 1; - - // store the time - gettimeofday(&(buffered_log->now_tv), nullptr); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Add '\0' to the end - *p = '\0'; - - rep_->logs_.push_back(buffered_log); -} - -void LogBuffer::FlushBufferToLog() const { - if (rep_ != nullptr) { - for (BufferedLog* log : rep_->logs_) { - const time_t seconds = log->now_tv.tv_sec; - struct tm t; - localtime_r(&seconds, &t); - Log(log_level_, info_log_, - "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", - t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, - t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); - } - } -} - FileLock::~FileLock() { } -void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { - if (log_buffer != nullptr) { - va_list ap; - va_start(ap, format); - log_buffer->AddLogToBuffer(format, ap); - va_end(ap); - } -} - void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush(); diff --git a/util/log_buffer.cc b/util/log_buffer.cc new file mode 100644 index 000000000..f27d62126 --- /dev/null +++ b/util/log_buffer.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/log_buffer.h" + +#include + +namespace rocksdb { + +LogBuffer::LogBuffer(const InfoLogLevel log_level, + Logger*info_log) + : log_level_(log_level), info_log_(info_log) {} + +void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Add '\0' to the end + *p = '\0'; + + logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() { + for (BufferedLog* log : logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + logs_.clear(); +} + +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format, ap); + va_end(ap); + } +} + +} // namespace rocksdb diff --git a/util/log_buffer.h b/util/log_buffer.h new file mode 100644 index 000000000..76503a084 --- /dev/null +++ b/util/log_buffer.h @@ -0,0 +1,46 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "rocksdb/env.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +class Logger; + +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, Logger* info_log); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, va_list ap); + + // Flush all buffered log to the info log. + void FlushBufferToLog(); + + private: + // One log entry with its timestamp + struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message + }; + + const InfoLogLevel log_level_; + Logger* info_log_; + Arena arena_; + autovector logs_; +}; + +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); + +} // namespace rocksdb