From 0acc7388101c7f0c043d1dc961238d1d44ee9971 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 22 Dec 2014 12:04:45 +0100 Subject: [PATCH] Speed up FindObsoleteFiles() Summary: There are two versions of FindObsoleteFiles(): * full scan, which is executed every 6 hours (and it's terribly slow) * no full scan, which is executed every time a background process finishes and iterator is deleted This diff is optimizing the second case (no full scan). Here's what we do before the diff: * Get the list of obsolete files (files with ref==0). Some files in obsolete_files set might actually be live. * Get the list of live files to avoid deleting files that are live. * Delete files that are in obsolete_files and not in live_files. After this diff: * The only files with ref==0 that are still live are files that have been part of move compaction. Don't include moved files in obsolete_files. * Get the list of obsolete files (which exclude moved files). * No need to get the list of live files, since all files in obsolete_files need to be deleted. I'll post the benchmark results, but you can get the feel of it here: https://reviews.facebook.net/D30123 This depends on D30123. P.S. We should do full scan only in failure scenarios, not every 6 hours. I'll do this in a follow-up diff. Test Plan: One new unit test. Made sure that unit test fails if we don't have a `if (!f->moved)` safeguard in ~Version. make check Big number of compactions and flushes: ./db_stress --threads=30 --ops_per_thread=20000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0 --reopen=15 --max_background_compactions=10 --max_background_flushes=10 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000 Reviewers: yhchiang, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30249 --- HISTORY.md | 1 + db/db_bench.cc | 7 ++----- db/db_impl.cc | 48 ++++++++++++++++++++++++------------------- db/db_impl.h | 4 ++-- db/db_test.cc | 32 ++++++++++++++++++++++++++++- db/job_context.h | 8 +++++--- db/version_builder.cc | 1 + db/version_edit.h | 7 ++++++- db/version_set.cc | 8 +++++++- 9 files changed, 82 insertions(+), 34 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index dede7580a..e24cad8e8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ specifying them in db_paths along with the target_size. Lower numbered levels will be placed earlier in the db_paths and higher numbered levels will be placed later in the db_paths vector. +* Potentially big performance improvements if you're using RocksDB with lots of column families (100-1000) ### 3.9.0 (12/8/2014) diff --git a/db/db_bench.cc b/db/db_bench.cc index 34531cc3e..8562d04aa 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -367,9 +367,8 @@ DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/" "deletepercent), so deletepercent must be smaller than (100 - " "FLAGS_readwritepercent)"); -DEFINE_uint64(delete_obsolete_files_period_micros, 0, "Option to delete " - "obsolete files periodically. 0 means that obsolete files are" - " deleted after every compaction run."); +DEFINE_uint64(delete_obsolete_files_period_micros, 0, + "Ignored. Left here for backward compatibility"); namespace { enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { @@ -2008,8 +2007,6 @@ class Benchmark { options.compression_per_level[i] = FLAGS_compression_type_e; } } - options.delete_obsolete_files_period_micros = - FLAGS_delete_obsolete_files_period_micros; options.soft_rate_limit = FLAGS_soft_rate_limit; options.hard_rate_limit = FLAGS_hard_rate_limit; options.rate_limit_delay_max_milliseconds = diff --git a/db/db_impl.cc b/db/db_impl.cc index aff68ed45..0c3bd778d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -213,7 +213,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) bg_flush_scheduled_(0), manual_compaction_(nullptr), disable_delete_obsolete_files_(0), - delete_obsolete_files_last_run_(options.env->NowMicros()), + delete_obsolete_files_next_run_( + options.env->NowMicros() + + db_options_.delete_obsolete_files_period_micros), last_stats_dump_time_microsec_(0), flush_on_destroy_(false), env_options_(options), @@ -421,14 +423,17 @@ void DBImpl::MaybeDumpStats() { } } -// Returns the list of live files in 'sst_live' and the list -// of all files in the filesystem in 'candidate_files'. +// If it's doing full scan: +// * Returns the list of live files in 'full_scan_sst_live' and the list +// of all files in the filesystem in 'full_scan_candidate_files'. +// Otherwise, gets obsolete files from VersionSet. // no_full_scan = true -- never do the full scan using GetChildren() // force = false -- don't force the full scan, except every // db_options_.delete_obsolete_files_period_micros // force = true -- force the full scan void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, bool no_full_scan) { + // TODO(icanadi) clean up FindObsoleteFiles, no need to do full scans anymore mutex_.AssertHeld(); // if deletion is disabled, do nothing @@ -445,10 +450,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, doing_the_full_scan = true; } else { const uint64_t now_micros = env_->NowMicros(); - if (delete_obsolete_files_last_run_ + - db_options_.delete_obsolete_files_period_micros < now_micros) { + if (delete_obsolete_files_next_run_ < now_micros) { doing_the_full_scan = true; - delete_obsolete_files_last_run_ = now_micros; + delete_obsolete_files_next_run_ = + now_micros + db_options_.delete_obsolete_files_period_micros; } } @@ -462,13 +467,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->log_number = versions_->MinLogNumber(); job_context->prev_log_number = versions_->prev_log_number(); - if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) { - // avoid filling up sst_live if we're sure that we - // are not going to do the full scan and that we don't have - // anything to delete at the moment - return; - } - // don't delete live files if (pending_outputs_.size()) { job_context->min_pending_output = *pending_outputs_.begin(); @@ -476,11 +474,16 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // delete all of them job_context->min_pending_output = std::numeric_limits::max(); } - versions_->AddLiveFiles(&job_context->sst_live); if (doing_the_full_scan) { - for (uint32_t path_id = 0; - path_id < db_options_.db_paths.size(); path_id++) { + // Here we find all files in the DB directory and all the live files. In the + // DeleteObsoleteFiles(), we will calculate a set difference (all_files - + // live_files) and delete all files in that difference. If we're not doing + // the full scan we don't need to get live files, because all files returned + // by GetObsoleteFiles() will be dead (and need to be deleted) + versions_->AddLiveFiles(&job_context->full_scan_sst_live); + for (uint32_t path_id = 0; path_id < db_options_.db_paths.size(); + path_id++) { // set of all files in the directory. We'll exclude files that are still // alive in the subsequent processings. std::vector files; @@ -488,7 +491,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, &files); // Ignore errors for (std::string file : files) { // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes - job_context->candidate_files.emplace_back("/" + file, path_id); + job_context->full_scan_candidate_files.emplace_back("/" + file, + path_id); } } @@ -497,7 +501,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, std::vector log_files; env_->GetChildren(db_options_.wal_dir, &log_files); // Ignore errors for (std::string log_file : log_files) { - job_context->candidate_files.emplace_back(log_file, 0); + job_context->full_scan_candidate_files.emplace_back(log_file, 0); } } // Add info log files in db_log_dir @@ -506,7 +510,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // Ignore errors env_->GetChildren(db_options_.db_log_dir, &info_log_files); for (std::string log_file : info_log_files) { - job_context->candidate_files.emplace_back(log_file, 0); + job_context->full_scan_candidate_files.emplace_back(log_file, 0); } } } @@ -543,11 +547,11 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. std::unordered_map sst_live_map; - for (const FileDescriptor& fd : state.sst_live) { + for (const FileDescriptor& fd : state.full_scan_sst_live) { sst_live_map[fd.GetNumber()] = &fd; } - auto candidate_files = state.candidate_files; + auto candidate_files = state.full_scan_candidate_files; candidate_files.reserve(candidate_files.size() + state.sst_delete_files.size() + state.log_delete_files.size()); @@ -1491,6 +1495,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) { + f->moved = true; edit.DeleteFile(level, f->fd.GetNumber()); edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, @@ -2137,6 +2142,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); + f->moved = true; c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, diff --git a/db/db_impl.h b/db/db_impl.h index 7a3a7984d..de834a0fa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -532,8 +532,8 @@ class DBImpl : public DB { // without any synchronization int disable_delete_obsolete_files_; - // last time when DeleteObsoleteFiles was invoked - uint64_t delete_obsolete_files_last_run_; + // next time when we should run DeleteObsoleteFiles with full scan + uint64_t delete_obsolete_files_next_run_; // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; diff --git a/db/db_test.cc b/db/db_test.cc index 6c995e7a0..7feb98808 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -85,7 +85,7 @@ static bool LZ4HCCompressionSupported(const CompressionOptions &options) { return port::LZ4HC_Compress(options, in.data(), in.size(), &out); } -static std::string RandomString(Random *rnd, int len) { +static std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); return r; @@ -9993,6 +9993,36 @@ TEST(DBTest, DontDeletePendingOutputs) { Compact("a", "b"); } +TEST(DBTest, DontDeleteMovedFile) { + // This test triggers move compaction and verifies that the file is not + // deleted when it's part of move compaction + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.max_bytes_for_level_base = 1024 * 1024; // 1 MB + options.level0_file_num_compaction_trigger = + 2; // trigger compaction when we have 2 files + DestroyAndReopen(options); + + Random rnd(301); + // Create two 1MB sst files + for (int i = 0; i < 2; ++i) { + // Create 1MB sst file + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024))); + } + ASSERT_OK(Flush()); + } + // this should execute both L0->L1 and L1->(move)->L2 compactions + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,0,1", FilesPerLevel(0)); + + // If the moved file is actually deleted (the move-safeguard in + // ~Version::Version() is not there), we get this failure: + // Corruption: Can't access /000009.sst + Reopen(options); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/job_context.h b/db/job_context.h index 9b14d5995..01c868c03 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -20,7 +20,7 @@ class MemTable; struct JobContext { inline bool HaveSomethingToDelete() const { - return candidate_files.size() || sst_delete_files.size() || + return full_scan_candidate_files.size() || sst_delete_files.size() || log_delete_files.size() || new_superversion != nullptr || superversions_to_free.size() > 0 || memtables_to_free.size() > 0; } @@ -39,10 +39,12 @@ struct JobContext { // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) - std::vector candidate_files; + // (filled only if we're doing full scan) + std::vector full_scan_candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + // (filled only if we're doing full scan) + std::vector full_scan_sst_live; // a list of sst files that we need to delete std::vector sst_delete_files; diff --git a/db/version_builder.cc b/db/version_builder.cc index ec7bb176a..e282e670c 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -206,6 +206,7 @@ class VersionBuilder::Rep { const int level = new_file.first; FileMetaData* f = new FileMetaData(new_file.second); f->refs = 1; + f->moved = false; assert(levels_[level].added_files.find(f->fd.GetNumber()) == levels_[level].added_files.end()); diff --git a/db/version_edit.h b/db/version_edit.h index 86e315c11..35b894954 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -85,6 +85,10 @@ struct FileMetaData { bool init_stats_from_file; // true if the data-entry stats of this file // has initialized from file. + // Always false for new files. Set to true if the file was part of move + // compaction. Can only be mutated from the compaction process, under DB mutex + bool moved; + FileMetaData() : refs(0), being_compacted(false), @@ -94,7 +98,8 @@ struct FileMetaData { num_deletions(0), raw_key_size(0), raw_value_size(0), - init_stats_from_file(false) {} + init_stats_from_file(false), + moved(false) {} }; // A compressed copy of file meta data that just contain diff --git a/db/version_set.cc b/db/version_set.cc index f138c8232..0dbac7667 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -309,7 +309,13 @@ Version::~Version() { cfd_->table_cache()->ReleaseHandle(f->table_reader_handle); f->table_reader_handle = nullptr; } - vset_->obsolete_files_.push_back(f); + if (!f->moved) { + vset_->obsolete_files_.push_back(f); + } else { + // moved! + // TODO(icanadi) delete this outside of mutex + delete f; + } } } }