Keep track of obsolete blob files in VersionSet (#6755)

Summary:
The patch adds logic to keep track of obsolete blob files. A blob file becomes
obsolete when the last `shared_ptr` that points to the corresponding
`SharedBlobFileMetaData` object goes away, which, in turn, happens when the
last `Version` that contains the blob file is destroyed. No longer needed blob
files are added to the obsolete list in `VersionSet` using a custom deleter to
avoid unnecessary coupling between `SharedBlobFileMetaData` and `VersionSet`.
Obsolete blob files are returned by `VersionSet::GetObsoleteFiles` and stored
in `JobContext`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6755

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D21233155

Pulled By: ltamasi

fbshipit-source-id: 47757e06fdc0127f27ed57f51abd27893d9a7b7a
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent cf342464ca
commit fe238e5438
  1. 6
      db/db_impl/db_impl_files.cc
  2. 8
      db/job_context.h
  3. 52
      db/version_builder.cc
  4. 7
      db/version_builder.h
  5. 77
      db/version_builder_test.cc
  6. 24
      db/version_set.cc
  7. 19
      db/version_set.h
  8. 82
      db/version_set_test.cc

@ -86,9 +86,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
&job_context->manifest_delete_files,
job_context->min_pending_output);
versions_->GetObsoleteFiles(
&job_context->sst_delete_files, &job_context->blob_delete_files,
&job_context->manifest_delete_files, job_context->min_pending_output);
// Mark the elements in job_context->sst_delete_files as grabbedForPurge
// so that other threads calling FindObsoleteFiles with full_scan=true

@ -102,8 +102,9 @@ struct SuperVersionContext {
struct JobContext {
inline bool HaveSomethingToDelete() const {
return full_scan_candidate_files.size() || sst_delete_files.size() ||
log_delete_files.size() || manifest_delete_files.size();
return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
blob_delete_files.empty() && log_delete_files.empty() &&
manifest_delete_files.empty());
}
inline bool HaveSomethingToClean() const {
@ -145,6 +146,9 @@ struct JobContext {
// a list of sst files that we need to delete
std::vector<ObsoleteFileInfo> sst_delete_files;
// the list of blob files that we need to delete
std::vector<ObsoleteBlobFileInfo> blob_delete_files;
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;

@ -87,9 +87,10 @@ class VersionBuilder::Rep {
};
const FileOptions& file_options_;
Logger* info_log_;
const ImmutableCFOptions* const ioptions_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
VersionSet* version_set_;
int num_levels_;
LevelState* levels_;
// Store states of levels larger than num_levels_. We do this instead of
@ -107,15 +108,18 @@ class VersionBuilder::Rep {
std::map<uint64_t, std::shared_ptr<BlobFileMetaData>> changed_blob_files_;
public:
Rep(const FileOptions& file_options, Logger* info_log,
TableCache* table_cache,
VersionStorageInfo* base_vstorage)
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
TableCache* table_cache, VersionStorageInfo* base_vstorage,
VersionSet* version_set)
: file_options_(file_options),
info_log_(info_log),
ioptions_(ioptions),
table_cache_(table_cache),
base_vstorage_(base_vstorage),
version_set_(version_set),
num_levels_(base_vstorage->num_levels()),
has_invalid_levels_(false) {
assert(ioptions_);
levels_ = new LevelState[num_levels_];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
@ -388,11 +392,29 @@ class VersionBuilder::Rep {
return Status::Corruption("VersionBuilder", oss.str());
}
// Note: we use C++11 for now but in C++14, this could be done in a more
// elegant way using generalized lambda capture.
VersionSet* const vs = version_set_;
const ImmutableCFOptions* const ioptions = ioptions_;
auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
if (vs) {
assert(ioptions);
assert(!ioptions->cf_paths.empty());
assert(shared_meta);
vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
ioptions->cf_paths.front().path);
}
delete shared_meta;
};
auto shared_meta = SharedBlobFileMetaData::Create(
blob_file_number, blob_file_addition.GetTotalBlobCount(),
blob_file_addition.GetTotalBlobBytes(),
blob_file_addition.GetChecksumMethod(),
blob_file_addition.GetChecksumValue());
blob_file_addition.GetChecksumValue(), deleter);
constexpr uint64_t garbage_blob_count = 0;
constexpr uint64_t garbage_blob_bytes = 0;
@ -738,16 +760,19 @@ class VersionBuilder::Rep {
// f is to-be-deleted table file
vstorage->RemoveCurrentStats(f);
} else {
vstorage->AddFile(level, f, info_log_);
assert(ioptions_);
vstorage->AddFile(level, f, ioptions_->info_log);
}
}
};
VersionBuilder::VersionBuilder(const FileOptions& file_options,
const ImmutableCFOptions* ioptions,
TableCache* table_cache,
VersionStorageInfo* base_vstorage,
Logger* info_log)
: rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
VersionSet* version_set)
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
version_set)) {}
VersionBuilder::~VersionBuilder() = default;
@ -773,8 +798,9 @@ Status VersionBuilder::LoadTableHandlers(
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->table_cache(),
cfd->current()->storage_info(), cfd->ioptions()->info_log)),
cfd->current()->version_set()->file_options(), cfd->ioptions(),
cfd->table_cache(), cfd->current()->storage_info(),
cfd->current()->version_set())),
version_(cfd->current()) {
version_->Ref();
}
@ -782,8 +808,8 @@ BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd, Version* v)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->table_cache(),
v->storage_info(), cfd->ioptions()->info_log)),
cfd->current()->version_set()->file_options(), cfd->ioptions(),
cfd->table_cache(), v->storage_info(), v->version_set())),
version_(v) {
assert(version_ != cfd->current());
}

