Expose the set of live blob files from Version/VersionSet (#6785)

Summary:
The patch adds logic that returns the set of live blob files from
`Version::AddLiveFiles` and `VersionSet::AddLiveFiles` (in addition to
live table files), and also cleans up the code a bit, for example, by
exposing only the numbers of table files as opposed to the earlier
`FileDescriptor`s that no clients used. Moreover, the patch extends
the `GetLiveFiles` API so that it also exposes blob files in the current version.
Similarly to https://github.com/facebook/rocksdb/pull/6755,
this is a building block for identifying and purging obsolete blob files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6785

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D21336210

Pulled By: ltamasi

fbshipit-source-id: fc1aede8a49eacd03caafbc5f6f9ce43b6270821
main
Levi Tamasi 5 years ago committed by Facebook GitHub Bot
parent 680c416348
commit a00ddf1574
  1. 24
      db/db_filesnapshot.cc
  2. 2
      db/db_impl/db_impl.h
  3. 17
      db/db_impl/db_impl_files.cc
  4. 36
      db/db_test.cc
  5. 7
      db/job_context.h
  6. 68
      db/version_set.cc
  7. 12
      db/version_set.h
  8. 78
      db/version_set_test.cc

@ -118,27 +118,33 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
} }
} }
// Make a set of all of the live *.sst files // Make a set of all of the live table and blob files
std::vector<FileDescriptor> live; std::vector<uint64_t> live_table_files;
std::vector<uint64_t> live_blob_files;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
continue; continue;
} }
cfd->current()->AddLiveFiles(&live); cfd->current()->AddLiveFiles(&live_table_files, &live_blob_files);
} }
ret.clear(); ret.clear();
ret.reserve(live.size() + 3); // *.sst + CURRENT + MANIFEST + OPTIONS ret.reserve(live_table_files.size() + live_blob_files.size() +
3); // for CURRENT + MANIFEST + OPTIONS
// create names of the live files. The names are not absolute // create names of the live files. The names are not absolute
// paths, instead they are relative to dbname_; // paths, instead they are relative to dbname_;
for (const auto& live_file : live) { for (const auto& table_file_number : live_table_files) {
ret.push_back(MakeTableFileName("", live_file.GetNumber())); ret.emplace_back(MakeTableFileName("", table_file_number));
} }
ret.push_back(CurrentFileName("")); for (const auto& blob_file_number : live_blob_files) {
ret.push_back(DescriptorFileName("", versions_->manifest_file_number())); ret.emplace_back(BlobFileName("", blob_file_number));
ret.push_back(OptionsFileName("", versions_->options_file_number())); }
ret.emplace_back(CurrentFileName(""));
ret.emplace_back(DescriptorFileName("", versions_->manifest_file_number()));
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
// find length of manifest file while holding the mutex lock // find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->manifest_file_size(); *manifest_file_size = versions_->manifest_file_size();

