Find/purge obsolete blob files (#6807)

Summary:
The patch extends `FindObsoleteFiles` and `PurgeObsoleteFiles` with
support for blob files. The behavior is analogous to SST files: obsolete
blob files are put on the "candidates for deletion" list, while live (and pending)
files are preserved.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6807

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D21406249

Pulled By: ltamasi

fbshipit-source-id: 1948f71c31927564b61e8af394f50ca3964880d9
main
Levi Tamasi 5 years ago committed by Facebook GitHub Bot
parent 1f20df2f38
commit ac3ae1df0b
  1. 6
      db/db_impl/db_impl.h
  2. 45
      db/db_impl/db_impl_files.cc
  3. 119
      db/obsolete_files_test.cc
  4. 5
      file/filename.cc
  5. 2
      file/filename.h

@ -971,6 +971,10 @@ class DBImpl : public DB {
size_t TEST_EstimateInMemoryStatsHistorySize() const;
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
return files_grabbed_for_purge_;
}
#endif // NDEBUG
protected:
@ -1937,7 +1941,7 @@ class DBImpl : public DB {
std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;
// A vector to store the file numbers that have been assigned to certain
// JobContext. Current implementation tracks ssts only.
// JobContext. Current implementation tracks table and blob files only.
std::unordered_set<uint64_t> files_grabbed_for_purge_;
// A queue to store log writers to close

@ -77,12 +77,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Since job_context->min_pending_output is set, until file scan finishes,
// mutex_ cannot be released. Otherwise, we might see no min_pending_output
// here but later find newer generated unfinalized files while scanning.
if (!pending_outputs_.empty()) {
job_context->min_pending_output = *pending_outputs_.begin();
} else {
// delete all of them
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
}
job_context->min_pending_output = MinObsoleteSstNumberToKeep();
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
@ -90,13 +85,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
&job_context->sst_delete_files, &job_context->blob_delete_files,
&job_context->manifest_delete_files, job_context->min_pending_output);
// Mark the elements in job_context->sst_delete_files as grabbedForPurge
// so that other threads calling FindObsoleteFiles with full_scan=true
// will not add these files to candidate list for purge.
// Mark the elements in job_context->sst_delete_files and
// job_context->blob_delete_files as "grabbed for purge" so that other threads
// calling FindObsoleteFiles with full_scan=true will not add these files to
// candidate list for purge.
for (const auto& sst_to_del : job_context->sst_delete_files) {
MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
}
for (const auto& blob_file : job_context->blob_delete_files) {
MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber());
}
// store the current filenum, lognum, etc
job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number =
@ -257,8 +257,11 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
const std::string& path_to_sync,
FileType type, uint64_t number) {
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion",
const_cast<std::string*>(&fname));
Status file_deletion_status;
if (type == kTableFile || type == kLogFile) {
if (type == kTableFile || type == kBlobFile || type == kLogFile) {
file_deletion_status =
DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
@ -307,13 +310,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
state.sst_live.end());
std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(),
state.blob_live.end());
std::unordered_set<uint64_t> log_recycle_files_set(
state.log_recycle_files.begin(), state.log_recycle_files.end());
auto candidate_files = state.full_scan_candidate_files;
candidate_files.reserve(
candidate_files.size() + state.sst_delete_files.size() +
state.log_delete_files.size() + state.manifest_delete_files.size());
state.blob_delete_files.size() + state.log_delete_files.size() +
state.manifest_delete_files.size());
// We may ignore the dbname when generating the file names.
for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back(
@ -324,6 +330,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
file.DeleteMetadata();
}
for (const auto& blob_file : state.blob_delete_files) {
candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()),
blob_file.GetPath());
}
for (auto file_num : state.log_delete_files) {
if (file_num > 0) {
candidate_files.emplace_back(LogFileName(file_num),
@ -416,6 +427,13 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
files_to_del.insert(number);
}
break;
case kBlobFile:
keep = number >= state.min_pending_output ||
(blob_live_set.find(number) != blob_live_set.end());
if (!keep) {
files_to_del.insert(number);
}
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live".
@ -426,6 +444,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// TODO(yhchiang): carefully modify the third condition to safely
// remove the temp options files.
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
(blob_live_set.find(number) != blob_live_set.end()) ||
(number == state.pending_manifest_file_number) ||
(to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
break;
@ -448,7 +467,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
case kDBLockFile:
case kIdentityFile:
case kMetaDatabase:
case kBlobFile:
keep = true;
break;
}
@ -464,6 +482,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
TableCache::Evict(table_cache_.get(), number);
fname = MakeTableFileName(candidate_file.file_path, number);
dir_to_sync = candidate_file.file_path;
} else if (type == kBlobFile) {
fname = BlobFileName(candidate_file.file_path, number);
dir_to_sync = candidate_file.file_path;
} else {
dir_to_sync =
(type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;

@ -10,6 +10,7 @@
#ifndef ROCKSDB_LITE
#include <stdlib.h>
#include <algorithm>
#include <map>
#include <string>
#include <vector>
@ -193,6 +194,124 @@ TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) {
ASSERT_EQ(2, opts_file_count);
}
TEST_F(ObsoleteFilesTest, BlobFiles) {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
// Add a blob file that consists of nothing but garbage (and is thus obsolete)
// and another one that is live.
VersionEdit edit;
constexpr uint64_t first_blob_file_number = 234;
constexpr uint64_t first_total_blob_count = 555;
constexpr uint64_t first_total_blob_bytes = 66666;
constexpr char first_checksum_method[] = "CRC32";
constexpr char first_checksum_value[] = "3d87ff57";
edit.AddBlobFile(first_blob_file_number, first_total_blob_count,
first_total_blob_bytes, first_checksum_method,
first_checksum_value);
edit.AddBlobFileGarbage(first_blob_file_number, first_total_blob_count,
first_total_blob_bytes);
constexpr uint64_t second_blob_file_number = 456;
constexpr uint64_t second_total_blob_count = 100;
constexpr uint64_t second_total_blob_bytes = 2000000;
constexpr char second_checksum_method[] = "CRC32B";
constexpr char second_checksum_value[] = "6dbdf23a";
edit.AddBlobFile(second_blob_file_number, second_total_blob_count,
second_total_blob_bytes, second_checksum_method,
second_checksum_value);
dbfull()->TEST_LockMutex();
Status s = versions->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, dbfull()->mutex());
dbfull()->TEST_UnlockMutex();
ASSERT_OK(s);
// Check for obsolete files and make sure the first blob file is picked up
// and grabbed for purge. The second blob file should be on the live list.
constexpr int job_id = 0;
JobContext job_context{job_id};
dbfull()->TEST_LockMutex();
constexpr bool force_full_scan = false;
dbfull()->FindObsoleteFiles(&job_context, force_full_scan);
dbfull()->TEST_UnlockMutex();
ASSERT_TRUE(job_context.HaveSomethingToDelete());
ASSERT_EQ(job_context.blob_delete_files.size(), 1);
ASSERT_EQ(job_context.blob_delete_files[0].GetBlobFileNumber(),
first_blob_file_number);
const auto& files_grabbed_for_purge =
dbfull()->TEST_GetFilesGrabbedForPurge();
ASSERT_NE(files_grabbed_for_purge.find(first_blob_file_number),
files_grabbed_for_purge.end());
ASSERT_EQ(job_context.blob_live.size(), 1);
ASSERT_EQ(job_context.blob_live[0], second_blob_file_number);
// Hack the job context a bit by adding a few files to the full scan
// list and adjusting the pending file number. We add the two files
// above as well as two additional ones, where one is old
// and should be cleaned up, and the other is still pending.
assert(cfd->ioptions());
assert(!cfd->ioptions()->cf_paths.empty());
const std::string& path = cfd->ioptions()->cf_paths.front().path;
constexpr uint64_t old_blob_file_number = 123;
constexpr uint64_t pending_blob_file_number = 567;
job_context.full_scan_candidate_files.emplace_back(
BlobFileName(old_blob_file_number), path);
job_context.full_scan_candidate_files.emplace_back(
BlobFileName(first_blob_file_number), path);
job_context.full_scan_candidate_files.emplace_back(
BlobFileName(second_blob_file_number), path);
job_context.full_scan_candidate_files.emplace_back(
BlobFileName(pending_blob_file_number), path);
job_context.min_pending_output = pending_blob_file_number;
// Purge obsolete files and make sure we purge the old file and the first file
// (and keep the second file and the pending file).
std::vector<std::string> deleted_files;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", [&](void* arg) {
const std::string* file = static_cast<std::string*>(arg);
assert(file);
constexpr char blob_extension[] = ".blob";
if (file->find(blob_extension) != std::string::npos) {
deleted_files.emplace_back(*file);
}
});
SyncPoint::GetInstance()->EnableProcessing();
dbfull()->PurgeObsoleteFiles(job_context);
job_context.Clean();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_EQ(files_grabbed_for_purge.find(first_blob_file_number),
files_grabbed_for_purge.end());
std::sort(deleted_files.begin(), deleted_files.end());
const std::vector<std::string> expected_deleted_files{
BlobFileName(path, old_blob_file_number),
BlobFileName(path, first_blob_file_number)};
ASSERT_EQ(deleted_files, expected_deleted_files);
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -79,6 +79,11 @@ std::string LogFileName(uint64_t number) {
return MakeFileName(number, "log");
}
std::string BlobFileName(uint64_t number) {
assert(number > 0);
return MakeFileName(number, kRocksDBBlobFileExt.c_str());
}
std::string BlobFileName(const std::string& blobdirname, uint64_t number) {
assert(number > 0);
return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str());

@ -56,6 +56,8 @@ extern std::string LogFileName(const std::string& dbname, uint64_t number);
extern std::string LogFileName(uint64_t number);
extern std::string BlobFileName(uint64_t number);
extern std::string BlobFileName(const std::string& bdirname, uint64_t number);
extern std::string BlobFileName(const std::string& dbname,

Loading…
Cancel
Save