Move out valid blobs from the oldest blob files during compaction (#6121)

Summary:
The patch adds logic that relocates live blobs from the oldest N non-TTL
blob files as they are encountered during compaction (assuming the BlobDB
configuration option `enable_garbage_collection` is `true`), where N is defined
as the number of immutable non-TTL blob files multiplied by the value of
a new BlobDB configuration option called `garbage_collection_cutoff`.
(The default value of this parameter is 0.25, that is, by default the valid blobs
residing in the oldest 25% of immutable non-TTL blob files are relocated.)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6121

Test Plan: Added unit test and tested using the BlobDB mode of `db_bench`.

Differential Revision: D18785357

Pulled By: ltamasi

fbshipit-source-id: 8c21c512a18fba777ec28765c88682bb1a5e694e
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent c2029f9716
commit 583c6953d8
  1. 59
      db/compaction/compaction_iterator.cc
  2. 9
      include/rocksdb/compaction_filter.h
  3. 329
      utilities/blob_db/blob_compaction_filter.cc
  4. 141
      utilities/blob_db/blob_compaction_filter.h
  5. 10
      utilities/blob_db/blob_db.h
  6. 84
      utilities/blob_db/blob_db_impl.cc
  7. 16
      utilities/blob_db/blob_db_impl.h
  8. 243
      utilities/blob_db/blob_db_test.cc
  9. 6
      utilities/blob_db/blob_file.h

@ -639,28 +639,45 @@ void CompactionIterator::NextFromInput() {
}
void CompactionIterator::PrepareOutput() {
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// and the userkey differs from the last userkey in compaction
// then we can squash the seqno to zero.
//
// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
ROCKS_LOG_FATAL(info_log_,
"Unexpected key type %d for seq-zero optimization",
ikey_.type);
if (valid_) {
if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ = Status::Corruption(
"Corrupted blob reference encountered during GC");
} else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
} else if (blob_decision ==
CompactionFilter::BlobDecision::kChangeValue) {
value_ = compaction_filter_value_;
}
}
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// and the userkey differs from the last userkey in compaction
// then we can squash the seqno to zero.
//
// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
ROCKS_LOG_FATAL(info_log_,
"Unexpected key type %d for seq-zero optimization",
ikey_.type);
}
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
}
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
}
}

@ -45,6 +45,8 @@ class CompactionFilter {
kRemoveAndSkipUntil,
};
enum class BlobDecision { kKeep, kChangeValue, kCorruption, kIOError };
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
@ -173,6 +175,13 @@ class CompactionFilter {
return Decision::kKeep;
}
// Internal (BlobDB) use only. Do not override in application code.
virtual BlobDecision PrepareBlobOutput(const Slice& /* key */,
const Slice& /* existing_value */,
std::string* /* new_value */) const {
return BlobDecision::kKeep;
}
// This function is deprecated. Snapshots will always be ignored for
// compaction filters, because we realized that not ignoring snapshots doesn't
// provide the gurantee we initially thought it would provide. Repeatable

