From ac3ae1df0b4858ea780d8e05d87a54006164a5b3 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 7 May 2020 09:29:21 -0700 Subject: [PATCH] Find/purge obsolete blob files (#6807) Summary: The patch extends `FindObsoleteFiles` and `PurgeObsoleteFiles` with support for blob files. The behavior is analogous to SST files: obsolete blob files are put on the "candidates for deletion" list, while live (and pending) files are preserved. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6807 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D21406249 Pulled By: ltamasi fbshipit-source-id: 1948f71c31927564b61e8af394f50ca3964880d9 --- db/db_impl/db_impl.h | 6 +- db/db_impl/db_impl_files.cc | 45 ++++++++++---- db/obsolete_files_test.cc | 119 ++++++++++++++++++++++++++++++++++++ file/filename.cc | 5 ++ file/filename.h | 2 + 5 files changed, 164 insertions(+), 13 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index fc3111ae7..c3edb9be5 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -971,6 +971,10 @@ class DBImpl : public DB { size_t TEST_EstimateInMemoryStatsHistorySize() const; VersionSet* TEST_GetVersionSet() const { return versions_.get(); } + + const std::unordered_set& TEST_GetFilesGrabbedForPurge() const { + return files_grabbed_for_purge_; + } #endif // NDEBUG protected: @@ -1937,7 +1941,7 @@ class DBImpl : public DB { std::unordered_map purge_files_; // A vector to store the file numbers that have been assigned to certain - // JobContext. Current implementation tracks ssts only. + // JobContext. Current implementation tracks table and blob files only. std::unordered_set files_grabbed_for_purge_; // A queue to store log writers to close diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index c1c1c5fa1..3998749f5 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -77,12 +77,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // Since job_context->min_pending_output is set, until file scan finishes, // mutex_ cannot be released. Otherwise, we might see no min_pending_output // here but later find newer generated unfinalized files while scanning. - if (!pending_outputs_.empty()) { - job_context->min_pending_output = *pending_outputs_.begin(); - } else { - // delete all of them - job_context->min_pending_output = std::numeric_limits::max(); - } + job_context->min_pending_output = MinObsoleteSstNumberToKeep(); // Get obsolete files. This function will also update the list of // pending files in VersionSet(). @@ -90,13 +85,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, &job_context->sst_delete_files, &job_context->blob_delete_files, &job_context->manifest_delete_files, job_context->min_pending_output); - // Mark the elements in job_context->sst_delete_files as grabbedForPurge - // so that other threads calling FindObsoleteFiles with full_scan=true - // will not add these files to candidate list for purge. + // Mark the elements in job_context->sst_delete_files and + // job_context->blob_delete_files as "grabbed for purge" so that other threads + // calling FindObsoleteFiles with full_scan=true will not add these files to + // candidate list for purge. for (const auto& sst_to_del : job_context->sst_delete_files) { MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber()); } + for (const auto& blob_file : job_context->blob_delete_files) { + MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber()); + } + // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = @@ -257,8 +257,11 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, const std::string& path_to_sync, FileType type, uint64_t number) { + TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", + const_cast(&fname)); + Status file_deletion_status; - if (type == kTableFile || type == kLogFile) { + if (type == kTableFile || type == kBlobFile || type == kLogFile) { file_deletion_status = DeleteDBFile(&immutable_db_options_, fname, path_to_sync, /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_); @@ -307,13 +310,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow. std::unordered_set sst_live_set(state.sst_live.begin(), state.sst_live.end()); + std::unordered_set blob_live_set(state.blob_live.begin(), + state.blob_live.end()); std::unordered_set log_recycle_files_set( state.log_recycle_files.begin(), state.log_recycle_files.end()); auto candidate_files = state.full_scan_candidate_files; candidate_files.reserve( candidate_files.size() + state.sst_delete_files.size() + - state.log_delete_files.size() + state.manifest_delete_files.size()); + state.blob_delete_files.size() + state.log_delete_files.size() + + state.manifest_delete_files.size()); // We may ignore the dbname when generating the file names. for (auto& file : state.sst_delete_files) { candidate_files.emplace_back( @@ -324,6 +330,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { file.DeleteMetadata(); } + for (const auto& blob_file : state.blob_delete_files) { + candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()), + blob_file.GetPath()); + } + for (auto file_num : state.log_delete_files) { if (file_num > 0) { candidate_files.emplace_back(LogFileName(file_num), @@ -416,6 +427,13 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { files_to_del.insert(number); } break; + case kBlobFile: + keep = number >= state.min_pending_output || + (blob_live_set.find(number) != blob_live_set.end()); + if (!keep) { + files_to_del.insert(number); + } + break; case kTempFile: // Any temp files that are currently being written to must // be recorded in pending_outputs_, which is inserted into "live". @@ -426,6 +444,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // TODO(yhchiang): carefully modify the third condition to safely // remove the temp options files. keep = (sst_live_set.find(number) != sst_live_set.end()) || + (blob_live_set.find(number) != blob_live_set.end()) || (number == state.pending_manifest_file_number) || (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); break; @@ -448,7 +467,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { case kDBLockFile: case kIdentityFile: case kMetaDatabase: - case kBlobFile: keep = true; break; } @@ -464,6 +482,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { TableCache::Evict(table_cache_.get(), number); fname = MakeTableFileName(candidate_file.file_path, number); dir_to_sync = candidate_file.file_path; + } else if (type == kBlobFile) { + fname = BlobFileName(candidate_file.file_path, number); + dir_to_sync = candidate_file.file_path; } else { dir_to_sync = (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_; diff --git a/db/obsolete_files_test.cc b/db/obsolete_files_test.cc index bf018a0e3..d467906b7 100644 --- a/db/obsolete_files_test.cc +++ b/db/obsolete_files_test.cc @@ -10,6 +10,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include #include @@ -193,6 +194,124 @@ TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) { ASSERT_EQ(2, opts_file_count); } +TEST_F(ObsoleteFilesTest, BlobFiles) { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + // Add a blob file that consists of nothing but garbage (and is thus obsolete) + // and another one that is live. + VersionEdit edit; + + constexpr uint64_t first_blob_file_number = 234; + constexpr uint64_t first_total_blob_count = 555; + constexpr uint64_t first_total_blob_bytes = 66666; + constexpr char first_checksum_method[] = "CRC32"; + constexpr char first_checksum_value[] = "3d87ff57"; + + edit.AddBlobFile(first_blob_file_number, first_total_blob_count, + first_total_blob_bytes, first_checksum_method, + first_checksum_value); + edit.AddBlobFileGarbage(first_blob_file_number, first_total_blob_count, + first_total_blob_bytes); + + constexpr uint64_t second_blob_file_number = 456; + constexpr uint64_t second_total_blob_count = 100; + constexpr uint64_t second_total_blob_bytes = 2000000; + constexpr char second_checksum_method[] = "CRC32B"; + constexpr char second_checksum_value[] = "6dbdf23a"; + edit.AddBlobFile(second_blob_file_number, second_total_blob_count, + second_total_blob_bytes, second_checksum_method, + second_checksum_value); + + dbfull()->TEST_LockMutex(); + Status s = versions->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, dbfull()->mutex()); + dbfull()->TEST_UnlockMutex(); + + ASSERT_OK(s); + + // Check for obsolete files and make sure the first blob file is picked up + // and grabbed for purge. The second blob file should be on the live list. + constexpr int job_id = 0; + JobContext job_context{job_id}; + + dbfull()->TEST_LockMutex(); + constexpr bool force_full_scan = false; + dbfull()->FindObsoleteFiles(&job_context, force_full_scan); + dbfull()->TEST_UnlockMutex(); + + ASSERT_TRUE(job_context.HaveSomethingToDelete()); + ASSERT_EQ(job_context.blob_delete_files.size(), 1); + ASSERT_EQ(job_context.blob_delete_files[0].GetBlobFileNumber(), + first_blob_file_number); + + const auto& files_grabbed_for_purge = + dbfull()->TEST_GetFilesGrabbedForPurge(); + ASSERT_NE(files_grabbed_for_purge.find(first_blob_file_number), + files_grabbed_for_purge.end()); + + ASSERT_EQ(job_context.blob_live.size(), 1); + ASSERT_EQ(job_context.blob_live[0], second_blob_file_number); + + // Hack the job context a bit by adding a few files to the full scan + // list and adjusting the pending file number. We add the two files + // above as well as two additional ones, where one is old + // and should be cleaned up, and the other is still pending. + assert(cfd->ioptions()); + assert(!cfd->ioptions()->cf_paths.empty()); + const std::string& path = cfd->ioptions()->cf_paths.front().path; + + constexpr uint64_t old_blob_file_number = 123; + constexpr uint64_t pending_blob_file_number = 567; + + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(old_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(first_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(second_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(pending_blob_file_number), path); + + job_context.min_pending_output = pending_blob_file_number; + + // Purge obsolete files and make sure we purge the old file and the first file + // (and keep the second file and the pending file). + std::vector deleted_files; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", [&](void* arg) { + const std::string* file = static_cast(arg); + assert(file); + + constexpr char blob_extension[] = ".blob"; + + if (file->find(blob_extension) != std::string::npos) { + deleted_files.emplace_back(*file); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->PurgeObsoleteFiles(job_context); + job_context.Clean(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(files_grabbed_for_purge.find(first_blob_file_number), + files_grabbed_for_purge.end()); + + std::sort(deleted_files.begin(), deleted_files.end()); + const std::vector expected_deleted_files{ + BlobFileName(path, old_blob_file_number), + BlobFileName(path, first_blob_file_number)}; + + ASSERT_EQ(deleted_files, expected_deleted_files); +} + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/file/filename.cc b/file/filename.cc index 04783b899..968adbaa7 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -79,6 +79,11 @@ std::string LogFileName(uint64_t number) { return MakeFileName(number, "log"); } +std::string BlobFileName(uint64_t number) { + assert(number > 0); + return MakeFileName(number, kRocksDBBlobFileExt.c_str()); +} + std::string BlobFileName(const std::string& blobdirname, uint64_t number) { assert(number > 0); return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str()); diff --git a/file/filename.h b/file/filename.h index 59b55e556..f23723244 100644 --- a/file/filename.h +++ b/file/filename.h @@ -56,6 +56,8 @@ extern std::string LogFileName(const std::string& dbname, uint64_t number); extern std::string LogFileName(uint64_t number); +extern std::string BlobFileName(uint64_t number); + extern std::string BlobFileName(const std::string& bdirname, uint64_t number); extern std::string BlobFileName(const std::string& dbname,