[RocksDB] LogBuffer Cleanup

Summary: Moved LogBuffer class to an internal header. Removed some unneccesary indirection. Enabled log buffer for BackgroundCallFlush. Forced log buffer flush right after Unlock to improve time ordering of info log.

Test Plan: make check; db_bench compare LOG output

Reviewers: sdong

Reviewed By: sdong

CC: leveldb, igor

Differential Revision: https://reviews.facebook.net/D16707
main
Haobo Xu 11 years ago
parent 04d2c26e17
commit 66da467983
  1. 1
      db/compaction_picker.cc
  2. 99
      db/db_impl.cc
  3. 12
      db/db_impl.h
  4. 5
      db/version_set.h
  5. 24
      include/rocksdb/env.h
  6. 73
      util/env.cc
  7. 67
      util/log_buffer.cc
  8. 46
      util/log_buffer.h

@ -10,6 +10,7 @@
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#include <limits> #include <limits>
#include "util/log_buffer.h"
#include "util/statistics.h" #include "util/statistics.h"
namespace rocksdb { namespace rocksdb {

@ -57,6 +57,7 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/hash_skiplist_rep.h" #include "util/hash_skiplist_rep.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -1192,7 +1193,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit, Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber) { uint64_t* filenumber,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
@ -1208,6 +1210,7 @@ Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables; std::vector<Iterator*> memtables;
for (MemTable* m : mems) { for (MemTable* m : mems) {
Log(options_.info_log, Log(options_.info_log,
@ -1278,7 +1281,8 @@ Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
} }
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_.size() != 0); assert(imm_.size() != 0);
assert(imm_.IsFlushPending()); assert(imm_.IsFlushPending());
@ -1288,7 +1292,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
autovector<MemTable*> mems; autovector<MemTable*> mems;
imm_.PickMemtablesToFlush(&mems); imm_.PickMemtablesToFlush(&mems);
if (mems.empty()) { if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush"); LogToBuffer(log_buffer, "Nothing in memstore to flush");
return Status::OK(); return Status::OK();
} }
@ -1311,7 +1315,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
} }
// This will release and re-acquire the mutex. // This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &file_number); Status s = WriteLevel0Table(mems, edit, &file_number, log_buffer);
if (s.ok() && shutting_down_.Acquire_Load()) { if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::ShutdownInProgress( s = Status::ShutdownInProgress(
@ -1866,13 +1870,14 @@ void DBImpl::BGWorkCompaction(void* db) {
} }
Status DBImpl::BackgroundFlush(bool* madeProgress, Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
Status stat; Status stat;
while (stat.ok() && imm_.IsFlushPending()) { while (stat.ok() && imm_.IsFlushPending()) {
Log(options_.info_log, Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_); options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(madeProgress, deletion_state); stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer);
} }
return stat; return stat;
} }
@ -1881,41 +1886,48 @@ void DBImpl::BackgroundCallFlush() {
bool madeProgress = false; bool madeProgress = false;
DeletionState deletion_state(true); DeletionState deletion_state(true);
assert(bg_flush_scheduled_); assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
Status s; LogBuffer log_buffer(INFO, options_.info_log.get());
if (!shutting_down_.Acquire_Load()) { {
s = BackgroundFlush(&madeProgress, deletion_state); MutexLock l(&mutex_);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in Status s;
// case this is an environmental problem and we do not want to if (!shutting_down_.Acquire_Load()) {
// chew up resources for failed compactions for the duration of s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer);
// the problem. if (!s.ok()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error // Wait a little bit before retrying background compaction in
Log(options_.info_log, "Waiting after background flush error: %s", // case this is an environmental problem and we do not want to
s.ToString().c_str()); // chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str());
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
}
// If !s.ok(), this means that Flush failed. In that case, we want
// to delete all obsolete files and we force FindObsoleteFiles()
FindObsoleteFiles(deletion_state, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log); log_buffer.FlushBufferToLog();
env_->SleepForMicroseconds(1000000); PurgeObsoleteFiles(deletion_state);
mutex_.Lock(); mutex_.Lock();
} }
}
// If !s.ok(), this means that Flush failed. In that case, we want bg_flush_scheduled_--;
// to delete all obsolete files and we force FindObsoleteFiles() if (madeProgress) {
FindObsoleteFiles(deletion_state, !s.ok()); MaybeScheduleFlushOrCompaction();
// delete unnecessary files if any, this is done outside the mutex }
if (deletion_state.HaveSomethingToDelete()) { bg_cv_.SignalAll();
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
mutex_.Lock();
}
bg_flush_scheduled_--;
if (madeProgress) {
MaybeScheduleFlushOrCompaction();
} }
bg_cv_.SignalAll(); log_buffer.FlushBufferToLog();
} }
@ -1933,7 +1945,7 @@ void DBImpl::BackgroundCallCompaction() {
DeletionState deletion_state(true); DeletionState deletion_state(true);
MaybeDumpStats(); MaybeDumpStats();
LogBuffer log_buffer(INFO, options_.info_log); LogBuffer log_buffer(INFO, options_.info_log.get());
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item", // Log(options_.info_log, "XXX BG Thread %llx process new work item",
@ -1949,6 +1961,7 @@ void DBImpl::BackgroundCallCompaction() {
// the problem. // the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
Log(options_.info_log, "Waiting after background compaction error: %s", Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str()); s.ToString().c_str());
LogFlush(options_.info_log); LogFlush(options_.info_log);
@ -1966,6 +1979,7 @@ void DBImpl::BackgroundCallCompaction() {
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) { if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
mutex_.Lock(); mutex_.Lock();
} }
@ -2005,7 +2019,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
"compaction slots " "compaction slots "
"available %d", "available %d",
options_.max_background_compactions - bg_compaction_scheduled_); options_.max_background_compactions - bg_compaction_scheduled_);
Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state,
log_buffer);
if (!stat.ok()) { if (!stat.ok()) {
if (is_manual) { if (is_manual) {
manual_compaction_->status = stat; manual_compaction_->status = stat;
@ -2067,7 +2082,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else { } else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get()); CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state); status = DoCompactionWork(compact, deletion_state, log_buffer);
CleanupCompaction(compact, status); CleanupCompaction(compact, status);
versions_->ReleaseCompactionFiles(c.get(), status); versions_->ReleaseCompactionFiles(c.get(), status);
c->ReleaseInputs(); c->ReleaseInputs();
@ -2336,7 +2351,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
} }
Status DBImpl::DoCompactionWork(CompactionState* compact, Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact); assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, Log(options_.info_log,
@ -2379,6 +2395,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Release mutex while we're actually doing the compaction work // Release mutex while we're actually doing the compaction work
mutex_.Unlock(); mutex_.Unlock();
// flush log buffer immediately after releasing the mutex
log_buffer->FlushBufferToLog();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction)); unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
@ -2412,10 +2430,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
LogFlush(options_.info_log); LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
if (imm_.IsFlushPending()) { if (imm_.IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state); FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} }
mutex_.Unlock(); mutex_.Unlock();
log_buffer->FlushBufferToLog();
imm_micros += (env_->NowMicros() - imm_start); imm_micros += (env_->NowMicros() - imm_start);
} }

@ -310,7 +310,8 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new // Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. // log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(bool* madeProgress, Status FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state); DeletionState& deletion_state,
LogBuffer* log_buffer);
Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only); bool read_only);
@ -322,7 +323,8 @@ class DBImpl : public DB {
// concurrent flush memtables to storage. // concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit);
Status WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit, Status WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber); uint64_t* filenumber,
LogBuffer* log_buffer);
uint64_t SlowdownAmount(int n, double bottom, double top); uint64_t SlowdownAmount(int n, double bottom, double top);
// MakeRoomForWrite will return superversion_to_free through an arugment, // MakeRoomForWrite will return superversion_to_free through an arugment,
@ -350,10 +352,12 @@ class DBImpl : public DB {
void BackgroundCallFlush(); void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state,
LogBuffer* log_buffer); LogBuffer* log_buffer);
Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state,
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status); void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact, Status DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state); DeletionState& deletion_state,
LogBuffer* log_buffer);
Status OpenCompactionOutputFile(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);

@ -38,12 +38,13 @@ namespace log { class Writer; }
class Compaction; class Compaction;
class CompactionPicker; class CompactionPicker;
class Iterator; class Iterator;
class LogBuffer;
class LookupKey;
class MemTable; class MemTable;
class MergeContext;
class TableCache; class TableCache;
class Version; class Version;
class VersionSet; class VersionSet;
class MergeContext;
class LookupKey;
// Return the smallest index i such that files[i]->largest >= key. // Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file. // Return files.size() if there is no such file.

