diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 74de21463..e6a8c5882 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -86,9 +86,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // Get obsolete files. This function will also update the list of // pending files in VersionSet(). - versions_->GetObsoleteFiles(&job_context->sst_delete_files, - &job_context->manifest_delete_files, - job_context->min_pending_output); + versions_->GetObsoleteFiles( + &job_context->sst_delete_files, &job_context->blob_delete_files, + &job_context->manifest_delete_files, job_context->min_pending_output); // Mark the elements in job_context->sst_delete_files as grabbedForPurge // so that other threads calling FindObsoleteFiles with full_scan=true diff --git a/db/job_context.h b/db/job_context.h index 31ff26c3a..87b46b9a7 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -102,8 +102,9 @@ struct SuperVersionContext { struct JobContext { inline bool HaveSomethingToDelete() const { - return full_scan_candidate_files.size() || sst_delete_files.size() || - log_delete_files.size() || manifest_delete_files.size(); + return !(full_scan_candidate_files.empty() && sst_delete_files.empty() && + blob_delete_files.empty() && log_delete_files.empty() && + manifest_delete_files.empty()); } inline bool HaveSomethingToClean() const { @@ -145,6 +146,9 @@ struct JobContext { // a list of sst files that we need to delete std::vector sst_delete_files; + // the list of blob files that we need to delete + std::vector blob_delete_files; + // a list of log files that we need to delete std::vector log_delete_files; diff --git a/db/version_builder.cc b/db/version_builder.cc index 4b609c3b8..38e407e38 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -87,9 +87,10 @@ class VersionBuilder::Rep { }; const FileOptions& file_options_; - Logger* info_log_; + const ImmutableCFOptions* const ioptions_; TableCache* table_cache_; VersionStorageInfo* base_vstorage_; + VersionSet* version_set_; int num_levels_; LevelState* levels_; // Store states of levels larger than num_levels_. We do this instead of @@ -107,15 +108,18 @@ class VersionBuilder::Rep { std::map> changed_blob_files_; public: - Rep(const FileOptions& file_options, Logger* info_log, - TableCache* table_cache, - VersionStorageInfo* base_vstorage) + Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions, + TableCache* table_cache, VersionStorageInfo* base_vstorage, + VersionSet* version_set) : file_options_(file_options), - info_log_(info_log), + ioptions_(ioptions), table_cache_(table_cache), base_vstorage_(base_vstorage), + version_set_(version_set), num_levels_(base_vstorage->num_levels()), has_invalid_levels_(false) { + assert(ioptions_); + levels_ = new LevelState[num_levels_]; level_zero_cmp_.sort_method = FileComparator::kLevel0; level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; @@ -388,11 +392,29 @@ class VersionBuilder::Rep { return Status::Corruption("VersionBuilder", oss.str()); } + // Note: we use C++11 for now but in C++14, this could be done in a more + // elegant way using generalized lambda capture. + VersionSet* const vs = version_set_; + const ImmutableCFOptions* const ioptions = ioptions_; + + auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) { + if (vs) { + assert(ioptions); + assert(!ioptions->cf_paths.empty()); + assert(shared_meta); + + vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(), + ioptions->cf_paths.front().path); + } + + delete shared_meta; + }; + auto shared_meta = SharedBlobFileMetaData::Create( blob_file_number, blob_file_addition.GetTotalBlobCount(), blob_file_addition.GetTotalBlobBytes(), blob_file_addition.GetChecksumMethod(), - blob_file_addition.GetChecksumValue()); + blob_file_addition.GetChecksumValue(), deleter); constexpr uint64_t garbage_blob_count = 0; constexpr uint64_t garbage_blob_bytes = 0; @@ -738,16 +760,19 @@ class VersionBuilder::Rep { // f is to-be-deleted table file vstorage->RemoveCurrentStats(f); } else { - vstorage->AddFile(level, f, info_log_); + assert(ioptions_); + vstorage->AddFile(level, f, ioptions_->info_log); } } }; VersionBuilder::VersionBuilder(const FileOptions& file_options, + const ImmutableCFOptions* ioptions, TableCache* table_cache, VersionStorageInfo* base_vstorage, - Logger* info_log) - : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {} + VersionSet* version_set) + : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage, + version_set)) {} VersionBuilder::~VersionBuilder() = default; @@ -773,8 +798,9 @@ Status VersionBuilder::LoadTableHandlers( BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( ColumnFamilyData* cfd) : version_builder_(new VersionBuilder( - cfd->current()->version_set()->file_options(), cfd->table_cache(), - cfd->current()->storage_info(), cfd->ioptions()->info_log)), + cfd->current()->version_set()->file_options(), cfd->ioptions(), + cfd->table_cache(), cfd->current()->storage_info(), + cfd->current()->version_set())), version_(cfd->current()) { version_->Ref(); } @@ -782,8 +808,8 @@ BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( ColumnFamilyData* cfd, Version* v) : version_builder_(new VersionBuilder( - cfd->current()->version_set()->file_options(), cfd->table_cache(), - v->storage_info(), cfd->ioptions()->info_log)), + cfd->current()->version_set()->file_options(), cfd->ioptions(), + cfd->table_cache(), v->storage_info(), v->version_set())), version_(v) { assert(version_ != cfd->current()); } diff --git a/db/version_builder.h b/db/version_builder.h index 1c75fee6d..fc7abe888 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -16,12 +16,14 @@ namespace ROCKSDB_NAMESPACE { +struct ImmutableCFOptions; class TableCache; class VersionStorageInfo; class VersionEdit; struct FileMetaData; class InternalStats; class Version; +class VersionSet; class ColumnFamilyData; // A helper class so we can efficiently apply a whole sequence @@ -29,8 +31,9 @@ class ColumnFamilyData; // Versions that contain full copies of the intermediate state. class VersionBuilder { public: - VersionBuilder(const FileOptions& file_options, TableCache* table_cache, - VersionStorageInfo* base_vstorage, Logger* info_log = nullptr); + VersionBuilder(const FileOptions& file_options, + const ImmutableCFOptions* ioptions, TableCache* table_cache, + VersionStorageInfo* base_vstorage, VersionSet* version_set); ~VersionBuilder(); bool CheckConsistencyForNumLevels(); diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index a3b06e085..aae161aab 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -152,8 +152,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) { version_edit.DeleteFile(3, 27U); EnvOptions env_options; + constexpr TableCache* table_cache = nullptr; + constexpr VersionSet* version_set = nullptr; - VersionBuilder version_builder(env_options, nullptr, &vstorage_); + VersionBuilder version_builder(env_options, &ioptions_, table_cache, + &vstorage_, version_set); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false); @@ -190,8 +193,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) { version_edit.DeleteFile(0, 88U); EnvOptions env_options; + constexpr TableCache* table_cache = nullptr; + constexpr VersionSet* version_set = nullptr; - VersionBuilder version_builder(env_options, nullptr, &vstorage_); + VersionBuilder version_builder(env_options, &ioptions_, table_cache, + &vstorage_, version_set); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false); @@ -233,8 +239,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) { version_edit.DeleteFile(4, 8U); EnvOptions env_options; + constexpr TableCache* table_cache = nullptr; + constexpr VersionSet* version_set = nullptr; - VersionBuilder version_builder(env_options, nullptr, &vstorage_); + VersionBuilder version_builder(env_options, &ioptions_, table_cache, + &vstorage_, version_set); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false); @@ -279,8 +288,11 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) { kUnknownFileChecksumFuncName); EnvOptions env_options; + constexpr TableCache* table_cache = nullptr; + constexpr VersionSet* version_set = nullptr; - VersionBuilder version_builder(env_options, nullptr, &vstorage_); + VersionBuilder version_builder(env_options, &ioptions_, table_cache, + &vstorage_, version_set); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false); @@ -296,7 +308,12 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { UpdateVersionStorageInfo(); EnvOptions env_options; - VersionBuilder version_builder(env_options, nullptr, &vstorage_); + constexpr TableCache* table_cache = nullptr; + constexpr VersionSet* version_set = nullptr; + + VersionBuilder version_builder(env_options, &ioptions_, table_cache, + &vstorage_, version_set); + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false); @@ -353,7 +370,10 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { TEST_F(VersionBuilderTest, ApplyBlobFileAddition) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -406,7 +426,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyInBase) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -423,7 +446,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyApplied) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -463,7 +489,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -505,7 +534,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit addition; @@ -558,7 +590,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileNotFound) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -591,7 +626,10 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -681,7 +719,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) { // new table file--blob file pair. EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); VersionEdit edit; @@ -739,7 +780,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesNotInVersion) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); // Save to a new version in order to trigger consistency checks. constexpr bool force_consistency_checks = true; @@ -776,7 +820,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) { EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); + constexpr VersionSet* version_set = nullptr; + + VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_, + version_set); // Save to a new version in order to trigger consistency checks. constexpr bool force_consistency_checks = true; diff --git a/db/version_set.cc b/db/version_set.cc index c2674dcae..4bfbbce91 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5725,19 +5725,37 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { } void VersionSet::GetObsoleteFiles(std::vector* files, + std::vector* blob_files, std::vector* manifest_filenames, uint64_t min_pending_output) { + assert(files); + assert(blob_files); + assert(manifest_filenames); + assert(files->empty()); + assert(blob_files->empty()); assert(manifest_filenames->empty()); - obsolete_manifests_.swap(*manifest_filenames); + std::vector pending_files; for (auto& f : obsolete_files_) { if (f.metadata->fd.GetNumber() < min_pending_output) { - files->push_back(std::move(f)); + files->emplace_back(std::move(f)); } else { - pending_files.push_back(std::move(f)); + pending_files.emplace_back(std::move(f)); } } obsolete_files_.swap(pending_files); + + std::vector pending_blob_files; + for (auto& blob_file : obsolete_blob_files_) { + if (blob_file.GetBlobFileNumber() < min_pending_output) { + blob_files->emplace_back(std::move(blob_file)); + } else { + pending_blob_files.emplace_back(std::move(blob_file)); + } + } + obsolete_blob_files_.swap(pending_blob_files); + + obsolete_manifests_.swap(*manifest_filenames); } ColumnFamilyData* VersionSet::CreateColumnFamily( diff --git a/db/version_set.h b/db/version_set.h index dc9b26289..ccab5428b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -789,6 +789,19 @@ struct ObsoleteFileInfo { } }; +class ObsoleteBlobFileInfo { + public: + ObsoleteBlobFileInfo(uint64_t blob_file_number, std::string path) + : blob_file_number_(blob_file_number), path_(std::move(path)) {} + + uint64_t GetBlobFileNumber() const { return blob_file_number_; } + const std::string& GetPath() const { return path_; } + + private: + uint64_t blob_file_number_; + std::string path_; +}; + class BaseReferencedVersionBuilder; class AtomicGroupReadBuffer { @@ -1060,7 +1073,12 @@ class VersionSet { // This function doesn't support leveldb SST filenames void GetLiveFilesMetaData(std::vector *metadata); + void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) { + obsolete_blob_files_.emplace_back(blob_file_number, std::move(path)); + } + void GetObsoleteFiles(std::vector* files, + std::vector* blob_files, std::vector* manifest_filenames, uint64_t min_pending_output); @@ -1194,6 +1212,7 @@ class VersionSet { uint64_t manifest_file_size_; std::vector obsolete_files_; + std::vector obsolete_blob_files_; std::vector obsolete_manifests_; // env options for all reads and writes except compactions diff --git a/db/version_set_test.cc b/db/version_set_test.cc index a89d497cd..5d30aad35 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -960,6 +960,88 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_F(VersionSetTest, ObsoleteBlobFile) { + // Initialize the database and add a blob file (with no garbage just yet). + NewDB(); + + VersionEdit addition; + + constexpr uint64_t blob_file_number = 234; + constexpr uint64_t total_blob_count = 555; + constexpr uint64_t total_blob_bytes = 66666; + constexpr char checksum_method[] = "CRC32"; + constexpr char checksum_value[] = "3d87ff57"; + + addition.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes, + checksum_method, checksum_value); + + assert(versions_); + assert(versions_->GetColumnFamilySet()); + + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &addition, &mutex_); + mutex_.Unlock(); + + ASSERT_OK(s); + + // Mark the entire blob file garbage. + VersionEdit garbage; + + garbage.AddBlobFileGarbage(blob_file_number, total_blob_count, + total_blob_bytes); + + mutex_.Lock(); + s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &garbage, &mutex_); + mutex_.Unlock(); + + ASSERT_OK(s); + + // Make sure blob files from the pending number range are not returned + // as obsolete. + { + std::vector table_files; + std::vector blob_files; + std::vector manifest_files; + constexpr uint64_t min_pending_output = blob_file_number; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_TRUE(blob_files.empty()); + } + + // Make sure the blob file is returned as obsolete if it's not in the pending + // range. + { + std::vector table_files; + std::vector blob_files; + std::vector manifest_files; + constexpr uint64_t min_pending_output = blob_file_number + 1; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_EQ(blob_files.size(), 1); + ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number); + } + + // Make sure it's not returned a second time. + { + std::vector table_files; + std::vector blob_files; + std::vector manifest_files; + constexpr uint64_t min_pending_output = blob_file_number + 1; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_TRUE(blob_files.empty()); + } +} + class VersionSetAtomicGroupTest : public VersionSetTestBase, public testing::Test { public: