diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index be0931673..dfda5e71f 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -182,7 +182,8 @@ void CompactionIterator::Next() { void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { - if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && + if (compaction_filter_ != nullptr && + (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) && (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || ignore_snapshots_)) { // If the user has specified a compaction filter and the sequence @@ -192,11 +193,13 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, CompactionFilter::Decision filter; compaction_filter_value_.clear(); compaction_filter_skip_until_.Clear(); + CompactionFilter::ValueType value_type = + ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue + : CompactionFilter::ValueType::kBlobIndex; { StopWatchNano timer(env_, true); filter = compaction_filter_->FilterV2( - compaction_->level(), ikey_.user_key, - CompactionFilter::ValueType::kValue, value_, + compaction_->level(), ikey_.user_key, value_type, value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr ? timer.ElapsedNanos() : 0; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 9a8c0318c..64f61a35e 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -36,6 +36,7 @@ class CompactionFilter { enum ValueType { kValue, kMergeOperand, + kBlobIndex, // used internally by BlobDB. }; enum class Decision { @@ -171,6 +172,8 @@ class CompactionFilter { bool rv = FilterMergeOperand(level, key, existing_value); return rv ? Decision::kRemove : Decision::kKeep; } + case ValueType::kBlobIndex: + return Decision::kKeep; } assert(false); return Decision::kKeep; diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h new file mode 100644 index 000000000..26cd188fe --- /dev/null +++ b/utilities/blob_db/blob_compaction_filter.h @@ -0,0 +1,78 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "utilities/blob_db/blob_index.h" + +namespace rocksdb { +namespace blob_db { + +// CompactionFilter to delete expired blob index from base DB. +class BlobIndexCompactionFilter : public CompactionFilter { + public: + explicit BlobIndexCompactionFilter(uint64_t current_time) + : current_time_(current_time) {} + + virtual const char* Name() const override { + return "BlobIndexCompactionFilter"; + } + + // Filter expired blob indexes regardless of snapshots. + virtual bool IgnoreSnapshots() const override { return true; } + + virtual Decision FilterV2(int /*level*/, const Slice& /*key*/, + ValueType value_type, const Slice& value, + std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (value_type != kBlobIndex) { + return Decision::kKeep; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + return Decision::kRemove; + } + return Decision::kKeep; + } + + private: + const uint64_t current_time_; +}; + +class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit BlobIndexCompactionFilterFactory(Env* env) : env_(env) {} + + virtual const char* Name() const override { + return "BlobIndexCompactionFilterFactory"; + } + + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + int64_t current_time = 0; + Status s = env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + return std::unique_ptr( + new BlobIndexCompactionFilter(static_cast(current_time))); + } + + private: + Env* env_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index f763ced20..b278df77f 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -26,6 +26,7 @@ #include "table/block_builder.h" #include "util/file_reader_writer.h" #include "util/filename.h" +#include "utilities/blob_db/blob_compaction_filter.h" #include "utilities/blob_db/blob_db_impl.h" namespace rocksdb { @@ -45,6 +46,11 @@ Status BlobDB::OpenAndLoad(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db, Options* changed_options) { + if (options.compaction_filter != nullptr || + options.compaction_filter_factory != nullptr) { + return Status::NotSupported("Blob DB doesn't support compaction filter."); + } + *changed_options = options; *blob_db = nullptr; @@ -63,6 +69,8 @@ Status BlobDB::OpenAndLoad(const Options& options, all_wal_filters.push_back(rw_filter); } + changed_options->compaction_filter_factory.reset( + new BlobIndexCompactionFilterFactory(options.env)); changed_options->listeners.emplace_back(fblistener); if (bdb_options.enable_garbage_collection) { changed_options->listeners.emplace_back(ce_listener); @@ -112,6 +120,11 @@ Status BlobDB::Open(const DBOptions& db_options_input, const std::vector& column_families, std::vector* handles, BlobDB** blob_db, bool no_base_db) { + if (column_families.size() != 1 || + column_families[0].name != kDefaultColumnFamilyName) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } *blob_db = nullptr; Status s; @@ -144,6 +157,15 @@ Status BlobDB::Open(const DBOptions& db_options_input, all_wal_filters.push_back(rw_filter); } + ColumnFamilyOptions cf_options(column_families[0].options); + if (cf_options.compaction_filter != nullptr || + cf_options.compaction_filter_factory != nullptr) { + return Status::NotSupported("Blob DB doesn't support compaction filter."); + } + cf_options.compaction_filter_factory.reset( + new BlobIndexCompactionFilterFactory(db_options.env)); + ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options); + // we need to open blob db first so that recovery can happen BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); fblistener->SetImplPtr(bdb); @@ -164,7 +186,7 @@ Status BlobDB::Open(const DBOptions& db_options_input, } DB* db = nullptr; - s = DB::Open(db_options, dbname, column_families, handles, &db); + s = DB::Open(db_options, dbname, {cf_descriptor}, handles, &db); if (!s.ok()) { delete bdb; return s; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 9cece1522..41c640f17 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1727,6 +1727,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq); // If key has expired, remove it from base DB. + // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter. + // We can just drop the blob record. if (no_relocation_ttl || (has_ttl && now >= record.expiration)) { gc_stats->num_deletes++; gc_stats->deleted_size += record.value_size; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 1c9493565..b036c0208 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -47,10 +47,15 @@ class BlobDBTest : public testing::Test { ~BlobDBTest() { Destroy(); } + Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + options.create_if_missing = true; + return BlobDB::Open(options, bdb_options, dbname_, &blob_db_); + } + void Open(BlobDBOptions bdb_options = BlobDBOptions(), Options options = Options()) { - options.create_if_missing = true; - ASSERT_OK(BlobDB::Open(options, bdb_options, dbname_, &blob_db_)); + ASSERT_OK(TryOpen(bdb_options, options)); } void Destroy() { @@ -79,6 +84,10 @@ class BlobDBTest : public testing::Test { } } + Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) { + return blob_db_->PutUntil(WriteOptions(), key, value, expiration); + } + void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, std::map *data = nullptr) { int len = rnd->Next() % kMaxBlobSize + 1; @@ -1122,6 +1131,95 @@ TEST_F(BlobDBTest, InlineSmallValues) { ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second); } +TEST_F(BlobDBTest, CompactionFilterNotSupported) { + class TestCompactionFilter : public CompactionFilter { + virtual const char *Name() const { return "TestCompactionFilter"; } + }; + class TestCompactionFilterFactory : public CompactionFilterFactory { + virtual const char *Name() const { return "TestCompactionFilterFactory"; } + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context & /*context*/) { + return std::unique_ptr(new TestCompactionFilter()); + } + }; + for (int i = 0; i < 2; i++) { + Options options; + if (i == 0) { + options.compaction_filter = new TestCompactionFilter(); + } else { + options.compaction_filter_factory.reset( + new TestCompactionFilterFactory()); + } + ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported()); + delete options.compaction_filter; + } +} + +TEST_F(BlobDBTest, FilterExpiredBlobIndex) { + constexpr size_t kNumKeys = 100; + constexpr size_t kNumPuts = 1000; + constexpr uint64_t kMaxExpiration = 1000; + constexpr uint64_t kCompactTime = 500; + constexpr uint64_t kMinBlobSize = 100; + Random rnd(301); + mock_env_->set_current_time(0); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + for (size_t i = 0; i < kNumPuts; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 10 : 200; + std::string key = "key" + ToString(rnd.Next() % kNumKeys); + std::string value = test::RandomHumanReadableString(&rnd, len); + if (!has_ttl) { + if (is_small_value) { + std::string blob_entry; + BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value); + // Fake blob index with TTL. See what it will do. + ASSERT_GT(kMinBlobSize, blob_entry.size()); + value = blob_entry; + } + ASSERT_OK(Put(key, value)); + data_after_compact[key] = value; + } else { + ASSERT_OK(PutUntil(key, value, expiration)); + if (expiration <= kCompactTime) { + data_after_compact.erase(key); + } else { + data_after_compact[key] = value; + } + } + data[key] = value; + } + VerifyDB(data); + + mock_env_->set_current_time(kCompactTime); + // Take a snapshot before compaction. Make sure expired blob indexes is + // filtered regardless of snapshot. + const Snapshot *snapshot = blob_db_->GetSnapshot(); + // Issue manual compaction to trigger compaction filter. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), + blob_db_->DefaultColumnFamily(), nullptr, + nullptr)); + blob_db_->ReleaseSnapshot(snapshot); + // Verify expired blob index are filtered. + std::vector versions; + GetAllKeyVersions(blob_db_, "", "", &versions); + ASSERT_EQ(data_after_compact.size(), versions.size()); + for (auto &version : versions) { + ASSERT_TRUE(data_after_compact.count(version.user_key) > 0); + } + VerifyDB(data_after_compact); +} + } // namespace blob_db } // namespace rocksdb