diff --git a/db/column_family.cc b/db/column_family.cc index 8d6c7d831..0aee751ca 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -270,8 +270,8 @@ void ColumnFamilyData::CreateNewMemtable() { mem_->Ref(); } -Compaction* ColumnFamilyData::PickCompaction() { - return compaction_picker_->PickCompaction(current_); +Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { + return compaction_picker_->PickCompaction(current_, log_buffer); } Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, diff --git a/db/column_family.h b/db/column_family.h index 8ee96f8c4..560e3dabd 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -147,7 +147,7 @@ class ColumnFamilyData { TableCache* table_cache() const { return table_cache_.get(); } // See documentation in compaction_picker.h - Compaction* PickCompaction(); + Compaction* PickCompaction(LogBuffer* log_buffer); Compaction* CompactRange(int input_level, int output_level, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 47cbc14aa..6308b9731 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -364,7 +364,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, return c; } -Compaction* LevelCompactionPicker::PickCompaction(Version* version) { +Compaction* LevelCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -541,31 +542,34 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // -Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { +Compaction* UniversalCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { int level = 0; double score = version->compaction_score_[0]; if ((version->files_[level].size() < (unsigned int)options_->level0_file_num_compaction_trigger)) { - Log(logger_, "Universal: nothing to do\n"); + LogToBuffer(log_buffer, "Universal: nothing to do\n"); return nullptr; } Version::FileSummaryStorage tmp; - Log(logger_, "Universal: candidate files(%zu): %s\n", - version->files_[level].size(), version->LevelFileSummary(&tmp, 0)); + LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n", + version->files_[level].size(), + version->LevelFileSummary(&tmp, 0)); // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) { - Log(logger_, "Universal: compacting for size amp\n"); + if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != + nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size amp\n"); } else { - // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = options_->compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) { - Log(logger_, "Universal: compacting for size ratio\n"); + if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, + log_buffer)) != nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for size ratio\n"); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force @@ -573,8 +577,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; - if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) { - Log(logger_, "Universal: compacting for file num\n"); + if ((c = PickCompactionUniversalReadAmp( + version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { + LogToBuffer(log_buffer, "Universal: compacting for file num\n"); } } } @@ -623,7 +628,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( Version* version, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact) { + unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { int level = 0; unsigned int min_merge_width = @@ -658,8 +663,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( candidate_count = 1; break; } - Log(logger_, "Universal: file %lu[%d] being compacted, skipping", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, + "Universal: file %lu[%d] being compacted, skipping", + (unsigned long)f->number, loop); f = nullptr; } @@ -667,8 +673,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->file_size : 0; if (f != nullptr) { - Log(logger_, "Universal: Possible candidate file %lu[%d].", - (unsigned long)f->number, loop); + LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].", + (unsigned long)f->number, loop); } // Check if the suceeding files need compaction. @@ -718,9 +724,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( i < loop + candidate_count && i < file_by_time.size(); i++) { int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; - Log(logger_, "Universal: Skipping file %lu[%d] with size %lu %d\n", - (unsigned long)f->number, i, (unsigned long)f->file_size, - f->being_compacted); + LogToBuffer(log_buffer, + "Universal: Skipping file %lu[%d] with size %lu %d\n", + (unsigned long)f->number, i, (unsigned long)f->file_size, + f->being_compacted); } } } @@ -754,8 +761,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(logger_, "Universal: Picking file %lu[%d] with size %lu\n", - (unsigned long)f->number, i, (unsigned long)f->file_size); + LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n", + (unsigned long)f->number, i, (unsigned long)f->file_size); } return c; } @@ -767,7 +774,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - Version* version, double score) { + Version* version, double score, LogBuffer* log_buffer) { int level = 0; // percentage flexibilty while reducing size amplification @@ -791,17 +798,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - Log(logger_, "Universal: skipping file %lu[%d] compacted %s", - (unsigned long)f->number, loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s", + (unsigned long)f->number, loop, + " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - Log(logger_, "Universal: First candidate file %lu[%d] %s", - (unsigned long)f->number, start_index, " to reduce size amp.\n"); + LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s", + (unsigned long)f->number, start_index, " to reduce size amp.\n"); // keep adding up all the remaining files for (unsigned int loop = start_index; loop < file_by_time.size() - 1; @@ -809,7 +816,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = version->files_[level][index]; if (f->being_compacted) { - Log(logger_, "Universal: Possible candidate file %lu[%d] %s.", + LogToBuffer( + log_buffer, "Universal: Possible candidate file %lu[%d] %s.", (unsigned long)f->number, loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; @@ -827,16 +835,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { - Log(logger_, - "Universal: size amp not needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp not needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); return nullptr; } else { - Log(logger_, - "Universal: size amp needed. newer-files-total-size %lu " - "earliest-file-size %lu", - (unsigned long)candidate_size, (unsigned long)earliest_file_size); + LogToBuffer(log_buffer, + "Universal: size amp needed. newer-files-total-size %lu " + "earliest-file-size %lu", + (unsigned long)candidate_size, + (unsigned long)earliest_file_size); } assert(start_index >= 0 && start_index < file_by_time.size() - 1); @@ -850,8 +860,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( int index = file_by_time[loop]; f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - Log(logger_, "Universal: size amp picking file %lu[%d] with size %lu", - (unsigned long)f->number, index, (unsigned long)f->file_size); + LogToBuffer(log_buffer, + "Universal: size amp picking file %lu[%d] with size %lu", + (unsigned long)f->number, index, (unsigned long)f->file_size); } return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c11b64785..164d23b11 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -20,6 +20,7 @@ namespace rocksdb { +class LogBuffer; class Compaction; class Version; @@ -33,7 +34,8 @@ class CompactionPicker { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(Version* version) = 0; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -131,16 +133,19 @@ class UniversalCompactionPicker : public CompactionPicker { UniversalCompactionPicker(const ColumnFamilyOptions* options, const InternalKeyComparator* icmp, Logger* logger) : CompactionPicker(options, icmp, logger) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp(Version* version, double score, unsigned int ratio, - unsigned int num_files); + unsigned int num_files, + LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp(Version* version, double score); + Compaction* PickCompactionUniversalSizeAmp(Version* version, double score, + LogBuffer* log_buffer); }; class LevelCompactionPicker : public CompactionPicker { @@ -148,7 +153,8 @@ class LevelCompactionPicker : public CompactionPicker { LevelCompactionPicker(const ColumnFamilyOptions* options, const InternalKeyComparator* icmp, Logger* logger) : CompactionPicker(options, icmp, logger) {} - virtual Compaction* PickCompaction(Version* version) override; + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; private: // For the specfied level, pick a compaction. diff --git a/db/db_impl.cc b/db/db_impl.cc index 014c2fb20..cc15049cf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1932,57 +1932,61 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); + LogBuffer log_buffer(INFO, options_.info_log); + { + MutexLock l(&mutex_); + // Log(options_.info_log, "XXX BG Thread %llx process new work item", + // pthread_self()); + assert(bg_compaction_scheduled_); + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + Log(options_.info_log, "Waiting after background compaction error: %s", + s.ToString().c_str()); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } - MutexLock l(&mutex_); - // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self()); - assert(bg_compaction_scheduled_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundCompaction(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); + // If !s.ok(), this means that Compaction failed. In that case, we want + // to delete all obsolete files we might have created and we force + // FindObsoleteFiles(). This is because deletion_state does not catch + // all created files if compaction failed. + FindObsoleteFiles(deletion_state, !s.ok()); + + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - - // If !s.ok(), this means that Compaction failed. In that case, we want - // to delete all obsolete files we might have created and we force - // FindObsoleteFiles(). This is because deletion_state does not catch - // all created files if compaction failed. - FindObsoleteFiles(deletion_state, !s.ok()); - - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } - bg_compaction_scheduled_--; + bg_compaction_scheduled_--; - MaybeScheduleLogDBDeployStats(); + MaybeScheduleLogDBDeployStats(); - // Previous compaction may have produced too many files in a level, - // So reschedule another compaction if we made progress in the - // last compaction. - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + // Previous compaction may have produced too many files in a level, + // So reschedule another compaction if we made progress in the + // last compaction. + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); - + log_buffer.FlushBufferToLog(); } Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { *madeProgress = false; mutex_.AssertHeld(); @@ -1999,11 +2003,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!c) { m->done = true; } - Log(options_.info_log, + LogToBuffer( + log_buffer, "Manual compaction from level-%d to level-%d from %s .. %s; will stop " "at %s\n", - m->input_level, - m->output_level, + m->input_level, m->output_level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), ((m->done || manual_end == nullptr) @@ -2013,7 +2017,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) { - c.reset(cfd->PickCompaction()); + c.reset(cfd->PickCompaction(log_buffer)); if (c != nullptr) { // update statistics MeasureTime(options_.statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, @@ -2027,7 +2031,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, Status status; if (!c) { // Nothing to do - Log(options_.info_log, "Compaction nothing to do"); + LogToBuffer(log_buffer, "Compaction nothing to do"); } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -2044,7 +2048,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } Version::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), status.ToString().c_str(), c->input_version()->LevelSummary(&tmp)); @@ -2066,8 +2070,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); + Log(WARN, options_.info_log, "Compaction error: %s", + status.ToString().c_str()); if (options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; } diff --git a/db/db_impl.h b/db/db_impl.h index 128de8ed1..d36f6a41f 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -323,7 +323,8 @@ class DBImpl : public DB { static void BGWorkFlush(void* db); void BackgroundCallCompaction(); void BackgroundCallFlush(); - Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); + Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index c6e836957..4f323b1b7 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -513,7 +513,7 @@ class Directory { virtual Status Fsync() = 0; }; -enum InfoLogLevel { +enum InfoLogLevel : unsigned char { DEBUG = 0, INFO, WARN, @@ -526,7 +526,7 @@ enum InfoLogLevel { class Logger { public: enum { DO_NOT_SUPPORT_GET_LOG_FILE_SIZE = -1 }; - explicit Logger(const InfoLogLevel log_level = InfoLogLevel::ERROR) + explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO) : log_level_(log_level) {} virtual ~Logger(); @@ -543,10 +543,20 @@ class Logger { if (log_level < log_level_) { return; } - char new_format[500]; - snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", - kInfoLogLevelNames[log_level], format); - Logv(new_format, ap); + + if (log_level == INFO) { + // Doesn't print log level if it is INFO level. + // This is to avoid unexpected performance regression after we add + // the feature of log level. All the logs before we add the feature + // are INFO level. We don't want to add extra costs to those existing + // logging. + Logv(format, ap); + } else { + char new_format[500]; + snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", + kInfoLogLevelNames[log_level], format); + Logv(new_format, ap); + } } virtual size_t GetLogFileSize() const { return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; @@ -565,6 +575,26 @@ class Logger { InfoLogLevel log_level_; }; +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); + ~LogBuffer(); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, ...); + + // Flush all buffered log to the info log. + void FlushBufferToLog() const; + + private: + struct Rep; + Rep* rep_; + const InfoLogLevel log_level_; + const shared_ptr& info_log_; +}; // Identifies a locked file. class FileLock { @@ -577,6 +607,9 @@ class FileLock { void operator=(const FileLock&); }; +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); extern void LogFlush(const shared_ptr& info_log); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3bbfc2f6a..8ca9a90a8 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -24,6 +24,7 @@ class CompactionFilter; class CompactionFilterFactory; class Comparator; class Env; +enum InfoLogLevel : unsigned char; class FilterPolicy; class Logger; class MergeOperator; @@ -532,6 +533,8 @@ struct DBOptions { // Default: nullptr shared_ptr info_log; + InfoLogLevel info_log_level; + // Number of open files that can be used by the DB. You may need to // increase this if your database has a large working set. Value -1 means // files opened are always kept open. You can estimate number of files based diff --git a/util/auto_roll_logger.cc b/util/auto_roll_logger.cc index c3d341590..76d410b26 100644 --- a/util/auto_roll_logger.cc +++ b/util/auto_roll_logger.cc @@ -88,7 +88,7 @@ Status CreateLoggerFromOptions( AutoRollLogger* result = new AutoRollLogger( env, dbname, db_log_dir, options.max_log_file_size, - options.log_file_time_to_roll); + options.log_file_time_to_roll, options.info_log_level); Status s = result->GetStatus(); if (!s.ok()) { delete result; @@ -101,7 +101,11 @@ Status CreateLoggerFromOptions( env->CreateDir(dbname); // In case it does not exist env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(), db_absolute_path, db_log_dir)); - return env->NewLogger(fname, logger); + auto s = env->NewLogger(fname, logger); + if (logger->get() != nullptr) { + (*logger)->SetInfoLogLevel(options.info_log_level); + } + return s; } } diff --git a/util/auto_roll_logger.h b/util/auto_roll_logger.h index b0cf1bfce..4c755f2ab 100644 --- a/util/auto_roll_logger.h +++ b/util/auto_roll_logger.h @@ -19,7 +19,7 @@ class AutoRollLogger : public Logger { AutoRollLogger(Env* env, const std::string& dbname, const std::string& db_log_dir, size_t log_max_size, size_t log_file_time_to_roll, - const InfoLogLevel log_level = InfoLogLevel::ERROR) + const InfoLogLevel log_level = InfoLogLevel::INFO) : Logger(log_level), dbname_(dbname), db_log_dir_(db_log_dir), diff --git a/util/env.cc b/util/env.cc index 1d89e15ae..245767acb 100644 --- a/util/env.cc +++ b/util/env.cc @@ -8,7 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/env.h" + +#include #include "rocksdb/options.h" +#include "util/arena.h" +#include "util/autovector.h" namespace rocksdb { @@ -27,9 +31,82 @@ WritableFile::~WritableFile() { Logger::~Logger() { } +// One log entry with its timestamp +struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message +}; + +struct LogBuffer::Rep { + Arena arena_; + autovector logs_; +}; + +// Lazily initialize Rep to avoid allocations when new log is added. +LogBuffer::LogBuffer(const InfoLogLevel log_level, + const shared_ptr& info_log) + : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} + +LogBuffer::~LogBuffer() { delete rep_; } + +void LogBuffer::AddLogToBuffer(const char* format, ...) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + if (rep_ == nullptr) { + rep_ = new Rep(); + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list ap; + va_start(ap, format); + p += vsnprintf(p, limit - p, format, ap); + va_end(ap); + } + + // Add '\0' to the end + *p = '\0'; + + rep_->logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() const { + if (rep_ != nullptr) { + for (BufferedLog* log : rep_->logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + } +} + FileLock::~FileLock() { } +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format); + va_end(ap); + } +} + void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush(); @@ -40,7 +117,7 @@ void Log(Logger* info_log, const char* format, ...) { if (info_log) { va_list ap; va_start(ap, format); - info_log->Logv(format, ap); + info_log->Logv(InfoLogLevel::INFO, format, ap); va_end(ap); } } @@ -163,7 +240,7 @@ void Log(const shared_ptr& info_log, const char* format, ...) { if (info_log) { va_list ap; va_start(ap, format); - info_log->Logv(format, ap); + info_log->Logv(InfoLogLevel::INFO, format, ap); va_end(ap); } } diff --git a/util/env_test.cc b/util/env_test.cc index 828b49a0b..eb2829303 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include @@ -191,6 +192,15 @@ bool IsSingleVarint(const std::string& s) { } #ifdef OS_LINUX +// To make sure the Env::GetUniqueId() related tests work correctly, The files +// should be stored in regular storage like "hard disk" or "flash device". +// Otherwise we cannot get the correct id. +// +// The following function act as the replacement of test::TmpDir() that may be +// customized by user to be on a storage that doesn't work with GetUniqueId(). +// +// TODO(kailiu) This function still assumes /tmp/ reside in regular +// storage system. bool IsUniqueIDValid(const std::string& s) { return !s.empty() && !IsSingleVarint(s); } @@ -198,11 +208,21 @@ bool IsUniqueIDValid(const std::string& s) { const size_t MAX_ID_SIZE = 100; char temp_id[MAX_ID_SIZE]; +std::string GetOnDiskTestDir() { + char base[100]; + snprintf(base, sizeof(base), "/tmp/rocksdbtest-%d", + static_cast(geteuid())); + // Directory may already exist + Env::Default()->CreateDirIfMissing(base); + + return base; +} + // Only works in linux platforms TEST(EnvPosixTest, RandomAccessUniqueID) { // Create file. const EnvOptions soptions; - std::string fname = test::TmpDir() + "/" + "testfile"; + std::string fname = GetOnDiskTestDir() + "/" + "testfile"; unique_ptr wfile; ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); @@ -261,7 +281,7 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Create the files std::vector fnames; for (int i = 0; i < 1000; ++i) { - fnames.push_back(test::TmpDir() + "/" + "testfile" + std::to_string(i)); + fnames.push_back(GetOnDiskTestDir() + "/" + "testfile" + std::to_string(i)); // Create file. unique_ptr wfile; @@ -294,7 +314,8 @@ TEST(EnvPosixTest, RandomAccessUniqueIDConcurrent) { // Only works in linux platforms TEST(EnvPosixTest, RandomAccessUniqueIDDeletes) { const EnvOptions soptions; - std::string fname = test::TmpDir() + "/" + "testfile"; + + std::string fname = GetOnDiskTestDir() + "/" + "testfile"; // Check that after file is deleted we don't get same ID again in a new file. std::unordered_set ids; diff --git a/util/options.cc b/util/options.cc index 63bedb858..cfa952dfb 100644 --- a/util/options.cc +++ b/util/options.cc @@ -148,6 +148,7 @@ DBOptions::DBOptions() paranoid_checks(false), env(Env::Default()), info_log(nullptr), + info_log_level(INFO), max_open_files(1000), statistics(nullptr), disableDataSync(false), @@ -185,6 +186,7 @@ DBOptions::DBOptions(const Options& options) paranoid_checks(options.paranoid_checks), env(options.env), info_log(options.info_log), + info_log_level(options.info_log_level), max_open_files(options.max_open_files), statistics(options.statistics), disableDataSync(options.disableDataSync),