diff --git a/db/db_impl.cc b/db/db_impl.cc index 7c57ace97..97970d6ef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -871,6 +871,9 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, std::vector list; for (MemTable* m : mems) { + Log(options_.info_log, + "Flushing memtable with log file: %lu\n", + m->GetLogNumber()); list.push_back(m->NewIterator()); } Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0], @@ -964,11 +967,22 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) { } // record the logfile_number_ before we release the mutex + // entries mems are (implicitly) sorted in ascending order by their created + // time. We will use the first memtable's `edit` to keep the meta info for + // this flush. MemTable* m = mems[0]; VersionEdit* edit = m->GetEdits(); edit->SetPrevLogNumber(0); - edit->SetLogNumber(m->GetNextLogNumber()); // Earlier logs no longer needed - auto to_delete = m->GetLogNumber(); + // SetLogNumber(log_num) indicates logs with number smaller than log_num + // will no longer be picked up for recovery. + edit->SetLogNumber( + mems.back()->GetNextLogNumber() + ); + + std::vector logs_to_delete; + for (auto mem : mems) { + logs_to_delete.push_back(mem->GetLogNumber()); + } // This will release and re-acquire the mutex. Status s = WriteLevel0Table(mems, edit, &file_number); @@ -994,12 +1008,17 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) { // should store the file number in the shared state, and retry // However, for now, PurgeObsoleteFiles will take care of that // anyways. - if (options_.purge_log_after_memtable_flush && - !disable_delete_obsolete_files_ && - to_delete > 0) { - mutex_.Unlock(); - DeleteLogFile(to_delete); - mutex_.Lock(); + bool should_delete_log = options_.purge_log_after_memtable_flush && + !disable_delete_obsolete_files_; + if (should_delete_log) { + for (auto log_num : logs_to_delete) { + if (log_num < 0) { + continue; + } + mutex_.Unlock(); + DeleteLogFile(log_num); + mutex_.Lock(); + } } } return s; @@ -2874,9 +2893,12 @@ Status DBImpl::MakeRoomForWrite(bool force) { if (force) { imm_.FlushRequested(); } - mem_ = new MemTable(internal_comparator_, mem_rep_factory_, - NumberLevels(), options_); + mem_ = new MemTable( + internal_comparator_, mem_rep_factory_, NumberLevels(), options_); mem_->Ref(); + Log(options_.info_log, + "New memtable created with log file: #%lu\n", + logfile_number_); mem_->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); @@ -3129,8 +3151,9 @@ Status DBImpl::DeleteFile(std::string name) { FileType type; if (!ParseFileName(name, &number, &type) || (type != kTableFile)) { - Log(options_.info_log, "DeleteFile #%lld FAILED. Invalid file name\n", - static_cast(number)); + Log(options_.info_log, + "DeleteFile #%ld FAILED. Invalid file name\n", + number); return Status::InvalidArgument("Invalid file name"); } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 5ff13090b..5f7c7c503 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -98,7 +98,6 @@ Status MemTableList::InstallMemtableFlushResults( // flush was sucessful bool first = true; for (MemTable* m : mems) { - // All the edits are associated with the first memtable of this batch. assert(first || m->GetEdits()->NumEntries() == 0); first = false; @@ -124,22 +123,21 @@ Status MemTableList::InstallMemtableFlushResults( if (!m->flush_completed_) { break; } - first = true; - Log(info_log, - "Level-0 commit table #%llu: started", - (unsigned long long)m->file_number_); + Log(info_log, "Level-0 commit table #%lu started", m->file_number_); // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu); // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. + uint64_t mem_id = 1; // how many memtables has been flushed. do { if (s.ok()) { // commit new state - Log(info_log, "Level-0 commit table #%llu: done %s", - (unsigned long long)m->file_number_, - first ? "": "bulk"); + Log(info_log, + "Level-0 commit table #%lu: memtable #%lu done", + m->file_number_, + mem_id); memlist_.remove(m); assert(m->file_number_ > 0); @@ -152,8 +150,10 @@ Status MemTableList::InstallMemtableFlushResults( size_--; } else { //commit failed. setup state so that we can flush again. - Log(info_log, "Level-0 commit table #%llu: failed", - (unsigned long long)m->file_number_); + Log(info_log, + "Level-0 commit table #%lu: memtable #%lu failed", + m->file_number_, + mem_id); m->flush_completed_ = false; m->flush_in_progress_ = false; m->edit_.Clear(); @@ -163,7 +163,7 @@ Status MemTableList::InstallMemtableFlushResults( imm_flush_needed.Release_Store((void *)1); s = Status::IOError("Unable to commit flushed memtable"); } - first = false; + ++mem_id; } while (!memlist_.empty() && (m = memlist_.back()) && m->file_number_ == file_number); } diff --git a/db/memtablelist.h b/db/memtablelist.h index 31b86571e..20ea9ecda 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -54,7 +54,8 @@ class MemTableList { // not yet started. bool IsFlushPending(int min_write_buffer_number_to_merge); - // Returns the earliest memtables that needs to be flushed. + // Returns the earliest memtables that needs to be flushed. The returned + // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(std::vector* mems); // Commit a successful flush in the manifest file diff --git a/db/version_edit.h b/db/version_edit.h index 01f3da287..196914e2b 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -34,9 +34,9 @@ struct FileMetaData { class VersionEdit { public: - /* implicit */ VersionEdit(int number_levels) : - number_levels_(number_levels) { - Clear(); + explicit VersionEdit(int number_levels) : + number_levels_(number_levels) { + Clear(); } ~VersionEdit() { }