@ -16,12 +16,14 @@
namespace ROCKSDB_NAMESPACE {
struct ImmutableCFOptions;
class TableCache;
class VersionStorageInfo;
class VersionEdit;
struct FileMetaData;
class InternalStats;
class Version;
class VersionSet;
class ColumnFamilyData;
// A helper class so we can efficiently apply a whole sequence
@ -29,8 +31,9 @@ class ColumnFamilyData;
// Versions that contain full copies of the intermediate state.
class VersionBuilder {
public:
VersionBuilder(const FileOptions& file_options, TableCache* table_cache,
VersionStorageInfo* base_vstorage, Logger* info_log = nullptr);
VersionBuilder(const FileOptions& file_options,
const ImmutableCFOptions* ioptions, TableCache* table_cache,
VersionStorageInfo* base_vstorage, VersionSet* version_set);
~VersionBuilder();
bool CheckConsistencyForNumLevels();

@ -152,8 +152,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
version_edit.DeleteFile(3, 27U);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr, false);
@ -190,8 +193,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
version_edit.DeleteFile(0, 88U);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr, false);
@ -233,8 +239,11 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
version_edit.DeleteFile(4, 8U);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr, false);
@ -279,8 +288,11 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
kUnknownFileChecksumFuncName);
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr, false);
@ -296,7 +308,12 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
UpdateVersionStorageInfo();
EnvOptions env_options;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
constexpr TableCache* table_cache = nullptr;
constexpr VersionSet* version_set = nullptr;
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
&vstorage_, version_set);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr, false);
@ -353,7 +370,10 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
TEST_F(VersionBuilderTest, ApplyBlobFileAddition) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -406,7 +426,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyInBase) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -423,7 +446,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAdditionAlreadyApplied) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -463,7 +489,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -505,7 +534,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit addition;
@ -558,7 +590,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileNotFound) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -591,7 +626,10 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -681,7 +719,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) {
// new table file--blob file pair.
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
VersionEdit edit;
@ -739,7 +780,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesNotInVersion) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
// Save to a new version in order to trigger consistency checks.
constexpr bool force_consistency_checks = true;
@ -776,7 +820,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
EnvOptions env_options;
constexpr TableCache* table_cache = nullptr;
VersionBuilder builder(env_options, table_cache, &vstorage_);
constexpr VersionSet* version_set = nullptr;
VersionBuilder builder(env_options, &ioptions_, table_cache, &vstorage_,
version_set);
// Save to a new version in order to trigger consistency checks.
constexpr bool force_consistency_checks = true;

