From 4405f3a000f0efec23d49d193fff8a657401704d Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 4 Mar 2014 18:10:14 -0800 Subject: [PATCH 1/3] Allow user to specify log level for info_log Summary: Currently, there is no easy way for user to change log level of info log. Add a parameter in options to specify that. Also make the default level to INFO level. Removing the [INFO] tag if it is INFO level as I don't want to cause performance regression. (add [LOG] means another mem-copy and string formatting). Test Plan: make all check manual check the levels work as expected. Reviewers: dhruba, yhchiang Reviewed By: yhchiang CC: dhruba, igor, i.am.jin.lei, ljin, haobo, leveldb Differential Revision: https://reviews.facebook.net/D16563 --- include/rocksdb/env.h | 22 ++++++++++++++++------ include/rocksdb/options.h | 3 +++ util/auto_roll_logger.cc | 8 ++++++-- util/auto_roll_logger.h | 2 +- util/env.cc | 4 ++-- util/options.cc | 1 + 6 files changed, 29 insertions(+), 11 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 90461b822..78fc86788 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -513,7 +513,7 @@ class Directory { virtual Status Fsync() = 0; }; -enum InfoLogLevel { +enum InfoLogLevel : unsigned char { DEBUG = 0, INFO, WARN, @@ -526,7 +526,7 @@ enum InfoLogLevel { class Logger { public: enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 }; - explicit Logger(const InfoLogLevel log_level = InfoLogLevel::ERROR) + explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO) : log_level_(log_level) {} virtual ~Logger(); @@ -543,10 +543,20 @@ class Logger { if (log_level < log_level_) { return; } - char new_format[500]; - snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", - kInfoLogLevelNames[log_level], format); - Logv(new_format, ap); + + if (log_level == INFO) { + // Doesn't print log level if it is INFO level. + // This is to avoid unexpected performance regression after we add + // the feature of log level. All the logs before we add the feature + // are INFO level. We don't want to add extra costs to those existing + // logging. + Logv(format, ap); + } else { + char new_format[500]; + snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", + kInfoLogLevelNames[log_level], format); + Logv(new_format, ap); + } } virtual size_t GetLogFileSize() const { return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ae88268c7..53ea97b14 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -24,6 +24,7 @@ class CompactionFilter; class CompactionFilterFactory; class Comparator; class Env; +enum InfoLogLevel : unsigned char; class FilterPolicy; class Logger; class MergeOperator; @@ -152,6 +153,8 @@ struct Options { // Default: nullptr shared_ptr info_log; + InfoLogLevel info_log_level; + // ------------------- // Parameters that affect performance diff --git a/util/auto_roll_logger.cc b/util/auto_roll_logger.cc index 95f2fae0d..0a00a586e 100644 --- a/util/auto_roll_logger.cc +++ b/util/auto_roll_logger.cc @@ -88,7 +88,7 @@ Status CreateLoggerFromOptions( AutoRollLogger* result = new AutoRollLogger( env, dbname, db_log_dir, options.max_log_file_size, - options.log_file_time_to_roll); + options.log_file_time_to_roll, options.info_log_level); Status s = result->GetStatus(); if (!s.ok()) { delete result; @@ -101,7 +101,11 @@ Status CreateLoggerFromOptions( env->CreateDir(dbname); // In case it does not exist env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(), db_absolute_path, db_log_dir)); - return env->NewLogger(fname, logger); + auto s = env->NewLogger(fname, logger); + if (logger->get() != nullptr) { + (*logger)->SetInfoLogLevel(options.info_log_level); + } + return s; } } diff --git a/util/auto_roll_logger.h b/util/auto_roll_logger.h index 882653688..5fd7a1472 100644 --- a/util/auto_roll_logger.h +++ b/util/auto_roll_logger.h @@ -19,7 +19,7 @@ class AutoRollLogger : public Logger { AutoRollLogger(Env* env, const std::string& dbname, const std::string& db_log_dir, size_t log_max_size, size_t log_file_time_to_roll, - const InfoLogLevel log_level = InfoLogLevel::ERROR) + const InfoLogLevel log_level = InfoLogLevel::INFO) : Logger(log_level), dbname_(dbname), db_log_dir_(db_log_dir), diff --git a/util/env.cc b/util/env.cc index f02dd4470..1b812800c 100644 --- a/util/env.cc +++ b/util/env.cc @@ -40,7 +40,7 @@ void Log(Logger* info_log, const char* format, ...) { if (info_log) { va_list ap; va_start(ap, format); - info_log->Logv(format, ap); + info_log->Logv(InfoLogLevel::INFO, format, ap); va_end(ap); } } @@ -163,7 +163,7 @@ void Log(const shared_ptr& info_log, const char* format, ...) { if (info_log) { va_list ap; va_start(ap, format); - info_log->Logv(format, ap); + info_log->Logv(InfoLogLevel::INFO, format, ap); va_end(ap); } } diff --git a/util/options.cc b/util/options.cc index bfafe81de..22e273a8c 100644 --- a/util/options.cc +++ b/util/options.cc @@ -38,6 +38,7 @@ Options::Options() paranoid_checks(false), env(Env::Default()), info_log(nullptr), + info_log_level(INFO), write_buffer_size(4<<20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), From ecb1ffa2a80accd6c396c93930e0249dd76903f4 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 4 Mar 2014 14:32:55 -0800 Subject: [PATCH 2/3] Buffer info logs when picking compactions and write them out after releasing the mutex Summary: Now while the background thread is picking compactions, it writes out multiple info_logs, especially for universal compaction, which introduces a chance of waiting log writing in mutex, which is bad. To remove this risk, write all those info logs to a buffer and flush it after releasing the mutex. Test Plan: make all check check the log lines while running some tests that trigger compactions. Reviewers: haobo, igor, dhruba Reviewed By: dhruba CC: i.am.jin.lei, dhruba, yhchiang, leveldb, nkg- Differential Revision: https://reviews.facebook.net/D16515 --- db/compaction_picker.cc | 106 +++++++++++++++++------------------- db/compaction_picker.h | 16 ++++-- db/db_impl.cc | 116 +++++++++++++++++++++------------------- db/db_impl.h | 3 +- db/version_set.cc | 4 +- db/version_set.h | 2 +- include/rocksdb/env.h | 23 ++++++++ util/env.cc | 77 ++++++++++++++++++++++++++ 8 files changed, 227 insertions(+), 120 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d84027f0a..255d0cf84 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -367,7 +367,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, return c; } -Compaction* LevelCompactionPicker::PickCompaction(Version* version) { +Compaction* LevelCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -544,32 +545,34 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // -Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { +Compaction* UniversalCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { int level = 0; double score = version->compaction_score_[0]; if ((version->files_[level].size() < (unsigned int)options_->level0_file_num_compaction_trigger)) { - Log(options_->info_log, "Universal: nothing to do\n"); + LogToBuffer(log_buffer, "Universal: nothing to do\n"); return nullptr; } Version::FileSummaryStorage tmp; - Log(options_->info_log, "Universal: candidate files(%zu): %s\n", - version->files_[level].size(), - version->LevelFileSummary(&tmp, 0)); + LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n", + version->files_[level].size(), + version->LevelFileSummary(&tmp, 0)); // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) { - Log(options_->info_log, "Universal: compacting for size amp\n"); + if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != + nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size amp\n"); } else { - // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = options_->compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) { - Log(options_->info_log, "Universal: compacting for size ratio\n"); + if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, + log_buffer)) != nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size ratio\n"); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force @@ -577,7 +580,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; - if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp( + version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { Log(options_->info_log, "Universal: compacting for file num\n"); } } @@ -631,7 +635,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( Version* version, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact) { + unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { int level = 0; unsigned int min_merge_width = @@ -666,9 +670,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( candidate_count = 1; break; } - Log(options_->info_log, - "Universal: file %lu[%d] being compacted, skipping", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, + "Universal: file %lu[%d] being compacted, skipping", + (unsigned long)f->number, loop); f = nullptr; } @@ -676,8 +680,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->file_size : 0; if (f != nullptr) { - Log(options_->info_log, "Universal: Possible candidate file %lu[%d].", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].", + (unsigned long)f->number, loop); } // Check if the suceeding files need compaction. @@ -727,12 +731,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( i < loop + candidate_count && i < file_by_time.size(); i++) { int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; - Log(options_->info_log, - "Universal: Skipping file %lu[%d] with size %lu %d\n", - (unsigned long)f->number, - i, - (unsigned long)f->file_size, - f->being_compacted); + LogToBuffer(log_buffer, + "Universal: Skipping file %lu[%d] with size %lu %d\n", + (unsigned long)f->number, i, (unsigned long)f->file_size, + f->being_compacted); } } } @@ -766,10 +768,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n", - (unsigned long)f->number, - i, - (unsigned long)f->file_size); + LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n", + (unsigned long)f->number, i, (unsigned long)f->file_size); } return c; } @@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - Version* version, double score) { + Version* version, double score, LogBuffer* log_buffer) { int level = 0; // percentage flexibilty while reducing size amplification @@ -805,20 +805,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s", - (unsigned long)f->number, - loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s", + (unsigned long)f->number, loop, + " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - Log(options_->info_log, "Universal: First candidate file %lu[%d] %s", - (unsigned long)f->number, - start_index, - " to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s", + (unsigned long)f->number, start_index, " to reduce size amp.\n"); // keep adding up all the remaining files for (unsigned int loop = start_index; loop < file_by_time.size() - 1; @@ -826,10 +823,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = version->files_[level][index]; if (f->being_compacted) { - Log(options_->info_log, - "Universal: Possible candidate file %lu[%d] %s.", - (unsigned long)f->number, - loop, + LogToBuffer( + log_buffer, "Universal: Possible candidate file %lu[%d] %s.", + (unsigned long)f->number, loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -846,18 +842,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { - Log(options_->info_log, - "Universal: size amp not needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp not needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); return nullptr; } else { - Log(options_->info_log, - "Universal: size amp needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); } assert(start_index >= 0 && start_index < file_by_time.size() - 1); @@ -871,11 +867,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(options_->info_log, - "Universal: size amp picking file %lu[%d] with size %lu", - (unsigned long)f->number, - index, - (unsigned long)f->file_size); + LogToBuffer(log_buffer, + "Universal: size amp picking file %lu[%d] with size %lu", + (unsigned long)f->number, index, (unsigned long)f->file_size); } return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ee77cc4c7..4793b1ba4 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -19,6 +19,7 @@ namespace rocksdb { +class LogBuffer; class Compaction; class Version; @@ -31,7 +32,8 @@ class CompactionPicker { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(Version* version) = 0; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -127,16 +129,19 @@ class UniversalCompactionPicker : public CompactionPicker { UniversalCompactionPicker(const Options* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp(Version* version, double score, unsigned int ratio, - unsigned int num_files); + unsigned int num_files, + LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp(Version* version, double score); + Compaction* PickCompactionUniversalSizeAmp(Version* version, double score, + LogBuffer* log_buffer); }; class LevelCompactionPicker : public CompactionPicker { @@ -144,7 +149,8 @@ class LevelCompactionPicker : public CompactionPicker { LevelCompactionPicker(const Options* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // For the specfied level, pick a compaction. diff --git a/db/db_impl.cc b/db/db_impl.cc index f9abd6073..d05424410 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1924,57 +1924,61 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); + LogBuffer log_buffer(INFO, options_.info_log); + { + MutexLock l(&mutex_); + // Log(options_.info_log, "XXX BG Thread %llx process new work item", + // pthread_self()); + assert(bg_compaction_scheduled_); + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + Log(options_.info_log, "Waiting after background compaction error: %s", + s.ToString().c_str()); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } - MutexLock l(&mutex_); - // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self()); - assert(bg_compaction_scheduled_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundCompaction(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); + // If !s.ok(), this means that Compaction failed. In that case, we want + // to delete all obsolete files we might have created and we force + // FindObsoleteFiles(). This is because deletion_state does not catch + // all created files if compaction failed. + FindObsoleteFiles(deletion_state, !s.ok()); + + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - // If !s.ok(), this means that Compaction failed. In that case, we want - // to delete all obsolete files we might have created and we force - // FindObsoleteFiles(). This is because deletion_state does not catch - // all created files if compaction failed. - FindObsoleteFiles(deletion_state, !s.ok()); - - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } + bg_compaction_scheduled_--; - bg_compaction_scheduled_--; - - MaybeScheduleLogDBDeployStats(); + MaybeScheduleLogDBDeployStats(); - // Previous compaction may have produced too many files in a level, - // So reschedule another compaction if we made progress in the - // last compaction. - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + // Previous compaction may have produced too many files in a level, + // So reschedule another compaction if we made progress in the + // last compaction. + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); - + log_buffer.FlushBufferToLog(); } Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { *madeProgress = false; mutex_.AssertHeld(); @@ -1987,10 +1991,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // TODO: remove memtable flush from formal compaction while (imm_.IsFlushPending()) { - Log(options_.info_log, - "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " - "available %d", - options_.max_background_compactions - bg_compaction_scheduled_); + LogToBuffer(log_buffer, + "BackgroundCompaction doing FlushMemTableToOutputFile, " + "compaction slots " + "available %d", + options_.max_background_compactions - bg_compaction_scheduled_); Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); if (!stat.ok()) { if (is_manual) { @@ -2014,24 +2019,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!c) { m->done = true; } - Log(options_.info_log, + LogToBuffer( + log_buffer, "Manual compaction from level-%d to level-%d from %s .. %s; will stop " "at %s\n", - m->input_level, - m->output_level, + m->input_level, m->output_level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), ((m->done || manual_end == nullptr) ? "(end)" : manual_end->DebugString().c_str())); } else if (!options_.disable_auto_compactions) { - c.reset(versions_->PickCompaction()); + c.reset(versions_->PickCompaction(log_buffer)); } Status status; if (!c) { // Nothing to do - Log(options_.info_log, "Compaction nothing to do"); + LogToBuffer(log_buffer, "Compaction nothing to do"); } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -2043,10 +2048,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(deletion_state); Version::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast(f->number), c->level() + 1, - static_cast(f->file_size), - status.ToString().c_str(), versions_->current()->LevelSummary(&tmp)); + LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n", + static_cast(f->number), c->level() + 1, + static_cast(f->file_size), + status.ToString().c_str(), + versions_->current()->LevelSummary(&tmp)); versions_->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { @@ -2065,8 +2071,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); + Log(WARN, options_.info_log, "Compaction error: %s", + status.ToString().c_str()); if (options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; } diff --git a/db/db_impl.h b/db/db_impl.h index 49f264225..c27dac849 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -335,7 +335,8 @@ class DBImpl : public DB { static void BGWorkFlush(void* db); void BackgroundCallCompaction(); void BackgroundCallFlush(); - Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); + Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, diff --git a/db/version_set.cc b/db/version_set.cc index 4153c6e11..d47b043d2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2165,8 +2165,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -Compaction* VersionSet::PickCompaction() { - return compaction_picker_->PickCompaction(current_); +Compaction* VersionSet::PickCompaction(LogBuffer* log_buffer) { + return compaction_picker_->PickCompaction(current_, log_buffer); } Compaction* VersionSet::CompactRange(int input_level, int output_level, diff --git a/db/version_set.h b/db/version_set.h index 09b71abd4..a2c9ff039 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -365,7 +365,7 @@ class VersionSet { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - Compaction* PickCompaction(); + Compaction* PickCompaction(LogBuffer* log_buffer); // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 78fc86788..68ab1bb6a 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -575,6 +575,26 @@ class Logger { InfoLogLevel log_level_; }; +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); + ~LogBuffer(); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, ...); + + // Flush all buffered log to the info log. + void FlushBufferToLog() const; + + private: + struct Rep; + Rep* rep_; + const InfoLogLevel log_level_; + const shared_ptr& info_log_; +}; // Identifies a locked file. class FileLock { @@ -587,6 +607,9 @@ class FileLock { void operator=(const FileLock&); }; +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); extern void LogFlush(const shared_ptr& info_log); diff --git a/util/env.cc b/util/env.cc index 1b812800c..9624c04ab 100644 --- a/util/env.cc +++ b/util/env.cc @@ -8,7 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/env.h" + +#include #include "rocksdb/options.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { @@ -27,9 +31,82 @@ WritableFile::~WritableFile() { Logger::~Logger() { } +// One log entry with its timestamp +struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message +}; + +struct LogBuffer::Rep { + Arena arena_; + autovector logs_; +}; + +// Lazily initialize Rep to avoid allocations when new log is added. +LogBuffer::LogBuffer(const InfoLogLevel log_level, + const shared_ptr& info_log) + : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} + +LogBuffer::~LogBuffer() { delete rep_; } + +void LogBuffer::AddLogToBuffer(const char* format, ...) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + if (rep_ == nullptr) { + rep_ = new Rep(); + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list ap; + va_start(ap, format); + p += vsnprintf(p, limit - p, format, ap); + va_end(ap); + } + + // Add '\0' to the end + *p = '\0'; + + rep_->logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() const { + if (rep_ != nullptr) { + for (BufferedLog* log : rep_->logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + } +} + FileLock::~FileLock() { } +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format); + va_end(ap); + } +} + void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush(); From abeee9f2cb6948ab95cc3645d613cc498ac3ff70 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Wed, 5 Mar 2014 14:28:47 -0800 Subject: [PATCH 3/3] Make sure GetUniqueID releated tests run on "regular" storage Summary: With the use of tmpfs or ramfs, unit tests related to GetUniqueID() failed because of the failure from ioctl, which doesn't work with these fancy file systems at all. I fixed this issue and make sure all related tests run on the "regular" storage (disk or flash). Test Plan: TEST_TMPDIR=/dev/shm make check -j32 Reviewers: igor, dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D16593 --- util/env_test.cc | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/util/env_test.cc b/util/env_test.cc index 828b49a0b..eb2829303 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include @@ -191,6 +192,15 @@ bool IsSingleVarint(const std::string& s) { } #ifdef OS_LINUX +// To make sure the Env::GetUniqueId() related tests work correctly, The files +// should be stored in regular storage like "hard disk" or "flash device". +// Otherwise we cannot get the correct id. +// +// The following function act as the replacement of test::TmpDir() that may be +// customized by user to be on a storage that doesn't work with GetUniqueId(). +// +// TODO(kailiu) This function still assumes /tmp/ reside in regular +// storage system. bool IsUniqueIDValid(const std::string& s) { return !s.empty() && !IsSingleVarint(s); } @@ -198,11 +208,21 @@ bool IsUniqueIDValid(const std::string& s) { const size_t MAX_ID_SIZE = 100; char temp_id[MAX_ID_SIZE]; +std::string GetOnDiskTestDir() { + char base[100]; + snprintf(base, sizeof(base), "/tmp/rocksdbtest-%d", + static_cast(geteuid())); + // Directory may already exist + Env::Default()->CreateDirIfMissing(base); + + return base; +} + // Only works in linux platforms TEST(EnvPosixTest, RandomAccessUniqueID) { // Create file. const EnvOptions soptions; - std::string fname = test::TmpDir() + "/" + "testfile"; + std::string fname = GetOnDiskTestDir() + "/" + "testfile"; unique_ptr wfile; ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); @@ -261,7 +281,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Create the files std::vector fnames; for (int i = 0; i < 1000; ++i) { - fnames.push_back(test::TmpDir() + "/" + "testfile" + std::to_string(i)); + fnames.push_back(GetOnDiskTestDir() + "/" + "testfile" + std::to_string(i)); // Create file. unique_ptr wfile; @@ -294,7 +314,8 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Only works in linux platforms TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) { const EnvOptions soptions; - std::string fname = test::TmpDir() + "/" + "testfile"; + + std::string fname = GetOnDiskTestDir() + "/" + "testfile"; // Check that after file is deleted we don't get same ID again in a new file. std::unordered_set ids;