@ -969,6 +969,8 @@ class DBImpl : public DB {
void TEST_WaitForPersistStatsRun(std::function<void()> callback) const; void TEST_WaitForPersistStatsRun(std::function<void()> callback) const;
bool TEST_IsPersistentStatsEnabled() const; bool TEST_IsPersistentStatsEnabled() const;
size_t TEST_EstimateInMemoryStatsHistorySize() const; size_t TEST_EstimateInMemoryStatsHistorySize() const;
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
#endif // NDEBUG #endif // NDEBUG
protected: protected:

@ -36,7 +36,7 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
return std::numeric_limits<uint64_t>::max(); return std::numeric_limits<uint64_t>::max();
} }
// * Returns the list of live files in 'sst_live' // * Returns the list of live files in 'sst_live' and 'blob_live'.
// If it's doing full scan: // If it's doing full scan:
// * Returns the list of all files in the filesystem in // * Returns the list of all files in the filesystem in
// 'full_scan_candidate_files'. // 'full_scan_candidate_files'.
@ -104,7 +104,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->log_number = MinLogNumberToKeep(); job_context->log_number = MinLogNumberToKeep();
job_context->prev_log_number = versions_->prev_log_number(); job_context->prev_log_number = versions_->prev_log_number();
versions_->AddLiveFiles(&job_context->sst_live); versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live);
if (doing_the_full_scan) { if (doing_the_full_scan) {
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_); dbname_);
@ -304,12 +304,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// FindObsoleteFiles() should've populated this so nonzero // FindObsoleteFiles() should've populated this so nonzero
assert(state.manifest_file_number != 0); assert(state.manifest_file_number != 0);
// Now, convert live list to an unordered map, WITHOUT mutex held; // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
// set is slow. std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map; state.sst_live.end());
for (const FileDescriptor& fd : state.sst_live) {
sst_live_map[fd.GetNumber()] = &fd;
}
std::unordered_set<uint64_t> log_recycle_files_set( std::unordered_set<uint64_t> log_recycle_files_set(
state.log_recycle_files.begin(), state.log_recycle_files.end()); state.log_recycle_files.begin(), state.log_recycle_files.end());
@ -413,7 +410,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
case kTableFile: case kTableFile:
// If the second condition is not there, this makes // If the second condition is not there, this makes
// DontDeletePendingOutputs fail // DontDeletePendingOutputs fail
keep = (sst_live_map.find(number) != sst_live_map.end()) || keep = (sst_live_set.find(number) != sst_live_set.end()) ||
number >= state.min_pending_output; number >= state.min_pending_output;
if (!keep) { if (!keep) {
files_to_del.insert(number); files_to_del.insert(number);
@ -428,7 +425,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// //
// TODO(yhchiang): carefully modify the third condition to safely // TODO(yhchiang): carefully modify the third condition to safely
// remove the temp options files. // remove the temp options files.
keep = (sst_live_map.find(number) != sst_live_map.end()) || keep = (sst_live_set.find(number) != sst_live_set.end()) ||
(number == state.pending_manifest_file_number) || (number == state.pending_manifest_file_number) ||
(to_delete.find(kOptionsFileNamePrefix) != std::string::npos); (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
break; break;

@ -2309,6 +2309,42 @@ TEST_F(DBTest, ReadonlyDBGetLiveManifestSize) {
Close(); Close();
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST_F(DBTest, GetLiveBlobFiles) {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
// Add a live blob file.
VersionEdit edit;
constexpr uint64_t blob_file_number = 234;
constexpr uint64_t total_blob_count = 555;
constexpr uint64_t total_blob_bytes = 66666;
constexpr char checksum_method[] = "CRC32";
constexpr char checksum_value[] = "3d87ff57";
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
dbfull()->TEST_LockMutex();
Status s = versions->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, dbfull()->mutex());
dbfull()->TEST_UnlockMutex();
ASSERT_OK(s);
// Make sure it appears in the results returned by GetLiveFiles.
uint64_t manifest_size = 0;
std::vector<std::string> files;
ASSERT_OK(dbfull()->GetLiveFiles(files, &manifest_size));
ASSERT_FALSE(files.empty());
ASSERT_EQ(files[0], BlobFileName("", blob_file_number));
}
#endif #endif
TEST_F(DBTest, PurgeInfoLogs) { TEST_F(DBTest, PurgeInfoLogs) {

@ -141,11 +141,14 @@ struct JobContext {
std::vector<CandidateFileInfo> full_scan_candidate_files; std::vector<CandidateFileInfo> full_scan_candidate_files;
// the list of all live sst files that cannot be deleted // the list of all live sst files that cannot be deleted
std::vector<FileDescriptor> sst_live; std::vector<uint64_t> sst_live;
// a list of sst files that we need to delete // the list of sst files that we need to delete
std::vector<ObsoleteFileInfo> sst_delete_files; std::vector<ObsoleteFileInfo> sst_delete_files;
// the list of all live blob files that cannot be deleted
std::vector<uint64_t> blob_live;
// the list of blob files that we need to delete // the list of blob files that we need to delete
std::vector<ObsoleteBlobFileInfo> blob_delete_files; std::vector<ObsoleteBlobFileInfo> blob_delete_files;

@ -3414,13 +3414,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun(
return false; return false;
} }
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) { void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
for (int level = 0; level < storage_info_.num_levels(); level++) { std::vector<uint64_t>* live_blob_files) const {
const std::vector<FileMetaData*>& files = storage_info_.files_[level]; assert(live_table_files);
for (const auto& file : files) { assert(live_blob_files);
live->push_back(file->fd);
for (int level = 0; level < storage_info_.num_levels(); ++level) {
const auto& level_files = storage_info_.LevelFiles(level);
for (const auto& meta : level_files) {
assert(meta);
live_table_files->emplace_back(meta->fd.GetNumber());
} }
} }
const auto& blob_files = storage_info_.GetBlobFiles();
for (const auto& pair : blob_files) {
const auto& meta = pair.second;
assert(meta);
live_blob_files->emplace_back(meta->GetBlobFileNumber());
}
} }
std::string Version::DebugString(bool hex, bool print_stats) const { std::string Version::DebugString(bool hex, bool print_stats) const {
@ -5511,44 +5525,70 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
v->GetMutableCFOptions().prefix_extractor.get()); v->GetMutableCFOptions().prefix_extractor.get());
} }
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) { void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);
assert(live_blob_files);
// pre-calculate space requirement // pre-calculate space requirement
int64_t total_files = 0; size_t total_table_files = 0;
size_t total_blob_files = 0;
assert(column_family_set_);
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
assert(cfd);
if (!cfd->initialized()) { if (!cfd->initialized()) {
continue; continue;
} }
Version* dummy_versions = cfd->dummy_versions();
Version* const dummy_versions = cfd->dummy_versions();
assert(dummy_versions);
for (Version* v = dummy_versions->next_; v != dummy_versions; for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) { v = v->next_) {
assert(v);
const auto* vstorage = v->storage_info(); const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->num_levels(); level++) { assert(vstorage);
total_files += vstorage->LevelFiles(level).size();
for (int level = 0; level < vstorage->num_levels(); ++level) {
total_table_files += vstorage->LevelFiles(level).size();
} }
total_blob_files += vstorage->GetBlobFiles().size();
} }
} }
// just one time extension to the right size // just one time extension to the right size
live_list->reserve(live_list->size() + static_cast<size_t>(total_files)); live_table_files->reserve(live_table_files->size() + total_table_files);
live_blob_files->reserve(live_blob_files->size() + total_blob_files);
assert(column_family_set_);
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
assert(cfd);
if (!cfd->initialized()) { if (!cfd->initialized()) {
continue; continue;
} }
auto* current = cfd->current(); auto* current = cfd->current();
bool found_current = false; bool found_current = false;
Version* dummy_versions = cfd->dummy_versions();
Version* const dummy_versions = cfd->dummy_versions();
assert(dummy_versions);
for (Version* v = dummy_versions->next_; v != dummy_versions; for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) { v = v->next_) {
v->AddLiveFiles(live_list); v->AddLiveFiles(live_table_files, live_blob_files);
if (v == current) { if (v == current) {
found_current = true; found_current = true;
} }
} }
if (!found_current && current != nullptr) { if (!found_current && current != nullptr) {
// Should never happen unless it is a bug. // Should never happen unless it is a bug.
assert(false); assert(false);
current->AddLiveFiles(live_list); current->AddLiveFiles(live_table_files, live_blob_files);
} }
} }
} }

@ -629,8 +629,10 @@ class Version {
// and return true. Otherwise, return false. // and return true. Otherwise, return false.
bool Unref(); bool Unref();
// Add all files listed in the current version to *live. // Add all files listed in the current version to *live_table_files and
void AddLiveFiles(std::vector<FileDescriptor>* live); // *live_blob_files.
void AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const;
// Return a human readable string that describes this version's contents. // Return a human readable string that describes this version's contents.
std::string DebugString(bool hex = false, bool print_stats = false) const; std::string DebugString(bool hex = false, bool print_stats = false) const;
@ -1047,8 +1049,10 @@ class VersionSet {
const Compaction* c, RangeDelAggregator* range_del_agg, const Compaction* c, RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions); const FileOptions& file_options_compactions);
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live_table_files and
void AddLiveFiles(std::vector<FileDescriptor>* live_list); // *live_blob_files. Note that these lists may contain duplicates.
void AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const;
// Return the approximate size of data to be scanned for range [start, end) // Return the approximate size of data to be scanned for range [start, end)
// in levels [start_level, end_level). If end_level == -1 it will search // in levels [start_level, end_level). If end_level == -1 it will search

@ -960,6 +960,84 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST_F(VersionSetTest, AddLiveBlobFiles) {
// Initialize the database and add a blob file.
NewDB();
assert(versions_);
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(cfd);
VersionEdit first;
constexpr uint64_t first_blob_file_number = 234;
constexpr uint64_t first_total_blob_count = 555;
constexpr uint64_t first_total_blob_bytes = 66666;
constexpr char first_checksum_method[] = "CRC32";
constexpr char first_checksum_value[] = "3d87ff57";
first.AddBlobFile(first_blob_file_number, first_total_blob_count,
first_total_blob_bytes, first_checksum_method,
first_checksum_value);
mutex_.Lock();
Status s = versions_->LogAndApply(cfd, mutable_cf_options_, &first, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
// Reference the version so it stays alive even after the following version
// edit.
Version* const version = cfd->current();
assert(version);
version->Ref();
// Get live files directly from version.
std::vector<uint64_t> version_table_files;
std::vector<uint64_t> version_blob_files;
version->AddLiveFiles(&version_table_files, &version_blob_files);
ASSERT_EQ(version_blob_files.size(), 1);
ASSERT_EQ(version_blob_files[0], first_blob_file_number);
// Add another blob file.
VersionEdit second;
constexpr uint64_t second_blob_file_number = 456;
constexpr uint64_t second_total_blob_count = 100;
constexpr uint64_t second_total_blob_bytes = 2000000;
constexpr char second_checksum_method[] = "CRC32B";
constexpr char second_checksum_value[] = "6dbdf23a";
second.AddBlobFile(second_blob_file_number, second_total_blob_count,
second_total_blob_bytes, second_checksum_method,
second_checksum_value);
mutex_.Lock();
s = versions_->LogAndApply(cfd, mutable_cf_options_, &second, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
// Get all live files from version set. Note that the result contains
// duplicates.
std::vector<uint64_t> all_table_files;
std::vector<uint64_t> all_blob_files;
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
ASSERT_EQ(all_blob_files.size(), 3);
ASSERT_EQ(all_blob_files[0], first_blob_file_number);
ASSERT_EQ(all_blob_files[1], first_blob_file_number);
ASSERT_EQ(all_blob_files[2], second_blob_file_number);
// Clean up previous version.
version->Unref();
}
TEST_F(VersionSetTest, ObsoleteBlobFile) { TEST_F(VersionSetTest, ObsoleteBlobFile) {
// Initialize the database and add a blob file (with no garbage just yet). // Initialize the database and add a blob file (with no garbage just yet).
NewDB(); NewDB();

Loading…
Cancel
Save