From 1510339e5257073af82f8c07e6fa1f2a9144e6aa Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 8 Nov 2013 15:23:46 -0800 Subject: [PATCH] Speed up FindObsoleteFiles Summary: Here's one solution we discussed on speeding up FindObsoleteFiles. Keep a set of all files in DBImpl and update the set every time we create a file. I probably missed few other spots where we create a file. It might speed things up a bit, but makes code uglier. I don't really like it. Much better approach would be to abstract all file handling to a separate class. Think of it as layer between DBImpl and Env. Having a separate class deal with file namings and deletion would benefit both code cleanliness (especially with huge DBImpl) and speed things up. It will take a huge effort to do this, though. Let's discuss offline today. Test Plan: Ran ./db_stress, verified that files are getting deleted Reviewers: dhruba, haobo, kailiu, emayanke Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D13827 --- db/db_filesnapshot.cc | 12 ++- db/db_impl.cc | 186 +++++++++++++++++--------------------- db/db_impl.h | 50 +++++++--- db/db_test.cc | 7 +- db/version_set.cc | 29 +++++- db/version_set.h | 4 + include/rocksdb/options.h | 3 +- util/options.cc | 2 +- 8 files changed, 169 insertions(+), 124 deletions(-) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index dcbfebd7e..7b9c5ddeb 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -28,9 +28,15 @@ Status DBImpl::DisableFileDeletions() { } Status DBImpl::EnableFileDeletions() { - MutexLock l(&mutex_); - disable_delete_obsolete_files_ = false; - Log(options_.info_log, "File Deletions Enabled"); + DeletionState deletion_state; + { + MutexLock l(&mutex_); + disable_delete_obsolete_files_ = false; + Log(options_.info_log, "File Deletions Enabled"); + FindObsoleteFiles(deletion_state, true); + } + PurgeObsoleteFiles(deletion_state); + LogFlush(options_.info_log); return Status::OK(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index cfd5eef5e..4eef17495 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -110,22 +110,6 @@ struct DBImpl::CompactionState { } }; -struct DBImpl::DeletionState { - - // the list of all live files that cannot be deleted - std::vector live; - - // a list of all siles that exists in the db directory - std::vector allfiles; - - // the current filenumber, lognumber and prevlognumber - // that corresponds to the set of files in 'live'. - uint64_t filenumber, lognumber, prevlognumber; - - // the list of all files to be evicted from the table cache - std::vector files_to_evict; -}; - // Fix user-supplied options to be reasonable template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { @@ -451,9 +435,9 @@ void DBImpl::MaybeDumpStats() { } } -// Returns the list of live files in 'live' and the list +// Returns the list of live files in 'sstlive' and the list // of all files in the filesystem in 'allfiles'. -void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { +void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, bool force) { mutex_.AssertHeld(); // if deletion is disabled, do nothing @@ -461,10 +445,15 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { return; } + // store the current filenum, lognum, etc + deletion_state.manifest_file_number = versions_->ManifestFileNumber(); + deletion_state.log_number = versions_->LogNumber(); + deletion_state.prev_log_number = versions_->PrevLogNumber(); + // This method is costly when the number of files is large. // Do not allow it to trigger more often than once in // delete_obsolete_files_period_micros. - if (options_.delete_obsolete_files_period_micros != 0) { + if (!force && options_.delete_obsolete_files_period_micros != 0) { const uint64_t now_micros = env_->NowMicros(); if (delete_obsolete_files_last_run_ + options_.delete_obsolete_files_period_micros > now_micros) { @@ -475,9 +464,9 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { // Make a list of all of the live files; set is slow, should not // be used. - deletion_state.live.assign(pending_outputs_.begin(), - pending_outputs_.end()); - versions_->AddLiveFiles(&deletion_state.live); + deletion_state.sstlive.assign(pending_outputs_.begin(), + pending_outputs_.end()); + versions_->AddLiveFiles(&deletion_state.sstlive); // set of all files in the directory env_->GetChildren(dbname_, &deletion_state.allfiles); // Ignore errors @@ -492,59 +481,51 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { log_files.end() ); } - - // store the current filenum, lognum, etc - deletion_state.filenumber = versions_->ManifestFileNumber(); - deletion_state.lognumber = versions_->LogNumber(); - deletion_state.prevlognumber = versions_->PrevLogNumber(); -} - -Status DBImpl::DeleteLogFile(uint64_t number) { - Status s; - auto filename = LogFileName(options_.wal_dir, number); - if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) { - s = env_->RenameFile(filename, - ArchivedLogFileName(options_.wal_dir, number)); - - if (!s.ok()) { - Log(options_.info_log, "RenameFile logfile #%lu FAILED", number); - } - } else { - s = env_->DeleteFile(filename); - if(!s.ok()) { - Log(options_.info_log, "Delete logfile #%lu FAILED", number); - } - } - - return s; } // Diffs the files listed in filenames and those that do not -// belong to live files are posibly removed. If the removed file -// is a sst file, then it returns the file number in files_to_evict. +// belong to live files are posibly removed. Also, removes all the +// files in sstdeletefiles and logdeletefiles. // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { + // if deletion is disabled, do nothing + if (disable_delete_obsolete_files_) { + return; + } + uint64_t number; FileType type; std::vector old_log_files; // Now, convert live list to an unordered set, WITHOUT mutex held; // set is slow. - std::unordered_set live_set(state.live.begin(), - state.live.end()); + std::unordered_set live_set(state.sstlive.begin(), + state.sstlive.end()); + + state.allfiles.reserve(state.allfiles.size() + state.sstdeletefiles.size()); + for (auto filenum : state.sstdeletefiles) { + state.allfiles.push_back(TableFileName("", filenum)); + } + + state.allfiles.reserve(state.allfiles.size() + state.logdeletefiles.size()); + for (auto filenum : state.logdeletefiles) { + if (filenum > 0) { + state.allfiles.push_back(LogFileName("", filenum)); + } + } for (size_t i = 0; i < state.allfiles.size(); i++) { if (ParseFileName(state.allfiles[i], &number, &type)) { bool keep = true; switch (type) { case kLogFile: - keep = ((number >= state.lognumber) || - (number == state.prevlognumber)); + keep = ((number >= state.log_number) || + (number == state.prev_log_number)); break; case kDescriptorFile: // Keep my manifest file, and any newer incarnations' // (in case there is a race that allows other incarnations) - keep = (number >= state.filenumber); + keep = (number >= state.manifest_file_number); break; case kTableFile: keep = (live_set.find(number) != live_set.end()); @@ -570,19 +551,25 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (!keep) { if (type == kTableFile) { - // record the files to be evicted from the cache - state.files_to_evict.push_back(number); + // evict from cache + table_cache_->Evict(number); } Log(options_.info_log, "Delete type=%d #%lu", int(type), number); - if (type == kLogFile) { - DeleteLogFile(number); + Status st; + if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || + options_.WAL_size_limit_MB > 0)) { + st = env_->RenameFile(dbname_ + "/" + state.allfiles[i], + ArchivedLogFileName(options_.wal_dir, + number)); + if (!st.ok()) { + Log(options_.info_log, "RenameFile logfile #%lu FAILED", number); + } } else { - Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); + st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); if (!st.ok()) { - Log(options_.info_log, "Delete type=%d #%lld FAILED\n", - int(type), - static_cast(number)); + Log(options_.info_log, "Delete type=%d #%lu FAILED\n", + int(type), number); } } } @@ -605,20 +592,14 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } } PurgeObsoleteWALFiles(); -} - -void DBImpl::EvictObsoleteFiles(DeletionState& state) { - for (unsigned int i = 0; i < state.files_to_evict.size(); i++) { - table_cache_->Evict(state.files_to_evict[i]); - } + LogFlush(options_.info_log); } void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); DeletionState deletion_state; - FindObsoleteFiles(deletion_state); + FindObsoleteFiles(deletion_state, true); PurgeObsoleteFiles(deletion_state); - EvictObsoleteFiles(deletion_state); } // 1. Go through all archived files and @@ -1091,7 +1072,8 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, return s; } -Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) { +Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, + DeletionState& deletion_state) { mutex_.AssertHeld(); assert(imm_.size() != 0); @@ -1149,22 +1131,13 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) { } MaybeScheduleLogDBDeployStats(); - // TODO: if log deletion failed for any reason, we probably - // should store the file number in the shared state, and retry - // However, for now, PurgeObsoleteFiles will take care of that - // anyways. - bool should_delete_log = options_.purge_log_after_memtable_flush && - !disable_delete_obsolete_files_; - if (should_delete_log) { - for (auto log_num : logs_to_delete) { - if (log_num < 0) { - continue; - } - mutex_.Unlock(); - DeleteLogFile(log_num); - LogFlush(options_.info_log); - mutex_.Lock(); - } + + if (options_.purge_log_after_memtable_flush && + !disable_delete_obsolete_files_) { + // add to deletion state + deletion_state.logdeletefiles.insert(deletion_state.logdeletefiles.end(), + logs_to_delete.begin(), + logs_to_delete.end()); } } return s; @@ -1621,25 +1594,27 @@ void DBImpl::BGWorkCompaction(void* db) { reinterpret_cast(db)->BackgroundCallCompaction(); } -Status DBImpl::BackgroundFlush(bool* madeProgress) { +Status DBImpl::BackgroundFlush(bool* madeProgress, + DeletionState& deletion_state) { Status stat; while (stat.ok() && imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(madeProgress); + stat = FlushMemTableToOutputFile(madeProgress, deletion_state); } return stat; } void DBImpl::BackgroundCallFlush() { bool madeProgress = false; + DeletionState deletion_state; assert(bg_flush_scheduled_); MutexLock l(&mutex_); if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundFlush(&madeProgress); + Status s = BackgroundFlush(&madeProgress, deletion_state); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -1652,9 +1627,18 @@ void DBImpl::BackgroundCallFlush() { LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); + // clean up all the files we might have created + FindObsoleteFiles(deletion_state, true); } } + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { + mutex_.Unlock(); + PurgeObsoleteFiles(deletion_state); + mutex_.Lock(); + } + bg_flush_scheduled_--; if (madeProgress) { MaybeScheduleFlushOrCompaction(); @@ -1690,17 +1674,16 @@ void DBImpl::BackgroundCallCompaction() { LogFlush(options_.info_log); env_->SleepForMicroseconds(1000000); mutex_.Lock(); + // clean up all the files we might have created + FindObsoleteFiles(deletion_state, true); } } // delete unnecessary files if any, this is done outside the mutex - if (!deletion_state.live.empty()) { + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); PurgeObsoleteFiles(deletion_state); - EvictObsoleteFiles(deletion_state); - LogFlush(options_.info_log); mutex_.Lock(); - } bg_compaction_scheduled_--; @@ -1728,7 +1711,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", options_.max_background_compactions - bg_compaction_scheduled_); - Status stat = FlushMemTableToOutputFile(madeProgress); + Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state); if (!stat.ok()) { return stat; } @@ -1783,11 +1766,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact); + status = DoCompactionWork(compact, deletion_state); CleanupCompaction(compact, status); versions_->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); - FindObsoleteFiles(deletion_state); + versions_->GetAndFreeObsoleteFiles(&deletion_state.sstdeletefiles); + FindObsoleteFiles(deletion_state, false); *madeProgress = true; } c.reset(); @@ -2044,7 +2028,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( return 0; } -Status DBImpl::DoCompactionWork(CompactionState* compact) { +Status DBImpl::DoCompactionWork(CompactionState* compact, + DeletionState& deletion_state) { assert(compact); int64_t imm_micros = 0; // Micros spent doing imm_ compactions Log(options_.info_log, @@ -2120,7 +2105,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { LogFlush(options_.info_log); mutex_.Lock(); if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { - FlushMemTableToOutputFile(); + FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); @@ -3376,7 +3361,7 @@ Status DBImpl::DeleteFile(std::string name) { edit.DeleteFile(level, number); status = versions_->LogAndApply(&edit, &mutex_); if (status.ok()) { - FindObsoleteFiles(deletion_state); + versions_->GetAndFreeObsoleteFiles(&deletion_state.sstdeletefiles); } } // lock released here LogFlush(options_.info_log); @@ -3384,7 +3369,6 @@ Status DBImpl::DeleteFile(std::string name) { if (status.ok()) { // remove files outside the db-lock PurgeObsoleteFiles(deletion_state); - EvictObsoleteFiles(deletion_state); } return status; } diff --git a/db/db_impl.h b/db/db_impl.h index c70ec23d4..3c8788e13 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -69,6 +69,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options); virtual Status DisableFileDeletions(); virtual Status EnableFileDeletions(); + // All the returned filenames start with "/" virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, bool flush_memtable = true); @@ -143,7 +144,6 @@ class DBImpl : public DB { friend class DB; struct CompactionState; struct Writer; - struct DeletionState; Status NewDB(); @@ -157,12 +157,37 @@ class DBImpl : public DB { const Status CreateArchivalDirectory(); + struct DeletionState { + inline bool HaveSomethingToDelete() const { + return allfiles.size() || sstdeletefiles.size() || logdeletefiles.size(); + } + + // a list of all files that we'll consider deleting + // (every once in a while this is filled up with all files + // in the DB directory) + std::vector allfiles; + + // the list of all live sst files that cannot be deleted + std::vector sstlive; + + // a list of sst files that we need to delete + std::vector sstdeletefiles; + + // a list of log files that we need to delete + std::vector logdeletefiles; + + // the current manifest_file_number, log_number and prev_log_number + // that corresponds to the set of files in 'live'. + uint64_t manifest_file_number, log_number, prev_log_number; + }; + // Delete any unneeded files and stale in-memory entries. void DeleteObsoleteFiles(); // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status FlushMemTableToOutputFile(bool* madeProgress = nullptr); + Status FlushMemTableToOutputFile(bool* madeProgress, + DeletionState& deletion_state); Status RecoverLogFile(uint64_t log_number, VersionEdit* edit, @@ -198,9 +223,10 @@ class DBImpl : public DB { void BackgroundCallCompaction(); void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); - Status BackgroundFlush(bool* madeProgress); + Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact, Status status); - Status DoCompactionWork(CompactionState* compact); + Status DoCompactionWork(CompactionState* compact, + DeletionState& deletion_state); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); @@ -208,21 +234,19 @@ class DBImpl : public DB { void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); - // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'allfiles'. - void FindObsoleteFiles(DeletionState& deletion_state); + // If force == false and the last call was less than + // options_.delete_obsolete_files_period_micros microseconds ago, + // it will not fill up the deletion_state + void FindObsoleteFiles(DeletionState& deletion_state, bool force); // Diffs the files listed in filenames and those that do not - // belong to live files are posibly removed. If the removed file - // is a sst file, then it returns the file number in files_to_evict. + // belong to live files are posibly removed. Also, removes all the + // files in sstdeletefiles and logdeletefiles. + // It is not necessary to hold the mutex when invoking this method. void PurgeObsoleteFiles(DeletionState& deletion_state); - // Removes the file listed in files_to_evict from the table_cache - void EvictObsoleteFiles(DeletionState& deletion_state); - - Status DeleteLogFile(uint64_t number); - void PurgeObsoleteWALFiles(); Status AppendSortedWalsOfType(const std::string& path, diff --git a/db/db_test.cc b/db/db_test.cc index f3be5a94f..5988abb68 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3616,8 +3616,11 @@ TEST(DBTest, SnapshotFiles) { ASSERT_EQ(system(mkdir.c_str()), 0); for (unsigned int i = 0; i < files.size(); i++) { - std::string src = dbname_ + "/" + files[i]; - std::string dest = snapdir + "/" + files[i]; + // our clients require that GetLiveFiles returns + // files with "/" as first character! + ASSERT_EQ(files[i][0], '/'); + std::string src = dbname_ + files[i]; + std::string dest = snapdir + files[i]; uint64_t size; ASSERT_OK(env_->GetFileSize(src, &size)); diff --git a/db/version_set.cc b/db/version_set.cc index 1745922bb..391bbb0ad 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -50,7 +50,7 @@ Version::~Version() { assert(f->refs > 0); f->refs--; if (f->refs <= 0) { - delete f; + vset_->obsolete_files_.push_back(f); } } } @@ -1161,6 +1161,7 @@ VersionSet::VersionSet(const std::string& dbname, VersionSet::~VersionSet() { current_->Unref(); assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty + GetAndFreeObsoleteFiles(nullptr); delete[] compact_pointer_; delete[] max_file_size_; delete[] level_max_bytes_; @@ -1239,6 +1240,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, std::string new_manifest_file; uint64_t new_manifest_file_size = 0; Status s; + // we will need this if we are creating new manifest + uint64_t old_manifest_file_number = manifest_file_number_; // No need to perform this check if a new Manifest is being created anyways. if (!descriptor_log_ || @@ -1247,7 +1250,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, manifest_file_number_ = NewFileNumber(); // Change manifest file no. } - if (!descriptor_log_ || new_descriptor_log) { + if (new_descriptor_log) { new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); } @@ -1313,6 +1316,15 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // new CURRENT file that points to it. if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); + if (s.ok() && old_manifest_file_number < manifest_file_number_) { + // delete old manifest file + Log(options_->info_log, + "Deleting manifest %lu current manifest %lu\n", + old_manifest_file_number, manifest_file_number_); + // we don't care about an error here, PurgeObsoleteFiles will take care + // of it later + env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number)); + } } // find offset in manifest file where this version is stored. @@ -2849,6 +2861,19 @@ void VersionSet::GetLiveFilesMetaData( } } +void VersionSet::GetAndFreeObsoleteFiles(std::vector* files) { + if (files != nullptr) { + files->reserve(files->size() + obsolete_files_.size()); + } + for (size_t i = 0; i < obsolete_files_.size(); i++) { + if (files != nullptr) { + files->push_back(obsolete_files_[i]->number); + } + delete obsolete_files_[i]; + } + obsolete_files_.clear(); +} + Compaction* VersionSet::CompactRange( int level, const InternalKey* begin, diff --git a/db/version_set.h b/db/version_set.h index 4e1d7e9bb..541bd8122 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -431,6 +431,8 @@ class VersionSet { void GetLiveFilesMetaData( std::vector *metadata); + void GetAndFreeObsoleteFiles(std::vector* files); + private: class Builder; struct ManifestWriter; @@ -507,6 +509,8 @@ class VersionSet { // Save us the cost of checking file size twice in LogAndApply uint64_t last_observed_manifest_size_; + std::vector obsolete_files_; + // storage options for all reads and writes except compactions const EnvOptions& storage_options_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 7bbbee41a..3e41af96d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -387,8 +387,7 @@ struct Options { bool disable_seek_compaction; // The periodicity when obsolete files get deleted. The default - // value is 0 which means that obsolete files get removed after - // every compaction run. + // value is 6 hours. uint64_t delete_obsolete_files_period_micros; // Maximum number of concurrent background jobs, submitted to diff --git a/util/options.cc b/util/options.cc index eae112e84..b54320391 100644 --- a/util/options.cc +++ b/util/options.cc @@ -62,7 +62,7 @@ Options::Options() db_log_dir(""), wal_dir(""), disable_seek_compaction(false), - delete_obsolete_files_period_micros(0), + delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), max_background_compactions(1), max_background_flushes(0), max_log_file_size(0),