@ -575,26 +575,6 @@ class Logger {
InfoLogLevel log_level_; InfoLogLevel log_level_;
}; };
// A class to buffer info log entries and flush them in the end.
class LogBuffer {
public:
// log_level: the log level for all the logs
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, const shared_ptr<Logger>& info_log);
~LogBuffer();
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, va_list ap);
// Flush all buffered log to the info log.
void FlushBufferToLog() const;
private:
struct Rep;
Rep* rep_;
const InfoLogLevel log_level_;
const shared_ptr<Logger>& info_log_;
};
// Identifies a locked file. // Identifies a locked file.
class FileLock { class FileLock {
@ -607,10 +587,6 @@ class FileLock {
void operator=(const FileLock&); void operator=(const FileLock&);
}; };
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
extern void LogFlush(const shared_ptr<Logger>& info_log); extern void LogFlush(const shared_ptr<Logger>& info_log);
extern void Log(const InfoLogLevel log_level, extern void Log(const InfoLogLevel log_level,

@ -31,82 +31,9 @@ WritableFile::~WritableFile() {
Logger::~Logger() { Logger::~Logger() {
} }
// One log entry with its timestamp
struct BufferedLog {
struct timeval now_tv; // Timestamp of the log
char message[1]; // Beginning of log message
};
struct LogBuffer::Rep {
Arena arena_;
autovector<BufferedLog*> logs_;
};
// Lazily initialize Rep to avoid allocations when new log is added.
LogBuffer::LogBuffer(const InfoLogLevel log_level,
const shared_ptr<Logger>& info_log)
: rep_(nullptr), log_level_(log_level), info_log_(info_log) {}
LogBuffer::~LogBuffer() { delete rep_; }
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
if (rep_ == nullptr) {
rep_ = new Rep();
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Add '\0' to the end
*p = '\0';
rep_->logs_.push_back(buffered_log);
}
void LogBuffer::FlushBufferToLog() const {
if (rep_ != nullptr) {
for (BufferedLog* log : rep_->logs_) {
const time_t seconds = log->now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
Log(log_level_, info_log_,
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
}
}
}
FileLock::~FileLock() { FileLock::~FileLock() {
} }
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format, ap);
va_end(ap);
}
}
void LogFlush(Logger *info_log) { void LogFlush(Logger *info_log) {
if (info_log) { if (info_log) {
info_log->Flush(); info_log->Flush();

@ -0,0 +1,67 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/log_buffer.h"
#include <sys/time.h>
namespace rocksdb {
LogBuffer::LogBuffer(const InfoLogLevel log_level,
Logger*info_log)
: log_level_(log_level), info_log_(info_log) {}
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Add '\0' to the end
*p = '\0';
logs_.push_back(buffered_log);
}
void LogBuffer::FlushBufferToLog() {
for (BufferedLog* log : logs_) {
const time_t seconds = log->now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
Log(log_level_, info_log_,
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
}
logs_.clear();
}
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format, ap);
va_end(ap);
}
}
} // namespace rocksdb

@ -0,0 +1,46 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "rocksdb/env.h"
#include "util/arena.h"
#include "util/autovector.h"
namespace rocksdb {
class Logger;
// A class to buffer info log entries and flush them in the end.
class LogBuffer {
public:
// log_level: the log level for all the logs
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, Logger* info_log);
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, va_list ap);
// Flush all buffered log to the info log.
void FlushBufferToLog();
private:
// One log entry with its timestamp
struct BufferedLog {
struct timeval now_tv; // Timestamp of the log
char message[1]; // Beginning of log message
};
const InfoLogLevel log_level_;
Logger* info_log_;
Arena arena_;
autovector<BufferedLog*> logs_;
};
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
} // namespace rocksdb
Loading…
Cancel
Save