@ -11,102 +11,279 @@
namespace rocksdb {
namespace blob_db {
namespace {
// CompactionFilter to delete expired blob index from base DB.
class BlobIndexCompactionFilter : public CompactionFilter {
public:
BlobIndexCompactionFilter(BlobCompactionContext context,
uint64_t current_time, Statistics* statistics)
: context_(context),
current_time_(current_time),
statistics_(statistics) {}
~BlobIndexCompactionFilter() override {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
}
const char* Name() const override { return "BlobIndexCompactionFilter"; }
// Filter expired blob indexes regardless of snapshots.
bool IgnoreSnapshots() const override { return true; }
Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
const Slice& value, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (value_type != kBlobIndex) {
return Decision::kKeep;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
expired_count_++;
expired_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (!blob_index.IsInlined() &&
blob_index.file_number() < context_.next_file_number &&
context_.current_blob_files.count(blob_index.file_number()) == 0) {
// Corresponding blob file gone. Could have been garbage collected or
// evicted by FIFO eviction.
CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
int /*level*/, const Slice& key, ValueType value_type, const Slice& value,
std::string* /*new_value*/, std::string* /*skip_until*/) const {
if (value_type != kBlobIndex) {
return Decision::kKeep;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
if (!s.ok()) {
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
expired_count_++;
expired_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (!blob_index.IsInlined() &&
blob_index.file_number() < context_.next_file_number &&
context_.current_blob_files.count(blob_index.file_number()) == 0) {
// Corresponding blob file gone. Could have been garbage collected or
// evicted by FIFO eviction.
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
blob_index.expiration() < context_.evict_expiration_up_to) {
// Hack: Internal key is passed to BlobIndexCompactionFilter for it to
// get sequence number.
ParsedInternalKey ikey;
bool ok = ParseInternalKey(key, &ikey);
// Remove keys that could have been remove by last FIFO eviction.
// If get error while parsing key, ignore and continue.
if (ok && ikey.sequence < context_.fifo_eviction_seq) {
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
blob_index.expiration() < context_.evict_expiration_up_to) {
// Hack: Internal key is passed to BlobIndexCompactionFilter for it to
// get sequence number.
ParsedInternalKey ikey;
bool ok = ParseInternalKey(key, &ikey);
// Remove keys that could have been remove by last FIFO eviction.
// If get error while parsing key, ignore and continue.
if (ok && ikey.sequence < context_.fifo_eviction_seq) {
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
}
return Decision::kKeep;
}
return Decision::kKeep;
}
CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
const Slice& key, const Slice& existing_value,
std::string* new_value) const {
assert(new_value);
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
(void)blob_db_impl;
assert(blob_db_impl);
assert(blob_db_impl->bdb_options_.enable_garbage_collection);
BlobIndex blob_index;
const Status s = blob_index.DecodeFrom(existing_value);
if (!s.ok()) {
return BlobDecision::kCorruption;
}
if (blob_index.IsInlined()) {
return BlobDecision::kKeep;
}
if (blob_index.HasTTL()) {
return BlobDecision::kKeep;
}
if (blob_index.file_number() >= context_gc_.cutoff_file_number) {
return BlobDecision::kKeep;
}
// Note: each compaction generates its own blob files, which, depending on the
// workload, might result in many small blob files. The total number of files
// is bounded though (determined by the number of compactions and the blob
// file size option).
if (!OpenNewBlobFileIfNeeded()) {
return BlobDecision::kIOError;
}
PinnableSlice blob;
CompressionType compression_type = kNoCompression;
if (!ReadBlobFromOldFile(key, blob_index, &blob, &compression_type)) {
return BlobDecision::kIOError;
}
uint64_t new_blob_file_number = 0;
uint64_t new_blob_offset = 0;
if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
return BlobDecision::kIOError;
}
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
mutable uint64_t expired_size_ = 0;
mutable uint64_t evicted_count_ = 0;
mutable uint64_t evicted_size_ = 0;
};
if (!CloseAndRegisterNewBlobFileIfNeeded()) {
return BlobDecision::kIOError;
}
BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
blob.size(), compression_type);
return BlobDecision::kChangeValue;
}
bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
if (blob_file_) {
assert(writer_);
return true;
}
} // anonymous namespace
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
const Status s = blob_db_impl->CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(), "GC", &blob_file_, &writer_);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error opening new blob file during GC, status: %s",
s.ToString().c_str());
return false;
}
assert(blob_file_);
assert(writer_);
return true;
}
bool BlobIndexCompactionFilterGC::ReadBlobFromOldFile(
const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
CompressionType* compression_type) const {
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
const Status s = blob_db_impl->GetRawBlobFromFile(
key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
blob, compression_type);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error reading blob during GC, key: %s (%s), status: %s",
key.ToString(/* output_hex */ true).c_str(),
blob_index.DebugString(/* output_hex */ true).c_str(),
s.ToString().c_str());
return false;
}
return true;
}
bool BlobIndexCompactionFilterGC::WriteBlobToNewFile(
const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const {
assert(new_blob_file_number);
assert(new_blob_offset);
assert(blob_file_);
*new_blob_file_number = blob_file_->BlobFileNumber();
assert(writer_);
uint64_t new_key_offset = 0;
const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
new_blob_offset);
if (!s.ok()) {
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
ROCKS_LOG_ERROR(
blob_db_impl->db_options_.info_log,
"Error writing blob to new file %s during GC, key: %s, status: %s",
blob_file_->PathName().c_str(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
return false;
}
const uint64_t new_size =
BlobLogRecord::kHeaderSize + key.size() + blob.size();
blob_file_->BlobRecordAdded(new_size);
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
blob_db_impl->total_blob_size_ += new_size;
return true;
}
bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFileIfNeeded() const {
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
return true;
}
return CloseAndRegisterNewBlobFile();
}
bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFile() const {
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
Status s;
{
WriteLock wl(&blob_db_impl->mutex_);
s = blob_db_impl->CloseBlobFile(blob_file_);
// Note: we delay registering the new blob file until it's closed to
// prevent FIFO eviction from processing it during the GC run.
blob_db_impl->RegisterBlobFile(blob_file_);
}
assert(blob_file_->Immutable());
blob_file_.reset();
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error closing new blob file %s during GC, status: %s",
blob_file_->PathName().c_str(), s.ToString().c_str());
return false;
}
return true;
}
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
assert(env());
int64_t current_time = 0;
Status s = env_->GetCurrentTime(&current_time);
Status s = env()->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
assert(current_time >= 0);
assert(blob_db_impl());
BlobCompactionContext context;
blob_db_impl_->GetCompactionContext(&context);
blob_db_impl()->GetCompactionContext(&context);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
context, static_cast<uint64_t>(current_time), statistics_));
std::move(context), current_time, statistics()));
}
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
assert(env());
int64_t current_time = 0;
Status s = env()->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
assert(current_time >= 0);
assert(blob_db_impl());
BlobCompactionContext context;
BlobCompactionContextGC context_gc;
blob_db_impl()->GetCompactionContext(&context, &context_gc);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC(
std::move(context), std::move(context_gc), current_time, statistics()));
}
} // namespace blob_db

@ -17,24 +17,113 @@ namespace rocksdb {
namespace blob_db {
struct BlobCompactionContext {
uint64_t next_file_number;
uint64_t next_file_number = 0;
std::unordered_set<uint64_t> current_blob_files;
SequenceNumber fifo_eviction_seq;
uint64_t evict_expiration_up_to;
SequenceNumber fifo_eviction_seq = 0;
uint64_t evict_expiration_up_to = 0;
};
class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
struct BlobCompactionContextGC {
BlobDBImpl* blob_db_impl = nullptr;
uint64_t cutoff_file_number = 0;
};
// Compaction filter that deletes expired blob indexes from the base DB.
// Comes into two varieties, one for the non-GC case and one for the GC case.
class BlobIndexCompactionFilterBase : public CompactionFilter {
public:
BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env,
Statistics* statistics)
: blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {}
BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
uint64_t current_time, Statistics* statistics)
: context_(std::move(context)),
current_time_(current_time),
statistics_(statistics) {}
virtual const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
~BlobIndexCompactionFilterBase() override {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override;
// Filter expired blob indexes regardless of snapshots.
bool IgnoreSnapshots() const override { return true; }
Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
const Slice& value, std::string* /*new_value*/,
std::string* /*skip_until*/) const override;
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
mutable uint64_t expired_size_ = 0;
mutable uint64_t evicted_count_ = 0;
mutable uint64_t evicted_size_ = 0;
};
class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilter(BlobCompactionContext&& context,
uint64_t current_time, Statistics* statistics)
: BlobIndexCompactionFilterBase(std::move(context), current_time,
statistics) {}
const char* Name() const override { return "BlobIndexCompactionFilter"; }
};
class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
BlobCompactionContextGC&& context_gc,
uint64_t current_time, Statistics* statistics)
: BlobIndexCompactionFilterBase(std::move(context), current_time,
statistics),
context_gc_(std::move(context_gc)) {}
~BlobIndexCompactionFilterGC() override {
if (blob_file_) {
CloseAndRegisterNewBlobFile();
}
}
const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
std::string* new_value) const override;
private:
bool OpenNewBlobFileIfNeeded() const;
bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
PinnableSlice* blob,
CompressionType* compression_type) const;
bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const;
bool CloseAndRegisterNewBlobFileIfNeeded() const;
bool CloseAndRegisterNewBlobFile() const;
private:
BlobCompactionContextGC context_gc_;
mutable std::shared_ptr<BlobFile> blob_file_;
mutable std::shared_ptr<Writer> writer_;
};
// Compaction filter factory; similarly to the filters above, it comes
// in two flavors, one that creates filters that support GC, and one
// that creates non-GC filters.
class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl* blob_db_impl, Env* env,
Statistics* statistics)
: blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {}
protected:
BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
Env* env() const { return env_; }
Statistics* statistics() const { return statistics_; }
private:
BlobDBImpl* blob_db_impl_;
@ -42,6 +131,36 @@ class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
Statistics* statistics_;
};
class BlobIndexCompactionFilterFactory
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env,
Statistics* statistics)
: BlobIndexCompactionFilterFactoryBase(blob_db_impl, env, statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override;
};
class BlobIndexCompactionFilterFactoryGC
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl* blob_db_impl, Env* env,
Statistics* statistics)
: BlobIndexCompactionFilterFactoryBase(blob_db_impl, env, statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactoryGC";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -68,11 +68,15 @@ struct BlobDBOptions {
// what compression to use for Blob's
CompressionType compression = kNoCompression;
// If enabled, blob DB periodically cleanup stale data by rewriting remaining
// live data in blob files to new files. If garbage collection is not enabled,
// blob files will be cleanup based on TTL.
// If enabled, BlobDB cleans up stale blobs in non-TTL files during compaction
// by rewriting the remaining live blobs to new files.
bool enable_garbage_collection = false;
// The cutoff in terms of blob file age for garbage collection. Blobs in
// the oldest N non-TTL blob files will be rewritten when encountered during
// compaction, where N = garbage_collection_cutoff * number_of_non_TTL_files.
double garbage_collection_cutoff = 0.25;
// Disable all background job. Used for test only.
bool disable_background_tasks = false;

@ -131,13 +131,22 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
assert(db_ == nullptr);
if (blob_dir_.empty()) {
return Status::NotSupported("No blob directory in options");
}
if (cf_options_.compaction_filter != nullptr ||
cf_options_.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
if (bdb_options_.garbage_collection_cutoff < 0.0 ||
bdb_options_.garbage_collection_cutoff > 1.0) {
return Status::InvalidArgument(
"Garbage collection cutoff must be in the interval [0.0, 1.0]");
}
// BlobDB does not support Periodic Compactions. So disable periodic
// compactions irrespective of the user set value.
cf_options_.periodic_compaction_seconds = 0;
@ -185,11 +194,17 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
}
// Update options
db_options_.listeners.push_back(bdb_options_.enable_garbage_collection
? std::make_shared<BlobDBListenerGC>(this)
: std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(this, env_, statistics_));
if (bdb_options_.enable_garbage_collection) {
db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
statistics_);
} else {
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
statistics_);
}
// Open base db.
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
@ -607,6 +622,17 @@ std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
return blob_file;
}
void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
const uint64_t blob_file_number = blob_file->BlobFileNumber();
auto it = blob_files_.lower_bound(blob_file_number);
assert(it == blob_files_.end() || it->first != blob_file_number);
blob_files_.insert(it,
std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
blob_file_number, std::move(blob_file)));
}
Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
std::string fpath(bfile->PathName());
std::unique_ptr<WritableFile> wfile;
@ -767,8 +793,7 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
return s;
}
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
(*blob_file)->BlobFileNumber(), *blob_file));
RegisterBlobFile(*blob_file);
open_non_ttl_file_ = *blob_file;
return s;
@ -814,8 +839,7 @@ Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
return s;
}
blob_files_.insert(std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
(*blob_file)->BlobFileNumber(), *blob_file));
RegisterBlobFile(*blob_file);
open_ttl_files_.insert(*blob_file);
return s;
@ -1062,8 +1086,9 @@ Status BlobDBImpl::CompactFiles(
return s;
}
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
ReadLock l(&mutex_);
void BlobDBImpl::GetCompactionContextCommon(
BlobCompactionContext* context) const {
assert(context);
context->next_file_number = next_file_number_.load();
context->current_blob_files.clear();
@ -1074,6 +1099,33 @@ void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
context->evict_expiration_up_to = evict_expiration_up_to_;
}
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
assert(context);
ReadLock l(&mutex_);
GetCompactionContextCommon(context);
}
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
BlobCompactionContextGC* context_gc) {
assert(context);
assert(context_gc);
ReadLock l(&mutex_);
GetCompactionContextCommon(context);
context_gc->blob_db_impl = this;
if (!live_imm_non_ttl_blob_files_.empty()) {
auto it = live_imm_non_ttl_blob_files_.begin();
std::advance(it, bdb_options_.garbage_collection_cutoff *
live_imm_non_ttl_blob_files_.size());
context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
? it->first
: std::numeric_limits<uint64_t>::max();
}
}
void BlobDBImpl::UpdateLiveSSTSize() {
uint64_t live_sst_size = 0;
bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
@ -1199,11 +1251,8 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
return s;
}
// increment blob count
bfile->blob_count_++;
uint64_t size_put = headerbuf.size() + key.size() + value.size();
bfile->file_size_ += size_put;
bfile->BlobRecordAdded(size_put);
total_blob_size_ += size_put;
if (expiration == kNoExpiration) {
@ -1573,7 +1622,10 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
assert(bfile);
assert(!bfile->Immutable());
assert(!bfile->Obsolete());
write_mutex_.AssertHeld();
if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
write_mutex_.AssertHeld();
}
ROCKS_LOG_INFO(db_options_.info_log,
"Closing blob file %" PRIu64 ". Path: %s",

@ -44,6 +44,7 @@ struct FlushJobInfo;
namespace blob_db {
struct BlobCompactionContext;
struct BlobCompactionContextGC;
class BlobDBImpl;
class BlobFile;
@ -79,6 +80,7 @@ class BlobDBImpl : public BlobDB {
friend class BlobDBIterator;
friend class BlobDBListener;
friend class BlobDBListenerGC;
friend class BlobIndexCompactionFilterGC;
public:
// deletions check period
@ -179,7 +181,13 @@ class BlobDBImpl : public BlobDB {
Status SyncBlobFiles() override;
// Common part of the two GetCompactionContext methods below.
// REQUIRES: read lock on mutex_
void GetCompactionContextCommon(BlobCompactionContext* context) const;
void GetCompactionContext(BlobCompactionContext* context);
void GetCompactionContext(BlobCompactionContext* context,
BlobCompactionContextGC* context_gc);
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
@ -247,7 +255,9 @@ class BlobDBImpl : public BlobDB {
// Close a file by appending a footer, and removes file from open files list.
// REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
// and the blob file's mutex_.
// and the blob file's mutex_. If called on a blob file which is visible only
// to a single thread (like in the case of new files written during GC), the
// locks on write_mutex_ and the blob file's mutex_ can be avoided.
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
// Close a file if its size exceeds blob_file_size
@ -317,6 +327,10 @@ class BlobDBImpl : public BlobDB {
const ExpirationRange& expiration_range,
const std::string& reason);
// Register a new blob file.
// REQUIRES: write lock on mutex_.
void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
// collect all the blob log files from the blob directory
Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);

@ -8,8 +8,10 @@
#include <algorithm>
#include <chrono>
#include <cstdlib>
#include <iomanip>
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
@ -35,10 +37,22 @@ class BlobDBTest : public testing::Test {
public:
const int kMaxBlobSize = 1 << 14;
struct BlobRecord {
std::string key;
std::string value;
uint64_t expiration = 0;
struct BlobIndexVersion {
BlobIndexVersion() = default;
BlobIndexVersion(std::string _user_key, uint64_t _file_number,
uint64_t _expiration, SequenceNumber _sequence,
ValueType _type)
: user_key(std::move(_user_key)),
file_number(_file_number),
expiration(_expiration),
sequence(_sequence),
type(_type) {}
std::string user_key;
uint64_t file_number = kInvalidBlobFileNumber;
uint64_t expiration = kNoExpiration;
SequenceNumber sequence = 0;
ValueType type = kTypeValue;
};
BlobDBTest()
@ -230,6 +244,45 @@ class BlobDBTest : public testing::Test {
}
}
void VerifyBaseDBBlobIndex(
const std::map<std::string, BlobIndexVersion> &expected_versions) {
const size_t kMaxKeys = 10000;
std::vector<KeyVersion> versions;
ASSERT_OK(
GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
ASSERT_EQ(versions.size(), expected_versions.size());
size_t i = 0;
for (const auto &expected_pair : expected_versions) {
const BlobIndexVersion &expected_version = expected_pair.second;
ASSERT_EQ(versions[i].user_key, expected_version.user_key);
ASSERT_EQ(versions[i].sequence, expected_version.sequence);
ASSERT_EQ(versions[i].type, expected_version.type);
if (versions[i].type != kTypeBlobIndex) {
ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
ASSERT_EQ(kNoExpiration, expected_version.expiration);
++i;
continue;
}
BlobIndex blob_index;
ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
const uint64_t file_number = !blob_index.IsInlined()
? blob_index.file_number()
: kInvalidBlobFileNumber;
ASSERT_EQ(file_number, expected_version.file_number);
const uint64_t expiration =
blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
ASSERT_EQ(expiration, expected_version.expiration);
++i;
}
}
void InsertBlobs() {
WriteOptions wo;
std::string value;
@ -1538,6 +1591,188 @@ TEST_F(BlobDBTest, FilterForFIFOEviction) {
VerifyDB(data_after_compact);
}
TEST_F(BlobDBTest, GarbageCollection) {
constexpr size_t kNumPuts = 1 << 10;
constexpr uint64_t kExpiration = 1000;
constexpr uint64_t kCompactTime = 500;
constexpr uint64_t kKeySize = 7; // "key" + 4 digits
constexpr uint64_t kSmallValueSize = 1 << 6;
constexpr uint64_t kLargeValueSize = 1 << 8;
constexpr uint64_t kMinBlobSize = 1 << 7;
static_assert(kSmallValueSize < kMinBlobSize, "");
static_assert(kLargeValueSize > kMinBlobSize, "");
constexpr size_t kBlobsPerFile = 8;
constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
constexpr uint64_t kBlobFileSize =
BlobLogHeader::kSize +
(BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.blob_file_size = kBlobFileSize;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 0.25;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, KeyVersion> blob_value_versions;
std::map<std::string, BlobIndexVersion> blob_index_versions;
Random rnd(301);
// Add a bunch of large non-TTL values. These will be written to non-TTL
// blob files and will be subject to GC.
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value(
test::RandomHumanReadableString(&rnd, kLargeValueSize));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] =
BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
sequence, kTypeBlobIndex);
}
// Add some small and/or TTL values that will be ignored during GC.
// First, add a large TTL value will be written to its own TTL blob file.
{
const std::string key("key2000");
const std::string value(
test::RandomHumanReadableString(&rnd, kLargeValueSize));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(PutUntil(key, value, kExpiration));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] =
BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
sequence, kTypeBlobIndex);
}
// Now add a small TTL value (which will be inlined).
{
const std::string key("key3000");
const std::string value(
test::RandomHumanReadableString(&rnd, kSmallValueSize));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(PutUntil(key, value, kExpiration));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
blob_index_versions[key] = BlobIndexVersion(
key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
}
// Finally, add a small non-TTL value (which will be stored as a regular
// value).
{
const std::string key("key4000");
const std::string value(
test::RandomHumanReadableString(&rnd, kSmallValueSize));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
blob_index_versions[key] = BlobIndexVersion(
key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
}
VerifyDB(data);
VerifyBaseDB(blob_value_versions);
VerifyBaseDBBlobIndex(blob_index_versions);
// At this point, we should have 128 immutable non-TTL files with file numbers
// 1..128.
{
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
for (size_t i = 0; i < kNumBlobFiles; ++i) {
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
ASSERT_EQ(live_imm_files[i]->GetFileSize(),
kBlobFileSize + BlobLogFooter::kSize);
}
}
mock_env_->set_current_time(kCompactTime);
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
blob_db_->DefaultColumnFamily(), nullptr,
nullptr));
// We expect the data to remain the same and the blobs from the oldest N files
// to be moved to new files. Sequence numbers get zeroed out during the
// compaction.
VerifyDB(data);
for (auto &pair : blob_value_versions) {
KeyVersion &version = pair.second;
version.sequence = 0;
}
VerifyBaseDB(blob_value_versions);
const uint64_t cutoff = static_cast<uint64_t>(
bdb_options.garbage_collection_cutoff * kNumBlobFiles);
for (auto &pair : blob_index_versions) {
BlobIndexVersion &version = pair.second;
version.sequence = 0;
if (version.file_number == kInvalidBlobFileNumber) {
continue;
}
if (version.file_number > cutoff) {
continue;
}
version.file_number += kNumBlobFiles + 1;
}
VerifyBaseDBBlobIndex(blob_index_versions);
// At this point, we should have 128 immutable non-TTL files with file numbers
// 33..128 and 130..161. (129 was taken by the TTL blob file.)
{
auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
for (size_t i = 0; i < kNumBlobFiles; ++i) {
uint64_t expected_file_number = i + cutoff + 1;
if (expected_file_number > kNumBlobFiles) {
++expected_file_number;
}
ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
ASSERT_EQ(live_imm_files[i]->GetFileSize(),
kBlobFileSize + BlobLogFooter::kSize);
}
}
}
// File should be evicted after expiration.
TEST_F(BlobDBTest, EvictExpiredFile) {
BlobDBOptions bdb_options;

@ -27,6 +27,7 @@ class BlobFile {
friend class BlobDBImpl;
friend struct BlobFileComparator;
friend struct BlobFileComparatorTTL;
friend class BlobIndexCompactionFilterGC;
private:
// access to parent
@ -240,6 +241,11 @@ class BlobFile {
void SetFileSize(uint64_t fs) { file_size_ = fs; }
void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
void BlobRecordAdded(uint64_t record_size) {
++blob_count_;
file_size_ += record_size;
}
};
} // namespace blob_db
} // namespace rocksdb

Loading…
Cancel
Save