@ -5725,19 +5725,37 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
}
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<ObsoleteBlobFileInfo>* blob_files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
assert(files);
assert(blob_files);
assert(manifest_filenames);
assert(files->empty());
assert(blob_files->empty());
assert(manifest_filenames->empty());
obsolete_manifests_.swap(*manifest_filenames);
std::vector<ObsoleteFileInfo> pending_files;
for (auto& f : obsolete_files_) {
if (f.metadata->fd.GetNumber() < min_pending_output) {
files->push_back(std::move(f));
files->emplace_back(std::move(f));
} else {
pending_files.push_back(std::move(f));
pending_files.emplace_back(std::move(f));
}
}
obsolete_files_.swap(pending_files);
std::vector<ObsoleteBlobFileInfo> pending_blob_files;
for (auto& blob_file : obsolete_blob_files_) {
if (blob_file.GetBlobFileNumber() < min_pending_output) {
blob_files->emplace_back(std::move(blob_file));
} else {
pending_blob_files.emplace_back(std::move(blob_file));
}
}
obsolete_blob_files_.swap(pending_blob_files);
obsolete_manifests_.swap(*manifest_filenames);
}
ColumnFamilyData* VersionSet::CreateColumnFamily(

@ -789,6 +789,19 @@ struct ObsoleteFileInfo {
}
};
class ObsoleteBlobFileInfo {
public:
ObsoleteBlobFileInfo(uint64_t blob_file_number, std::string path)
: blob_file_number_(blob_file_number), path_(std::move(path)) {}
uint64_t GetBlobFileNumber() const { return blob_file_number_; }
const std::string& GetPath() const { return path_; }
private:
uint64_t blob_file_number_;
std::string path_;
};
class BaseReferencedVersionBuilder;
class AtomicGroupReadBuffer {
@ -1060,7 +1073,12 @@ class VersionSet {
// This function doesn't support leveldb SST filenames
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) {
obsolete_blob_files_.emplace_back(blob_file_number, std::move(path));
}
void GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<ObsoleteBlobFileInfo>* blob_files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output);
@ -1194,6 +1212,7 @@ class VersionSet {
uint64_t manifest_file_size_;
std::vector<ObsoleteFileInfo> obsolete_files_;
std::vector<ObsoleteBlobFileInfo> obsolete_blob_files_;
std::vector<std::string> obsolete_manifests_;
// env options for all reads and writes except compactions

@ -960,6 +960,88 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(VersionSetTest, ObsoleteBlobFile) {
// Initialize the database and add a blob file (with no garbage just yet).
NewDB();
VersionEdit addition;
constexpr uint64_t blob_file_number = 234;
constexpr uint64_t total_blob_count = 555;
constexpr uint64_t total_blob_bytes = 66666;
constexpr char checksum_method[] = "CRC32";
constexpr char checksum_value[] = "3d87ff57";
addition.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
assert(versions_);
assert(versions_->GetColumnFamilySet());
mutex_.Lock();
Status s =
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &addition, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
// Mark the entire blob file garbage.
VersionEdit garbage;
garbage.AddBlobFileGarbage(blob_file_number, total_blob_count,
total_blob_bytes);
mutex_.Lock();
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &garbage, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
// Make sure blob files from the pending number range are not returned
// as obsolete.
{
std::vector<ObsoleteFileInfo> table_files;
std::vector<ObsoleteBlobFileInfo> blob_files;
std::vector<std::string> manifest_files;
constexpr uint64_t min_pending_output = blob_file_number;
versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
min_pending_output);
ASSERT_TRUE(blob_files.empty());
}
// Make sure the blob file is returned as obsolete if it's not in the pending
// range.
{
std::vector<ObsoleteFileInfo> table_files;
std::vector<ObsoleteBlobFileInfo> blob_files;
std::vector<std::string> manifest_files;
constexpr uint64_t min_pending_output = blob_file_number + 1;
versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
min_pending_output);
ASSERT_EQ(blob_files.size(), 1);
ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number);
}
// Make sure it's not returned a second time.
{
std::vector<ObsoleteFileInfo> table_files;
std::vector<ObsoleteBlobFileInfo> blob_files;
std::vector<std::string> manifest_files;
constexpr uint64_t min_pending_output = blob_file_number + 1;
versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
min_pending_output);
ASSERT_TRUE(blob_files.empty());
}
}
class VersionSetAtomicGroupTest : public VersionSetTestBase,
public testing::Test {
public:

Loading…
Cancel
Save