diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d84027f0a..255d0cf84 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -367,7 +367,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, return c; } -Compaction* LevelCompactionPicker::PickCompaction(Version* version) { +Compaction* LevelCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -544,32 +545,34 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // -Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { +Compaction* UniversalCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { int level = 0; double score = version->compaction_score_[0]; if ((version->files_[level].size() < (unsigned int)options_->level0_file_num_compaction_trigger)) { - Log(options_->info_log, "Universal: nothing to do\n"); + LogToBuffer(log_buffer, "Universal: nothing to do\n"); return nullptr; } Version::FileSummaryStorage tmp; - Log(options_->info_log, "Universal: candidate files(%zu): %s\n", - version->files_[level].size(), - version->LevelFileSummary(&tmp, 0)); + LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n", + version->files_[level].size(), + version->LevelFileSummary(&tmp, 0)); // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) { - Log(options_->info_log, "Universal: compacting for size amp\n"); + if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != + nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size amp\n"); } else { - // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = options_->compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) { - Log(options_->info_log, "Universal: compacting for size ratio\n"); + if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, + log_buffer)) != nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size ratio\n"); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force @@ -577,7 +580,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; - if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp( + version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { Log(options_->info_log, "Universal: compacting for file num\n"); } } @@ -631,7 +635,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( Version* version, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact) { + unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { int level = 0; unsigned int min_merge_width = @@ -666,9 +670,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( candidate_count = 1; break; } - Log(options_->info_log, - "Universal: file %lu[%d] being compacted, skipping", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, + "Universal: file %lu[%d] being compacted, skipping", + (unsigned long)f->number, loop); f = nullptr; } @@ -676,8 +680,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->file_size : 0; if (f != nullptr) { - Log(options_->info_log, "Universal: Possible candidate file %lu[%d].", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].", + (unsigned long)f->number, loop); } // Check if the suceeding files need compaction. @@ -727,12 +731,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( i < loop + candidate_count && i < file_by_time.size(); i++) { int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; - Log(options_->info_log, - "Universal: Skipping file %lu[%d] with size %lu %d\n", - (unsigned long)f->number, - i, - (unsigned long)f->file_size, - f->being_compacted); + LogToBuffer(log_buffer, + "Universal: Skipping file %lu[%d] with size %lu %d\n", + (unsigned long)f->number, i, (unsigned long)f->file_size, + f->being_compacted); } } } @@ -766,10 +768,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n", - (unsigned long)f->number, - i, - (unsigned long)f->file_size); + LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n", + (unsigned long)f->number, i, (unsigned long)f->file_size); } return c; } @@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - Version* version, double score) { + Version* version, double score, LogBuffer* log_buffer) { int level = 0; // percentage flexibilty while reducing size amplification @@ -805,20 +805,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s", - (unsigned long)f->number, - loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s", + (unsigned long)f->number, loop, + " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - Log(options_->info_log, "Universal: First candidate file %lu[%d] %s", - (unsigned long)f->number, - start_index, - " to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s", + (unsigned long)f->number, start_index, " to reduce size amp.\n"); // keep adding up all the remaining files for (unsigned int loop = start_index; loop < file_by_time.size() - 1; @@ -826,10 +823,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = version->files_[level][index]; if (f->being_compacted) { - Log(options_->info_log, - "Universal: Possible candidate file %lu[%d] %s.", - (unsigned long)f->number, - loop, + LogToBuffer( + log_buffer, "Universal: Possible candidate file %lu[%d] %s.", + (unsigned long)f->number, loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -846,18 +842,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { - Log(options_->info_log, - "Universal: size amp not needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp not needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); return nullptr; } else { - Log(options_->info_log, - "Universal: size amp needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); } assert(start_index >= 0 && start_index < file_by_time.size() - 1); @@ -871,11 +867,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(options_->info_log, - "Universal: size amp picking file %lu[%d] with size %lu", - (unsigned long)f->number, - index, - (unsigned long)f->file_size); + LogToBuffer(log_buffer, + "Universal: size amp picking file %lu[%d] with size %lu", + (unsigned long)f->number, index, (unsigned long)f->file_size); } return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ee77cc4c7..4793b1ba4 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -19,6 +19,7 @@ namespace rocksdb { +class LogBuffer; class Compaction; class Version; @@ -31,7 +32,8 @@ class CompactionPicker { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(Version* version) = 0; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -127,16 +129,19 @@ class UniversalCompactionPicker : public CompactionPicker { UniversalCompactionPicker(const Options* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp(Version* version, double score, unsigned int ratio, - unsigned int num_files); + unsigned int num_files, + LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp(Version* version, double score); + Compaction* PickCompactionUniversalSizeAmp(Version* version, double score, + LogBuffer* log_buffer); }; class LevelCompactionPicker : public CompactionPicker { @@ -144,7 +149,8 @@ class LevelCompactionPicker : public CompactionPicker { LevelCompactionPicker(const Options* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // For the specfied level, pick a compaction. diff --git a/db/db_impl.cc b/db/db_impl.cc index f9abd6073..d05424410 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1924,57 +1924,61 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); + LogBuffer log_buffer(INFO, options_.info_log); + { + MutexLock l(&mutex_); + // Log(options_.info_log, "XXX BG Thread %llx process new work item", + // pthread_self()); + assert(bg_compaction_scheduled_); + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + Log(options_.info_log, "Waiting after background compaction error: %s", + s.ToString().c_str()); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } - MutexLock l(&mutex_); - // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self()); - assert(bg_compaction_scheduled_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundCompaction(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); + // If !s.ok(), this means that Compaction failed. In that case, we want + // to delete all obsolete files we might have created and we force + // FindObsoleteFiles(). This is because deletion_state does not catch + // all created files if compaction failed. + FindObsoleteFiles(deletion_state, !s.ok()); + + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - // If !s.ok(), this means that Compaction failed. In that case, we want - // to delete all obsolete files we might have created and we force - // FindObsoleteFiles(). This is because deletion_state does not catch - // all created files if compaction failed. - FindObsoleteFiles(deletion_state, !s.ok()); - - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } + bg_compaction_scheduled_--; - bg_compaction_scheduled_--; - - MaybeScheduleLogDBDeployStats(); + MaybeScheduleLogDBDeployStats(); - // Previous compaction may have produced too many files in a level, - // So reschedule another compaction if we made progress in the - // last compaction. - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + // Previous compaction may have produced too many files in a level, + // So reschedule another compaction if we made progress in the + // last compaction. + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); - + log_buffer.FlushBufferToLog(); } Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { *madeProgress = false; mutex_.AssertHeld(); @@ -1987,10 +1991,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // TODO: remove memtable flush from formal compaction while (imm_.IsFlushPending()) { - Log(options_.info_log, - "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " - "available %d", - options_.max_background_compactions - bg_compaction_scheduled_); + LogToBuffer(log_buffer, + "BackgroundCompaction doing FlushMemTableToOutputFile, " + "compaction slots " + "available %d", + options_.max_background_compactions - bg_compaction_scheduled_); Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); if (!stat.ok()) { if (is_manual) { @@ -2014,24 +2019,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!c) { m->done = true; } - Log(options_.info_log, + LogToBuffer( + log_buffer, "Manual compaction from level-%d to level-%d from %s .. %s; will stop " "at %s\n", - m->input_level, - m->output_level, + m->input_level, m->output_level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), ((m->done || manual_end == nullptr) ? "(end)" : manual_end->DebugString().c_str())); } else if (!options_.disable_auto_compactions) { - c.reset(versions_->PickCompaction()); + c.reset(versions_->PickCompaction(log_buffer)); } Status status; if (!c) { // Nothing to do - Log(options_.info_log, "Compaction nothing to do"); + LogToBuffer(log_buffer, "Compaction nothing to do"); } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -2043,10 +2048,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(deletion_state); Version::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast(f->number), c->level() + 1, - static_cast(f->file_size), - status.ToString().c_str(), versions_->current()->LevelSummary(&tmp)); + LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n", + static_cast(f->number), c->level() + 1, + static_cast(f->file_size), + status.ToString().c_str(), + versions_->current()->LevelSummary(&tmp)); versions_->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { @@ -2065,8 +2071,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); + Log(WARN, options_.info_log, "Compaction error: %s", + status.ToString().c_str()); if (options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; } diff --git a/db/db_impl.h b/db/db_impl.h index 49f264225..c27dac849 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -335,7 +335,8 @@ class DBImpl : public DB { static void BGWorkFlush(void* db); void BackgroundCallCompaction(); void BackgroundCallFlush(); - Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); + Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, diff --git a/db/version_set.cc b/db/version_set.cc index 4153c6e11..d47b043d2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2165,8 +2165,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -Compaction* VersionSet::PickCompaction() { - return compaction_picker_->PickCompaction(current_); +Compaction* VersionSet::PickCompaction(LogBuffer* log_buffer) { + return compaction_picker_->PickCompaction(current_, log_buffer); } Compaction* VersionSet::CompactRange(int input_level, int output_level, diff --git a/db/version_set.h b/db/version_set.h index 09b71abd4..a2c9ff039 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -365,7 +365,7 @@ class VersionSet { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - Compaction* PickCompaction(); + Compaction* PickCompaction(LogBuffer* log_buffer); // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 78fc86788..68ab1bb6a 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -575,6 +575,26 @@ class Logger { InfoLogLevel log_level_; }; +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); + ~LogBuffer(); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, ...); + + // Flush all buffered log to the info log. + void FlushBufferToLog() const; + + private: + struct Rep; + Rep* rep_; + const InfoLogLevel log_level_; + const shared_ptr& info_log_; +}; // Identifies a locked file. class FileLock { @@ -587,6 +607,9 @@ class FileLock { void operator=(const FileLock&); }; +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); extern void LogFlush(const shared_ptr& info_log); diff --git a/util/env.cc b/util/env.cc index 1b812800c..9624c04ab 100644 --- a/util/env.cc +++ b/util/env.cc @@ -8,7 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/env.h" + +#include #include "rocksdb/options.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { @@ -27,9 +31,82 @@ WritableFile::~WritableFile() { Logger::~Logger() { } +// One log entry with its timestamp +struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message +}; + +struct LogBuffer::Rep { + Arena arena_; + autovector logs_; +}; + +// Lazily initialize Rep to avoid allocations when new log is added. +LogBuffer::LogBuffer(const InfoLogLevel log_level, + const shared_ptr& info_log) + : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} + +LogBuffer::~LogBuffer() { delete rep_; } + +void LogBuffer::AddLogToBuffer(const char* format, ...) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + if (rep_ == nullptr) { + rep_ = new Rep(); + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list ap; + va_start(ap, format); + p += vsnprintf(p, limit - p, format, ap); + va_end(ap); + } + + // Add '\0' to the end + *p = '\0'; + + rep_->logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() const { + if (rep_ != nullptr) { + for (BufferedLog* log : rep_->logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + } +} + FileLock::~FileLock() { } +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format); + va_end(ap); + } +} + void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush();