diff --git a/db/db_impl.h b/db/db_impl.h index 8b434c118..11750a028 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1678,6 +1678,9 @@ class DBImpl : public DB { size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } + Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, + size_t preallocate_block_size, log::Writer** new_log); + // When set, we use a separate queue for writes that dont write to memtable. // In 2PC these are the writes at Prepare phase. const bool two_write_queues_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index d32c64ab0..1bc69b491 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1106,6 +1106,45 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, !kSeqPerBatch, kBatchPerTxn); } +Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, + size_t preallocate_block_size, log::Writer** new_log) { + Status s; + std::unique_ptr lfile; + + DBOptions db_options = + BuildDBOptions(immutable_db_options_, mutable_db_options_); + EnvOptions opt_env_options = + env_->OptimizeForLogWrite(env_options_, db_options); + std::string log_fname = + LogFileName(immutable_db_options_.wal_dir, log_file_num); + + if (recycle_log_number) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "reusing log %" PRIu64 " from recycle list\n", + recycle_log_number); + std::string old_log_fname = + LogFileName(immutable_db_options_.wal_dir, recycle_log_number); + s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile, + opt_env_options); + } else { + s = NewWritableFile(env_, log_fname, &lfile, opt_env_options); + } + + if (s.ok()) { + lfile->SetWriteLifeTimeHint(CalculateWALWriteHint()); + lfile->SetPreallocationBlockSize(preallocate_block_size); + + const auto& listeners = immutable_db_options_.listeners; + std::unique_ptr file_writer( + new WritableFileWriter(std::move(lfile), log_fname, opt_env_options, + env_, nullptr /* stats */, listeners)); + *new_log = new log::Writer(std::move(file_writer), log_file_num, + immutable_db_options_.recycle_log_file_num > 0, + immutable_db_options_.manual_wal_flush); + } + return s; +} + Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr, @@ -1166,40 +1205,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, return s; } impl->mutex_.Lock(); - auto write_hint = impl->CalculateWALWriteHint(); // Handles create_if_missing, error_if_exists s = impl->Recover(column_families); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); - std::unique_ptr lfile; - EnvOptions soptions(db_options); - EnvOptions opt_env_options = - impl->immutable_db_options_.env->OptimizeForLogWrite( - soptions, BuildDBOptions(impl->immutable_db_options_, - impl->mutable_db_options_)); - std::string log_fname = - LogFileName(impl->immutable_db_options_.wal_dir, new_log_number); - s = NewWritableFile(impl->immutable_db_options_.env, log_fname, &lfile, - opt_env_options); + log::Writer* new_log = nullptr; + const size_t preallocate_block_size = + impl->GetWalPreallocateBlockSize(max_write_buffer_size); + s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/, + preallocate_block_size, &new_log); if (s.ok()) { - lfile->SetWriteLifeTimeHint(write_hint); - lfile->SetPreallocationBlockSize( - impl->GetWalPreallocateBlockSize(max_write_buffer_size)); - { - InstrumentedMutexLock wl(&impl->log_write_mutex_); - impl->logfile_number_ = new_log_number; - const auto& listeners = impl->immutable_db_options_.listeners; - std::unique_ptr file_writer( - new WritableFileWriter(std::move(lfile), log_fname, opt_env_options, - impl->env_, nullptr /* stats */, listeners)); - impl->logs_.emplace_back( - new_log_number, - new log::Writer( - std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0, - impl->immutable_db_options_.manual_wal_flush)); - } + InstrumentedMutexLock wl(&impl->log_write_mutex_); + impl->logfile_number_ = new_log_number; + assert(new_log != nullptr); + impl->logs_.emplace_back(new_log_number, new_log); + } + if (s.ok()) { // set column family handles for (auto cf : column_families) { auto cfd = diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 21a9378d2..3edec9ac5 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1418,51 +1418,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // Log this later after lock release. It may be outdated, e.g., if background // flush happens before logging, but that should be ok. int num_imm_unflushed = cfd->imm()->NumNotFlushed(); - DBOptions db_options = - BuildDBOptions(immutable_db_options_, mutable_db_options_); const auto preallocate_block_size = GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); - auto write_hint = CalculateWALWriteHint(); mutex_.Unlock(); - { - std::string log_fname = - LogFileName(immutable_db_options_.wal_dir, new_log_number); - if (creating_new_log) { - EnvOptions opt_env_opt = - env_->OptimizeForLogWrite(env_options_, db_options); - if (recycle_log_number) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "reusing log %" PRIu64 " from recycle list\n", - recycle_log_number); - std::string old_log_fname = - LogFileName(immutable_db_options_.wal_dir, recycle_log_number); - s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile, - opt_env_opt); - } else { - s = NewWritableFile(env_, log_fname, &lfile, opt_env_opt); - } - if (s.ok()) { - // Our final size should be less than write_buffer_size - // (compression, etc) but err on the side of caution. - - // use preallocate_block_size instead - // of calling GetWalPreallocateBlockSize() - lfile->SetPreallocationBlockSize(preallocate_block_size); - lfile->SetWriteLifeTimeHint(write_hint); - std::unique_ptr file_writer(new WritableFileWriter( - std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */, - immutable_db_options_.listeners)); - new_log = new log::Writer( - std::move(file_writer), new_log_number, - immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_); - } - } - - if (s.ok()) { - SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); - context->superversion_context.NewSuperVersion(); - } + if (creating_new_log) { + // TODO: Write buffer size passed in should be max of all CF's instead + // of mutable_cf_options.write_buffer_size. + s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size, + &new_log); + } + if (s.ok()) { + SequenceNumber seq = versions_->LastSequence(); + new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + context->superversion_context.NewSuperVersion(); } ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] New memtable created with log file: #%" PRIu64