From a00ddf1574017e278873d175b7d2cef8d373b337 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 4 May 2020 15:05:34 -0700 Subject: [PATCH] Expose the set of live blob files from Version/VersionSet (#6785) Summary: The patch adds logic that returns the set of live blob files from `Version::AddLiveFiles` and `VersionSet::AddLiveFiles` (in addition to live table files), and also cleans up the code a bit, for example, by exposing only the numbers of table files as opposed to the earlier `FileDescriptor`s that no clients used. Moreover, the patch extends the `GetLiveFiles` API so that it also exposes blob files in the current version. Similarly to https://github.com/facebook/rocksdb/pull/6755, this is a building block for identifying and purging obsolete blob files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6785 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D21336210 Pulled By: ltamasi fbshipit-source-id: fc1aede8a49eacd03caafbc5f6f9ce43b6270821 --- db/db_filesnapshot.cc | 24 +++++++----- db/db_impl/db_impl.h | 2 + db/db_impl/db_impl_files.cc | 17 ++++---- db/db_test.cc | 36 +++++++++++++++++ db/job_context.h | 7 +++- db/version_set.cc | 68 +++++++++++++++++++++++++------- db/version_set.h | 12 ++++-- db/version_set_test.cc | 78 +++++++++++++++++++++++++++++++++++++ 8 files changed, 205 insertions(+), 39 deletions(-) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index f0f22cb47..bb6199827 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -118,27 +118,33 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } } - // Make a set of all of the live *.sst files - std::vector live; + // Make a set of all of the live table and blob files + std::vector live_table_files; + std::vector live_blob_files; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } - cfd->current()->AddLiveFiles(&live); + cfd->current()->AddLiveFiles(&live_table_files, &live_blob_files); } ret.clear(); - ret.reserve(live.size() + 3); // *.sst + CURRENT + MANIFEST + OPTIONS + ret.reserve(live_table_files.size() + live_blob_files.size() + + 3); // for CURRENT + MANIFEST + OPTIONS // create names of the live files. The names are not absolute // paths, instead they are relative to dbname_; - for (const auto& live_file : live) { - ret.push_back(MakeTableFileName("", live_file.GetNumber())); + for (const auto& table_file_number : live_table_files) { + ret.emplace_back(MakeTableFileName("", table_file_number)); } - ret.push_back(CurrentFileName("")); - ret.push_back(DescriptorFileName("", versions_->manifest_file_number())); - ret.push_back(OptionsFileName("", versions_->options_file_number())); + for (const auto& blob_file_number : live_blob_files) { + ret.emplace_back(BlobFileName("", blob_file_number)); + } + + ret.emplace_back(CurrentFileName("")); + ret.emplace_back(DescriptorFileName("", versions_->manifest_file_number())); + ret.emplace_back(OptionsFileName("", versions_->options_file_number())); // find length of manifest file while holding the mutex lock *manifest_file_size = versions_->manifest_file_size(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6a5bc23fa..fc3111ae7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -969,6 +969,8 @@ class DBImpl : public DB { void TEST_WaitForPersistStatsRun(std::function callback) const; bool TEST_IsPersistentStatsEnabled() const; size_t TEST_EstimateInMemoryStatsHistorySize() const; + + VersionSet* TEST_GetVersionSet() const { return versions_.get(); } #endif // NDEBUG protected: diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index e6a8c5882..c1c1c5fa1 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -36,7 +36,7 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() { return std::numeric_limits::max(); } -// * Returns the list of live files in 'sst_live' +// * Returns the list of live files in 'sst_live' and 'blob_live'. // If it's doing full scan: // * Returns the list of all files in the filesystem in // 'full_scan_candidate_files'. @@ -104,7 +104,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->log_number = MinLogNumberToKeep(); job_context->prev_log_number = versions_->prev_log_number(); - versions_->AddLiveFiles(&job_context->sst_live); + versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live); if (doing_the_full_scan) { InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), dbname_); @@ -304,12 +304,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // FindObsoleteFiles() should've populated this so nonzero assert(state.manifest_file_number != 0); - // Now, convert live list to an unordered map, WITHOUT mutex held; - // set is slow. - std::unordered_map sst_live_map; - for (const FileDescriptor& fd : state.sst_live) { - sst_live_map[fd.GetNumber()] = &fd; - } + // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow. + std::unordered_set sst_live_set(state.sst_live.begin(), + state.sst_live.end()); std::unordered_set log_recycle_files_set( state.log_recycle_files.begin(), state.log_recycle_files.end()); @@ -413,7 +410,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { case kTableFile: // If the second condition is not there, this makes // DontDeletePendingOutputs fail - keep = (sst_live_map.find(number) != sst_live_map.end()) || + keep = (sst_live_set.find(number) != sst_live_set.end()) || number >= state.min_pending_output; if (!keep) { files_to_del.insert(number); @@ -428,7 +425,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // // TODO(yhchiang): carefully modify the third condition to safely // remove the temp options files. - keep = (sst_live_map.find(number) != sst_live_map.end()) || + keep = (sst_live_set.find(number) != sst_live_set.end()) || (number == state.pending_manifest_file_number) || (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); break; diff --git a/db/db_test.cc b/db/db_test.cc index a7adf5826..72100981b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2309,6 +2309,42 @@ TEST_F(DBTest, ReadonlyDBGetLiveManifestSize) { Close(); } while (ChangeCompactOptions()); } + +TEST_F(DBTest, GetLiveBlobFiles) { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + // Add a live blob file. + VersionEdit edit; + + constexpr uint64_t blob_file_number = 234; + constexpr uint64_t total_blob_count = 555; + constexpr uint64_t total_blob_bytes = 66666; + constexpr char checksum_method[] = "CRC32"; + constexpr char checksum_value[] = "3d87ff57"; + + edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes, + checksum_method, checksum_value); + + dbfull()->TEST_LockMutex(); + Status s = versions->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, dbfull()->mutex()); + dbfull()->TEST_UnlockMutex(); + + ASSERT_OK(s); + + // Make sure it appears in the results returned by GetLiveFiles. + uint64_t manifest_size = 0; + std::vector files; + ASSERT_OK(dbfull()->GetLiveFiles(files, &manifest_size)); + + ASSERT_FALSE(files.empty()); + ASSERT_EQ(files[0], BlobFileName("", blob_file_number)); +} #endif TEST_F(DBTest, PurgeInfoLogs) { diff --git a/db/job_context.h b/db/job_context.h index 87b46b9a7..d09937d11 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -141,11 +141,14 @@ struct JobContext { std::vector full_scan_candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + std::vector sst_live; - // a list of sst files that we need to delete + // the list of sst files that we need to delete std::vector sst_delete_files; + // the list of all live blob files that cannot be deleted + std::vector blob_live; + // the list of blob files that we need to delete std::vector blob_delete_files; diff --git a/db/version_set.cc b/db/version_set.cc index 0d1687500..65f88cdc8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3414,13 +3414,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun( return false; } -void Version::AddLiveFiles(std::vector* live) { - for (int level = 0; level < storage_info_.num_levels(); level++) { - const std::vector& files = storage_info_.files_[level]; - for (const auto& file : files) { - live->push_back(file->fd); +void Version::AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const { + assert(live_table_files); + assert(live_blob_files); + + for (int level = 0; level < storage_info_.num_levels(); ++level) { + const auto& level_files = storage_info_.LevelFiles(level); + for (const auto& meta : level_files) { + assert(meta); + + live_table_files->emplace_back(meta->fd.GetNumber()); } } + + const auto& blob_files = storage_info_.GetBlobFiles(); + for (const auto& pair : blob_files) { + const auto& meta = pair.second; + assert(meta); + + live_blob_files->emplace_back(meta->GetBlobFileNumber()); + } } std::string Version::DebugString(bool hex, bool print_stats) const { @@ -5511,44 +5525,70 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, v->GetMutableCFOptions().prefix_extractor.get()); } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const { + assert(live_table_files); + assert(live_blob_files); + // pre-calculate space requirement - int64_t total_files = 0; + size_t total_table_files = 0; + size_t total_blob_files = 0; + + assert(column_family_set_); for (auto cfd : *column_family_set_) { + assert(cfd); + if (!cfd->initialized()) { continue; } - Version* dummy_versions = cfd->dummy_versions(); + + Version* const dummy_versions = cfd->dummy_versions(); + assert(dummy_versions); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { + assert(v); + const auto* vstorage = v->storage_info(); - for (int level = 0; level < vstorage->num_levels(); level++) { - total_files += vstorage->LevelFiles(level).size(); + assert(vstorage); + + for (int level = 0; level < vstorage->num_levels(); ++level) { + total_table_files += vstorage->LevelFiles(level).size(); } + + total_blob_files += vstorage->GetBlobFiles().size(); } } // just one time extension to the right size - live_list->reserve(live_list->size() + static_cast(total_files)); + live_table_files->reserve(live_table_files->size() + total_table_files); + live_blob_files->reserve(live_blob_files->size() + total_blob_files); + assert(column_family_set_); for (auto cfd : *column_family_set_) { + assert(cfd); if (!cfd->initialized()) { continue; } + auto* current = cfd->current(); bool found_current = false; - Version* dummy_versions = cfd->dummy_versions(); + + Version* const dummy_versions = cfd->dummy_versions(); + assert(dummy_versions); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - v->AddLiveFiles(live_list); + v->AddLiveFiles(live_table_files, live_blob_files); if (v == current) { found_current = true; } } + if (!found_current && current != nullptr) { // Should never happen unless it is a bug. assert(false); - current->AddLiveFiles(live_list); + current->AddLiveFiles(live_table_files, live_blob_files); } } } diff --git a/db/version_set.h b/db/version_set.h index ccab5428b..ce518fb8a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -629,8 +629,10 @@ class Version { // and return true. Otherwise, return false. bool Unref(); - // Add all files listed in the current version to *live. - void AddLiveFiles(std::vector* live); + // Add all files listed in the current version to *live_table_files and + // *live_blob_files. + void AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const; // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false, bool print_stats = false) const; @@ -1047,8 +1049,10 @@ class VersionSet { const Compaction* c, RangeDelAggregator* range_del_agg, const FileOptions& file_options_compactions); - // Add all files listed in any live version to *live. - void AddLiveFiles(std::vector* live_list); + // Add all files listed in any live version to *live_table_files and + // *live_blob_files. Note that these lists may contain duplicates. + void AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const; // Return the approximate size of data to be scanned for range [start, end) // in levels [start_level, end_level). If end_level == -1 it will search diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 5d30aad35..c335589e3 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -960,6 +960,84 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_F(VersionSetTest, AddLiveBlobFiles) { + // Initialize the database and add a blob file. + NewDB(); + + assert(versions_); + assert(versions_->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + VersionEdit first; + + constexpr uint64_t first_blob_file_number = 234; + constexpr uint64_t first_total_blob_count = 555; + constexpr uint64_t first_total_blob_bytes = 66666; + constexpr char first_checksum_method[] = "CRC32"; + constexpr char first_checksum_value[] = "3d87ff57"; + + first.AddBlobFile(first_blob_file_number, first_total_blob_count, + first_total_blob_bytes, first_checksum_method, + first_checksum_value); + + mutex_.Lock(); + Status s = versions_->LogAndApply(cfd, mutable_cf_options_, &first, &mutex_); + mutex_.Unlock(); + + ASSERT_OK(s); + + // Reference the version so it stays alive even after the following version + // edit. + Version* const version = cfd->current(); + assert(version); + + version->Ref(); + + // Get live files directly from version. + std::vector version_table_files; + std::vector version_blob_files; + + version->AddLiveFiles(&version_table_files, &version_blob_files); + + ASSERT_EQ(version_blob_files.size(), 1); + ASSERT_EQ(version_blob_files[0], first_blob_file_number); + + // Add another blob file. + VersionEdit second; + + constexpr uint64_t second_blob_file_number = 456; + constexpr uint64_t second_total_blob_count = 100; + constexpr uint64_t second_total_blob_bytes = 2000000; + constexpr char second_checksum_method[] = "CRC32B"; + constexpr char second_checksum_value[] = "6dbdf23a"; + second.AddBlobFile(second_blob_file_number, second_total_blob_count, + second_total_blob_bytes, second_checksum_method, + second_checksum_value); + + mutex_.Lock(); + s = versions_->LogAndApply(cfd, mutable_cf_options_, &second, &mutex_); + mutex_.Unlock(); + + ASSERT_OK(s); + + // Get all live files from version set. Note that the result contains + // duplicates. + std::vector all_table_files; + std::vector all_blob_files; + + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + + ASSERT_EQ(all_blob_files.size(), 3); + ASSERT_EQ(all_blob_files[0], first_blob_file_number); + ASSERT_EQ(all_blob_files[1], first_blob_file_number); + ASSERT_EQ(all_blob_files[2], second_blob_file_number); + + // Clean up previous version. + version->Unref(); +} + TEST_F(VersionSetTest, ObsoleteBlobFile) { // Initialize the database and add a blob file (with no garbage just yet). NewDB();