From 444cf88a56466c6206d71453b8fca6fae8a3cde2 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 7 Nov 2013 11:31:56 -0800 Subject: [PATCH] Flush the log outside of lock Summary: Added a new call LogFlush() that flushes the log contents to the OS buffers. We never call it with lock held. We call it once for every Read/Write and often in compaction/flush process so the frequency should not be a problem. Test Plan: db_test Reviewers: dhruba, haobo, kailiu, emayanke Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13935 --- db/db_impl.cc | 18 ++++++++++++++++++ db/version_set.cc | 1 + include/rocksdb/env.h | 7 +++++++ util/env.cc | 12 ++++++++++++ util/posix_logger.h | 19 ++++++++++++------- 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 6fd319698..ac1f84dc3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -311,6 +311,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) } last_log_ts = 0; + LogFlush(options_.info_log); } DBImpl::~DBImpl() { @@ -333,6 +334,7 @@ DBImpl::~DBImpl() { if (mem_ != nullptr) mem_->Unref(); imm_.UnrefAll(); + LogFlush(options_.info_log); } // Do not flush and close database elegantly. Simulate a crash. @@ -354,6 +356,7 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_compaction_scheduled_ += LargeNumber; mutex_.Unlock(); + LogFlush(options_.info_log); // force release the lock file. if (db_lock_ != nullptr) { @@ -967,6 +970,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, earliest_seqno_in_memtable, true); + LogFlush(options_.info_log); mutex_.Lock(); } @@ -1034,6 +1038,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, earliest_seqno_in_memtable, enable_compression); + LogFlush(options_.info_log); mutex_.Lock(); } base->Unref(); @@ -1153,6 +1158,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) { } mutex_.Unlock(); DeleteLogFile(log_num); + LogFlush(options_.info_log); mutex_.Lock(); } } @@ -1180,6 +1186,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end, if (reduce_level) { ReFitLevel(max_level_with_files, target_level); } + LogFlush(options_.info_log); } // return the same level if it cannot be moved @@ -1638,6 +1645,7 @@ void DBImpl::BackgroundCallFlush() { Log(options_.info_log, "Waiting after background flush error: %s", s.ToString().c_str()); mutex_.Unlock(); + LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); } @@ -1675,6 +1683,7 @@ void DBImpl::BackgroundCallCompaction() { Log(options_.info_log, "Waiting after background compaction error: %s", s.ToString().c_str()); mutex_.Unlock(); + LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); } @@ -1685,6 +1694,7 @@ void DBImpl::BackgroundCallCompaction() { mutex_.Unlock(); PurgeObsoleteFiles(deletion_state); EvictObsoleteFiles(deletion_state); + LogFlush(options_.info_log); mutex_.Lock(); } @@ -1905,6 +1915,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->builder.reset( GetTableBuilder(options_, compact->outfile.get(), compression_type)); } + LogFlush(options_.info_log); return s; } @@ -2102,6 +2113,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // TODO: remove memtable flush from normal compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); + LogFlush(options_.info_log); mutex_.Lock(); if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { FlushMemTableToOutputFile(); @@ -2395,6 +2407,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { stats.bytes_written += compact->outputs[i].file_size; } + LogFlush(options_.info_log); mutex_.Lock(); stats_[compact->compaction->output_level()].Add(stats); @@ -2479,6 +2492,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); mutex_.Unlock(); + LogFlush(options_.info_log); return internal_iter; } @@ -2553,6 +2567,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, current->Unref(); mutex_.Unlock(); + LogFlush(options_.info_log); // Note, tickers are atomic now - no lock protection needed any more. RecordTick(options_.statistics, NUMBER_KEYS_READ); RecordTick(options_.statistics, BYTES_READ, value->size()); @@ -2631,6 +2646,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, current->Unref(); mutex_.Unlock(); + LogFlush(options_.info_log); RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); @@ -2770,6 +2786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence); } + LogFlush(options_.info_log); mutex_.Lock(); if (status.ok()) { versions_->SetLastSequence(last_sequence); @@ -3358,6 +3375,7 @@ Status DBImpl::DeleteFile(std::string name) { FindObsoleteFiles(deletion_state); } } // lock released here + LogFlush(options_.info_log); if (status.ok()) { // remove files outside the db-lock diff --git a/db/version_set.cc b/db/version_set.cc index 55293ca2e..1745922bb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1318,6 +1318,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // find offset in manifest file where this version is stored. new_manifest_file_size = descriptor_log_->file()->GetFileSize(); + LogFlush(options_->info_log); mu->Lock(); // cache the manifest_file_size so that it can be used to rollover in the // next call to LogAndApply diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index c9b2befa5..824e61e58 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -497,6 +497,8 @@ class Logger { virtual size_t GetLogFileSize() const { return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; } + // Flush to the OS buffers + virtual void Flush() {} private: // No copying allowed @@ -516,6 +518,9 @@ class FileLock { void operator=(const FileLock&); }; + +extern void LogFlush(const shared_ptr& info_log); + // Log the specified data to *info_log if info_log is non-nullptr. extern void Log(const shared_ptr& info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) @@ -523,6 +528,8 @@ extern void Log(const shared_ptr& info_log, const char* format, ...) # endif ; +extern void LogFlush(Logger *info_log); + extern void Log(Logger* info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) __attribute__((__format__ (__printf__, 2, 3))) diff --git a/util/env.cc b/util/env.cc index cf6273034..bd19d48eb 100644 --- a/util/env.cc +++ b/util/env.cc @@ -30,6 +30,12 @@ Logger::~Logger() { FileLock::~FileLock() { } +void LogFlush(Logger *info_log) { + if (info_log) { + info_log->Flush(); + } +} + void Log(Logger* info_log, const char* format, ...) { if (info_log) { va_list ap; @@ -39,6 +45,12 @@ void Log(Logger* info_log, const char* format, ...) { } } +void LogFlush(const shared_ptr& info_log) { + if (info_log) { + info_log->Flush(); + } +} + void Log(const shared_ptr& info_log, const char* format, ...) { if (info_log) { va_list ap; diff --git a/util/posix_logger.h b/util/posix_logger.h index 482ec3786..0a09bd1eb 100644 --- a/util/posix_logger.h +++ b/util/posix_logger.h @@ -33,8 +33,8 @@ class PosixLogger : public Logger { uint64_t (*gettid_)(); // Return the thread id for the current thread std::atomic_size_t log_size_; int fd_; - const static uint64_t flush_every_seconds_ = 0; - uint64_t last_flush_micros_; + const static uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; Env* env_; public: PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) : @@ -43,6 +43,10 @@ class PosixLogger : public Logger { virtual ~PosixLogger() { fclose(file_); } + virtual void Flush() { + fflush(file_); + last_flush_micros_ = env_->NowMicros(); + } virtual void Logv(const char* format, va_list ap) { const uint64_t thread_id = (*gettid_)(); @@ -122,13 +126,14 @@ class PosixLogger : public Logger { size_t sz = fwrite(base, 1, write_size, file_); assert(sz == write_size); if (sz > 0) { - if (env_->NowMicros() - last_flush_micros_ >= - flush_every_seconds_ * 1000000) { - fflush(file_); - last_flush_micros_ = env_->NowMicros(); - } log_size_ += write_size; } + uint64_t now_micros = static_cast(now_tv.tv_sec) * 1000000 + + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + fflush(file_); + last_flush_micros_ = now_micros; + } if (base != buffer) { delete[] base; }