Implemented a file logger that uses WritableFileWriter (#5491)
Summary: Current PosixLogger performs IO operations using posix calls. Thus the current implementation will not work for non-posix env. Created a new logger class EnvLogger that uses env specific WritableFileWriter for IO operations. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5491 Test Plan: make check Differential Revision: D15909002 Pulled By: ggaurav28 fbshipit-source-id: 13a8105176e8e42db0c59798d48cb6a0dbccc965main
parent
f786b4a5b4
commit
60d8b19836
@ -0,0 +1,165 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
//
|
||||
// Logger implementation that uses custom Env object for logging.
|
||||
|
||||
#pragma once |
||||
|
||||
#include <atomic> |
||||
#include <memory> |
||||
#include "port/sys_time.h" |
||||
#include <time.h> |
||||
|
||||
#include "monitoring/iostats_context_imp.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/slice.h" |
||||
#include "test_util/sync_point.h" |
||||
#include "util/file_reader_writer.h" |
||||
#include "util/mutexlock.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
class EnvLogger : public Logger { |
||||
public: |
||||
EnvLogger(std::unique_ptr<WritableFile>&& writable_file, |
||||
const std::string& fname, const EnvOptions& options, Env* env, |
||||
InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) |
||||
: Logger(log_level), |
||||
file_(std::move(writable_file), fname, options, env), |
||||
last_flush_micros_(0), |
||||
env_(env), |
||||
flush_pending_(false) {} |
||||
|
||||
~EnvLogger() { |
||||
if (!closed_) { |
||||
closed_ = true; |
||||
CloseHelper(); |
||||
} |
||||
} |
||||
|
||||
private: |
||||
void FlushLocked() { |
||||
mutex_.AssertHeld(); |
||||
if (flush_pending_) { |
||||
flush_pending_ = false; |
||||
file_.Flush(); |
||||
} |
||||
last_flush_micros_ = env_->NowMicros(); |
||||
} |
||||
|
||||
void Flush() override { |
||||
TEST_SYNC_POINT("EnvLogger::Flush:Begin1"); |
||||
TEST_SYNC_POINT("EnvLogger::Flush:Begin2"); |
||||
|
||||
MutexLock l(&mutex_); |
||||
FlushLocked(); |
||||
} |
||||
|
||||
Status CloseImpl() override { return CloseHelper(); } |
||||
|
||||
Status CloseHelper() { |
||||
mutex_.Lock(); |
||||
const auto close_status = file_.Close(); |
||||
mutex_.Unlock(); |
||||
|
||||
if (close_status.ok()) { |
||||
return close_status; |
||||
} |
||||
return Status::IOError("Close of log file failed with error:" + |
||||
(close_status.getState() |
||||
? std::string(close_status.getState()) |
||||
: std::string())); |
||||
} |
||||
|
||||
using Logger::Logv; |
||||
void Logv(const char* format, va_list ap) override { |
||||
IOSTATS_TIMER_GUARD(logger_nanos); |
||||
|
||||
const uint64_t thread_id = env_->GetThreadID(); |
||||
|
||||
// We try twice: the first time with a fixed-size stack allocated buffer,
|
||||
// and the second time with a much larger dynamically allocated buffer.
|
||||
char buffer[500]; |
||||
for (int iter = 0; iter < 2; iter++) { |
||||
char* base; |
||||
int bufsize; |
||||
if (iter == 0) { |
||||
bufsize = sizeof(buffer); |
||||
base = buffer; |
||||
} else { |
||||
bufsize = 65536; |
||||
base = new char[bufsize]; |
||||
} |
||||
char* p = base; |
||||
char* limit = base + bufsize; |
||||
|
||||
struct timeval now_tv; |
||||
gettimeofday(&now_tv, nullptr); |
||||
const time_t seconds = now_tv.tv_sec; |
||||
struct tm t; |
||||
localtime_r(&seconds, &t); |
||||
p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", |
||||
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, |
||||
t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec), |
||||
static_cast<long long unsigned int>(thread_id)); |
||||
|
||||
// Print the message
|
||||
if (p < limit) { |
||||
va_list backup_ap; |
||||
va_copy(backup_ap, ap); |
||||
p += vsnprintf(p, limit - p, format, backup_ap); |
||||
va_end(backup_ap); |
||||
} |
||||
|
||||
// Truncate to available space if necessary
|
||||
if (p >= limit) { |
||||
if (iter == 0) { |
||||
continue; // Try again with larger buffer
|
||||
} else { |
||||
p = limit - 1; |
||||
} |
||||
} |
||||
|
||||
// Add newline if necessary
|
||||
if (p == base || p[-1] != '\n') { |
||||
*p++ = '\n'; |
||||
} |
||||
|
||||
assert(p <= limit); |
||||
mutex_.Lock(); |
||||
// We will ignore any error returned by Append().
|
||||
file_.Append(Slice(base, p - base)); |
||||
flush_pending_ = true; |
||||
const uint64_t now_micros = env_->NowMicros(); |
||||
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { |
||||
FlushLocked(); |
||||
} |
||||
mutex_.Unlock(); |
||||
if (base != buffer) { |
||||
delete[] base; |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
|
||||
size_t GetLogFileSize() const override { |
||||
MutexLock l(&mutex_); |
||||
return file_.GetFileSize(); |
||||
} |
||||
|
||||
private: |
||||
WritableFileWriter file_; |
||||
mutable port::Mutex mutex_; // Mutex to protect the shared variables below.
|
||||
const static uint64_t flush_every_seconds_ = 5; |
||||
std::atomic_uint_fast64_t last_flush_micros_; |
||||
Env* env_; |
||||
std::atomic<bool> flush_pending_; |
||||
}; |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,164 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
|
||||
#include "env/mock_env.h" |
||||
#include "logging/env_logger.h" |
||||
#include "test_util/testharness.h" |
||||
#include "test_util/testutil.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
namespace { |
||||
// In this test we only want to Log some simple log message with
|
||||
// no format.
|
||||
void LogMessage(std::shared_ptr<Logger> logger, const std::string& message) { |
||||
Log(logger, "%s", message.c_str()); |
||||
} |
||||
|
||||
// Helper method to write the message num_times in the given logger.
|
||||
void WriteLogs(std::shared_ptr<Logger> logger, const std::string& message, |
||||
int num_times) { |
||||
for (int ii = 0; ii < num_times; ++ii) { |
||||
LogMessage(logger, message); |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
class EnvLoggerTest : public testing::Test { |
||||
public: |
||||
Env* env_; |
||||
|
||||
EnvLoggerTest() : env_(Env::Default()) {} |
||||
|
||||
~EnvLoggerTest() = default; |
||||
|
||||
std::shared_ptr<Logger> CreateLogger() { |
||||
std::shared_ptr<Logger> result; |
||||
assert(NewEnvLogger(kLogFile, env_, &result).ok()); |
||||
assert(result); |
||||
result->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); |
||||
return result; |
||||
} |
||||
|
||||
void DeleteLogFile() { |
||||
ASSERT_OK(env_->DeleteFile(kLogFile)); |
||||
} |
||||
|
||||
static const std::string kSampleMessage; |
||||
static const std::string kTestDir; |
||||
static const std::string kLogFile; |
||||
}; |
||||
|
||||
const std::string EnvLoggerTest::kSampleMessage = |
||||
"this is the message to be written to the log file!!"; |
||||
const std::string EnvLoggerTest::kLogFile = test::PerThreadDBPath("log_file"); |
||||
|
||||
TEST_F(EnvLoggerTest, EmptyLogFile) { |
||||
auto logger = CreateLogger(); |
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
|
||||
// Check the size of the log file.
|
||||
uint64_t file_size; |
||||
ASSERT_EQ(env_->GetFileSize(kLogFile, &file_size), Status::OK()); |
||||
ASSERT_EQ(file_size, 0); |
||||
DeleteLogFile(); |
||||
} |
||||
|
||||
TEST_F(EnvLoggerTest, LogMultipleLines) { |
||||
auto logger = CreateLogger(); |
||||
|
||||
// Write multiple lines.
|
||||
const int kNumIter = 10; |
||||
WriteLogs(logger, kSampleMessage, kNumIter); |
||||
|
||||
// Flush the logs.
|
||||
logger->Flush(); |
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
|
||||
// Validate whether the log file has 'kNumIter' number of lines.
|
||||
ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); |
||||
DeleteLogFile(); |
||||
} |
||||
|
||||
TEST_F(EnvLoggerTest, Overwrite) { |
||||
{ |
||||
auto logger = CreateLogger(); |
||||
|
||||
// Write multiple lines.
|
||||
const int kNumIter = 10; |
||||
WriteLogs(logger, kSampleMessage, kNumIter); |
||||
|
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
|
||||
// Validate whether the log file has 'kNumIter' number of lines.
|
||||
ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); |
||||
} |
||||
|
||||
// Now reopen the file again.
|
||||
{ |
||||
auto logger = CreateLogger(); |
||||
|
||||
// File should be empty.
|
||||
uint64_t file_size; |
||||
ASSERT_EQ(env_->GetFileSize(kLogFile, &file_size), Status::OK()); |
||||
ASSERT_EQ(file_size, 0); |
||||
ASSERT_EQ(logger->GetLogFileSize(), 0); |
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
} |
||||
DeleteLogFile(); |
||||
} |
||||
|
||||
TEST_F(EnvLoggerTest, Close) { |
||||
auto logger = CreateLogger(); |
||||
|
||||
// Write multiple lines.
|
||||
const int kNumIter = 10; |
||||
WriteLogs(logger, kSampleMessage, kNumIter); |
||||
|
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
|
||||
// Validate whether the log file has 'kNumIter' number of lines.
|
||||
ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); |
||||
DeleteLogFile(); |
||||
} |
||||
|
||||
TEST_F(EnvLoggerTest, ConcurrentLogging) { |
||||
auto logger = CreateLogger(); |
||||
|
||||
const int kNumIter = 20; |
||||
std::function<void()> cb = [&]() { |
||||
WriteLogs(logger, kSampleMessage, kNumIter); |
||||
logger->Flush(); |
||||
}; |
||||
|
||||
// Write to the logs from multiple threads.
|
||||
std::vector<port::Thread> threads; |
||||
const int kNumThreads = 5; |
||||
// Create threads.
|
||||
for (int ii = 0; ii < kNumThreads; ++ii) { |
||||
threads.push_back(port::Thread(cb)); |
||||
} |
||||
|
||||
// Wait for them to complete.
|
||||
for (auto& th : threads) { |
||||
th.join(); |
||||
} |
||||
|
||||
ASSERT_EQ(logger->Close(), Status::OK()); |
||||
|
||||
// Verfiy the log file.
|
||||
ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), |
||||
kNumIter * kNumThreads); |
||||
DeleteLogFile(); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue