Blob DB: option to enable garbage collection

Summary:
Add an option to enable/disable auto garbage collection, where we keep counting how many keys have been evicted by either deletion or compaction and decide whether to garbage collect a blob file.

Default disable auto garbage collection for now since the whole logic is not fully tested and we plan to make major change to it.
Closes https://github.com/facebook/rocksdb/pull/3117

Differential Revision: D6224756

Pulled By: yiwu-arbug

fbshipit-source-id: cdf53bdccec96a4580a2b3a342110ad9e8864dfe
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 167ba599ec
commit f662f8f0b6
  1. 46
      utilities/blob_db/blob_db.cc
  2. 7
      utilities/blob_db/blob_db.h
  3. 45
      utilities/blob_db/blob_db_impl.cc

@ -57,12 +57,16 @@ Status BlobDB::OpenAndLoad(const Options& options,
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener);
if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter);
}
changed_options->listeners.emplace_back(fblistener);
changed_options->listeners.emplace_back(ce_listener);
if (bdb_options.enable_garbage_collection) {
changed_options->listeners.emplace_back(ce_listener);
}
changed_options->wal_filter = rw_filter.get();
DBOptions db_options(*changed_options);
@ -71,7 +75,9 @@ Status BlobDB::OpenAndLoad(const Options& options,
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb);
if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb);
Status s = bdb->OpenPhase1();
@ -124,20 +130,26 @@ Status BlobDB::Open(const DBOptions& db_options_input,
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();
db_options.listeners.emplace_back(fblistener);
db_options.listeners.emplace_back(ce_listener);
if (bdb_options.enable_garbage_collection) {
db_options.listeners.emplace_back(ce_listener);
}
db_options.wal_filter = rw_filter.get();
{
MutexLock l(&listener_mutex);
all_blobdb_listeners.push_back(fblistener);
all_blobdb_listeners.push_back(ce_listener);
if (bdb_options.enable_garbage_collection) {
all_blobdb_listeners.push_back(ce_listener);
}
all_wal_filters.push_back(rw_filter);
}
// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb);
if (bdb_options.enable_garbage_collection) {
ce_listener->SetImplPtr(bdb);
}
rw_filter->SetImplPtr(bdb);
s = bdb->OpenPhase1();
@ -172,25 +184,27 @@ Status BlobDB::Open(const DBOptions& db_options_input,
BlobDB::BlobDB(DB* db) : StackableDB(db) {}
void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",
blob_dir.c_str());
ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d",
ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d",
path_relative);
ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d",
ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d",
is_fifo);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64,
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64,
blob_dir_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32,
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32,
ttl_range_secs);
ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64,
ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
blob_file_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
ttl_extractor.get());
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
static_cast<int>(compression));
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d",
ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d",
enable_garbage_collection);
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d",
disable_background_tasks);
}

@ -71,7 +71,12 @@ struct BlobDBOptions {
// what compression to use for Blob's
CompressionType compression = kNoCompression;
// Disable all background job.
// If enabled, blob DB periodically cleanup stale data by rewriting remaining
// live data in blob files to new files. If garbage collection is not enabled,
// blob files will be cleanup based on TTL.
bool enable_garbage_collection = false;
// Disable all background job. Used for test only.
bool disable_background_tasks = false;
void Dump(Logger* log) const;

@ -69,6 +69,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
int level, const Slice& key,
CompactionEventListener::CompactionListenerValueType value_type,
const Slice& existing_value, const SequenceNumber& sn, bool is_new) {
assert(impl_->bdb_options_.enable_garbage_collection);
if (!is_new &&
value_type ==
CompactionEventListener::CompactionListenerValueType::kValue) {
@ -213,12 +214,14 @@ void BlobDBImpl::StartBackgroundTasks() {
std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
tqueue_.add(kGCCheckPeriodMillisecs,
std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1));
tqueue_.add(
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1));
tqueue_.add(
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
if (bdb_options_.enable_garbage_collection) {
tqueue_.add(
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1));
tqueue_.add(
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
}
tqueue_.add(
kDeleteObsoleteFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
@ -659,8 +662,10 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
Status s = db_->Delete(options, key);
// add deleted key to list of keys that have been deleted for book-keeping
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn});
if (bdb_options_.enable_garbage_collection) {
// add deleted key to list of keys that have been deleted for book-keeping
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn});
}
return s;
}
@ -780,11 +785,13 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
SequenceNumber sequence_;
};
// add deleted key to list of keys that have been deleted for book-keeping
DeleteBookkeeper delete_bookkeeper(this, current_seq);
updates->Iterate(&delete_bookkeeper);
if (bdb_options_.enable_garbage_collection) {
// add deleted key to list of keys that have been deleted for book-keeping
DeleteBookkeeper delete_bookkeeper(this, current_seq);
s = updates->Iterate(&delete_bookkeeper);
}
return Status::OK();
return s;
}
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
@ -1318,6 +1325,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
uint64_t blob_offset,
uint64_t blob_size) {
assert(bdb_options_.enable_garbage_collection);
(void)blob_offset;
std::shared_ptr<BlobFile> bfile;
{
@ -1340,6 +1348,7 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
}
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
assert(bdb_options_.enable_garbage_collection);
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
@ -1354,6 +1363,7 @@ bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
}
std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) {
assert(bdb_options_.enable_garbage_collection);
if (aborted) return std::make_pair(false, -1);
override_packet_t packet;
@ -1377,6 +1387,7 @@ std::pair<bool, int64_t> BlobDBImpl::EvictCompacted(bool aborted) {
}
std::pair<bool, int64_t> BlobDBImpl::EvictDeletions(bool aborted) {
assert(bdb_options_.enable_garbage_collection);
if (aborted) return std::make_pair(false, -1);
ColumnFamilyHandle* last_cfh = nullptr;
@ -1882,10 +1893,12 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
ReadLock lockbfile_r(&bfile->mutex_);
if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) >
kPartialExpirationPercentage) {
*reason = "deleted simple blobs beyond threshold";
return true;
if (bdb_options_.enable_garbage_collection) {
if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) >
kPartialExpirationPercentage) {
*reason = "deleted simple blobs beyond threshold";
return true;
}
}
// if we haven't reached limits of disk space, don't DELETE

Loading…
Cancel
Save