From 583c6953d818d4e422bfca28dc16fd6cb7e724db Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 13 Dec 2019 10:11:03 -0800 Subject: [PATCH] Move out valid blobs from the oldest blob files during compaction (#6121) Summary: The patch adds logic that relocates live blobs from the oldest N non-TTL blob files as they are encountered during compaction (assuming the BlobDB configuration option `enable_garbage_collection` is `true`), where N is defined as the number of immutable non-TTL blob files multiplied by the value of a new BlobDB configuration option called `garbage_collection_cutoff`. (The default value of this parameter is 0.25, that is, by default the valid blobs residing in the oldest 25% of immutable non-TTL blob files are relocated.) Pull Request resolved: https://github.com/facebook/rocksdb/pull/6121 Test Plan: Added unit test and tested using the BlobDB mode of `db_bench`. Differential Revision: D18785357 Pulled By: ltamasi fbshipit-source-id: 8c21c512a18fba777ec28765c88682bb1a5e694e --- db/compaction/compaction_iterator.cc | 59 ++-- include/rocksdb/compaction_filter.h | 9 + utilities/blob_db/blob_compaction_filter.cc | 329 +++++++++++++++----- utilities/blob_db/blob_compaction_filter.h | 141 ++++++++- utilities/blob_db/blob_db.h | 10 +- utilities/blob_db/blob_db_impl.cc | 84 ++++- utilities/blob_db/blob_db_impl.h | 16 +- utilities/blob_db/blob_db_test.cc | 243 ++++++++++++++- utilities/blob_db/blob_file.h | 6 + 9 files changed, 765 insertions(+), 132 deletions(-) diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 59097eec0..57bb92570 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -639,28 +639,45 @@ void CompactionIterator::NextFromInput() { } void CompactionIterator::PrepareOutput() { - // Zeroing out the sequence number leads to better compression. - // If this is the bottommost level (no files in lower levels) - // and the earliest snapshot is larger than this seqno - // and the userkey differs from the last userkey in compaction - // then we can squash the seqno to zero. - // - // This is safe for TransactionDB write-conflict checking since transactions - // only care about sequence number larger than any active snapshots. - // - // Can we do the same for levels above bottom level as long as - // KeyNotExistsBeyondOutputLevel() return true? - if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && - ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ && - IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) { - assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); - if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { - ROCKS_LOG_FATAL(info_log_, - "Unexpected key type %d for seq-zero optimization", - ikey_.type); + if (valid_) { + if (compaction_filter_ && ikey_.type == kTypeBlobIndex) { + const auto blob_decision = compaction_filter_->PrepareBlobOutput( + user_key(), value_, &compaction_filter_value_); + + if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { + status_ = Status::Corruption( + "Corrupted blob reference encountered during GC"); + } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { + status_ = Status::IOError("Could not relocate blob during GC"); + } else if (blob_decision == + CompactionFilter::BlobDecision::kChangeValue) { + value_ = compaction_filter_value_; + } + } + + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // and the userkey differs from the last userkey in compaction + // then we can squash the seqno to zero. + // + // This is safe for TransactionDB write-conflict checking since transactions + // only care about sequence number larger than any active snapshots. + // + // Can we do the same for levels above bottom level as long as + // KeyNotExistsBeyondOutputLevel() return true? + if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && + ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && + IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) { + assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { + ROCKS_LOG_FATAL(info_log_, + "Unexpected key type %d for seq-zero optimization", + ikey_.type); + } + ikey_.sequence = 0; + current_key_.UpdateInternalKey(0, ikey_.type); } - ikey_.sequence = 0; - current_key_.UpdateInternalKey(0, ikey_.type); } } diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 5d476fb8e..1964091a9 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -45,6 +45,8 @@ class CompactionFilter { kRemoveAndSkipUntil, }; + enum class BlobDecision { kKeep, kChangeValue, kCorruption, kIOError }; + // Context information of a compaction run struct Context { // Does this compaction run include all data files @@ -173,6 +175,13 @@ class CompactionFilter { return Decision::kKeep; } + // Internal (BlobDB) use only. Do not override in application code. + virtual BlobDecision PrepareBlobOutput(const Slice& /* key */, + const Slice& /* existing_value */, + std::string* /* new_value */) const { + return BlobDecision::kKeep; + } + // This function is deprecated. Snapshots will always be ignored for // compaction filters, because we realized that not ignoring snapshots doesn't // provide the gurantee we initially thought it would provide. Repeatable diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc index f145d9a92..cb7adf64b 100644 --- a/utilities/blob_db/blob_compaction_filter.cc +++ b/utilities/blob_db/blob_compaction_filter.cc @@ -11,102 +11,279 @@ namespace rocksdb { namespace blob_db { -namespace { - -// CompactionFilter to delete expired blob index from base DB. -class BlobIndexCompactionFilter : public CompactionFilter { - public: - BlobIndexCompactionFilter(BlobCompactionContext context, - uint64_t current_time, Statistics* statistics) - : context_(context), - current_time_(current_time), - statistics_(statistics) {} - - ~BlobIndexCompactionFilter() override { - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); - } - - const char* Name() const override { return "BlobIndexCompactionFilter"; } - - // Filter expired blob indexes regardless of snapshots. - bool IgnoreSnapshots() const override { return true; } - - Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type, - const Slice& value, std::string* /*new_value*/, - std::string* /*skip_until*/) const override { - if (value_type != kBlobIndex) { - return Decision::kKeep; - } - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(value); - if (!s.ok()) { - // Unable to decode blob index. Keeping the value. - return Decision::kKeep; - } - if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { - // Expired - expired_count_++; - expired_size_ += key.size() + value.size(); - return Decision::kRemove; - } - if (!blob_index.IsInlined() && - blob_index.file_number() < context_.next_file_number && - context_.current_blob_files.count(blob_index.file_number()) == 0) { - // Corresponding blob file gone. Could have been garbage collected or - // evicted by FIFO eviction. +CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2( + int /*level*/, const Slice& key, ValueType value_type, const Slice& value, + std::string* /*new_value*/, std::string* /*skip_until*/) const { + if (value_type != kBlobIndex) { + return Decision::kKeep; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone. Could have been garbage collected or + // evicted by FIFO eviction. + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + bool ok = ParseInternalKey(key, &ikey); + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ok && ikey.sequence < context_.fifo_eviction_seq) { evicted_count_++; evicted_size_ += key.size() + value.size(); return Decision::kRemove; } - if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && - blob_index.expiration() < context_.evict_expiration_up_to) { - // Hack: Internal key is passed to BlobIndexCompactionFilter for it to - // get sequence number. - ParsedInternalKey ikey; - bool ok = ParseInternalKey(key, &ikey); - // Remove keys that could have been remove by last FIFO eviction. - // If get error while parsing key, ignore and continue. - if (ok && ikey.sequence < context_.fifo_eviction_seq) { - evicted_count_++; - evicted_size_ += key.size() + value.size(); - return Decision::kRemove; - } - } - return Decision::kKeep; + } + return Decision::kKeep; +} + +CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput( + const Slice& key, const Slice& existing_value, + std::string* new_value) const { + assert(new_value); + + const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + (void)blob_db_impl; + + assert(blob_db_impl); + assert(blob_db_impl->bdb_options_.enable_garbage_collection); + + BlobIndex blob_index; + const Status s = blob_index.DecodeFrom(existing_value); + if (!s.ok()) { + return BlobDecision::kCorruption; + } + + if (blob_index.IsInlined()) { + return BlobDecision::kKeep; + } + + if (blob_index.HasTTL()) { + return BlobDecision::kKeep; + } + + if (blob_index.file_number() >= context_gc_.cutoff_file_number) { + return BlobDecision::kKeep; + } + + // Note: each compaction generates its own blob files, which, depending on the + // workload, might result in many small blob files. The total number of files + // is bounded though (determined by the number of compactions and the blob + // file size option). + if (!OpenNewBlobFileIfNeeded()) { + return BlobDecision::kIOError; + } + + PinnableSlice blob; + CompressionType compression_type = kNoCompression; + if (!ReadBlobFromOldFile(key, blob_index, &blob, &compression_type)) { + return BlobDecision::kIOError; + } + + uint64_t new_blob_file_number = 0; + uint64_t new_blob_offset = 0; + if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) { + return BlobDecision::kIOError; } - private: - BlobCompactionContext context_; - const uint64_t current_time_; - Statistics* statistics_; - // It is safe to not using std::atomic since the compaction filter, created - // from a compaction filter factroy, will not be called from multiple threads. - mutable uint64_t expired_count_ = 0; - mutable uint64_t expired_size_ = 0; - mutable uint64_t evicted_count_ = 0; - mutable uint64_t evicted_size_ = 0; -}; + if (!CloseAndRegisterNewBlobFileIfNeeded()) { + return BlobDecision::kIOError; + } + + BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, + blob.size(), compression_type); + + return BlobDecision::kChangeValue; +} + +bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const { + if (blob_file_) { + assert(writer_); + return true; + } -} // anonymous namespace + BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + + const Status s = blob_db_impl->CreateBlobFileAndWriter( + /* has_ttl */ false, ExpirationRange(), "GC", &blob_file_, &writer_); + if (!s.ok()) { + ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, + "Error opening new blob file during GC, status: %s", + s.ToString().c_str()); + + return false; + } + + assert(blob_file_); + assert(writer_); + + return true; +} + +bool BlobIndexCompactionFilterGC::ReadBlobFromOldFile( + const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob, + CompressionType* compression_type) const { + BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + + const Status s = blob_db_impl->GetRawBlobFromFile( + key, blob_index.file_number(), blob_index.offset(), blob_index.size(), + blob, compression_type); + + if (!s.ok()) { + ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, + "Error reading blob during GC, key: %s (%s), status: %s", + key.ToString(/* output_hex */ true).c_str(), + blob_index.DebugString(/* output_hex */ true).c_str(), + s.ToString().c_str()); + + return false; + } + + return true; +} + +bool BlobIndexCompactionFilterGC::WriteBlobToNewFile( + const Slice& key, const Slice& blob, uint64_t* new_blob_file_number, + uint64_t* new_blob_offset) const { + assert(new_blob_file_number); + assert(new_blob_offset); + + assert(blob_file_); + *new_blob_file_number = blob_file_->BlobFileNumber(); + + assert(writer_); + uint64_t new_key_offset = 0; + const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset, + new_blob_offset); + + if (!s.ok()) { + const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error writing blob to new file %s during GC, key: %s, status: %s", + blob_file_->PathName().c_str(), + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + return false; + } + + const uint64_t new_size = + BlobLogRecord::kHeaderSize + key.size() + blob.size(); + blob_file_->BlobRecordAdded(new_size); + + BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + + blob_db_impl->total_blob_size_ += new_size; + + return true; +} + +bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFileIfNeeded() const { + const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + + assert(blob_file_); + if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) { + return true; + } + + return CloseAndRegisterNewBlobFile(); +} + +bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFile() const { + BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl; + assert(blob_db_impl); + assert(blob_file_); + + Status s; + + { + WriteLock wl(&blob_db_impl->mutex_); + + s = blob_db_impl->CloseBlobFile(blob_file_); + + // Note: we delay registering the new blob file until it's closed to + // prevent FIFO eviction from processing it during the GC run. + blob_db_impl->RegisterBlobFile(blob_file_); + } + + assert(blob_file_->Immutable()); + blob_file_.reset(); + + if (!s.ok()) { + ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, + "Error closing new blob file %s during GC, status: %s", + blob_file_->PathName().c_str(), s.ToString().c_str()); + + return false; + } + + return true; +} std::unique_ptr BlobIndexCompactionFilterFactory::CreateCompactionFilter( const CompactionFilter::Context& /*context*/) { + assert(env()); + int64_t current_time = 0; - Status s = env_->GetCurrentTime(¤t_time); + Status s = env()->GetCurrentTime(¤t_time); if (!s.ok()) { return nullptr; } assert(current_time >= 0); + assert(blob_db_impl()); + BlobCompactionContext context; - blob_db_impl_->GetCompactionContext(&context); + blob_db_impl()->GetCompactionContext(&context); return std::unique_ptr(new BlobIndexCompactionFilter( - context, static_cast(current_time), statistics_)); + std::move(context), current_time, statistics())); +} + +std::unique_ptr +BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) { + assert(env()); + + int64_t current_time = 0; + Status s = env()->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + assert(blob_db_impl()); + + BlobCompactionContext context; + BlobCompactionContextGC context_gc; + blob_db_impl()->GetCompactionContext(&context, &context_gc); + + return std::unique_ptr(new BlobIndexCompactionFilterGC( + std::move(context), std::move(context_gc), current_time, statistics())); } } // namespace blob_db diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index 5555d6f72..32f44e516 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -17,24 +17,113 @@ namespace rocksdb { namespace blob_db { struct BlobCompactionContext { - uint64_t next_file_number; + uint64_t next_file_number = 0; std::unordered_set current_blob_files; - SequenceNumber fifo_eviction_seq; - uint64_t evict_expiration_up_to; + SequenceNumber fifo_eviction_seq = 0; + uint64_t evict_expiration_up_to = 0; }; -class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { +struct BlobCompactionContextGC { + BlobDBImpl* blob_db_impl = nullptr; + uint64_t cutoff_file_number = 0; +}; + +// Compaction filter that deletes expired blob indexes from the base DB. +// Comes into two varieties, one for the non-GC case and one for the GC case. +class BlobIndexCompactionFilterBase : public CompactionFilter { public: - BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env, - Statistics* statistics) - : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {} + BlobIndexCompactionFilterBase(BlobCompactionContext&& context, + uint64_t current_time, Statistics* statistics) + : context_(std::move(context)), + current_time_(current_time), + statistics_(statistics) {} - virtual const char* Name() const override { - return "BlobIndexCompactionFilterFactory"; + ~BlobIndexCompactionFilterBase() override { + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); } - virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& /*context*/) override; + // Filter expired blob indexes regardless of snapshots. + bool IgnoreSnapshots() const override { return true; } + + Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type, + const Slice& value, std::string* /*new_value*/, + std::string* /*skip_until*/) const override; + + private: + BlobCompactionContext context_; + const uint64_t current_time_; + Statistics* statistics_; + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; + mutable uint64_t expired_size_ = 0; + mutable uint64_t evicted_count_ = 0; + mutable uint64_t evicted_size_ = 0; +}; + +class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase { + public: + BlobIndexCompactionFilter(BlobCompactionContext&& context, + uint64_t current_time, Statistics* statistics) + : BlobIndexCompactionFilterBase(std::move(context), current_time, + statistics) {} + + const char* Name() const override { return "BlobIndexCompactionFilter"; } +}; + +class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase { + public: + BlobIndexCompactionFilterGC(BlobCompactionContext&& context, + BlobCompactionContextGC&& context_gc, + uint64_t current_time, Statistics* statistics) + : BlobIndexCompactionFilterBase(std::move(context), current_time, + statistics), + context_gc_(std::move(context_gc)) {} + + ~BlobIndexCompactionFilterGC() override { + if (blob_file_) { + CloseAndRegisterNewBlobFile(); + } + } + + const char* Name() const override { return "BlobIndexCompactionFilterGC"; } + + BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value, + std::string* new_value) const override; + + private: + bool OpenNewBlobFileIfNeeded() const; + bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index, + PinnableSlice* blob, + CompressionType* compression_type) const; + bool WriteBlobToNewFile(const Slice& key, const Slice& blob, + uint64_t* new_blob_file_number, + uint64_t* new_blob_offset) const; + bool CloseAndRegisterNewBlobFileIfNeeded() const; + bool CloseAndRegisterNewBlobFile() const; + + private: + BlobCompactionContextGC context_gc_; + mutable std::shared_ptr blob_file_; + mutable std::shared_ptr writer_; +}; + +// Compaction filter factory; similarly to the filters above, it comes +// in two flavors, one that creates filters that support GC, and one +// that creates non-GC filters. +class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory { + public: + BlobIndexCompactionFilterFactoryBase(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {} + + protected: + BlobDBImpl* blob_db_impl() const { return blob_db_impl_; } + Env* env() const { return env_; } + Statistics* statistics() const { return statistics_; } private: BlobDBImpl* blob_db_impl_; @@ -42,6 +131,36 @@ class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { Statistics* statistics_; }; +class BlobIndexCompactionFilterFactory + : public BlobIndexCompactionFilterFactoryBase { + public: + BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : BlobIndexCompactionFilterFactoryBase(blob_db_impl, env, statistics) {} + + const char* Name() const override { + return "BlobIndexCompactionFilterFactory"; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override; +}; + +class BlobIndexCompactionFilterFactoryGC + : public BlobIndexCompactionFilterFactoryBase { + public: + BlobIndexCompactionFilterFactoryGC(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : BlobIndexCompactionFilterFactoryBase(blob_db_impl, env, statistics) {} + + const char* Name() const override { + return "BlobIndexCompactionFilterFactoryGC"; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override; +}; + } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index d6ebca2db..5a950134e 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -68,11 +68,15 @@ struct BlobDBOptions { // what compression to use for Blob's CompressionType compression = kNoCompression; - // If enabled, blob DB periodically cleanup stale data by rewriting remaining - // live data in blob files to new files. If garbage collection is not enabled, - // blob files will be cleanup based on TTL. + // If enabled, BlobDB cleans up stale blobs in non-TTL files during compaction + // by rewriting the remaining live blobs to new files. bool enable_garbage_collection = false; + // The cutoff in terms of blob file age for garbage collection. Blobs in + // the oldest N non-TTL blob files will be rewritten when encountered during + // compaction, where N = garbage_collection_cutoff * number_of_non_TTL_files. + double garbage_collection_cutoff = 0.25; + // Disable all background job. Used for test only. bool disable_background_tasks = false; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 0e10e647f..d0c85c48f 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -131,13 +131,22 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } Status BlobDBImpl::Open(std::vector* handles) { assert(handles != nullptr); assert(db_ == nullptr); + if (blob_dir_.empty()) { return Status::NotSupported("No blob directory in options"); } + if (cf_options_.compaction_filter != nullptr || cf_options_.compaction_filter_factory != nullptr) { return Status::NotSupported("Blob DB doesn't support compaction filter."); } + + if (bdb_options_.garbage_collection_cutoff < 0.0 || + bdb_options_.garbage_collection_cutoff > 1.0) { + return Status::InvalidArgument( + "Garbage collection cutoff must be in the interval [0.0, 1.0]"); + } + // BlobDB does not support Periodic Compactions. So disable periodic // compactions irrespective of the user set value. cf_options_.periodic_compaction_seconds = 0; @@ -185,11 +194,17 @@ Status BlobDBImpl::Open(std::vector* handles) { } // Update options - db_options_.listeners.push_back(bdb_options_.enable_garbage_collection - ? std::make_shared(this) - : std::make_shared(this)); - cf_options_.compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(this, env_, statistics_)); + if (bdb_options_.enable_garbage_collection) { + db_options_.listeners.push_back(std::make_shared(this)); + cf_options_.compaction_filter_factory = + std::make_shared(this, env_, + statistics_); + } else { + db_options_.listeners.push_back(std::make_shared(this)); + cf_options_.compaction_filter_factory = + std::make_shared(this, env_, + statistics_); + } // Open base db. ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); @@ -607,6 +622,17 @@ std::shared_ptr BlobDBImpl::NewBlobFile( return blob_file; } +void BlobDBImpl::RegisterBlobFile(std::shared_ptr blob_file) { + const uint64_t blob_file_number = blob_file->BlobFileNumber(); + + auto it = blob_files_.lower_bound(blob_file_number); + assert(it == blob_files_.end() || it->first != blob_file_number); + + blob_files_.insert(it, + std::map>::value_type( + blob_file_number, std::move(blob_file))); +} + Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { std::string fpath(bfile->PathName()); std::unique_ptr wfile; @@ -767,8 +793,7 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { return s; } - blob_files_.insert(std::map>::value_type( - (*blob_file)->BlobFileNumber(), *blob_file)); + RegisterBlobFile(*blob_file); open_non_ttl_file_ = *blob_file; return s; @@ -814,8 +839,7 @@ Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, return s; } - blob_files_.insert(std::map>::value_type( - (*blob_file)->BlobFileNumber(), *blob_file)); + RegisterBlobFile(*blob_file); open_ttl_files_.insert(*blob_file); return s; @@ -1062,8 +1086,9 @@ Status BlobDBImpl::CompactFiles( return s; } -void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { - ReadLock l(&mutex_); +void BlobDBImpl::GetCompactionContextCommon( + BlobCompactionContext* context) const { + assert(context); context->next_file_number = next_file_number_.load(); context->current_blob_files.clear(); @@ -1074,6 +1099,33 @@ void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { context->evict_expiration_up_to = evict_expiration_up_to_; } +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { + assert(context); + + ReadLock l(&mutex_); + GetCompactionContextCommon(context); +} + +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context, + BlobCompactionContextGC* context_gc) { + assert(context); + assert(context_gc); + + ReadLock l(&mutex_); + GetCompactionContextCommon(context); + + context_gc->blob_db_impl = this; + + if (!live_imm_non_ttl_blob_files_.empty()) { + auto it = live_imm_non_ttl_blob_files_.begin(); + std::advance(it, bdb_options_.garbage_collection_cutoff * + live_imm_non_ttl_blob_files_.size()); + context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end() + ? it->first + : std::numeric_limits::max(); + } +} + void BlobDBImpl::UpdateLiveSSTSize() { uint64_t live_sst_size = 0; bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size); @@ -1199,11 +1251,8 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, return s; } - // increment blob count - bfile->blob_count_++; - uint64_t size_put = headerbuf.size() + key.size() + value.size(); - bfile->file_size_ += size_put; + bfile->BlobRecordAdded(size_put); total_blob_size_ += size_put; if (expiration == kNoExpiration) { @@ -1573,7 +1622,10 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { assert(bfile); assert(!bfile->Immutable()); assert(!bfile->Obsolete()); - write_mutex_.AssertHeld(); + + if (bfile->HasTTL() || bfile == open_non_ttl_file_) { + write_mutex_.AssertHeld(); + } ROCKS_LOG_INFO(db_options_.info_log, "Closing blob file %" PRIu64 ". Path: %s", diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 18aed63f8..aecdd36ed 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -44,6 +44,7 @@ struct FlushJobInfo; namespace blob_db { struct BlobCompactionContext; +struct BlobCompactionContextGC; class BlobDBImpl; class BlobFile; @@ -79,6 +80,7 @@ class BlobDBImpl : public BlobDB { friend class BlobDBIterator; friend class BlobDBListener; friend class BlobDBListenerGC; + friend class BlobIndexCompactionFilterGC; public: // deletions check period @@ -179,7 +181,13 @@ class BlobDBImpl : public BlobDB { Status SyncBlobFiles() override; + // Common part of the two GetCompactionContext methods below. + // REQUIRES: read lock on mutex_ + void GetCompactionContextCommon(BlobCompactionContext* context) const; + void GetCompactionContext(BlobCompactionContext* context); + void GetCompactionContext(BlobCompactionContext* context, + BlobCompactionContextGC* context_gc); #ifndef NDEBUG Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, @@ -247,7 +255,9 @@ class BlobDBImpl : public BlobDB { // Close a file by appending a footer, and removes file from open files list. // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ - // and the blob file's mutex_. + // and the blob file's mutex_. If called on a blob file which is visible only + // to a single thread (like in the case of new files written during GC), the + // locks on write_mutex_ and the blob file's mutex_ can be avoided. Status CloseBlobFile(std::shared_ptr bfile); // Close a file if its size exceeds blob_file_size @@ -317,6 +327,10 @@ class BlobDBImpl : public BlobDB { const ExpirationRange& expiration_range, const std::string& reason); + // Register a new blob file. + // REQUIRES: write lock on mutex_. + void RegisterBlobFile(std::shared_ptr blob_file); + // collect all the blob log files from the blob directory Status GetAllBlobFiles(std::set* file_numbers); diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index b2f9d3a5f..0bb207167 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -8,8 +8,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -35,10 +37,22 @@ class BlobDBTest : public testing::Test { public: const int kMaxBlobSize = 1 << 14; - struct BlobRecord { - std::string key; - std::string value; - uint64_t expiration = 0; + struct BlobIndexVersion { + BlobIndexVersion() = default; + BlobIndexVersion(std::string _user_key, uint64_t _file_number, + uint64_t _expiration, SequenceNumber _sequence, + ValueType _type) + : user_key(std::move(_user_key)), + file_number(_file_number), + expiration(_expiration), + sequence(_sequence), + type(_type) {} + + std::string user_key; + uint64_t file_number = kInvalidBlobFileNumber; + uint64_t expiration = kNoExpiration; + SequenceNumber sequence = 0; + ValueType type = kTypeValue; }; BlobDBTest() @@ -230,6 +244,45 @@ class BlobDBTest : public testing::Test { } } + void VerifyBaseDBBlobIndex( + const std::map &expected_versions) { + const size_t kMaxKeys = 10000; + std::vector versions; + ASSERT_OK( + GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions)); + ASSERT_EQ(versions.size(), expected_versions.size()); + + size_t i = 0; + for (const auto &expected_pair : expected_versions) { + const BlobIndexVersion &expected_version = expected_pair.second; + + ASSERT_EQ(versions[i].user_key, expected_version.user_key); + ASSERT_EQ(versions[i].sequence, expected_version.sequence); + ASSERT_EQ(versions[i].type, expected_version.type); + if (versions[i].type != kTypeBlobIndex) { + ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number); + ASSERT_EQ(kNoExpiration, expected_version.expiration); + + ++i; + continue; + } + + BlobIndex blob_index; + ASSERT_OK(blob_index.DecodeFrom(versions[i].value)); + + const uint64_t file_number = !blob_index.IsInlined() + ? blob_index.file_number() + : kInvalidBlobFileNumber; + ASSERT_EQ(file_number, expected_version.file_number); + + const uint64_t expiration = + blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration; + ASSERT_EQ(expiration, expected_version.expiration); + + ++i; + } + } + void InsertBlobs() { WriteOptions wo; std::string value; @@ -1538,6 +1591,188 @@ TEST_F(BlobDBTest, FilterForFIFOEviction) { VerifyDB(data_after_compact); } +TEST_F(BlobDBTest, GarbageCollection) { + constexpr size_t kNumPuts = 1 << 10; + + constexpr uint64_t kExpiration = 1000; + constexpr uint64_t kCompactTime = 500; + + constexpr uint64_t kKeySize = 7; // "key" + 4 digits + + constexpr uint64_t kSmallValueSize = 1 << 6; + constexpr uint64_t kLargeValueSize = 1 << 8; + constexpr uint64_t kMinBlobSize = 1 << 7; + static_assert(kSmallValueSize < kMinBlobSize, ""); + static_assert(kLargeValueSize > kMinBlobSize, ""); + + constexpr size_t kBlobsPerFile = 8; + constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile; + constexpr uint64_t kBlobFileSize = + BlobLogHeader::kSize + + (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile; + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.blob_file_size = kBlobFileSize; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 0.25; + bdb_options.disable_background_tasks = true; + + Options options; + options.env = mock_env_.get(); + + Open(bdb_options, options); + + std::map data; + std::map blob_value_versions; + std::map blob_index_versions; + + Random rnd(301); + + // Add a bunch of large non-TTL values. These will be written to non-TTL + // blob files and will be subject to GC. + for (size_t i = 0; i < kNumPuts; ++i) { + std::ostringstream oss; + oss << "key" << std::setw(4) << std::setfill('0') << i; + + const std::string key(oss.str()); + const std::string value( + test::RandomHumanReadableString(&rnd, kLargeValueSize)); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = + BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration, + sequence, kTypeBlobIndex); + } + + // Add some small and/or TTL values that will be ignored during GC. + // First, add a large TTL value will be written to its own TTL blob file. + { + const std::string key("key2000"); + const std::string value( + test::RandomHumanReadableString(&rnd, kLargeValueSize)); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(PutUntil(key, value, kExpiration)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = + BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration, + sequence, kTypeBlobIndex); + } + + // Now add a small TTL value (which will be inlined). + { + const std::string key("key3000"); + const std::string value( + test::RandomHumanReadableString(&rnd, kSmallValueSize)); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(PutUntil(key, value, kExpiration)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = BlobIndexVersion( + key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex); + } + + // Finally, add a small non-TTL value (which will be stored as a regular + // value). + { + const std::string key("key4000"); + const std::string value( + test::RandomHumanReadableString(&rnd, kSmallValueSize)); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue); + blob_index_versions[key] = BlobIndexVersion( + key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue); + } + + VerifyDB(data); + VerifyBaseDB(blob_value_versions); + VerifyBaseDBBlobIndex(blob_index_versions); + + // At this point, we should have 128 immutable non-TTL files with file numbers + // 1..128. + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), kNumBlobFiles); + for (size_t i = 0; i < kNumBlobFiles; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + ASSERT_EQ(live_imm_files[i]->GetFileSize(), + kBlobFileSize + BlobLogFooter::kSize); + } + } + + mock_env_->set_current_time(kCompactTime); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), + blob_db_->DefaultColumnFamily(), nullptr, + nullptr)); + + // We expect the data to remain the same and the blobs from the oldest N files + // to be moved to new files. Sequence numbers get zeroed out during the + // compaction. + VerifyDB(data); + + for (auto &pair : blob_value_versions) { + KeyVersion &version = pair.second; + version.sequence = 0; + } + + VerifyBaseDB(blob_value_versions); + + const uint64_t cutoff = static_cast( + bdb_options.garbage_collection_cutoff * kNumBlobFiles); + for (auto &pair : blob_index_versions) { + BlobIndexVersion &version = pair.second; + + version.sequence = 0; + + if (version.file_number == kInvalidBlobFileNumber) { + continue; + } + + if (version.file_number > cutoff) { + continue; + } + + version.file_number += kNumBlobFiles + 1; + } + + VerifyBaseDBBlobIndex(blob_index_versions); + + // At this point, we should have 128 immutable non-TTL files with file numbers + // 33..128 and 130..161. (129 was taken by the TTL blob file.) + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), kNumBlobFiles); + for (size_t i = 0; i < kNumBlobFiles; ++i) { + uint64_t expected_file_number = i + cutoff + 1; + if (expected_file_number > kNumBlobFiles) { + ++expected_file_number; + } + + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number); + ASSERT_EQ(live_imm_files[i]->GetFileSize(), + kBlobFileSize + BlobLogFooter::kSize); + } + } +} + // File should be evicted after expiration. TEST_F(BlobDBTest, EvictExpiredFile) { BlobDBOptions bdb_options; diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 41e86431f..37fc895fb 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -27,6 +27,7 @@ class BlobFile { friend class BlobDBImpl; friend struct BlobFileComparator; friend struct BlobFileComparatorTTL; + friend class BlobIndexCompactionFilterGC; private: // access to parent @@ -240,6 +241,11 @@ class BlobFile { void SetFileSize(uint64_t fs) { file_size_ = fs; } void SetBlobCount(uint64_t bc) { blob_count_ = bc; } + + void BlobRecordAdded(uint64_t record_size) { + ++blob_count_; + file_size_ += record_size; + } }; } // namespace blob_db } // namespace rocksdb