Flush the log outside of lock

Summary:
Added a new call LogFlush() that flushes the log contents to the OS buffers. We never call it with lock held.

We call it once for every Read/Write and often in compaction/flush process so the frequency should not be a problem.

Test Plan: db_test

Reviewers: dhruba, haobo, kailiu, emayanke

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13935
main
Igor Canadi 11 years ago
parent fd2044883a
commit 444cf88a56
  1. 18
      db/db_impl.cc
  2. 1
      db/version_set.cc
  3. 7
      include/rocksdb/env.h
  4. 12
      util/env.cc
  5. 19
      util/posix_logger.h

@ -311,6 +311,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
} }
last_log_ts = 0; last_log_ts = 0;
LogFlush(options_.info_log);
} }
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
@ -333,6 +334,7 @@ DBImpl::~DBImpl() {
if (mem_ != nullptr) mem_->Unref(); if (mem_ != nullptr) mem_->Unref();
imm_.UnrefAll(); imm_.UnrefAll();
LogFlush(options_.info_log);
} }
// Do not flush and close database elegantly. Simulate a crash. // Do not flush and close database elegantly. Simulate a crash.
@ -354,6 +356,7 @@ void DBImpl::TEST_Destroy_DBImpl() {
bg_compaction_scheduled_ += LargeNumber; bg_compaction_scheduled_ += LargeNumber;
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
// force release the lock file. // force release the lock file.
if (db_lock_ != nullptr) { if (db_lock_ != nullptr) {
@ -967,6 +970,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
table_cache_.get(), iter, &meta, table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot, user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, true); earliest_seqno_in_memtable, true);
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
} }
@ -1034,6 +1038,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
table_cache_.get(), iter, &meta, table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot, user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, enable_compression); earliest_seqno_in_memtable, enable_compression);
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
} }
base->Unref(); base->Unref();
@ -1153,6 +1158,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) {
} }
mutex_.Unlock(); mutex_.Unlock();
DeleteLogFile(log_num); DeleteLogFile(log_num);
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
} }
} }
@ -1180,6 +1186,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end,
if (reduce_level) { if (reduce_level) {
ReFitLevel(max_level_with_files, target_level); ReFitLevel(max_level_with_files, target_level);
} }
LogFlush(options_.info_log);
} }
// return the same level if it cannot be moved // return the same level if it cannot be moved
@ -1638,6 +1645,7 @@ void DBImpl::BackgroundCallFlush() {
Log(options_.info_log, "Waiting after background flush error: %s", Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str()); s.ToString().c_str());
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000); env_->SleepForMicroseconds(1000000);
mutex_.Lock(); mutex_.Lock();
} }
@ -1675,6 +1683,7 @@ void DBImpl::BackgroundCallCompaction() {
Log(options_.info_log, "Waiting after background compaction error: %s", Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str()); s.ToString().c_str());
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000); env_->SleepForMicroseconds(1000000);
mutex_.Lock(); mutex_.Lock();
} }
@ -1685,6 +1694,7 @@ void DBImpl::BackgroundCallCompaction() {
mutex_.Unlock(); mutex_.Unlock();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state); EvictObsoleteFiles(deletion_state);
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
} }
@ -1905,6 +1915,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
compact->builder.reset( compact->builder.reset(
GetTableBuilder(options_, compact->outfile.get(), compression_type)); GetTableBuilder(options_, compact->outfile.get(), compression_type));
} }
LogFlush(options_.info_log);
return s; return s;
} }
@ -2102,6 +2113,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// TODO: remove memtable flush from normal compaction work // TODO: remove memtable flush from normal compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros(); const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
FlushMemTableToOutputFile(); FlushMemTableToOutputFile();
@ -2395,6 +2407,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats.bytes_written += compact->outputs[i].file_size; stats.bytes_written += compact->outputs[i].file_size;
} }
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
stats_[compact->compaction->output_level()].Add(stats); stats_[compact->compaction->output_level()].Add(stats);
@ -2479,6 +2492,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
return internal_iter; return internal_iter;
} }
@ -2553,6 +2567,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
current->Unref(); current->Unref();
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
// Note, tickers are atomic now - no lock protection needed any more. // Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics, NUMBER_KEYS_READ); RecordTick(options_.statistics, NUMBER_KEYS_READ);
RecordTick(options_.statistics, BYTES_READ, value->size()); RecordTick(options_.statistics, BYTES_READ, value->size());
@ -2631,6 +2646,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
current->Unref(); current->Unref();
mutex_.Unlock(); mutex_.Unlock();
LogFlush(options_.info_log);
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
@ -2770,6 +2786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence); SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence);
} }
LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
if (status.ok()) { if (status.ok()) {
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
@ -3358,6 +3375,7 @@ Status DBImpl::DeleteFile(std::string name) {
FindObsoleteFiles(deletion_state); FindObsoleteFiles(deletion_state);
} }
} // lock released here } // lock released here
LogFlush(options_.info_log);
if (status.ok()) { if (status.ok()) {
// remove files outside the db-lock // remove files outside the db-lock

@ -1318,6 +1318,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// find offset in manifest file where this version is stored. // find offset in manifest file where this version is stored.
new_manifest_file_size = descriptor_log_->file()->GetFileSize(); new_manifest_file_size = descriptor_log_->file()->GetFileSize();
LogFlush(options_->info_log);
mu->Lock(); mu->Lock();
// cache the manifest_file_size so that it can be used to rollover in the // cache the manifest_file_size so that it can be used to rollover in the
// next call to LogAndApply // next call to LogAndApply

@ -497,6 +497,8 @@ class Logger {
virtual size_t GetLogFileSize() const { virtual size_t GetLogFileSize() const {
return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE; return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
} }
// Flush to the OS buffers
virtual void Flush() {}
private: private:
// No copying allowed // No copying allowed
@ -516,6 +518,9 @@ class FileLock {
void operator=(const FileLock&); void operator=(const FileLock&);
}; };
extern void LogFlush(const shared_ptr<Logger>& info_log);
// Log the specified data to *info_log if info_log is non-nullptr. // Log the specified data to *info_log if info_log is non-nullptr.
extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...) extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__) # if defined(__GNUC__) || defined(__clang__)
@ -523,6 +528,8 @@ extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
# endif # endif
; ;
extern void LogFlush(Logger *info_log);
extern void Log(Logger* info_log, const char* format, ...) extern void Log(Logger* info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__) # if defined(__GNUC__) || defined(__clang__)
__attribute__((__format__ (__printf__, 2, 3))) __attribute__((__format__ (__printf__, 2, 3)))

