Allow the logs to be purged by TTL.

Summary:
* Add a SplitByTTLLogger to enable this feature. In this diff I implemented generalized AutoSplitLoggerBase class to simplify the
development of such classes.
* Refactor the existing AutoSplitLogger and fix several bugs.

Test Plan:
* Added a unit tests for different types of "auto splitable" loggers individually.
* Tested the composited logger which allows the log files to be splitted by both TTL and log size.

Reviewers: heyongqiang, dhruba

Reviewed By: heyongqiang

CC: zshao, leveldb

Differential Revision: https://reviews.facebook.net/D8037
main
Kai Liu 12 years ago
parent 19012c2e28
commit b63aafce42
  1. 5
      Makefile
  2. 44
      db/db_impl.cc
  3. 3
      db/filename.cc
  4. 10
      include/leveldb/options.h
  5. 93
      util/auto_roll_logger.cc
  6. 90
      util/auto_roll_logger.h
  7. 305
      util/auto_roll_logger_test.cc
  8. 82
      util/auto_split_logger.h
  9. 4
      util/options.cc

@ -55,8 +55,10 @@ TESTS = \
version_set_test \
reduce_levels_test \
write_batch_test \
auto_roll_logger_test \
filelock_test
TOOLS = \
manifest_dump \
sst_dump \
@ -217,6 +219,9 @@ manifest_dump: tools/manifest_dump.o $(LIBOBJECTS)
filelock_test: util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
auto_roll_logger_test: util/auto_roll_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/auto_roll_logger_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
$(CXX) tools/sst_dump.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)

