BlobDB: Maintain mapping between blob files and SSTs (#6020)

Summary:
The patch adds logic to BlobDB to maintain the mapping between blob files
and SSTs for which the blob file in question is the oldest blob file referenced
by the SST file. The mapping is initialized during database open based on the
information retrieved using `GetLiveFilesMetaData`, and updated after
flushes/compactions based on the information received through the `EventListener`
interface (or, in the case of manual compactions issued through the `CompactFiles`
API, the `CompactionJobInfo` object).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6020

Test Plan: Added a unit test; also tested using the BlobDB mode of `db_bench`.

Differential Revision: D18410508

Pulled By: ltamasi

fbshipit-source-id: dd9e778af781cfdb0d7056298c54ba9cebdd54a5
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent aa63abf698
commit 8e7aa62813
  1. 22
      utilities/blob_db/blob_db.h
  2. 211
      utilities/blob_db/blob_db_impl.cc
  3. 49
      utilities/blob_db/blob_db_impl.h
  4. 22
      utilities/blob_db/blob_db_listener.h
  5. 105
      utilities/blob_db/blob_db_test.cc
  6. 24
      utilities/blob_db/blob_file.h

@ -204,6 +204,28 @@ class BlobDB : public StackableDB {
return NewIterator(options);
}
Status CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override = 0;
Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override {
if (column_family != DefaultColumnFamily()) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
return CompactFiles(compact_options, input_file_names, output_level,
output_path_id, output_file_names, compaction_job_info);
}
using rocksdb::StackableDB::Close;
virtual Status Close() override = 0;

@ -141,6 +141,15 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
// compactions irrespective of the user set value.
cf_options_.periodic_compaction_seconds = 0;
// Temporarily disable compactions in the base DB during open; save the user
// defined value beforehand so we can restore it once BlobDB is initialized.
// Note: this is only needed if garbage collection is enabled.
const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
if (bdb_options_.enable_garbage_collection) {
cf_options_.disable_auto_compactions = true;
}
Status s;
// Create info log.
@ -175,7 +184,9 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
}
// Update options
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
db_options_.listeners.push_back(bdb_options_.enable_garbage_collection
? std::make_shared<BlobDBListenerGC>(this)
: std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(this, env_, statistics_));
@ -187,6 +198,26 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
}
db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
// Initialize SST file <-> oldest blob file mapping if garbage collection
// is enabled.
if (bdb_options_.enable_garbage_collection) {
std::vector<LiveFileMetaData> live_files;
db_->GetLiveFilesMetaData(&live_files);
InitializeBlobFileToSstMapping(live_files);
if (!disable_auto_compactions) {
s = db_->EnableAutoCompaction(*handles);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to enable automatic compactions during open, status: %s",
s.ToString().c_str());
return s;
}
}
}
// Add trash files in blob dir to file delete scheduler.
SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
db_impl_->immutable_db_options().sst_file_manager.get());
@ -302,6 +333,137 @@ Status BlobDBImpl::OpenAllBlobFiles() {
return s;
}
template <typename Linker>
void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
uint64_t blob_file_number,
Linker linker) {
assert(bdb_options_.enable_garbage_collection);
assert(blob_file_number != kInvalidBlobFileNumber);
auto it = blob_files_.find(blob_file_number);
if (it == blob_files_.end()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Blob file %" PRIu64
" not found while trying to link "
"SST file %" PRIu64,
blob_file_number, sst_file_number);
return;
}
BlobFile* const blob_file = it->second.get();
assert(blob_file);
linker(blob_file, sst_file_number);
ROCKS_LOG_INFO(db_options_.info_log,
"Blob file %" PRIu64 " linked to SST file %" PRIu64,
blob_file_number, sst_file_number);
}
void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
uint64_t blob_file_number) {
auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
WriteLock file_lock(&blob_file->mutex_);
blob_file->LinkSstFile(sst_file);
};
LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
}
void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
uint64_t blob_file_number) {
auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
blob_file->LinkSstFile(sst_file);
};
LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
}
void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
uint64_t blob_file_number) {
assert(bdb_options_.enable_garbage_collection);
assert(blob_file_number != kInvalidBlobFileNumber);
auto it = blob_files_.find(blob_file_number);
if (it == blob_files_.end()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Blob file %" PRIu64
" not found while trying to unlink "
"SST file %" PRIu64,
blob_file_number, sst_file_number);
return;
}
BlobFile* const blob_file = it->second.get();
assert(blob_file);
{
WriteLock file_lock(&blob_file->mutex_);
blob_file->UnlinkSstFile(sst_file_number);
}
ROCKS_LOG_INFO(db_options_.info_log,
"Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
blob_file_number, sst_file_number);
}
void BlobDBImpl::InitializeBlobFileToSstMapping(
const std::vector<LiveFileMetaData>& live_files) {
assert(bdb_options_.enable_garbage_collection);
for (const auto& live_file : live_files) {
const uint64_t sst_file_number = live_file.file_number;
const uint64_t blob_file_number = live_file.oldest_blob_file_number;
if (blob_file_number == kInvalidBlobFileNumber) {
continue;
}
LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
}
}
void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
assert(bdb_options_.enable_garbage_collection);
if (info.oldest_blob_file_number == kInvalidBlobFileNumber) {
return;
}
{
ReadLock lock(&mutex_);
LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
}
}
void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
assert(bdb_options_.enable_garbage_collection);
// Note: the same SST file may appear in both the input and the output
// file list in case of a trivial move. We process the inputs first
// to ensure the blob file still has a link after processing all updates.
{
ReadLock lock(&mutex_);
for (const auto& input : info.input_file_infos) {
if (input.oldest_blob_file_number == kInvalidBlobFileNumber) {
continue;
}
UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
}
for (const auto& output : info.output_file_infos) {
if (output.oldest_blob_file_number == kInvalidBlobFileNumber) {
continue;
}
LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
}
}
}
void BlobDBImpl::CloseRandomAccessLocked(
const std::shared_ptr<BlobFile>& bfile) {
bfile->CloseRandomAccessLocked();
@ -777,6 +939,34 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return *compression_output;
}
Status BlobDBImpl::CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id, std::vector<std::string>* const output_file_names,
CompactionJobInfo* compaction_job_info) {
// Note: we need CompactionJobInfo to be able to track updates to the
// blob file <-> SST mappings, so we provide one if the user hasn't,
// assuming that GC is enabled.
CompactionJobInfo info{};
if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
compaction_job_info = &info;
}
const Status s =
db_->CompactFiles(compact_options, input_file_names, output_level,
output_path_id, output_file_names, compaction_job_info);
if (!s.ok()) {
return s;
}
if (bdb_options_.enable_garbage_collection) {
assert(compaction_job_info);
ProcessCompactionJobInfo(*compaction_job_info);
}
return s;
}
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
ReadLock l(&mutex_);
@ -1902,6 +2092,11 @@ Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
return GetBlobValue(key, index_entry, value);
}
void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number) {
blob_files_[blob_file_number] = std::make_shared<BlobFile>(
this, blob_dir_, blob_file_number, db_options_.info_log.get());
}
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files;
@ -1948,6 +2143,20 @@ void BlobDBImpl::TEST_EvictExpiredFiles() {
}
uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
const std::vector<LiveFileMetaData>& live_files) {
InitializeBlobFileToSstMapping(live_files);
}
void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
ProcessFlushJobInfo(info);
}
void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
ProcessCompactionJobInfo(info);
}
#endif // !NDEBUG
} // namespace blob_db

@ -77,6 +77,8 @@ struct GCStats {
class BlobDBImpl : public BlobDB {
friend class BlobFile;
friend class BlobDBIterator;
friend class BlobDBListener;
friend class BlobDBListenerGC;
public:
// deletions check period
@ -148,6 +150,14 @@ class BlobDBImpl : public BlobDB {
Status PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) override;
using BlobDB::CompactFiles;
Status CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override;
BlobDBOptions GetBlobDBOptions() const override;
BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
@ -169,14 +179,14 @@ class BlobDBImpl : public BlobDB {
Status SyncBlobFiles() override;
void UpdateLiveSSTSize();
void GetCompactionContext(BlobCompactionContext* context);
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
void TEST_AddDummyBlobFile(uint64_t blob_file_number);
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
@ -199,6 +209,14 @@ class BlobDBImpl : public BlobDB {
uint64_t TEST_live_sst_size();
const std::string& TEST_blob_dir() const { return blob_dir_; }
void TEST_InitializeBlobFileToSstMapping(
const std::vector<LiveFileMetaData>& live_files);
void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
#endif // !NDEBUG
private:
@ -284,6 +302,33 @@ class BlobDBImpl : public BlobDB {
// Open all blob files found in blob_dir.
Status OpenAllBlobFiles();
// Link an SST to a blob file. Comes in locking and non-locking varieties
// (the latter is used during Open).
template <typename Linker>
void LinkSstToBlobFileImpl(uint64_t sst_file_number,
uint64_t blob_file_number, Linker linker);
void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
uint64_t blob_file_number);
// Unlink an SST from a blob file.
void UnlinkSstFromBlobFile(uint64_t sst_file_number,
uint64_t blob_file_number);
// Initialize the mapping between blob files and SSTs during Open.
void InitializeBlobFileToSstMapping(
const std::vector<LiveFileMetaData>& live_files);
// Update the mapping between blob files and SSTs after a flush.
void ProcessFlushJobInfo(const FlushJobInfo& info);
// Update the mapping between blob files and SSTs after a compaction.
void ProcessCompactionJobInfo(const CompactionJobInfo& info);
void UpdateLiveSSTSize();
Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<RandomAccessFileReader>* reader);

@ -37,10 +37,30 @@ class BlobDBListener : public EventListener {
blob_db_impl_->UpdateLiveSSTSize();
}
private:
protected:
BlobDBImpl* blob_db_impl_;
};
class BlobDBListenerGC : public BlobDBListener {
public:
explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl)
: BlobDBListener(blob_db_impl) {}
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
BlobDBListener::OnFlushCompleted(db, info);
assert(blob_db_impl_);
blob_db_impl_->ProcessFlushJobInfo(info);
}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
BlobDBListener::OnCompactionCompleted(db, info);
assert(blob_db_impl_);
blob_db_impl_->ProcessCompactionJobInfo(info);
}
};
} // namespace blob_db
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -1614,6 +1614,111 @@ TEST_F(BlobDBTest, DisableFileDeletions) {
}
}
TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
BlobDBOptions bdb_options;
bdb_options.enable_garbage_collection = true;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
// Register some dummy blob files.
blob_db_impl()->TEST_AddDummyBlobFile(1);
blob_db_impl()->TEST_AddDummyBlobFile(2);
blob_db_impl()->TEST_AddDummyBlobFile(3);
blob_db_impl()->TEST_AddDummyBlobFile(4);
blob_db_impl()->TEST_AddDummyBlobFile(5);
// Initialize the blob <-> SST file mapping. First, add some SST files with
// blob file references, then some without.
std::vector<LiveFileMetaData> live_files;
for (uint64_t i = 1; i <= 10; ++i) {
LiveFileMetaData live_file;
live_file.file_number = i;
live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
live_files.emplace_back(live_file);
}
for (uint64_t i = 11; i <= 20; ++i) {
LiveFileMetaData live_file;
live_file.file_number = i;
live_files.emplace_back(live_file);
}
blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
// Check that the blob <-> SST mappings have been correctly initialized.
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(blob_files.size(), 5);
{
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
}
}
// Simulate a flush where the SST does not reference any blob files.
{
FlushJobInfo info{};
info.file_number = 21;
blob_db_impl()->TEST_ProcessFlushJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
}
}
// Simulate a flush where the SST references a blob file.
{
FlushJobInfo info{};
info.file_number = 22;
info.oldest_blob_file_number = 5;
blob_db_impl()->TEST_ProcessFlushJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
}
}
// Simulate a compaction. Some inputs and outputs have blob file references,
// some don't. There is also a trivial move (which means the SST appears on
// both the input and the output list).
{
CompactionJobInfo info{};
info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
info.input_file_infos.emplace_back(
CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
info.input_file_infos.emplace_back(CompactionFileInfo{1, 5, 22});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
info.output_file_infos.emplace_back(
CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
info.output_file_infos.emplace_back(CompactionFileInfo{2, 5, 22});
blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
{6}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
for (size_t i = 0; i < 5; ++i) {
const auto &blob_file = blob_files[i];
ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
}
}
}
TEST_F(BlobDBTest, ShutdownWait) {
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 100;

@ -7,6 +7,7 @@
#include <atomic>
#include <memory>
#include <unordered_set>
#include "file/random_access_file_reader.h"
#include "port/port.h"
@ -38,6 +39,10 @@ class BlobFile {
// after that
uint64_t file_number_;
// The file numbers of the SST files whose oldest blob file reference
// points to this blob file.
std::unordered_set<uint64_t> linked_sst_files_;
// Info log.
Logger* info_log_;
@ -116,6 +121,25 @@ class BlobFile {
// once the file is created, this never changes
uint64_t BlobFileNumber() const { return file_number_; }
// Get the set of SST files whose oldest blob file reference points to
// this file.
const std::unordered_set<uint64_t>& GetLinkedSstFiles() const {
return linked_sst_files_;
}
// Link an SST file whose oldest blob file reference points to this file.
void LinkSstFile(uint64_t sst_file_number) {
assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end());
linked_sst_files_.insert(sst_file_number);
}
// Unlink an SST file whose oldest blob file reference points to this file.
void UnlinkSstFile(uint64_t sst_file_number) {
auto it = linked_sst_files_.find(sst_file_number);
assert(it != linked_sst_files_.end());
linked_sst_files_.erase(it);
}
// the following functions are atomic, and don't need
// read lock
uint64_t BlobCount() const {

Loading…
Cancel
Save