@ -30,6 +30,12 @@ Logger::~Logger() {
FileLock::~FileLock() { FileLock::~FileLock() {
} }
void LogFlush(Logger *info_log) {
if (info_log) {
info_log->Flush();
}
}
void Log(Logger* info_log, const char* format, ...) { void Log(Logger* info_log, const char* format, ...) {
if (info_log) { if (info_log) {
va_list ap; va_list ap;
@ -39,6 +45,12 @@ void Log(Logger* info_log, const char* format, ...) {
} }
} }
void LogFlush(const shared_ptr<Logger>& info_log) {
if (info_log) {
info_log->Flush();
}
}
void Log(const shared_ptr<Logger>& info_log, const char* format, ...) { void Log(const shared_ptr<Logger>& info_log, const char* format, ...) {
if (info_log) { if (info_log) {
va_list ap; va_list ap;

@ -33,8 +33,8 @@ class PosixLogger : public Logger {
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_; std::atomic_size_t log_size_;
int fd_; int fd_;
const static uint64_t flush_every_seconds_ = 0; const static uint64_t flush_every_seconds_ = 5;
uint64_t last_flush_micros_; std::atomic_uint_fast64_t last_flush_micros_;
Env* env_; Env* env_;
public: public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) : PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) :
@ -43,6 +43,10 @@ class PosixLogger : public Logger {
virtual ~PosixLogger() { virtual ~PosixLogger() {
fclose(file_); fclose(file_);
} }
virtual void Flush() {
fflush(file_);
last_flush_micros_ = env_->NowMicros();
}
virtual void Logv(const char* format, va_list ap) { virtual void Logv(const char* format, va_list ap) {
const uint64_t thread_id = (*gettid_)(); const uint64_t thread_id = (*gettid_)();
@ -122,13 +126,14 @@ class PosixLogger : public Logger {
size_t sz = fwrite(base, 1, write_size, file_); size_t sz = fwrite(base, 1, write_size, file_);
assert(sz == write_size); assert(sz == write_size);
if (sz > 0) { if (sz > 0) {
if (env_->NowMicros() - last_flush_micros_ >=
flush_every_seconds_ * 1000000) {
fflush(file_);
last_flush_micros_ = env_->NowMicros();
}
log_size_ += write_size; log_size_ += write_size;
} }
uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
now_tv.tv_usec;
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
fflush(file_);
last_flush_micros_ = now_micros;
}
if (base != buffer) { if (base != buffer) {
delete[] base; delete[] base;
} }

Loading…
Cancel
Save