From e6acb874cd994434d6364f55dd8fb22055e1d4a2 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 15 Apr 2014 09:57:25 -0700 Subject: [PATCH] Don't roll empty logs Summary: With multiple column families, especially when manual Flush is executed, we might roll the log file, although the current log file is empty (no data has been written to the log). After the diff, we won't create new log file if current is empty. Next, I will write an algorithm that will flush column families that reference old log files (i.e., that weren't flushed in a while) Test Plan: Added an unit test. Confirmed that unit test failes in master Reviewers: dhruba, haobo, ljin, sdong Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D17631 --- db/column_family_test.cc | 42 +++++++++++++++++++++++-- db/db_impl.cc | 67 +++++++++++++++++++++++----------------- db/db_impl.h | 1 + db/version_set.cc | 2 +- 4 files changed, 81 insertions(+), 31 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 57acb1b5c..02236c966 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -29,12 +29,31 @@ std::string RandomString(Random* rnd, int len) { } } // anonymous namespace +// counts how many operations were performed +class EnvCounter : public EnvWrapper { + public: + explicit EnvCounter(Env* base) + : EnvWrapper(base), num_new_writable_file_(0) {} + int GetNumberOfNewWritableFileCalls() { + return num_new_writable_file_; + } + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { + ++num_new_writable_file_; + return EnvWrapper::NewWritableFile(f, r, soptions); + } + + private: + int num_new_writable_file_; +}; + class ColumnFamilyTest { public: ColumnFamilyTest() : rnd_(139) { - env_ = Env::Default(); + env_ = new EnvCounter(Env::Default()); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; + db_options_.env = env_; DestroyDB(dbname_, Options(db_options_, column_family_options_)); } @@ -299,7 +318,7 @@ class ColumnFamilyTest { DBOptions db_options_; std::string dbname_; DB* db_ = nullptr; - Env* env_; + EnvCounter* env_; Random rnd_; }; @@ -895,6 +914,25 @@ TEST(ColumnFamilyTest, ReadOnlyDBTest) { ASSERT_TRUE(!s.ok()); } +TEST(ColumnFamilyTest, DontRollEmptyLogs) { + Open(); + CreateColumnFamiliesAndReopen({"one", "two", "three", "four"}); + + for (int i = 0; i < handles_.size(); ++i) { + PutRandomData(i, 10, 100); + } + int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls(); + // this will trigger the flushes + ASSERT_OK(db_->Write(WriteOptions(), nullptr)); + + for (int i = 0; i < 4; ++i) { + dbfull()->TEST_WaitForFlushMemTable(handles_[i]); + } + int total_new_writable_files = + env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start; + ASSERT_EQ(total_new_writable_files, handles_.size() + 1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index e02df44b5..c50eb159f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -349,6 +349,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), logfile_number_(0), + log_empty_(true), default_cf_handle_(nullptr), tmp_batch_(), bg_schedule_needed_(false), @@ -3785,6 +3786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size()); if (status.ok() && options.sync) { + log_empty_ = false; if (options_.use_fsync) { StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); status = log_->file()->Fsync(); @@ -4057,57 +4059,66 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->PrevLogNumber() == 0); - uint64_t new_log_number = versions_->NewFileNumber(); + bool creating_new_log = !log_empty_; + uint64_t new_log_number = + creating_new_log ? versions_->NewFileNumber() : logfile_number_; SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { DelayLoggingAndReset(); - s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), - &lfile, - env_->OptimizeForLogWrite(storage_options_)); + if (creating_new_log) { + s = env_->NewWritableFile( + LogFileName(options_.wal_dir, new_log_number), &lfile, + env_->OptimizeForLogWrite(storage_options_)); + if (s.ok()) { + // Our final size should be less than write_buffer_size + // (compression, etc) but err on the side of caution. + lfile->SetPreallocationBlockSize(1.1 * + cfd->options()->write_buffer_size); + new_log = new log::Writer(std::move(lfile)); + } + } + if (s.ok()) { - // Our final size should be less than write_buffer_size - // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * - cfd->options()->write_buffer_size); - new_log = new log::Writer(std::move(lfile)); new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); new_superversion = new SuperVersion(); } - Log(options_.info_log, - "New memtable created with log file: #%lu\n", - (unsigned long)new_log_number); } mutex_.Lock(); if (!s.ok()) { + // how do we fail if we're not creating new log? + assert(creating_new_log); // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); assert(!new_mem); assert(!new_log); break; } - logfile_number_ = new_log_number; - assert(new_log != nullptr); - // TODO(icanadi) delete outside of mutex - delete log_.release(); - log_.reset(new_log); + if (creating_new_log) { + logfile_number_ = new_log_number; + assert(new_log != nullptr); + // TODO(icanadi) delete outside of mutex + delete log_.release(); + log_.reset(new_log); + log_empty_ = true; + alive_log_files_.push_back(logfile_number_); + for (auto cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (cfd->mem()->GetFirstSequenceNumber() == 0 && + cfd->imm()->size() == 0) { + cfd->SetLogNumber(logfile_number_); + } + } + } cfd->mem()->SetNextLogNumber(logfile_number_); cfd->imm()->Add(cfd->mem()); if (force) { cfd->imm()->FlushRequested(); } new_mem->Ref(); - alive_log_files_.push_back(logfile_number_); - for (auto cfd : *versions_->GetColumnFamilySet()) { - // all this is just optimization to delete logs that - // are no longer needed -- if CF is empty, that means it - // doesn't need that particular log to stay alive, so we just - // advance the log number. no need to persist this in the manifest - if (cfd->mem()->GetFirstSequenceNumber() == 0 && - cfd->imm()->size() == 0) { - cfd->SetLogNumber(logfile_number_); - } - } cfd->SetMemtable(new_mem); Log(options_.info_log, "[CF %" PRIu32 "] New memtable created with log file: #%lu\n", diff --git a/db/db_impl.h b/db/db_impl.h index c4efb919e..e0413a748 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -421,6 +421,7 @@ class DBImpl : public DB { port::CondVar bg_cv_; // Signalled when background work finishes uint64_t logfile_number_; unique_ptr log_; + bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; unique_ptr column_family_memtables_; std::deque alive_log_files_; diff --git a/db/version_set.cc b/db/version_set.cc index 9cf63cb81..2c3121752 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1742,7 +1742,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } if (max_log_number_in_batch != 0) { - assert(column_family_data->GetLogNumber() < max_log_number_in_batch); + assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch); } AppendVersion(column_family_data, v);