[RocksDB] Fix PosixLogger and AutoRollLogger thread safety

Summary:
PosixLogger and AutoRollLogger do not seem to be thread safe.
For PosixLogger, log_size_ is not atomically updated.
For AutoRollLogger, the underlying logger_ might be deleted by
one thread while still being accessed by another.

Test Plan: make check

Reviewers: kailiu, dhruba, heyongqiang

Reviewed By: kailiu

CC: leveldb, zshao, sheki

Differential Revision: https://reviews.facebook.net/D9699
main
Haobo Xu 12 years ago
parent 15ccd10c7f
commit 839f6db77e
  1. 22
      util/auto_roll_logger.cc
  2. 7
      util/auto_roll_logger.h
  3. 6
      util/auto_roll_logger_test.cc
  4. 9
      util/posix_logger.h

@ -1,4 +1,5 @@
#include "util/auto_roll_logger.h" #include "util/auto_roll_logger.h"
#include "util/mutexlock.h"
using namespace std; using namespace std;
@ -35,17 +36,26 @@ void AutoRollLogger::RollLogFile() {
void AutoRollLogger::Logv(const char* format, va_list ap) { void AutoRollLogger::Logv(const char* format, va_list ap) {
assert(GetStatus().ok()); assert(GetStatus().ok());
if (kLogFileTimeToRoll > 0 && LogExpired()) { std::shared_ptr<Logger> logger;
{
MutexLock l(&mutex_);
if ((kLogFileTimeToRoll > 0 && LogExpired()) ||
(kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) {
RollLogFile(); RollLogFile();
ResetLogger(); ResetLogger();
} }
logger_->Logv(format, ap); // pin down the current logger_ instance before releasing the mutex.
logger = logger_;
if (kMaxLogFileSize > 0 && logger_->GetLogFileSize() > kMaxLogFileSize) {
RollLogFile();
ResetLogger();
} }
// Another thread could have put a new Logger instance into logger_ by now.
// However, since logger is still hanging on to the previous instance
// (reference count is not zero), we don't have to worry about it being
// deleted while we are accessing it.
// Note that logv itself is not mutex protected to allow maximum concurrency,
// as thread safety should have been handled by the underlying logger.
logger->Logv(format, ap);
} }
bool AutoRollLogger::LogExpired() { bool AutoRollLogger::LogExpired() {

@ -8,8 +8,9 @@
#ifndef STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H #ifndef STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H
#define STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H #define STORAGE_LEVELDB_UTIL_AUTO_ROLL_LOGGER_H
#include "util/posix_logger.h"
#include "db/filename.h" #include "db/filename.h"
#include "port/port.h"
#include "util/posix_logger.h"
namespace leveldb { namespace leveldb {
@ -29,7 +30,8 @@ class AutoRollLogger : public Logger {
cached_now(static_cast<uint64_t>(env_->NowMicros() * 1e-6)), cached_now(static_cast<uint64_t>(env_->NowMicros() * 1e-6)),
ctime_(cached_now), ctime_(cached_now),
cached_now_access_count(0), cached_now_access_count(0),
call_NowMicros_every_N_records_(100) { call_NowMicros_every_N_records_(100),
mutex_() {
env->GetAbsolutePath(dbname, &db_absolute_path_); env->GetAbsolutePath(dbname, &db_absolute_path_);
log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_); log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_);
RollLogFile(); RollLogFile();
@ -75,6 +77,7 @@ class AutoRollLogger : public Logger {
uint64_t ctime_; uint64_t ctime_;
uint64_t cached_now_access_count; uint64_t cached_now_access_count;
uint64_t call_NowMicros_every_N_records_; uint64_t call_NowMicros_every_N_records_;
port::Mutex mutex_;
}; };
// Facade to craete logger automatically // Facade to craete logger automatically

@ -78,7 +78,11 @@ void AutoRollLoggerTest::RollLogFileBySizeTest(AutoRollLogger* logger,
// Now the log file will be rolled // Now the log file will be rolled
LogMessage(logger, log_message.c_str()); LogMessage(logger, log_message.c_str());
ASSERT_TRUE(0 == logger->GetLogFileSize()); // Since rotation is checked before actual logging, we need to
// trigger the rotation by logging another message.
LogMessage(logger, log_message.c_str());
ASSERT_TRUE(message_size == logger->GetLogFileSize());
} }
uint64_t AutoRollLoggerTest::RollLogFileByTimeTest( uint64_t AutoRollLoggerTest::RollLogFileByTimeTest(

@ -18,6 +18,7 @@
#include <linux/falloc.h> #include <linux/falloc.h>
#endif #endif
#include "leveldb/env.h" #include "leveldb/env.h"
#include <atomic>
namespace leveldb { namespace leveldb {
@ -27,8 +28,7 @@ class PosixLogger : public Logger {
private: private:
FILE* file_; FILE* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
size_t log_size_;
int fd_; int fd_;
public: public:
PosixLogger(FILE* f, uint64_t (*gettid)()) : PosixLogger(FILE* f, uint64_t (*gettid)()) :
@ -100,10 +100,11 @@ class PosixLogger : public Logger {
// If this write would cross a boundary of kDebugLogChunkSize // If this write would cross a boundary of kDebugLogChunkSize
// space, pre-allocate more space to avoid overly large // space, pre-allocate more space to avoid overly large
// allocations from filesystem allocsize options. // allocations from filesystem allocsize options.
const size_t log_size = log_size_;
const int last_allocation_chunk = const int last_allocation_chunk =
((kDebugLogChunkSize - 1 + log_size_) / kDebugLogChunkSize); ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize);
const int desired_allocation_chunk = const int desired_allocation_chunk =
((kDebugLogChunkSize - 1 + log_size_ + write_size) / ((kDebugLogChunkSize - 1 + log_size + write_size) /
kDebugLogChunkSize); kDebugLogChunkSize);
if (last_allocation_chunk != desired_allocation_chunk) { if (last_allocation_chunk != desired_allocation_chunk) {
fallocate(fd_, FALLOC_FL_KEEP_SIZE, 0, fallocate(fd_, FALLOC_FL_KEEP_SIZE, 0,

Loading…
Cancel
Save