Buffer info logs when picking compactions and write them out after releasing the mutex

Summary: Now while the background thread is picking compactions, it writes out multiple info_logs, especially for universal compaction, which introduces a chance of waiting log writing in mutex, which is bad. To remove this risk, write all those info logs to a buffer and flush it after releasing the mutex.

Test Plan:
make all check
check the log lines while running some tests that trigger compactions.

Reviewers: haobo, igor, dhruba

Reviewed By: dhruba

CC: i.am.jin.lei, dhruba, yhchiang, leveldb, nkg-

Differential Revision: https://reviews.facebook.net/D16515
main
sdong 11 years ago
parent 4405f3a000
commit ecb1ffa2a8
  1. 72
      db/compaction_picker.cc
  2. 16
      db/compaction_picker.h
  3. 40
      db/db_impl.cc
  4. 3
      db/db_impl.h
  5. 4
      db/version_set.cc
  6. 2
      db/version_set.h
  7. 23
      include/rocksdb/env.h
  8. 77
      util/env.cc

@ -367,7 +367,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
return c;
}
Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
Compaction* LevelCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* c = nullptr;
int level = -1;
@ -544,32 +545,34 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
int level = 0;
double score = version->compaction_score_[0];
if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
Log(options_->info_log, "Universal: nothing to do\n");
LogToBuffer(log_buffer, "Universal: nothing to do\n");
return nullptr;
}
Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%zu): %s\n",
LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n",
version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size amp\n");
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size amp\n");
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size ratio\n");
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size ratio\n");
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
@ -577,7 +580,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) {
if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
Log(options_->info_log, "Universal: compacting for file num\n");
}
}
@ -631,7 +635,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Version* version, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact) {
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
int level = 0;
unsigned int min_merge_width =
@ -666,7 +670,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
candidate_count = 1;
break;
}
Log(options_->info_log,
LogToBuffer(log_buffer,
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop);
f = nullptr;
@ -676,7 +680,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) {
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop);
}
@ -727,11 +731,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
i < loop + candidate_count && i < file_by_time.size(); i++) {
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
Log(options_->info_log,
LogToBuffer(log_buffer,
"Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size,
(unsigned long)f->number, i, (unsigned long)f->file_size,
f->being_compacted);
}
}
@ -766,10 +768,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size);
LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number, i, (unsigned long)f->file_size);
}
return c;
}
@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Version* version, double score) {
Version* version, double score, LogBuffer* log_buffer) {
int level = 0;
// percentage flexibilty while reducing size amplification
@ -805,9 +805,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
start_index = loop; // Consider this as the first candidate.
break;
}
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number,
loop,
LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number, loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
@ -815,10 +814,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return nullptr; // no candidate files
}
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number,
start_index,
" to reduce size amp.\n");
LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number, start_index, " to reduce size amp.\n");
// keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
@ -826,10 +823,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
int index = file_by_time[loop];
f = version->files_[level][index];
if (f->being_compacted) {
Log(options_->info_log,
"Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number,
loop,
LogToBuffer(
log_buffer, "Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number, loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
@ -846,14 +842,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
Log(options_->info_log,
LogToBuffer(log_buffer,
"Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
return nullptr;
} else {
Log(options_->info_log,
LogToBuffer(log_buffer,
"Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
@ -871,11 +867,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
int index = file_by_time[loop];
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log,
LogToBuffer(log_buffer,
"Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number,
index,
(unsigned long)f->file_size);
(unsigned long)f->number, index, (unsigned long)f->file_size);
}
return c;
}

@ -19,6 +19,7 @@
namespace rocksdb {
class LogBuffer;
class Compaction;
class Version;
@ -31,7 +32,8 @@ class CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(Version* version) = 0;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) = 0;
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
@ -127,16 +129,19 @@ class UniversalCompactionPicker : public CompactionPicker {
UniversalCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
unsigned int ratio,
unsigned int num_files);
unsigned int num_files,
LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
LogBuffer* log_buffer);
};
class LevelCompactionPicker : public CompactionPicker {
@ -144,7 +149,8 @@ class LevelCompactionPicker : public CompactionPicker {
LevelCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
private:
// For the specfied level, pick a compaction.

@ -1924,22 +1924,24 @@ void DBImpl::BackgroundCallCompaction() {
DeletionState deletion_state(true);
MaybeDumpStats();
LogBuffer log_buffer(INFO, options_.info_log);
{
MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
// Log(options_.info_log, "XXX BG Thread %llx process new work item",
// pthread_self());
assert(bg_compaction_scheduled_);
Status s;
if (!shutting_down_.Acquire_Load()) {
s = BackgroundCompaction(&madeProgress, deletion_state);
s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
@ -1970,11 +1972,13 @@ void DBImpl::BackgroundCallCompaction() {
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
}
log_buffer.FlushBufferToLog();
}
Status DBImpl::BackgroundCompaction(bool* madeProgress,
DeletionState& deletion_state) {
DeletionState& deletion_state,
LogBuffer* log_buffer) {
*madeProgress = false;
mutex_.AssertHeld();
@ -1987,8 +1991,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// TODO: remove memtable flush from formal compaction
while (imm_.IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
LogToBuffer(log_buffer,
"BackgroundCompaction doing FlushMemTableToOutputFile, "
"compaction slots "
"available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
@ -2014,24 +2019,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!c) {
m->done = true;
}
Log(options_.info_log,
LogToBuffer(
log_buffer,
"Manual compaction from level-%d to level-%d from %s .. %s; will stop "
"at %s\n",
m->input_level,
m->output_level,
m->input_level, m->output_level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
((m->done || manual_end == nullptr)
? "(end)"
: manual_end->DebugString().c_str()));
} else if (!options_.disable_auto_compactions) {
c.reset(versions_->PickCompaction());
c.reset(versions_->PickCompaction(log_buffer));
}
Status status;
if (!c) {
// Nothing to do
Log(options_.info_log, "Compaction nothing to do");
LogToBuffer(log_buffer, "Compaction nothing to do");
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
@ -2043,10 +2048,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
InstallSuperVersion(deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->current()->LevelSummary(&tmp));
status.ToString().c_str(),
versions_->current()->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c.get(), status);
*madeProgress = true;
} else {
@ -2065,8 +2071,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
Log(WARN, options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}

@ -335,7 +335,8 @@ class DBImpl : public DB {
static void BGWorkFlush(void* db);
void BackgroundCallCompaction();
void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state,
LogBuffer* log_buffer);
Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact,

@ -2165,8 +2165,8 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
}
}
Compaction* VersionSet::PickCompaction() {
return compaction_picker_->PickCompaction(current_);
Compaction* VersionSet::PickCompaction(LogBuffer* log_buffer) {
return compaction_picker_->PickCompaction(current_, log_buffer);
}
Compaction* VersionSet::CompactRange(int input_level, int output_level,

@ -365,7 +365,7 @@ class VersionSet {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();
Compaction* PickCompaction(LogBuffer* log_buffer);
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that

@ -575,6 +575,26 @@ class Logger {
InfoLogLevel log_level_;
};
// A class to buffer info log entries and flush them in the end.
class LogBuffer {
public:
// log_level: the log level for all the logs
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, const shared_ptr<Logger>& info_log);
~LogBuffer();
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, ...);
// Flush all buffered log to the info log.
void FlushBufferToLog() const;
private:
struct Rep;
Rep* rep_;
const InfoLogLevel log_level_;
const shared_ptr<Logger>& info_log_;
};
// Identifies a locked file.
class FileLock {
@ -587,6 +607,9 @@ class FileLock {
void operator=(const FileLock&);
};
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
extern void LogFlush(const shared_ptr<Logger>& info_log);

@ -8,7 +8,11 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/env.h"
#include <sys/time.h>
#include "rocksdb/options.h"
#include "util/arena.h"
#include "util/autovector.h"
namespace rocksdb {
@ -27,9 +31,82 @@ WritableFile::~WritableFile() {
Logger::~Logger() {
}
// One log entry with its timestamp
struct BufferedLog {
struct timeval now_tv; // Timestamp of the log
char message[1]; // Beginning of log message
};
struct LogBuffer::Rep {
Arena arena_;
autovector<BufferedLog*> logs_;
};
// Lazily initialize Rep to avoid allocations when new log is added.
LogBuffer::LogBuffer(const InfoLogLevel log_level,
const shared_ptr<Logger>& info_log)
: rep_(nullptr), log_level_(log_level), info_log_(info_log) {}
LogBuffer::~LogBuffer() { delete rep_; }
void LogBuffer::AddLogToBuffer(const char* format, ...) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
if (rep_ == nullptr) {
rep_ = new Rep();
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
// Print the message
if (p < limit) {
va_list ap;
va_start(ap, format);
p += vsnprintf(p, limit - p, format, ap);
va_end(ap);
}
// Add '\0' to the end
*p = '\0';
rep_->logs_.push_back(buffered_log);
}
void LogBuffer::FlushBufferToLog() const {
if (rep_ != nullptr) {
for (BufferedLog* log : rep_->logs_) {
const time_t seconds = log->now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
Log(log_level_, info_log_,
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
}
}
}
FileLock::~FileLock() {
}
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format);
va_end(ap);
}
}
void LogFlush(Logger *info_log) {
if (info_log) {
info_log->Flush();

Loading…
Cancel
Save