diff --git a/db/db_bench.cc b/db/db_bench.cc index b8b4c319c..fbc37be29 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -378,6 +378,9 @@ static enum RepFactory FLAGS_rep_factory; // The possible merge operators are defined in utilities/merge_operators.h static std::string FLAGS_merge_operator = ""; +static auto FLAGS_purge_log_after_memtable_flush = + leveldb::Options().purge_log_after_memtable_flush; + namespace leveldb { // Helper for quickly generating random data. @@ -1226,6 +1229,7 @@ class Benchmark { ); break; } + options.purge_log_after_memtable_flush = FLAGS_purge_log_after_memtable_flush; if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) { if (FLAGS_max_bytes_for_level_multiplier_additional.size() != (unsigned int)FLAGS_num_levels) { @@ -2518,6 +2522,9 @@ int main(int argc, char** argv) { FLAGS_filter_deletes = n; } else if (sscanf(argv[i], "--merge_operator=%s", buf) == 1) { FLAGS_merge_operator = buf; + } else if (sscanf(argv[i], "--purge_log_after_memtable_flush=%d%c", &n, &junk) + == 1 && (n == 0 || n ==1 )) { + FLAGS_purge_log_after_memtable_flush = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 69e153bdd..8124b18f3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -420,6 +420,26 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { deletion_state.prevlognumber = versions_->PrevLogNumber(); } +Status DBImpl::DeleteLogFile(uint64_t number) { + Status s; + auto filename = LogFileName(dbname_, number); + if (options_.WAL_ttl_seconds > 0) { + s = env_->RenameFile(filename, + ArchivedLogFileName(dbname_, number)); + + if (!s.ok()) { + Log(options_.info_log, "RenameFile logfile #%lu FAILED", number); + } + } else { + s = env_->DeleteFile(filename); + if(!s.ok()) { + Log(options_.info_log, "Delete logfile #%lu FAILED", number); + } + } + + return s; +} + // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. If the removed file // is a sst file, then it returns the file number in files_to_evict. @@ -474,19 +494,9 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { state.files_to_evict.push_back(number); } Log(options_.info_log, "Delete type=%d #%lu", int(type), number); - if (type == kLogFile && options_.WAL_ttl_seconds > 0) { - Status st = env_->RenameFile( - LogFileName(dbname_, number), - ArchivedLogFileName(dbname_, number) - ); - if (!st.ok()) { - Log( - options_.info_log, "RenameFile type=%d #%lu FAILED", - int(type), - number - ); - } + if (type == kLogFile) { + DeleteLogFile(number); } else { Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); if (!st.ok()) { @@ -499,7 +509,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } } - // Delete old log files. + // Delete old info log files. size_t old_log_file_count = old_log_files.size(); // NOTE: Currently we only support log purge when options_.db_log_dir is // located in `dbname` directory. @@ -904,7 +914,6 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { } // Save the contents of the earliest memtable as a new Table - // This will release and re-acquire the mutex. uint64_t file_number; std::vector mems; imm_.PickMemtablesToFlush(&mems); @@ -918,8 +927,10 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { MemTable* m = mems[0]; VersionEdit* edit = m->GetEdits(); edit->SetPrevLogNumber(0); - edit->SetLogNumber(m->GetLogNumber()); // Earlier logs no longer needed + edit->SetLogNumber(m->GetNextLogNumber()); // Earlier logs no longer needed + auto to_delete = m->GetLogNumber(); + // This will release and re-acquire the mutex. Status s = WriteLevel0Table(mems, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { @@ -937,10 +948,17 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { if (madeProgress) { *madeProgress = 1; } + MaybeScheduleLogDBDeployStats(); - // we could have deleted obsolete files here, but it is not - // absolutely necessary because it could be also done as part - // of other background compaction + // TODO: if log deletion failed for any reason, we probably + // should store the file number in the shared state, and retry + // However, for now, PurgeObsoleteFiles will take care of that + // anyways. + if (options_.purge_log_after_memtable_flush && to_delete > 0) { + mutex_.Unlock(); + DeleteLogFile(to_delete); + mutex_.Lock(); + } } return s; } @@ -2743,7 +2761,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); - mem_->SetLogNumber(logfile_number_); + mem_->SetNextLogNumber(logfile_number_); imm_.Add(mem_); if (force) { imm_.FlushRequested(); @@ -2751,6 +2769,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { mem_ = new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_); mem_->Ref(); + mem_->SetLogNumber(logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } @@ -3113,6 +3132,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { s = impl->versions_->LogAndApply(&edit, &impl->mutex_); } if (s.ok()) { + impl->mem_->SetLogNumber(impl->logfile_number_); impl->DeleteObsoleteFiles(); impl->MaybeScheduleCompaction(); impl->MaybeScheduleLogDBDeployStats(); diff --git a/db/db_impl.h b/db/db_impl.h index d7e7f1d41..def204aa6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -212,6 +212,8 @@ class DBImpl : public DB { // Removes the file listed in files_to_evict from the table_cache void EvictObsoleteFiles(DeletionState& deletion_state); + Status DeleteLogFile(uint64_t number); + void PurgeObsoleteWALFiles(); Status AppendSortedWalsOfType(const std::string& path, diff --git a/db/memtable.cc b/db/memtable.cc index eaa0777cd..255169da2 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -29,6 +29,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, file_number_(0), edit_(numlevel), first_seqno_(0), + mem_next_logfile_number_(0), mem_logfile_number_(0) { } MemTable::~MemTable() { diff --git a/db/memtable.h b/db/memtable.h index e3b66f29f..9b3755cd0 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -92,6 +92,14 @@ class MemTable { // into the memtable SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + // Returns the next active logfile number when this memtable is about to + // be flushed to storage + uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } + + // Sets the next active logfile number when this memtable is about to + // be flushed to storage + void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + // Returns the logfile number that can be safely deleted when this // memstore is flushed to storage uint64_t GetLogNumber() { return mem_logfile_number_; } @@ -127,6 +135,10 @@ class MemTable { SequenceNumber first_seqno_; // The log files earlier than this number can be deleted. + uint64_t mem_next_logfile_number_; + + // The log file that backs this memtable (to be deleted when + // memtable flush is done) uint64_t mem_logfile_number_; // No copying allowed diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0c8bdde3a..c8fc6ea39 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -548,6 +548,11 @@ struct Options { // an application to modify/delete a key-value during background compaction. // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory; + + // Remove the log file immediately after the corresponding memtable is flushed + // to data file. + // Default: true + bool purge_log_after_memtable_flush; }; // diff --git a/util/options.cc b/util/options.cc index 3ca71e5ff..39c83bc9e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -85,7 +85,8 @@ Options::Options() memtable_factory(std::shared_ptr(new SkipListFactory)), compaction_filter_factory( std::shared_ptr( - new DefaultCompactionFilterFactory())) { + new DefaultCompactionFilterFactory())), + purge_log_after_memtable_flush(true) { assert(memtable_factory.get() != nullptr); } @@ -242,6 +243,8 @@ Options::Dump(Logger* log) const compaction_options_universal.min_merge_width); Log(log," Options.compaction_options_universal.max_merge_width: %u", compaction_options_universal.max_merge_width); + Log(log," Options.purge_log_after_memtable_flush: %d", + purge_log_after_memtable_flush); } // Options::Dump //