From cc2099803a1de4dab8aa748cb26b2650e740d197 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 1 Aug 2022 14:37:18 -0700 Subject: [PATCH] Use EnvLogger instead of PosixLogger (#10436) Summary: EnvLogger was built to replace PosixLogger that supports multiple Envs. Make FileSystem use EnvLogger by default, remove Posix FS specific implementation and remove PosixLogger code, Some hacky changes are made to make sure iostats are not polluted by logging, in order to pass existing unit tests. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10436 Test Plan: Run db_bench and watch info log files. Reviewed By: anand1976 Differential Revision: D38259855 fbshipit-source-id: 67d65874bfba7a33535b6d0dd0ed92cbbc9888b8 --- HISTORY.md | 1 + env/env_posix.cc | 2 +- env/file_system.cc | 20 ++++ env/fs_posix.cc | 44 -------- include/rocksdb/file_system.h | 2 +- include/rocksdb/iostats_context.h | 7 ++ logging/auto_roll_logger_test.cc | 17 +-- logging/env_logger.h | 52 ++++++--- logging/posix_logger.h | 179 ------------------------------ monitoring/iostats_context_imp.h | 5 +- port/util_logger.h | 4 +- 11 files changed, 83 insertions(+), 250 deletions(-) delete mode 100644 logging/posix_logger.h diff --git a/HISTORY.md b/HISTORY.md index 991e30f2a..01a1c4d58 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ ### Behavior Change * Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery. +* PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. ## 7.5.0 (07/15/2022) ### New Features diff --git a/env/env_posix.cc b/env/env_posix.cc index 1e648bc94..77f28e1f5 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -55,10 +55,10 @@ #include "env/composite_env_wrapper.h" #include "env/io_posix.h" -#include "logging/posix_logger.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_updater.h" #include "port/port.h" +#include "port/sys_time.h" #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" diff --git a/env/file_system.cc b/env/file_system.cc index e1cb19c01..ab5b3c450 100644 --- a/env/file_system.cc +++ b/env/file_system.cc @@ -10,6 +10,7 @@ #include "env/env_encryption_ctr.h" #include "env/fs_readonly.h" #include "env/mock_env.h" +#include "logging/env_logger.h" #include "options/db_options.h" #include "rocksdb/convenience.h" #include "rocksdb/utilities/customizable_util.h" @@ -115,6 +116,25 @@ IOStatus FileSystem::ReuseWritableFile(const std::string& fname, return NewWritableFile(fname, opts, result, dbg); } +IOStatus FileSystem::NewLogger(const std::string& fname, + const IOOptions& io_opts, + std::shared_ptr* result, + IODebugContext* dbg) { + FileOptions options; + options.io_options = io_opts; + // TODO: Tune the buffer size. + options.writable_file_max_buffer_size = 1024 * 1024; + std::unique_ptr writable_file; + const IOStatus status = NewWritableFile(fname, options, &writable_file, dbg); + if (!status.ok()) { + return status; + } + + *result = std::make_shared(std::move(writable_file), fname, + options, Env::Default()); + return IOStatus::OK(); +} + FileOptions FileSystem::OptimizeForLogRead( const FileOptions& file_options) const { FileOptions optimized_file_options(file_options); diff --git a/env/fs_posix.cc b/env/fs_posix.cc index 7dba5b81c..8a742da5d 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -48,7 +48,6 @@ #include "env/composite_env_wrapper.h" #include "env/io_posix.h" -#include "logging/posix_logger.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_updater.h" #include "port/lang.h" @@ -84,8 +83,6 @@ inline mode_t GetDBFileMode(bool allow_non_owner_access) { return allow_non_owner_access ? 0644 : 0600; } -static uint64_t gettid() { return Env::Default()->GetThreadID(); } - // list of pathnames that are locked // Only used for error message. struct LockHoldingInfo { @@ -555,47 +552,6 @@ class PosixFileSystem : public FileSystem { return IOStatus::OK(); } - IOStatus NewLogger(const std::string& fname, const IOOptions& /*opts*/, - std::shared_ptr* result, - IODebugContext* /*dbg*/) override { - FILE* f = nullptr; - int fd; - { - IOSTATS_TIMER_GUARD(open_nanos); - fd = open(fname.c_str(), - cloexec_flags(O_WRONLY | O_CREAT | O_TRUNC, nullptr), - GetDBFileMode(allow_non_owner_access_)); - if (fd != -1) { - f = fdopen(fd, - "w" -#ifdef __GLIBC_PREREQ -#if __GLIBC_PREREQ(2, 7) - "e" // glibc extension to enable O_CLOEXEC -#endif -#endif - ); - } - } - if (fd == -1) { - result->reset(); - return status_to_io_status( - IOError("when open a file for new logger", fname, errno)); - } - if (f == nullptr) { - close(fd); - result->reset(); - return status_to_io_status( - IOError("when fdopen a file for new logger", fname, errno)); - } else { -#ifdef ROCKSDB_FALLOCATE_PRESENT - fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024); -#endif - SetFD_CLOEXEC(fd, nullptr); - result->reset(new PosixLogger(f, &gettid, Env::Default())); - return IOStatus::OK(); - } - } - IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { int result = access(fname.c_str(), F_OK); diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 45cde241d..ee8362eab 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -581,7 +581,7 @@ class FileSystem : public Customizable { // logger. virtual IOStatus NewLogger(const std::string& fname, const IOOptions& io_opts, std::shared_ptr* result, - IODebugContext* dbg) = 0; + IODebugContext* dbg); // Get full directory name for this db. virtual IOStatus GetAbsolutePath(const std::string& db_path, diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h index d6741c32a..0437b80dc 100644 --- a/include/rocksdb/iostats_context.h +++ b/include/rocksdb/iostats_context.h @@ -76,6 +76,13 @@ struct IOStatsContext { uint64_t cpu_read_nanos; FileIOByTemperature file_io_stats_by_temperature; + + // It is not consistent that whether iostats follows PerfLevel.Timer counters + // follows it but BackupEngine relies on counter metrics to always be there. + // Here we create a backdoor option to disable some counters, so that some + // existing stats are not polluted by file operations, such as logging, by + // turning this off. + bool disable_iostats = false; }; // If RocksDB is compiled with -DNIOSTATS_CONTEXT, then a pointer to a global, diff --git a/logging/auto_roll_logger_test.cc b/logging/auto_roll_logger_test.cc index 89beaaf50..a3a89ad91 100644 --- a/logging/auto_roll_logger_test.cc +++ b/logging/auto_roll_logger_test.cc @@ -21,6 +21,7 @@ #include "db/db_test_util.h" #include "env/emulated_clock.h" +#include "logging/env_logger.h" #include "logging/logging.h" #include "port/port.h" #include "rocksdb/db.h" @@ -299,7 +300,7 @@ TEST_F(AutoRollLoggerTest, CompositeRollByTimeAndSizeLogger) { } #ifndef OS_WIN -// TODO: does not build for Windows because of PosixLogger use below. Need to +// TODO: does not build for Windows because of EnvLogger use below. Need to // port TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) { DBOptions options; @@ -311,7 +312,7 @@ TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) { // Normal logger ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); - ASSERT_TRUE(dynamic_cast(logger.get())); + ASSERT_TRUE(dynamic_cast(logger.get())); // Only roll by size InitTestDb(); @@ -467,20 +468,20 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) { // (1) Need to pin the old logger before beginning the roll, as rolling grabs // the mutex, which would prevent us from accessing the old logger. This // also marks flush_thread with AutoRollLogger::Flush:PinnedLogger. - // (2) Need to reset logger during PosixLogger::Flush() to exercise a race + // (2) Need to reset logger during EnvLogger::Flush() to exercise a race // condition case, which is executing the flush with the pinned (old) // logger after auto-roll logger has cut over to a new logger. - // (3) PosixLogger::Flush() happens in both threads but its SyncPoints only + // (3) EnvLogger::Flush() happens in both threads but its SyncPoints only // are enabled in flush_thread (the one pinning the old logger). ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers( {{"AutoRollLogger::Flush:PinnedLogger", "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"}, - {"PosixLogger::Flush:Begin1", + {"EnvLogger::Flush:Begin1", "AutoRollLogger::ResetLogger:BeforeNewLogger"}, {"AutoRollLogger::ResetLogger:AfterNewLogger", - "PosixLogger::Flush:Begin2"}}, - {{"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin1"}, - {"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}}); + "EnvLogger::Flush:Begin2"}}, + {{"AutoRollLogger::Flush:PinnedLogger", "EnvLogger::Flush:Begin1"}, + {"AutoRollLogger::Flush:PinnedLogger", "EnvLogger::Flush:Begin2"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); flush_thread = port::Thread([&]() { auto_roll_logger->Flush(); }); diff --git a/logging/env_logger.h b/logging/env_logger.h index ac62c6f1a..b9099a834 100644 --- a/logging/env_logger.h +++ b/logging/env_logger.h @@ -12,13 +12,16 @@ #pragma once #include + #include #include -#include "port/sys_time.h" #include "file/writable_file_writer.h" #include "monitoring/iostats_context_imp.h" +#include "port/sys_time.h" #include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/perf_level.h" #include "rocksdb/slice.h" #include "test_util/sync_point.h" #include "util/mutexlock.h" @@ -45,6 +48,29 @@ class EnvLogger : public Logger { } private: + // A guard to prepare file operations, such as mutex and skip + // I/O context. + class FileOpGuard { + public: + explicit FileOpGuard(EnvLogger& logger) + : logger_(logger), prev_perf_level_(GetPerfLevel()) { + // Preserve iostats not to pollute writes from user writes. We might + // need a better solution than this. + SetPerfLevel(PerfLevel::kDisable); + iostats_context.disable_iostats = true; + logger.mutex_.Lock(); + } + ~FileOpGuard() { + logger_.mutex_.Unlock(); + iostats_context.disable_iostats = false; + SetPerfLevel(prev_perf_level_); + } + + private: + EnvLogger& logger_; + PerfLevel prev_perf_level_; + }; + void FlushLocked() { mutex_.AssertHeld(); if (flush_pending_) { @@ -58,16 +84,15 @@ class EnvLogger : public Logger { TEST_SYNC_POINT("EnvLogger::Flush:Begin1"); TEST_SYNC_POINT("EnvLogger::Flush:Begin2"); - MutexLock l(&mutex_); + FileOpGuard guard(*this); FlushLocked(); } Status CloseImpl() override { return CloseHelper(); } Status CloseHelper() { - mutex_.Lock(); + FileOpGuard guard(*this); const auto close_status = file_.Close(); - mutex_.Unlock(); if (close_status.ok()) { return close_status; @@ -105,7 +130,7 @@ class EnvLogger : public Logger { const time_t seconds = now_tv.tv_sec; struct tm t; port::LocalTimeR(&seconds, &t); - p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, static_cast(now_tv.tv_usec), static_cast(thread_id)); @@ -133,15 +158,16 @@ class EnvLogger : public Logger { } assert(p <= limit); - mutex_.Lock(); - // We will ignore any error returned by Append(). - file_.Append(Slice(base, p - base)).PermitUncheckedError(); - flush_pending_ = true; - const uint64_t now_micros = clock_->NowMicros(); - if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { - FlushLocked(); + { + FileOpGuard guard(*this); + // We will ignore any error returned by Append(). + file_.Append(Slice(base, p - base)).PermitUncheckedError(); + flush_pending_ = true; + const uint64_t now_micros = clock_->NowMicros(); + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + FlushLocked(); + } } - mutex_.Unlock(); if (base != buffer) { delete[] base; } diff --git a/logging/posix_logger.h b/logging/posix_logger.h deleted file mode 100644 index fa02dd752..000000000 --- a/logging/posix_logger.h +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. -// -// Logger implementation that can be shared by all environments -// where enough posix functionality is available. - -#pragma once -#include -#include -#include "port/sys_time.h" -#include -#include - -#ifdef OS_LINUX -#ifndef FALLOC_FL_KEEP_SIZE -#include -#endif -#endif - -#include -#include "env/io_posix.h" -#include "monitoring/iostats_context_imp.h" -#include "rocksdb/env.h" -#include "test_util/sync_point.h" - -namespace ROCKSDB_NAMESPACE { - -class PosixLogger : public Logger { - private: - Status PosixCloseHelper() { - int ret; - - ret = fclose(file_); - if (ret) { - return IOError("Unable to close log file", "", ret); - } - return Status::OK(); - } - FILE* file_; - uint64_t (*gettid_)(); // Return the thread id for the current thread - std::atomic_size_t log_size_; - int fd_; - const static uint64_t flush_every_seconds_ = 5; - std::atomic_uint_fast64_t last_flush_micros_; - Env* env_; - std::atomic flush_pending_; - - protected: - virtual Status CloseImpl() override { return PosixCloseHelper(); } - - public: - PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, - const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) - : Logger(log_level), - file_(f), - gettid_(gettid), - log_size_(0), - fd_(fileno(f)), - last_flush_micros_(0), - env_(env), - flush_pending_(false) {} - virtual ~PosixLogger() { - if (!closed_) { - closed_ = true; - PosixCloseHelper().PermitUncheckedError(); - } - } - virtual void Flush() override { - TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); - TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); - if (flush_pending_) { - flush_pending_ = false; - fflush(file_); - } - last_flush_micros_ = env_->NowMicros(); - } - - using Logger::Logv; - virtual void Logv(const char* format, va_list ap) override { - IOSTATS_TIMER_GUARD(logger_nanos); - - const uint64_t thread_id = (*gettid_)(); - - // We try twice: the first time with a fixed-size stack allocated buffer, - // and the second time with a much larger dynamically allocated buffer. - char buffer[500]; - for (int iter = 0; iter < 2; iter++) { - char* base; - int bufsize; - if (iter == 0) { - bufsize = sizeof(buffer); - base = buffer; - } else { - bufsize = 65536; - base = new char[bufsize]; - } - char* p = base; - char* limit = base + bufsize; - - port::TimeVal now_tv; - port::GetTimeOfDay(&now_tv, nullptr); - const time_t seconds = now_tv.tv_sec; - struct tm t; - port::LocalTimeR(&seconds, &t); - p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ", - t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, - t.tm_min, t.tm_sec, static_cast(now_tv.tv_usec), - static_cast(thread_id)); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Truncate to available space if necessary - if (p >= limit) { - if (iter == 0) { - continue; // Try again with larger buffer - } else { - p = limit - 1; - } - } - - // Add newline if necessary - if (p == base || p[-1] != '\n') { - *p++ = '\n'; - } - - assert(p <= limit); - const size_t write_size = p - base; - -#ifdef ROCKSDB_FALLOCATE_PRESENT - const int kDebugLogChunkSize = 128 * 1024; - - // If this write would cross a boundary of kDebugLogChunkSize - // space, pre-allocate more space to avoid overly large - // allocations from filesystem allocsize options. - const size_t log_size = log_size_; - const size_t last_allocation_chunk = - ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize); - const size_t desired_allocation_chunk = - ((kDebugLogChunkSize - 1 + log_size + write_size) / - kDebugLogChunkSize); - if (last_allocation_chunk != desired_allocation_chunk) { - fallocate( - fd_, FALLOC_FL_KEEP_SIZE, 0, - static_cast(desired_allocation_chunk * kDebugLogChunkSize)); - } -#endif - - size_t sz = fwrite(base, 1, write_size, file_); - flush_pending_ = true; - if (sz > 0) { - log_size_ += write_size; - } - uint64_t now_micros = static_cast(now_tv.tv_sec) * 1000000 + - now_tv.tv_usec; - if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { - Flush(); - } - if (base != buffer) { - delete[] base; - } - break; - } - } - size_t GetLogFileSize() const override { return log_size_; } -}; - -} // namespace ROCKSDB_NAMESPACE diff --git a/monitoring/iostats_context_imp.h b/monitoring/iostats_context_imp.h index 7a3e7d33b..42b8a82fc 100644 --- a/monitoring/iostats_context_imp.h +++ b/monitoring/iostats_context_imp.h @@ -13,7 +13,10 @@ extern thread_local IOStatsContext iostats_context; } // namespace ROCKSDB_NAMESPACE // increment a specific counter by the specified value -#define IOSTATS_ADD(metric, value) (iostats_context.metric += value) +#define IOSTATS_ADD(metric, value) \ + if (!iostats_context.disable_iostats) { \ + iostats_context.metric += value; \ + } // reset a specific counter to zero #define IOSTATS_RESET(metric) (iostats_context.metric = 0) diff --git a/port/util_logger.h b/port/util_logger.h index d2d62a987..ce7e3a941 100644 --- a/port/util_logger.h +++ b/port/util_logger.h @@ -13,8 +13,6 @@ // porting to a new platform, see "port_example.h" for documentation // of what the new port_.h file must provide. -#if defined(ROCKSDB_PLATFORM_POSIX) -#include "logging/posix_logger.h" -#elif defined(OS_WIN) +#if defined(OS_WIN) #include "port/win/win_logger.h" #endif