Use EnvLogger instead of PosixLogger (#10436)

Summary:
EnvLogger was built to replace PosixLogger that supports multiple Envs. Make FileSystem use EnvLogger by default, remove Posix FS specific implementation and remove PosixLogger code,
Some hacky changes are made to make sure iostats are not polluted by logging, in order to pass existing unit tests.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10436

Test Plan: Run db_bench and watch info log files.

Reviewed By: anand1976

Differential Revision: D38259855

fbshipit-source-id: 67d65874bfba7a33535b6d0dd0ed92cbbc9888b8
main
sdong 2 years ago committed by Facebook GitHub Bot
parent e1b176d274
commit cc2099803a
  1. 1
      HISTORY.md
  2. 2
      env/env_posix.cc
  3. 20
      env/file_system.cc
  4. 44
      env/fs_posix.cc
  5. 2
      include/rocksdb/file_system.h
  6. 7
      include/rocksdb/iostats_context.h
  7. 17
      logging/auto_roll_logger_test.cc
  8. 40
      logging/env_logger.h
  9. 179
      logging/posix_logger.h
  10. 5
      monitoring/iostats_context_imp.h
  11. 4
      port/util_logger.h

@ -21,6 +21,7 @@
### Behavior Change ### Behavior Change
* Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery. * Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery.
* PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env.
## 7.5.0 (07/15/2022) ## 7.5.0 (07/15/2022)
### New Features ### New Features

2
env/env_posix.cc vendored

@ -55,10 +55,10 @@
#include "env/composite_env_wrapper.h" #include "env/composite_env_wrapper.h"
#include "env/io_posix.h" #include "env/io_posix.h"
#include "logging/posix_logger.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_updater.h"
#include "port/port.h" #include "port/port.h"
#include "port/sys_time.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"

20
env/file_system.cc vendored

@ -10,6 +10,7 @@
#include "env/env_encryption_ctr.h" #include "env/env_encryption_ctr.h"
#include "env/fs_readonly.h" #include "env/fs_readonly.h"
#include "env/mock_env.h" #include "env/mock_env.h"
#include "logging/env_logger.h"
#include "options/db_options.h" #include "options/db_options.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/utilities/customizable_util.h" #include "rocksdb/utilities/customizable_util.h"
@ -115,6 +116,25 @@ IOStatus FileSystem::ReuseWritableFile(const std::string& fname,
return NewWritableFile(fname, opts, result, dbg); return NewWritableFile(fname, opts, result, dbg);
} }
IOStatus FileSystem::NewLogger(const std::string& fname,
const IOOptions& io_opts,
std::shared_ptr<Logger>* result,
IODebugContext* dbg) {
FileOptions options;
options.io_options = io_opts;
// TODO: Tune the buffer size.
options.writable_file_max_buffer_size = 1024 * 1024;
std::unique_ptr<FSWritableFile> writable_file;
const IOStatus status = NewWritableFile(fname, options, &writable_file, dbg);
if (!status.ok()) {
return status;
}
*result = std::make_shared<EnvLogger>(std::move(writable_file), fname,
options, Env::Default());
return IOStatus::OK();
}
FileOptions FileSystem::OptimizeForLogRead( FileOptions FileSystem::OptimizeForLogRead(
const FileOptions& file_options) const { const FileOptions& file_options) const {
FileOptions optimized_file_options(file_options); FileOptions optimized_file_options(file_options);

44
env/fs_posix.cc vendored

@ -48,7 +48,6 @@
#include "env/composite_env_wrapper.h" #include "env/composite_env_wrapper.h"
#include "env/io_posix.h" #include "env/io_posix.h"
#include "logging/posix_logger.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_updater.h"
#include "port/lang.h" #include "port/lang.h"
@ -84,8 +83,6 @@ inline mode_t GetDBFileMode(bool allow_non_owner_access) {
return allow_non_owner_access ? 0644 : 0600; return allow_non_owner_access ? 0644 : 0600;
} }
static uint64_t gettid() { return Env::Default()->GetThreadID(); }
// list of pathnames that are locked // list of pathnames that are locked
// Only used for error message. // Only used for error message.
struct LockHoldingInfo { struct LockHoldingInfo {
@ -555,47 +552,6 @@ class PosixFileSystem : public FileSystem {
return IOStatus::OK(); return IOStatus::OK();
} }
IOStatus NewLogger(const std::string& fname, const IOOptions& /*opts*/,
std::shared_ptr<Logger>* result,
IODebugContext* /*dbg*/) override {
FILE* f = nullptr;
int fd;
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(),
cloexec_flags(O_WRONLY | O_CREAT | O_TRUNC, nullptr),
GetDBFileMode(allow_non_owner_access_));
if (fd != -1) {
f = fdopen(fd,
"w"
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 7)
"e" // glibc extension to enable O_CLOEXEC
#endif
#endif
);
}
}
if (fd == -1) {
result->reset();
return status_to_io_status(
IOError("when open a file for new logger", fname, errno));
}
if (f == nullptr) {
close(fd);
result->reset();
return status_to_io_status(
IOError("when fdopen a file for new logger", fname, errno));
} else {
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
#endif
SetFD_CLOEXEC(fd, nullptr);
result->reset(new PosixLogger(f, &gettid, Env::Default()));
return IOStatus::OK();
}
}
IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/, IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override { IODebugContext* /*dbg*/) override {
int result = access(fname.c_str(), F_OK); int result = access(fname.c_str(), F_OK);

@ -581,7 +581,7 @@ class FileSystem : public Customizable {
// logger. // logger.
virtual IOStatus NewLogger(const std::string& fname, const IOOptions& io_opts, virtual IOStatus NewLogger(const std::string& fname, const IOOptions& io_opts,
std::shared_ptr<Logger>* result, std::shared_ptr<Logger>* result,
IODebugContext* dbg) = 0; IODebugContext* dbg);
// Get full directory name for this db. // Get full directory name for this db.
virtual IOStatus GetAbsolutePath(const std::string& db_path, virtual IOStatus GetAbsolutePath(const std::string& db_path,

@ -76,6 +76,13 @@ struct IOStatsContext {
uint64_t cpu_read_nanos; uint64_t cpu_read_nanos;
FileIOByTemperature file_io_stats_by_temperature; FileIOByTemperature file_io_stats_by_temperature;
// It is not consistent that whether iostats follows PerfLevel.Timer counters
// follows it but BackupEngine relies on counter metrics to always be there.
// Here we create a backdoor option to disable some counters, so that some
// existing stats are not polluted by file operations, such as logging, by
// turning this off.
bool disable_iostats = false;
}; };
// If RocksDB is compiled with -DNIOSTATS_CONTEXT, then a pointer to a global, // If RocksDB is compiled with -DNIOSTATS_CONTEXT, then a pointer to a global,

@ -21,6 +21,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "env/emulated_clock.h" #include "env/emulated_clock.h"
#include "logging/env_logger.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -299,7 +300,7 @@ TEST_F(AutoRollLoggerTest, CompositeRollByTimeAndSizeLogger) {
} }
#ifndef OS_WIN #ifndef OS_WIN
// TODO: does not build for Windows because of PosixLogger use below. Need to // TODO: does not build for Windows because of EnvLogger use below. Need to
// port // port
TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) { TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) {
DBOptions options; DBOptions options;
@ -311,7 +312,7 @@ TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) {
// Normal logger // Normal logger
ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger));
ASSERT_TRUE(dynamic_cast<PosixLogger*>(logger.get())); ASSERT_TRUE(dynamic_cast<EnvLogger*>(logger.get()));
// Only roll by size // Only roll by size
InitTestDb(); InitTestDb();
@ -467,20 +468,20 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) {
// (1) Need to pin the old logger before beginning the roll, as rolling grabs // (1) Need to pin the old logger before beginning the roll, as rolling grabs
// the mutex, which would prevent us from accessing the old logger. This // the mutex, which would prevent us from accessing the old logger. This
// also marks flush_thread with AutoRollLogger::Flush:PinnedLogger. // also marks flush_thread with AutoRollLogger::Flush:PinnedLogger.
// (2) Need to reset logger during PosixLogger::Flush() to exercise a race // (2) Need to reset logger during EnvLogger::Flush() to exercise a race
// condition case, which is executing the flush with the pinned (old) // condition case, which is executing the flush with the pinned (old)
// logger after auto-roll logger has cut over to a new logger. // logger after auto-roll logger has cut over to a new logger.
// (3) PosixLogger::Flush() happens in both threads but its SyncPoints only // (3) EnvLogger::Flush() happens in both threads but its SyncPoints only
// are enabled in flush_thread (the one pinning the old logger). // are enabled in flush_thread (the one pinning the old logger).
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
{{"AutoRollLogger::Flush:PinnedLogger", {{"AutoRollLogger::Flush:PinnedLogger",
"AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"}, "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"},
{"PosixLogger::Flush:Begin1", {"EnvLogger::Flush:Begin1",
"AutoRollLogger::ResetLogger:BeforeNewLogger"}, "AutoRollLogger::ResetLogger:BeforeNewLogger"},
{"AutoRollLogger::ResetLogger:AfterNewLogger", {"AutoRollLogger::ResetLogger:AfterNewLogger",
"PosixLogger::Flush:Begin2"}}, "EnvLogger::Flush:Begin2"}},
{{"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin1"}, {{"AutoRollLogger::Flush:PinnedLogger", "EnvLogger::Flush:Begin1"},
{"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}}); {"AutoRollLogger::Flush:PinnedLogger", "EnvLogger::Flush:Begin2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
flush_thread = port::Thread([&]() { auto_roll_logger->Flush(); }); flush_thread = port::Thread([&]() { auto_roll_logger->Flush(); });

@ -12,13 +12,16 @@
#pragma once #pragma once
#include <time.h> #include <time.h>
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include "port/sys_time.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "port/sys_time.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/perf_level.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -45,6 +48,29 @@ class EnvLogger : public Logger {
} }
private: private:
// A guard to prepare file operations, such as mutex and skip
// I/O context.
class FileOpGuard {
public:
explicit FileOpGuard(EnvLogger& logger)
: logger_(logger), prev_perf_level_(GetPerfLevel()) {
// Preserve iostats not to pollute writes from user writes. We might
// need a better solution than this.
SetPerfLevel(PerfLevel::kDisable);
iostats_context.disable_iostats = true;
logger.mutex_.Lock();
}
~FileOpGuard() {
logger_.mutex_.Unlock();
iostats_context.disable_iostats = false;
SetPerfLevel(prev_perf_level_);
}
private:
EnvLogger& logger_;
PerfLevel prev_perf_level_;
};
void FlushLocked() { void FlushLocked() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (flush_pending_) { if (flush_pending_) {
@ -58,16 +84,15 @@ class EnvLogger : public Logger {
TEST_SYNC_POINT("EnvLogger::Flush:Begin1"); TEST_SYNC_POINT("EnvLogger::Flush:Begin1");
TEST_SYNC_POINT("EnvLogger::Flush:Begin2"); TEST_SYNC_POINT("EnvLogger::Flush:Begin2");
MutexLock l(&mutex_); FileOpGuard guard(*this);
FlushLocked(); FlushLocked();
} }
Status CloseImpl() override { return CloseHelper(); } Status CloseImpl() override { return CloseHelper(); }
Status CloseHelper() { Status CloseHelper() {
mutex_.Lock(); FileOpGuard guard(*this);
const auto close_status = file_.Close(); const auto close_status = file_.Close();
mutex_.Unlock();
if (close_status.ok()) { if (close_status.ok()) {
return close_status; return close_status;
@ -105,7 +130,7 @@ class EnvLogger : public Logger {
const time_t seconds = now_tv.tv_sec; const time_t seconds = now_tv.tv_sec;
struct tm t; struct tm t;
port::LocalTimeR(&seconds, &t); port::LocalTimeR(&seconds, &t);
p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec), t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id)); static_cast<long long unsigned int>(thread_id));
@ -133,7 +158,8 @@ class EnvLogger : public Logger {
} }
assert(p <= limit); assert(p <= limit);
mutex_.Lock(); {
FileOpGuard guard(*this);
// We will ignore any error returned by Append(). // We will ignore any error returned by Append().
file_.Append(Slice(base, p - base)).PermitUncheckedError(); file_.Append(Slice(base, p - base)).PermitUncheckedError();
flush_pending_ = true; flush_pending_ = true;
@ -141,7 +167,7 @@ class EnvLogger : public Logger {
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
FlushLocked(); FlushLocked();
} }
mutex_.Unlock(); }
if (base != buffer) { if (base != buffer) {
delete[] base; delete[] base;
} }

@ -1,179 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Logger implementation that can be shared by all environments
// where enough posix functionality is available.
#pragma once
#include <algorithm>
#include <stdio.h>
#include "port/sys_time.h"
#include <time.h>
#include <fcntl.h>
#ifdef OS_LINUX
#ifndef FALLOC_FL_KEEP_SIZE
#include <linux/falloc.h>
#endif
#endif
#include <atomic>
#include "env/io_posix.h"
#include "monitoring/iostats_context_imp.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
class PosixLogger : public Logger {
private:
Status PosixCloseHelper() {
int ret;
ret = fclose(file_);
if (ret) {
return IOError("Unable to close log file", "", ret);
}
return Status::OK();
}
FILE* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
int fd_;
const static uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
std::atomic<bool> flush_pending_;
protected:
virtual Status CloseImpl() override { return PosixCloseHelper(); }
public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
: Logger(log_level),
file_(f),
gettid_(gettid),
log_size_(0),
fd_(fileno(f)),
last_flush_micros_(0),
env_(env),
flush_pending_(false) {}
virtual ~PosixLogger() {
if (!closed_) {
closed_ = true;
PosixCloseHelper().PermitUncheckedError();
}
}
virtual void Flush() override {
TEST_SYNC_POINT("PosixLogger::Flush:Begin1");
TEST_SYNC_POINT("PosixLogger::Flush:Begin2");
if (flush_pending_) {
flush_pending_ = false;
fflush(file_);
}
last_flush_micros_ = env_->NowMicros();
}
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override {
IOSTATS_TIMER_GUARD(logger_nanos);
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 65536;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
port::TimeVal now_tv;
port::GetTimeOfDay(&now_tv, nullptr);
const time_t seconds = now_tv.tv_sec;
struct tm t;
port::LocalTimeR(&seconds, &t);
p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
const size_t write_size = p - base;
#ifdef ROCKSDB_FALLOCATE_PRESENT
const int kDebugLogChunkSize = 128 * 1024;
// If this write would cross a boundary of kDebugLogChunkSize
// space, pre-allocate more space to avoid overly large
// allocations from filesystem allocsize options.
const size_t log_size = log_size_;
const size_t last_allocation_chunk =
((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize);
const size_t desired_allocation_chunk =
((kDebugLogChunkSize - 1 + log_size + write_size) /
kDebugLogChunkSize);
if (last_allocation_chunk != desired_allocation_chunk) {
fallocate(
fd_, FALLOC_FL_KEEP_SIZE, 0,
static_cast<off_t>(desired_allocation_chunk * kDebugLogChunkSize));
}
#endif
size_t sz = fwrite(base, 1, write_size, file_);
flush_pending_ = true;
if (sz > 0) {
log_size_ += write_size;
}
uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
now_tv.tv_usec;
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
Flush();
}
if (base != buffer) {
delete[] base;
}
break;
}
}
size_t GetLogFileSize() const override { return log_size_; }
};
} // namespace ROCKSDB_NAMESPACE

@ -13,7 +13,10 @@ extern thread_local IOStatsContext iostats_context;
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
// increment a specific counter by the specified value // increment a specific counter by the specified value
#define IOSTATS_ADD(metric, value) (iostats_context.metric += value) #define IOSTATS_ADD(metric, value) \
if (!iostats_context.disable_iostats) { \
iostats_context.metric += value; \
}
// reset a specific counter to zero // reset a specific counter to zero
#define IOSTATS_RESET(metric) (iostats_context.metric = 0) #define IOSTATS_RESET(metric) (iostats_context.metric = 0)

@ -13,8 +13,6 @@
// porting to a new platform, see "port_example.h" for documentation // porting to a new platform, see "port_example.h" for documentation
// of what the new port_<platform>.h file must provide. // of what the new port_<platform>.h file must provide.
#if defined(ROCKSDB_PLATFORM_POSIX) #if defined(OS_WIN)
#include "logging/posix_logger.h"
#elif defined(OS_WIN)
#include "port/win/win_logger.h" #include "port/win/win_logger.h"
#endif #endif

Loading…
Cancel
Save