diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 57acb1b5c..02236c966 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -29,12 +29,31 @@ std::string RandomString(Random* rnd, int len) { } } // anonymous namespace +// counts how many operations were performed +class EnvCounter : public EnvWrapper { + public: + explicit EnvCounter(Env* base) + : EnvWrapper(base), num_new_writable_file_(0) {} + int GetNumberOfNewWritableFileCalls() { + return num_new_writable_file_; + } + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { + ++num_new_writable_file_; + return EnvWrapper::NewWritableFile(f, r, soptions); + } + + private: + int num_new_writable_file_; +}; + class ColumnFamilyTest { public: ColumnFamilyTest() : rnd_(139) { - env_ = Env::Default(); + env_ = new EnvCounter(Env::Default()); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; + db_options_.env = env_; DestroyDB(dbname_, Options(db_options_, column_family_options_)); } @@ -299,7 +318,7 @@ class ColumnFamilyTest { DBOptions db_options_; std::string dbname_; DB* db_ = nullptr; - Env* env_; + EnvCounter* env_; Random rnd_; }; @@ -895,6 +914,25 @@ TEST(ColumnFamilyTest, ReadOnlyDBTest) { ASSERT_TRUE(!s.ok()); } +TEST(ColumnFamilyTest, DontRollEmptyLogs) { + Open(); + CreateColumnFamiliesAndReopen({"one", "two", "three", "four"}); + + for (int i = 0; i < handles_.size(); ++i) { + PutRandomData(i, 10, 100); + } + int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls(); + // this will trigger the flushes + ASSERT_OK(db_->Write(WriteOptions(), nullptr)); + + for (int i = 0; i < 4; ++i) { + dbfull()->TEST_WaitForFlushMemTable(handles_[i]); + } + int total_new_writable_files = + env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start; + ASSERT_EQ(total_new_writable_files, handles_.size() + 1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index e02df44b5..c50eb159f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -349,6 +349,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), logfile_number_(0), + log_empty_(true), default_cf_handle_(nullptr), tmp_batch_(), bg_schedule_needed_(false), @@ -3785,6 +3786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size()); if (status.ok() && options.sync) { + log_empty_ = false; if (options_.use_fsync) { StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); status = log_->file()->Fsync(); @@ -4057,57 +4059,66 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->PrevLogNumber() == 0); - uint64_t new_log_number = versions_->NewFileNumber(); + bool creating_new_log = !log_empty_; + uint64_t new_log_number = + creating_new_log ? versions_->NewFileNumber() : logfile_number_; SuperVersion* new_superversion = nullptr; mutex_.Unlock(); { DelayLoggingAndReset(); - s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), - &lfile, - env_->OptimizeForLogWrite(storage_options_)); + if (creating_new_log) { + s = env_->NewWritableFile( + LogFileName(options_.wal_dir, new_log_number), &lfile, + env_->OptimizeForLogWrite(storage_options_)); + if (s.ok()) { + // Our final size should be less than write_buffer_size + // (compression, etc) but err on the side of caution. + lfile->SetPreallocationBlockSize(1.1 * + cfd->options()->write_buffer_size); + new_log = new log::Writer(std::move(lfile)); + } + } + if (s.ok()) { - // Our final size should be less than write_buffer_size - // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * - cfd->options()->write_buffer_size); - new_log = new log::Writer(std::move(lfile)); new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); new_superversion = new SuperVersion(); } - Log(options_.info_log, - "New memtable created with log file: #%lu\n", - (unsigned long)new_log_number); } mutex_.Lock(); if (!s.ok()) { + // how do we fail if we're not creating new log? + assert(creating_new_log); // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); assert(!new_mem); assert(!new_log); break; } - logfile_number_ = new_log_number; - assert(new_log != nullptr); - // TODO(icanadi) delete outside of mutex - delete log_.release(); - log_.reset(new_log); + if (creating_new_log) { + logfile_number_ = new_log_number; + assert(new_log != nullptr); + // TODO(icanadi) delete outside of mutex + delete log_.release(); + log_.reset(new_log); + log_empty_ = true; + alive_log_files_.push_back(logfile_number_); + for (auto cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (cfd->mem()->GetFirstSequenceNumber() == 0 && + cfd->imm()->size() == 0) { + cfd->SetLogNumber(logfile_number_); + } + } + } cfd->mem()->SetNextLogNumber(logfile_number_); cfd->imm()->Add(cfd->mem()); if (force) { cfd->imm()->FlushRequested(); } new_mem->Ref(); - alive_log_files_.push_back(logfile_number_); - for (auto cfd : *versions_->GetColumnFamilySet()) { - // all this is just optimization to delete logs that - // are no longer needed -- if CF is empty, that means it - // doesn't need that particular log to stay alive, so we just - // advance the log number. no need to persist this in the manifest - if (cfd->mem()->GetFirstSequenceNumber() == 0 && - cfd->imm()->size() == 0) { - cfd->SetLogNumber(logfile_number_); - } - } cfd->SetMemtable(new_mem); Log(options_.info_log, "[CF %" PRIu32 "] New memtable created with log file: #%lu\n", diff --git a/db/db_impl.h b/db/db_impl.h index c4efb919e..e0413a748 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -421,6 +421,7 @@ class DBImpl : public DB { port::CondVar bg_cv_; // Signalled when background work finishes uint64_t logfile_number_; unique_ptr log_; + bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; unique_ptr column_family_memtables_; std::deque alive_log_files_; diff --git a/db/version_set.cc b/db/version_set.cc index 9cf63cb81..2c3121752 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1742,7 +1742,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } if (max_log_number_in_batch != 0) { - assert(column_family_data->GetLogNumber() < max_log_number_in_batch); + assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch); } AppendVersion(column_family_data, v);