@ -38,39 +38,12 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/build_version.h"
#include "util/auto_split_logger.h"
#include "util/auto_roll_logger.h"
namespace leveldb {
void dumpLeveldbBuildVersion(Logger * log);
static Status NewLogger(const std::string& dbname,
const std::string& db_log_dir,
Env* env,
size_t max_log_file_size,
shared_ptr<Logger>* logger) {
std::string db_absolute_path;
env->GetAbsolutePath(dbname, &db_absolute_path);
if (max_log_file_size > 0) { // need to auto split the log file?
auto logger_ptr =
new AutoSplitLogger<Logger>(env, dbname, db_log_dir, max_log_file_size);
logger->reset(logger_ptr);
Status s = logger_ptr->GetStatus();
if (!s.ok()) {
logger->reset();
}
return s;
} else {
// Open a log file in the same directory as the db
env->CreateDir(dbname); // In case it does not exist
std::string fname = InfoLogFileName(dbname, db_absolute_path, db_log_dir);
env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(),
db_absolute_path, db_log_dir));
return env->NewLogger(fname, logger);
}
}
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
@ -148,8 +121,8 @@ Options SanitizeOptions(const std::string& dbname,
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
Status s = NewLogger(dbname, result.db_log_dir, src.env,
result.max_log_file_size, &result.info_log);
Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
result, &result.info_log);
if (!s.ok()) {
// No place suitable for logging
result.info_log = NULL;
@ -426,11 +399,14 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
}
// Delete old log files.
int old_log_file_count = old_log_files.size();
if (old_log_file_count >= KEEP_LOG_FILE_NUM &&
!options_.db_log_dir.empty()) {
size_t old_log_file_count = old_log_files.size();
// NOTE: Currently we only support log purge when options_.db_log_dir is
// located in `dbname` directory.
if (old_log_file_count >= options_.keep_log_file_num &&
options_.db_log_dir.empty()) {
std::sort(old_log_files.begin(), old_log_files.end());
for (int i = 0; i >= (old_log_file_count - KEEP_LOG_FILE_NUM); i++) {
size_t end = old_log_file_count - options_.keep_log_file_num;
for (int i = 0; i <= end; i++) {
std::string& to_delete = old_log_files.at(i);
// Log(options_.info_log, "Delete type=%d %s\n",
// int(kInfoLogFile), to_delete.c_str());

@ -145,7 +145,8 @@ bool ParseFileName(const std::string& fname,
*type = kInfoLogFile;
} else if (rest.starts_with("LOG.old.")) {
uint64_t ts_suffix;
rest.remove_prefix(sizeof("LOG.old."));
// sizeof also counts the trailing '\0'.
rest.remove_prefix(sizeof("LOG.old.") - 1);
if (!ConsumeDecimalNumber(&rest, &ts_suffix)) {
return false;
}

@ -310,6 +310,16 @@ struct Options {
// log file.
size_t max_log_file_size;
// Time for the info log file to roll (in seconds).
// If specified with non-zero value, log file will be rolled
// if it has been active longer than `log_file_time_to_roll`.
// Default: 0 (disabled)
size_t log_file_time_to_roll;
// Maximal info log files to be kept.
// Default: 1000
size_t keep_log_file_num;
// Puts are delayed when any level has a compaction score that
// exceeds rate_limit. This is ignored when <= 1.0.
double rate_limit;

@ -0,0 +1,93 @@
#include "util/auto_roll_logger.h"
using namespace std;
namespace leveldb {
// -- AutoRollLogger
Status AutoRollLogger::ResetLogger() {
status_ = env_->NewLogger(log_fname_, &logger_);
if (!status_.ok()) {
return status_;
}
if (logger_->GetLogFileSize() ==
(size_t)Logger::DO_NOT_SUPPORT_GET_LOG_FILE_SIZE) {
status_ = Status::NotSupported(
"The underlying logger doesn't support GetLogFileSize()");
}
if (status_.ok()) {
cached_now = static_cast<uint64_t>(env_->NowMicros() * 1e-6);
ctime_ = cached_now;
cached_now_access_count = 0;
}
return status_;
}
void AutoRollLogger::RollLogFile() {
std::string old_fname = OldInfoLogFileName(
dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_);
env_->RenameFile(log_fname_, old_fname);
}
void AutoRollLogger::Logv(const char* format, va_list ap) {
assert(GetStatus().ok());
if (kLogFileTimeToRoll > 0 && LogExpired()) {
RollLogFile();
ResetLogger();
}
logger_->Logv(format, ap);
if (kMaxLogFileSize > 0 && logger_->GetLogFileSize() > kMaxLogFileSize) {
RollLogFile();
ResetLogger();
}
}
bool AutoRollLogger::LogExpired() {
if (cached_now_access_count >= call_NowMicros_every_N_records_) {
cached_now = static_cast<uint64_t>(env_->NowMicros() * 1e-6);
cached_now_access_count = 0;
}
++cached_now_access_count;
return cached_now >= ctime_ + kLogFileTimeToRoll;
}
Status CreateLoggerFromOptions(
const std::string& dbname,
const std::string& db_log_dir,
Env* env,
const Options& options,
std::shared_ptr<Logger>* logger) {
std::string db_absolute_path;
env->GetAbsolutePath(dbname, &db_absolute_path);
std::string fname = InfoLogFileName(dbname, db_absolute_path, db_log_dir);
// Currently we only support roll by time-to-roll and log size
if (options.log_file_time_to_roll > 0 || options.max_log_file_size > 0) {
AutoRollLogger* result = new AutoRollLogger(
env, dbname, db_log_dir,
options.max_log_file_size,
options.log_file_time_to_roll);
Status s = result->GetStatus();
if (!s.ok()) {
delete result;
} else {
logger->reset(result);
}
return s;
} else {
// Open a log file in the same directory as the db
env->CreateDir(dbname); // In case it does not exist
env->RenameFile(fname, OldInfoLogFileName(dbname, env->NowMicros(),
db_absolute_path, db_log_dir));
return env->NewLogger(fname, logger);
}
}
} // namespace leveldb

@ -0,0 +1,90 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Logger implementation that can be shared by all environments
// where enough posix functionality is available.
#ifndef STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H
#define STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H
#include "util/posix_logger.h"
#include "db/filename.h"
namespace leveldb {
// Rolls the log file by size and/or time
class AutoRollLogger : public Logger {
public:
AutoRollLogger(Env* env, const std::string& dbname,
const std::string& db_log_dir,
size_t log_max_size,
size_t log_file_time_to_roll):
dbname_(dbname),
db_log_dir_(db_log_dir),
env_(env),
status_(Status::OK()),
kMaxLogFileSize(log_max_size),
kLogFileTimeToRoll(log_file_time_to_roll),
cached_now(static_cast<uint64_t>(env_->NowMicros() * 1e-6)),
ctime_(cached_now),
cached_now_access_count(0),
call_NowMicros_every_N_records_(100) {
env->GetAbsolutePath(dbname, &db_absolute_path_);
log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_);
RollLogFile();
ResetLogger();
}
void Logv(const char* format, va_list ap);
// check if the logger has encountered any problem.
Status GetStatus() {
return status_;
}
size_t GetLogFileSize() const {
return logger_->GetLogFileSize();
}
virtual ~AutoRollLogger() {
}
void SetCallNowMicrosEveryNRecords(uint64_t call_NowMicros_every_N_records) {
call_NowMicros_every_N_records_ = call_NowMicros_every_N_records;
}
private:
bool LogExpired();
Status ResetLogger();
void RollLogFile();
std::string log_fname_; // Current active info log's file name.
std::string dbname_;
std::string db_log_dir_;
std::string db_absolute_path_;
Env* env_;
std::shared_ptr<Logger> logger_;
// current status of the logger
Status status_;
const size_t kMaxLogFileSize;
const size_t kLogFileTimeToRoll;
// to avoid frequent env->NowMicros() calls, we cached the current time
uint64_t cached_now;
uint64_t ctime_;
uint64_t cached_now_access_count;
uint64_t call_NowMicros_every_N_records_;
};
// Facade to craete logger automatically
Status CreateLoggerFromOptions(
const std::string& dbname,
const std::string& db_log_dir,
Env* env,
const Options& options,
std::shared_ptr<Logger>* logger);
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H

@ -0,0 +1,305 @@
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <string>
#include <cmath>
#include "util/testharness.h"
#include "util/auto_roll_logger.h"
#include "leveldb/db.h"
#include <sys/stat.h>
#include <errno.h>
#include <iostream>
using namespace std;
namespace leveldb {
// annoymous namespace for test facilities.
namespace {
// This is a fake logger that keeps counting
// the size of logged messages.
class MockLogger: public Logger {
public:
MockLogger(): log_size_(0) { }
// In our simple cases, we only use `format` parameter.
void Logv(const char* format, va_list ap) {
log_size_ += strlen(format);
}
virtual size_t GetLogFileSize() const {
return log_size_;
}
private:
size_t log_size_;
};
// A fake Env class that can create MockLogger instance.
class MockEnv: public EnvWrapper {
public:
static Env* MakeMockEnv() {
Env* target = Env::Default();
return new MockEnv(target);
}
~MockEnv() {
}
virtual Status NewLogger(const std::string& fname, shared_ptr<Logger>* result) {
result->reset(new MockLogger());
return Status::OK();
}
virtual Status RenameFile(const std::string& src,
const std::string& target) {
// do nothing
return Status::OK();
}
private:
MockEnv(Env* target): EnvWrapper(target), target_(target) { }
Env* target_;
};
} // end anonymous namespace
class AutoRollLoggerTest {
public:
static void InitTestDb() {
string deleteCmd = "rm -rf " + kTestDir;
system(deleteCmd.c_str());
Env::Default()->CreateDir(kTestDir);
}
void RollLogFileBySizeTest(AutoRollLogger* logger,
size_t log_max_size,
const string& log_message);
uint64_t RollLogFileByTimeTest(AutoRollLogger* logger,
size_t time,
const string& log_message);
static const string kSampleMessage;
static const string kTestDir;
static const string kLogFile;
static Env* env;
static Env* mockEnv;
};
const string AutoRollLoggerTest::kSampleMessage(
"this is the message to be written to the log file!!");
const string AutoRollLoggerTest::kTestDir(
test::TmpDir() + "/db_log_test");
const string AutoRollLoggerTest::kLogFile(
test::TmpDir() + "/db_log_test/LOG");
Env* AutoRollLoggerTest::mockEnv = MockEnv::MakeMockEnv();
Env* AutoRollLoggerTest::env = Env::Default();
void GetFileCreateTime(const std::string& fname, uint64_t* file_ctime) {
struct stat s;
if (stat(fname.c_str(), &s) != 0) {
*file_ctime = (uint64_t)0;
}
*file_ctime = static_cast<uint64_t>(s.st_ctime);
}
void AutoRollLoggerTest::RollLogFileBySizeTest(AutoRollLogger* logger,
size_t log_max_size,
const string& log_message) {
// measure the size of each message, which is supposed
// to be equal or greater than log_message.size()
Log(logger, log_message.c_str());
size_t message_size = logger->GetLogFileSize();
size_t current_log_size = message_size;
// Test the cases when the log file will not be rolled.
while (current_log_size + message_size < log_max_size) {
Log(logger, log_message.c_str());
current_log_size += message_size;
ASSERT_EQ(current_log_size, logger->GetLogFileSize());
}
// Now the log file will be rolled
Log(logger, log_message.c_str());
ASSERT_TRUE(0 == logger->GetLogFileSize());
}
uint64_t AutoRollLoggerTest::RollLogFileByTimeTest(
AutoRollLogger* logger, size_t time, const string& log_message) {
uint64_t expected_create_time;
uint64_t actual_create_time;
uint64_t total_log_size;
ASSERT_OK(env->GetFileSize(kLogFile, &total_log_size));
GetFileCreateTime(kLogFile, &expected_create_time);
logger->SetCallNowMicrosEveryNRecords(0);
// -- Write to the log for several times, which is supposed
// to be finished before time.
for (int i = 0; i < 10; ++i) {
Log(logger, log_message.c_str());
ASSERT_OK(logger->GetStatus());
// Make sure we always write to the same log file (by
// checking the create time);
GetFileCreateTime(kLogFile, &actual_create_time);
// Also make sure the log size is increasing.
ASSERT_EQ(expected_create_time, actual_create_time);
ASSERT_GT(logger->GetLogFileSize(), total_log_size);
total_log_size = logger->GetLogFileSize();
}
// -- Make the log file expire
sleep(time);
Log(logger, log_message.c_str());
// At this time, the new log file should be created.
GetFileCreateTime(kLogFile, &actual_create_time);
ASSERT_GT(actual_create_time, expected_create_time);
ASSERT_LT(logger->GetLogFileSize(), total_log_size);
expected_create_time = actual_create_time;
return expected_create_time;
}
TEST(AutoRollLoggerTest, RollLogFileBySize) {
size_t log_max_size = 1024;
AutoRollLogger* logger = new AutoRollLogger(
mockEnv, kTestDir, "", log_max_size, 0);
RollLogFileBySizeTest(logger, log_max_size,
kSampleMessage + ":RollLogFileBySize");
delete logger;
}
TEST(AutoRollLoggerTest, RollLogFileByTime) {
size_t time = 1;
size_t log_size = 1024 * 5;
InitTestDb();
// -- Test the existence of file during the server restart.
ASSERT_TRUE(!env->FileExists(kLogFile));
AutoRollLogger* logger = new AutoRollLogger(
Env::Default(), kTestDir, "", log_size, 1);
ASSERT_TRUE(env->FileExists(kLogFile));
RollLogFileByTimeTest(logger, time, kSampleMessage + ":RollLogFileByTime");
delete logger;
}
TEST(AutoRollLoggerTest,
OpenLogFilesMultipleTimesWithOptionLog_max_size) {
// If only 'log_max_size' options is specified, then every time
// when leveldb is restarted, a new empty log file will be created.
InitTestDb();
// WORKAROUND:
// avoid complier's complaint of "comparison between signed
// and unsigned integer expressions" because literal 0 is
// treated as "singed".
size_t kZero = 0;
size_t log_size = 1024;
AutoRollLogger* logger = new AutoRollLogger(
Env::Default(), kTestDir, "", log_size, 0);
Log(logger, kSampleMessage.c_str());
ASSERT_GT(logger->GetLogFileSize(), kZero);
delete logger;
// reopens the log file and an empty log file will be created.
logger = new AutoRollLogger(
Env::Default(), kTestDir, "", log_size, 0);
ASSERT_EQ(logger->GetLogFileSize(), kZero);
delete logger;
}
TEST(AutoRollLoggerTest, CompositeRollByTimeAndSizeLogger) {
size_t time = 1, log_max_size = 1024 * 5;
InitTestDb();
AutoRollLogger* logger = new AutoRollLogger(
Env::Default(), kTestDir, "", log_max_size, time);
// Test the ability to roll by size
RollLogFileBySizeTest(
logger, log_max_size,
kSampleMessage + ":CompositeRollByTimeAndSizeLogger");
// Test the ability to roll by Time
RollLogFileByTimeTest( logger, time,
kSampleMessage + ":CompositeRollByTimeAndSizeLogger");
}
TEST(AutoRollLoggerTest, CreateLoggerFromOptions) {
Options options;
shared_ptr<Logger> logger;
// Normal logger
ASSERT_OK(CreateLoggerFromOptions(kTestDir, "", env, options, &logger));
ASSERT_TRUE(dynamic_cast<PosixLogger*>(logger.get()) != NULL);
// Only roll by size
InitTestDb();
options.max_log_file_size = 1024;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, "", env, options, &logger));
AutoRollLogger* auto_roll_logger =
dynamic_cast<AutoRollLogger*>(logger.get());
ASSERT_TRUE(auto_roll_logger != NULL);
RollLogFileBySizeTest(
auto_roll_logger, options.max_log_file_size,
kSampleMessage + ":CreateLoggerFromOptions - size");
// Only roll by Time
InitTestDb();
options.max_log_file_size = 0;
options.log_file_time_to_roll = 1;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, "", env, options, &logger));
auto_roll_logger =
dynamic_cast<AutoRollLogger*>(logger.get());
RollLogFileByTimeTest(
auto_roll_logger, options.log_file_time_to_roll,
kSampleMessage + ":CreateLoggerFromOptions - time");
// roll by both Time and size
InitTestDb();
options.max_log_file_size = 1024 * 5;
options.log_file_time_to_roll = 1;
ASSERT_OK(CreateLoggerFromOptions(kTestDir, "", env, options, &logger));
auto_roll_logger =
dynamic_cast<AutoRollLogger*>(logger.get());
RollLogFileBySizeTest(
auto_roll_logger, options.max_log_file_size,
kSampleMessage + ":CreateLoggerFromOptions - both");
RollLogFileByTimeTest(
auto_roll_logger, options.log_file_time_to_roll,
kSampleMessage + ":CreateLoggerFromOptions - both");
}
int OldLogFileCount(const string& dir) {
std::vector<std::string> files;
Env::Default()->GetChildren(dir, &files);
int log_file_count = 0;
for (std::vector<std::string>::iterator it = files.begin();
it != files.end(); ++it) {
uint64_t create_time;
FileType type;
if (!ParseFileName(*it, &create_time, &type)) {
continue;
}
if (type == kInfoLogFile && create_time > 0) {
++log_file_count;
}
}
return log_file_count;
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

@ -1,82 +0,0 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Logger implementation that can be shared by all environments
// where enough posix functionality is available.
#ifndef STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_
#define STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_
#include "util/posix_logger.h"
#include "db/filename.h"
namespace leveldb {
// AutoSplitLogger can automatically create a new log file
// if the file size exceeds the limit.
//
// The template parameter "UnderlyingLogger" can be any Logger class
// that has the method "GetLogFileSize()" and "ResetFile()"
template<class UnderlyingLogger>
class AutoSplitLogger : public Logger {
private:
std::string log_fname_; // Current active info log's file name.
std::string dbname_;
std::string db_log_dir_;
std::string db_absolute_path_;
Env* env_;
shared_ptr<UnderlyingLogger> logger_;
const size_t MAX_LOG_FILE_SIZE;
Status status_;
public:
AutoSplitLogger(Env* env, const std::string& dbname,
const std::string& db_log_dir, size_t log_max_size):
dbname_(dbname),
db_log_dir_(db_log_dir),
env_(env),
MAX_LOG_FILE_SIZE(log_max_size),
status_(Status::OK()) {
env->GetAbsolutePath(dbname, &db_absolute_path_);
log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_);
InitLogger();
}
virtual void Logv(const char* format, va_list ap) {
assert(GetStatus().ok());
logger_->Logv(format, ap);
// Check if the log file should be splitted.
if (logger_->GetLogFileSize() > MAX_LOG_FILE_SIZE) {
std::string old_fname = OldInfoLogFileName(
dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_);
env_->RenameFile(log_fname_, old_fname);
InitLogger();
}
}
// check if the logger has any problem.
Status GetStatus() {
return status_;
}
private:
Status InitLogger() {
status_ = env_->NewLogger(log_fname_, &logger_);
if (!status_.ok()) {
logger_ = NULL;
}
if (logger_->GetLogFileSize() ==
(size_t)Logger::DO_NOT_SUPPORT_GET_LOG_FILE_SIZE) {
status_ = Status::NotSupported(
"The underlying logger doesn't support GetLogFileSize()");
}
return status_;
}
}; // class AutoSplitLogger
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_SPLIT_LOGGER_LOGGER_H_

@ -48,6 +48,8 @@ Options::Options()
delete_obsolete_files_period_micros(0),
max_background_compactions(1),
max_log_file_size(0),
log_file_time_to_roll(0),
keep_log_file_num(1000),
rate_limit(0.0),
max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
no_block_cache(false),
@ -95,6 +97,8 @@ Options::Dump(Logger* log) const
Log(log," Options.max_log_file_size: %ld", max_log_file_size);
Log(log,"Options.max_manifest_file_size: %ld",
max_manifest_file_size);
Log(log," Options.log_file_time_to_roll: %ld", log_file_time_to_roll);
Log(log," Options.keep_log_file_num: %ld", keep_log_file_num);
Log(log," Options.db_stats_log_interval: %d",
db_stats_log_interval);
Log(log," Options.compression_opts.window_bits: %d",

Loading…
Cancel
Save